JAVA/RxJava

[Java] 동기화 도구

꾸준함. 2024. 3. 14. 09:00

개요

앞선 게시물을 읽고 오시는 것을 추천드립니다!

https://jaimemin.tistory.com/2411

 

[Java] Lock, ReentrantLock, ReadWriteLock, ReentrantReadWriteLock

개요 앞선 게시물을 읽고 오시는 것을 추천드립니다! https://jaimemin.tistory.com/2409 [Java] synchronized, wait() & notify(), volatile, Deadlock 개요 앞선 게시물을 읽고 오시는 것을 추천드립니다! https://jaimemin.tist

jaimemin.tistory.com

 

CAS(Compare and Swap)

  • 멀티 쓰레드 환경에서 락을 사용하지 않고도 공유 변수의 값을 원자적으로 변경하는 방법 제공
  • CPU 캐시와 메인 메모리의 두 값을 비교하고 그 값이 동일해야지만 새로운 값으로 교체하는 동기화 연산
    • 여러 쓰레드가 공유하는 메모리 영역을 보호하는데 쓰임

 

  • 락을 사용하지 않기 때문에 대기하지 않는 non-blocking 실행이 가능하고 race condition과 데드락을 방지할 수 있음
    • 다만, 조건에 따라 실패할 경우 성공할 때까지 다시 시도해야 할 수 있는데 동시적으로 접근하는 요청의 수가 많은 race condition일 경우 자칫하면 무한 루프에 가깝게 빠질 수 있으므로 효율성 저하될 수 있음
    • thread가 다수일 때는 오히려 락을 사용하는 synchronized 블록이 유리

 

  • 주호 CPU에서 지원되는 연산으로 자바에서 제공하는 java.util.concurrent.atomic 패키지는 원자적 연산을 통해 CAS 지원

 

AtomicInteger에서 제공하는 메서드 중 하나

 

1. CAS 동작 방식

 

동작 방식 설명에 앞서 핵심 매개변수 세 가지를 소개하겠습니다.

  • 메인 메모리에 저장된 변수의 실제 값 (value)
  • CPU 캐시에 저장된 변수의 현재 기대 값 (expected)
  • 변경하려는 값 (new value)

 

CAS 동작 방식은 다음과 같습니다.

  • value와 expected가 일치하는지 확인
    • 일치할 경우 변수의 값을 new value로 업데이트 후 true로 반환
    • 일치하지 않을 경우 아무 작업도 수행하지 않고 false 반환

 

2. CAS 예시 코드

 

java.util.concurrent.atomic 패키지에서 제공하는 클래스 내 메서드는 내부적으로 CPU가 제공하는 native 메서드를 호출하여 원자적 연산을 통해 CAS를 제공합니다.

 

 

 

코드 부연 설명

  • 3개의 쓰레드가 각각 MAX번씩 CAS 연산 시도
  • compareAndSet 메서드를 호출해서 현재 값과 변경하려는 값을 비교하여 일치할 때까지 시도
  • 실행 결과 멀티 쓰레드 환경에서 동시에 여러 쓰레드에서 CAS 연산을 수행하더라도 적절히 값을 증가시키는 것을 확인 가능
    • synchronized 블록과 달리 락이 없더라도 동시성 보장
    • 다만 while문을 통해 메모리에 저장된 값과 CPU 캐시에 저장된 값이 일치할 때까지 시도하므로 너무 많은 쓰레드가 동시에 요청할 경우 락을 사용하는 기법보다 비효율적일 수 있음

 

단일 연산 변수(Atomic Variables)

  • 락을 사용하지 않고도 여러 쓰레드 간에 안전하게 값을 공유하고 동기화하는 데 사용
  • 기본적으로 volatile 속성을 가지고 있어 가시성을 지원
  • 보통은 읽기에 대해서만 원자성을 보장하는데 단일 연산 변수는 읽기 뿐만 아니라 modify와 write에 대해서도 원자적 연산 지원
    • 내부적으로 CAS 연산을 사용하여 데이터의 일관성과 안전성 유지

 

  • 간단한 연산의 경우 락을 사용하는 것보다 월등히 빠른 성능을 보임
    • 다만, 연산이 복잡하거나 시간이 오래 걸리는 작업은 오히려 락을 사용하는 것보다 오버헤드 커질 수 있음
    • 앞서 CAS 예제 코드에서 확인했다시피 실제 값과 기대하는 값이 일치할 때까지 루프를 타기 때문

 

  • 단일 연산 변수는 단일 연산에 대해 원자성 보장
    • 하지만 여러 연산을 조합한 복잡한 동작에 대해서는 원자성이 보장되지 않음
    • 예를 들어, 여러 변수를 함께 수정하거나 여러 단계로 이루어진 작업에서는 원자성을 보장하기 어려울 수 있으며 이 경우에는 여러 연산을 조합하는 동안 일부 연산이 실행되고 나서 다른 연산이 끼어들어서 데이터의 일관성이 깨지거나 예기치 않은 결과가 발생할 수 있음
    • 이러한 이슈를 해결하기 위해서는 여러 연산이 조합된 복잡한 동작에서도 원자성을 보장할 수 있는 방법을 고려해야 함
    • ex) 트랜잭션(Transaction)이나 락(Lock) 등의 메커니즘을 활용하여 여러 연산을 원자적으로 처리하는 방법을 고려할 수 있음

 

1. 단일 연산 클래스(Atomic Class)

 

  • java.util.concurrent.atomic 패키지는 단일 연산 변수를 사용하기 위한 여러 종류의 단일 연산 클래스 제공
    • AtomicBoolean
    • AtomicInteger
    • AtomicLong
    • AtomicReference
    • AtomicIntegerArray
    • AtomicLongArray
    • AtomicReferenceArray

 

1.1 AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference 공통 API

  • get(): 현재 값을 반환
  • void set(T newValue): 새로운 값으로 설정
  • getAndSet(T newValue): 현재 값을 반환하고 새로운 값을 설정
  • boolean compareAndSet(boolean expect, boolean update)
    • 현재 값이 기대값과 같으면 새 값을 설정한 후 true 반환
    • 현재 값이 기대값과 상이하면 변경 없이 false 반환

 

1.2 AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray 공통 API

  • get(int i): 인덱스 i번째 값을 반환
  • void set(int i, T newValue): 인덱스 i번째에 새로운 값을 설정
  • getAndSet(int i, T newValue): 인덱스 i번째 값을 반환하고 새로운 값을 설정
  • boolean compareAndSet(int i, boolean expect, boolean update)
    • 인덱스 i번째 값이 기대한 값과 같으면 새 값을 설정한 후 true 반환
    • 인덱스 i번째 값이 기대한 값과 상이하면 변경없이 false 반환

 

이처럼 단일 연산 클래스는 타입만 다르고 대부분 제공하는 메서드가 비슷하므로 AtomicBoolean, AtomicInteger, 그리고 AtomicReference 클래스만 잘 살펴보면 다른 클래스도 쉽게 사용할 수 있습니다.

 

1.3 AtomicBoolean 

 

 

 

1.4 AtomicInteger

 

 

1.5 AtomicReference

 

1.5.1 Reference가 자바에서 제공하는 클래스인 코드 (String)

 

 

1.5.2 Reference가 사용자 정의 클래스인 경우 (User)

 

 

코드 부연 설명

  • 두 쓰레드가 동시에 실행되고 있지만, AtomicReference의 compareAndSet 메서드를 사용하여 한 번에 하나의 쓰레드만 참조를 변경할 수 있기 때문에, 변경에 성공한 쓰레드만이 실제로 참조를 변경
  • AtomicReference 생성자에 user2를 넘겼으므로 쓰레드 2에 의해 David로 변경 성공
  • 만약 생성자에 user1을 넘겼더라면 쓰레드 1에 의해 Carol로 변경되었을 것

 

 

단일 연산 필드 업데이터(AtomicFieldUpdater)

  • 지정된 클래스의 volatile 필드에 대한 원자적 업데이트를 가능하게 하는 리플렉션 기반 유틸리티
    • 리플렉션은 클래스나 함수의 메타정보를 사용해서 동적으로 호출하는 메서드를 변경 가능하도록 하는 기술

 

  • 주로 클래스 내부의 필드를 원자적으로 변경하는 경우에 사용

 

1. 단일 연산 필드 업데이터 클래스

 

  • AtomicIntegerFieldUpdater<T>: 지정된 클래스의 int volatile 필드에 대한 원자적 업데이트를 가능하게 지원
  • AtomicLongFieldUpdater<T>: 지정된 클래스의 long volatile 필드에 대한 원자적 업데이트를 가능하게 지원
  • AtomicReferenceFieldUpdater<U, W>: 지정된 클래스의 참조 volatile 필드에 대한 원자적 업데이트를 가능하게 지원

 

2. 정적 팩토리 메서드

 

  • static newUpdater(Class<T> class, String fieldName): 적용하고자 하는 클래스의 타입과 필드명을 설정
  • static newUpdater(Class<U> class, Class<W> class, String fieldName): 적용하고자 하는 클래스의 타입과 필드타입과 필드명을 설정

 

 

3. 공통 API

 

  • get(T obj): 현재 필드 값을 반환
  • void set(T obj, newValue): 필드에 새로운 값 설정
  • getAndSet(T obj, newValue): 현재 필드 값을 반환하고 새로운 값을 설정
  • boolean compareAndSet(T obj, boolean expect, boolean update)
    • 현재 필드 값이 기대하는 값과 일치할 경우 새 값을 설정한 후 true 반환
    • 현재 필드 값이 기대하는 값과 불일치하는 경우 변경 없이 false 반환

 

 

 

4. FieldUpdater를 통해 CAS 연산하는 코드

 

 

* 여태까지 코드를 보면 알 수 있다시피 CAS 연산을 지원하는 메서드명은 보통 다 compareAndSet

 

5. AtomicFieldUpdater vs AtomicVariable

 

일반적으로 많은 객체가 생성되고 각 객체의 특정 필드를 원자적으로 갱신해야 하는 경우에는 AtomicFieldUpdater가 유리합니다.

이는 AtomicFieldUpdater가 리플렉션을 사용하여 객체의 필드를 원자적으로 갱신하기 때문에 각 객체에 대해 동작하기 때문입니다.

여러 객체가 생성되고 필드 갱신이 필요한 상황에서 AtomicFieldUpdater를 사용하는 것은 메모리 측면에서 특히 효율적입니다.

  • 메모리 사용: AtomicFieldUpdater는 객체의 필드를 대상으로 하므로, 객체의 상태를 변경하는 데 필요한 추가적인 메모리 공간이 필요하지 않기 때문에 많은 객체가 생성될 때 메모리 사용을 최소화하는 데 도움이 됨

 

그러나 명확하게 객체로 구분해서 간단하게 사용할 경우 쓰기 더 간편한 AtomicVariable을 사용하는 것을 권장합니다.

 

CountDownLatch

  • 하나 이상의 쓰레드가 다른 쓰레드에서 수행되는 일련의 작업이 완료될 때까지 기다릴 수 있게 해주는 동기화 보조 도구
    • ex) thread-1은 나머지 쓰레드 작업이 완료되어야 작업 시작
    • 즉, 여러 쓰레드가 초기화 작업을 마칠 때까지 기다렸다가 모든 쓰레드가 완료되면 마무리 작업을 수행하는 경우 사용

 

  • CountDownLatch는 주어진 카운트로 초기화되고 await 메서드는 현재 카운트가 countDown 메서드의 호출로 인해 0이 될 때까지 blocking 하는 역할
    • 현재 카운트가 0이 되면 대기 중인 쓰레드가 해제되고 await() 이후 처리가 이루어짐

 

  • CountDownLatch는 일회성으로 처리되며 카운트를 재설정할 수 없음
    • 카운트를 재설정하고 싶을 경우 CyclicBarrier 사용

 

  • 한 쓰레드가 countDown() 메서드를 N번 호출 가능

 

1. 핵심 메서드

 

void await() throws InterruptedException

  • 현재 카운트가 0이라면 해당 메서드는 즉시 반환
  • 카운트가 0보다 크면 다음 두 가지 사항 중 하나가 발생할 때까지 대기 상태
    • countDown 메서드의 호출로 인해 카운트가 0이 됨
    • 다른 쓰레드가 현재 쓰레드를 interrupt 시킴

 

boolean await(long timeout, TimeUnit unit) throws InterruptedException

  • 현재 카운트가 0이라면 해당 메서드는 즉시 반환
  • 카운트가 0보다 크면 다음 세 가지 사항 중 하나가 발생할 때까지 대기 상태
    • countDown 메서드의 호출로 인해 카운트가 0이 됨
    • 다른 쓰레드가 현재 쓰레드를 interrupt 시킴
    • 지정된 대기 시간을 경과

 

void countDown()

  • CountDownLatch의 카운트를 감소시킴
  • 카운트가 0이 되면 모든 대기 중인 쓰레드들 대기 해제

 

long getCount()

  • 현재 카운트를 반환
  • 주로 디버깅 및 테스트 목적으로 사용

 

2. 예제 코드

 

 

 

코드 부연 설명

  • startSignal은 모든 쓰레드가 작업을 시작할 때까지 대기
    • startSignal 카운트를 1로 초기화했기 때문에 startSignal.countDown()이 호출되자마자 카운트가 0이 되어 모든 쓰레드가 작업을 시작

 

  • doneSignal은 모든 쓰레드의 작업이 완료될 때까지 대기
  • doneSignal.await()을 호출하여 모든 쓰레드의 작업이 완료될 때까지 대기
    • 작업을 완료한 후에는 doneSignal.countDown()을 호출하여 doneSignal을 1만큼 감소시킴
    • doneSignal의 count가 0이 되었다는 것은 모든 쓰레드가 작업을 완료했음을 의미
    • count가 0이 되면 doneSignal.await() 이후 라인 수행하여 `모든 쓰레드의 작업이 완료되었습니다.`를 출력

 

CyclicBarrier

  • 공통된 장벽 지점에 도달할 때까지 일련의 쓰레드가 서로 기다리도록 하는 동기화 보조 도구
    • 공통된 장벽 지점은 프로그램에서 지정한 지점을 의미
    • 일련의 쓰레드는 이 장벽 지점에 도달하기 위해 일을 수행하며 모든 쓰레드가 도달할 때까지 대기
    • 모든 쓰레드가 해당 지점에 도달해야지만 모든 쓰레드가 동시에 계속해서 실행될 수 있음
    • 여러 쓰레드가 병렬로 작업을 수행하다가 특정 단계에 도달하거나 모든 쓰레드가 특정 작업을 완료하고 모이는 지점에 사용
    • ex) 병렬 계산 작업 중 중간 결과를 모두 계산한 후에 다음 단계로 진행하기 위해 쓰레드들이 모이는 경우에 유용

 

  • 대기 중인 쓰레드가 해제된 후 재사용할 수 있음
  • CyclicBarrier는 옵션으로 Runnable 명령을 지원하는데 해당 명령은 마지막 쓰레드가 도착한 후 각 장벽 지점마다 한 번씩 실행되는 barrierAction 역할을 수행
    • CyclicBarrier가 설정된 카운트(쓰레드 수)만큼 쓰레드가 await() 메서드를 호출하여 장벽 지점에 도달하면, 그때 barrierAction이 실행된다는 것을 의미
    • 장벽 지점을 모든 쓰레드가 도달할 때마다 실행되는 것이 아니라, 모든 쓰레드가 도착한 후 한 번 실행된다는 것을 의미
    • 이 Runnable은 쓰레드가 장벽 이후  실행을 계속하기 전 공유 상태를 업데이트하는데 유용함

 

1. 핵심 메서드

 

int await() throws InterruptedException, BrokenBarrierException

  • 장벽에서 모든 쓰레드가 await을 호출할 때까지 대기
  • 현재 쓰레드가 마지막으로 도착한 것이 아닌 경우 다음 중 하나가 발생할 때까지 대기
    • 마지막 쓰레드가 도착
    • 다른 쓰레드가 현재 쓰레드를 interrupt
    • 다른 쓰레드가 다른 대기 중인 쓰레드 중 하나를 interrupt
    • 다른 쓰레드가 장벽 대기 중에 timeout
    • 다른 쓰레드가 해당 장벽에 대해 reset() 메서드를 호출

 

* 메서드 진입 시 인터럽트 상태가 설정되거나 혹은 대기 중인 동안 인터럽트 되면 InterruptedException 발생

* 어떤 쓰레드가 대기 중일 때 장벽이 리셋, 끊어지는 경우 혹은 await가 호출될 때  장벽이 깨지는 경우 BrokenBarrierException 발생

* 어떤 쓰레드가 대기 중일 때 인터럽트가 발생할 경우 다른 대기 중인 쓰레드들도 모두 BrokenBarrierException을 던지고 장벽이 깨진 상태로 전환

 

boolean await(long timeout, TimeUnit unit) throws InterruptedException

  • 장벽에서 모든 쓰레드가 await을 호출할 때까지 대기
  • 현재 쓰레드가 마지막으로 도착한 것이 아닌 경우 다음 중 하나가 발생할 때까지 대기
    • 마지막 쓰레드가 도착
    • 다른 쓰레드가 현재 쓰레드를 interrupt
    • 다른 쓰레드가 다른 대기 중인 쓰레드 중 하나를 interrupt
    • 다른 쓰레드가 장벽 대기 중에 timeout
    • 다른 쓰레드가 해당 장벽에 대해 reset() 메서드를 호출
    • 지정된 타임아웃을 경과 (TimeoutException 발생)

 

int getParties()

  • 장벽을 작동시키기 위해 필요한 쓰레드 수를 반환

 

boolean isBroken()

  • 장벽이 깨진 상태인지 확인하는 메서드
  • 만약 생성 이후 혹은 마지막 재설정 이후 interrupt나 timeout으로 하나 이상의 쓰레드가 이 장벽에서 벗어나거나 예외로 인해 barrierAction이 실패했다면 true를 반환
  • 위 조건에 해당하지 않으면 false 반환

 

void reset()

  • 장벽을 초기 상태로 재설정
  • 현재 장벽에서 대기 중인 쓰레드가 있는 경우 이들은 모두 BrokenBarrierException 던짐

 

2. 예제 코드

 

 

코드 부연 설명

  • BarrierAction 클래스
    • CyclicBarrier의 장벽 지점에 도달할 때 실행될 작업을 정의
    • BarrierAction은 생성될 때 parallelSum 배열을 받아서, 해당 배열의 합을 계산하여 출력

 

  • CyclicBarrier는 2개의 쓰레드가 장벽 지점에 도달할 때마다 BarrierAction을 실행
  • 각 쓰레드는 주어진 숫자 배열의 일부분을 합산하고, parallelSum 배열에 결과를 저장
  • 모든 쓰레드가 장벽 지점에 도달하면, BarrierAction이 실행되어 parallelSum 배열의 합을 계산하고 출력
  • CyclicBarrier를 사용하여 병렬 작업을 조율하고, 모든 작업이 완료된 후에 추가적인 작업을 수행할 수 있도록 하는 방법을 보여줌

 

참고

자바 동시성 프로그래밍 [리액티브 프로그래밍 Part.1] - 정수원 강사님

반응형