JAVA/비동기 프로그래밍

[Java] CompletableFuture

꾸준함. 2024. 5. 12. 04:21

동시성 프레임워크 정리 글에서 언급했다시피 Future는 작업의 결과를 가져올 때까지 blocking 되고, 작업을 조합하거나 예외 처리하는 것이 어려웠습니다.

이 문제를 해결하기 위해 자바 8에서 CompletableFuture가 등장했고 이번 게시글에서는 다음 내용을 간단히 정리해 보겠습니다.

  1. 동기 vs 비동기
  2. CompletableFuture 개요
  3. CompletableFuture API
  4. ForkJoinPool

 

1. 동기 vs 비동기

 

1.1 동기(Synchronous)

  • 작업이 순차적으로 실행되며 한 작업의 시작과 완료가 다른 작업의 시작과 완료와 밀접하게 연결된 방식
    • 하나의 작업이 실행 중인 동안 다른 작업은 대기해야 함
    • 작업의 결과를 기다린 후 다음 작업이 진행
    • 작업은 한 번에 하나씩 진행하며 작업을 건너뛰거나 빠뜨릴 수 없음
    • 일반적으로 동기는 싱글 쓰레드일 경우 성립되는 방식

 

1.2 비동기(Asynchronous)

  • 작업이 순차적으로 실행되지 않고 각 작업이 다른 작업의 완료를 기다리지 않고 독립적으로 실행되는 방식
    • 한 작업이 시작된 후에도 다음 작업이 바로 시작될 수 있음
    • 작업의 결과에 영향을 받지 않기 때문에 기다리지 않으며 다른 작업 실행 가능
    • 주로 I/O 작업과 같이 시간이 오래 걸리는 작업을 다룰 때 유용
    • 다수의 작업을 동시에 처리하거나 빠른 응답 시간을 보장해야 하는 경우 활용
    • 비동기는 멀티 쓰레드일 경우 성립되는 방식

 

  • 비동기 상황에서도 동기식 처리가 가능
    • 작업이 완료되지 않은 상태에서 Future의 get() 메서드를 호출하면 blocking
    • 이처럼 작업끼리 독립적이지 않을 경우 별도의 쓰레드에서 작업을 진행하더라도 동기식 처리

 

1.3 블록킹(Blocking)

  • 동기 작업에서 나타나는 현상으로 작업이 완료될 때까지 실행 흐름을 멈추고 대기하는 상태
    • ex) 작업이 완료되지 않은 상태에서 Future의 get() 메서드를 호출하는 경우
    • ex) 파일을 읽거나 네트워크에서 데이터를 받아오는 I/O 작업
    • 위와 같은 상황에서는 작업이 완료될 때까지 다른 작업은 차단되고 대기 상태에 놓임

 

1.4 논블록킹(Non-Blocking)

  • 비동기 작업에서 나타나는 현상으로 블록킹 되지 않고 실행 흐름이 지속되는 특성
    • 특정 작업이 진행 중일 때에도 다른 작업이 계속 실행됨
    • 작업이 완료되지 않았더라도 대기하지 않고 다음 작업을 처리하는 방식
    • 다른 작업들과 동시에 진행될 수 있어 전체 시스템의 응답성을 향상할 수 있음

 

1.5 함수 관점에서 동기 & 비동기

  • 함수를 호출한 Caller와 호출된 함수를 수행하는 주체 Callee
    • Caller와 Callee가 동일 쓰레드라면 동기 관계
    • Caller와 Callee가 서로 다른 쓰레드라면 비동기 관계
    • Caller가 Callee의 작업 결과에 종속적이라면 동기 관계
    • Caller가 Callee의 작업 결과와 상관없이 실행된다면 비동기 관계

 

1.6 함수 관점에서 블록킹 & 논블록킹

  • 함수를 호출한 Caller와 호출된 함수를 수행하는 주체 Callee
    • Caller와 Callee가 동일 쓰레드라면 블록킹
    • Caller와 Callee가 서로 다른 쓰레드라면 논블록킹
    • Caller와 Callee가 순차적으로 작업을 진행하면 블록킹
    • Caller와 Callee가 작업을 동시에 진행하면 논블록킹

 

2. CompletableFuture 개요

  • 자바 8 버전부터 추가된 복잡한 비동기 작업을 효과적으로 처리할 수 있도록 해주는 도구
  • 코드의 가독성을 높이고 비동기 작업의 조합을 간단하게 처리 가능
  • 자바에서 비동기 및 병렬 프로그래밍의 능력을 향상하는데 중요한 역학을 수행하는 클래스
  • CompletableFuture는 사용자가 직접 Future의 값과 상태를 조작 및 제어할 수 있는 구조로 개선
    • 기존의 Future는 쓰레드 풀 내부적으로 자동 생성되고 반환된 결과를 받아 사용
    • ex) 가령, 회원 목록을 가져오고, 스터디 목록을 가져와서, 회원 중에 스터디에 참여하지 않았던 사람들을 추리고, 각 회원에게 추천 스터디 목록을 가져온 다음, 추천 스터디를 이메일로 전송하는 로직이라면 이 중에 일부는 비동기적으로 동시에 처리할 수도 있고 일부는 순차적으로 이어져야 하는데 그런 것들을 CompletableFuture가 제공하는 메서드로 잘 표현할 수 있음 (출처)

 

Future vs CompletableFuture

 

2.1 CompletableFuture vs Future

 

특징 CompletableFuture Future
개요 ForkJoinPool 내장
병렬 처리 지원
ExecutorService의 ThreadPool은 동기 처리 특화
비동기 및 병렬 처리 작업을 병렬로 실행
여러 작업을 동시에 처리 가능
지원하지만 병렬 처리를 위해 추가 작업 필요
Callback 지원 유무 작업이 완료되면 콜백 함수를 실행할 수 있는 기능 제공 미지원
조합 및 체이닝 여러 CompletableFuture 인스턴스를 조합하고 체인 형식으로 연결 가능 여러 Future를 별도로 추적해야 조합 가능
예외 처리 단계적 예외 처리 지원
작업 중 발생한 예외를 적절하게 처리 가능
예외 처리가 번거롭고 구조적이지 않음
동기 및 비동기 메서드 작업을 동기적/비동기적으로 실행할 수 있는 다양한 메서드 제공 주로 동기 메서드인 get() 메서드 사용
명시적 완료 명시적으로 값을 저장하고 완료 가능 미지원
정상 & 오류 상태 구분 정상과 오류를 구분해서 처리 가능 미지원
정상, 오류 모두 완료 상태
재사용 생성한 CompletableFuture는 계속 사용 가능 한번 사용 후 재사용 불가능
다중 작업 처리 다중작업처리 메서드 활용 가능 다중 Future 작업을 위한 처리 로직 필요

 

결론: 비동기 및 병렬 작업의 효율적인 제어와 조작을 위해서는 Future보다 CompletableFuture를 사용하는 것을 권장

 

3. CompletableFuture API

  • CompletableFuture는 비동기 작업과 함수형 프로그래밍의 콜백 패턴을 조합한 Future라고 정의할 수 있으며 두 가지 유형의 API로 구분 가능
    • Future
    • CompletionStage

 

 

3.1 CompletionStage

  • 비동기 작업을 위한 콜백 함수 API를 제공
  • 어떤 작업이 완료된 후에 실행되어야 하는 후속 작업들을 정의하는 데 사용
  • 연쇄적인 비동기 작업을 표현하기 위한 도구로 사용
  • 한 작업의 완료는 자동으로 다음 작업의 시작을 트리거할 수 있어 여러 비동기 작업들을 연속적으로 연결하여 실행할 수 있게 해 줌
    • 앞서 Future vs CompletableFuture 코드 이미지 참고

 

3.2 CopmpletableFuture API

  • 비동기 작업 생성 및 시작
    • CompletableFuture<T> supplyAsync(Supplier<T> supplier)
    • CompletableFuture<Void> funAsync(Runnable runnable)

 

  • 비동기 작업 완료 설정
    • CompletableFuture<T> completedFuture(T value)
    • boolean complete(T value)
    • boolean completeExceptionally(Throwable ex)

 

  • 다중 비동기 작업 조합
    • CompletableFuture<Void> allOf(CompletableFuture<?>.... cfs)
    • CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

 

  • 비동기 작업 대기
    • V join()

 

3.3 CompletionStage API

  • 단일 CompletableFuture 비동기 작업
    • CompletionStage<Void> thenAccept(Consumer action)
    • CompletionStage<Void> thenAcceptAsync(Consumer action)
    • CompletionStage<U> thenApply(Function fn)
    • CompletionStage<U> thenApplyAsync(Function fn)
    • CompletionStage<Void> thenRun(Runnable action)
    • CompletionStage<Void> thenRunAsync(Runnable action)

 

  • 복합 CompletableFuture 비동기 작업
    • CompletionStage<U> thenCombine(CompletionStage other, BiFunction fn)
    • CompletionStage<U> thenCombineAsync(CompletionStage other, BiFunction fn)
    • CompletionStage<U> thenCompose(Function fn)
    • CompletionStage<U> thenComposeAsync(Function fn)

 

  • 비동기 작업 예외 핸들링
    • CompletionStage<U> exceptionally(Function fn)
    • CompletionStage<U> handle(BiFunction fn)
    • CompletionStage<U> whenComplete(BiConsumer action)

 

3.4 비동기 작업 유형

 

메서드 인수(함수형 인터페이스) 인수 추상 메서드 개념
supplyAsync() Supplier<T> T get() 비동기 작업을 생성하고 결과 제공
runAsync() Runnable void run() 비동기 작업을 생성하고 결과가 없는 작업 실행
thenAccept()
thenAcceptAsync()
Consumer<T> void accept(T t) 이전 작업의 결과를 소비하고 새로운 작업 실행
thenApply()
thenApplyAsync()
Function<T, R> R apply(T t) 이전 작업의 결과를 가공하고 새로운 작업 실행
thenRun()
thenRunAsync()
Runnable void run() 이전 작업의 결과를 사용하지 않고 새로운 작업 실행
thenCombine()
thenCombineAsync()
BiFunction<T, U, R> R apply(T t, U u) 두 개의 CompletableFuture의 결과를 조합하고 새로운 작업 실행
thenCompose()
thenComposeAysnc()
Function<T, CompletableFuture<U>> CompletableFuture<U> apply(T t) 이전 작업의 결과에 따라 새로운 CompletableFuture를 생성하고 조합
allOf() CompletableFuture<?>... cfs   모든 CompletableFuture가 완료될 때 새로운 CompletableFuture 반환
anyOf() CompletableFuture<?>... cfs   CompletableFutures 중 하나가 완료될 때 새로운 CompletableFuture 반환

 

https://blog.csdn.net/weixin_43820556/article/details/125380251

 

4. CompletableFuture API (비동기 작업 시작)

  • CompletableFuture는 비동기 작업을 생성하고 실행하는 시작 메서드로 supplyAsync()와 runAsync() 제공
    • 비동기 작업을 실행하기 위해 내부적으로 ForkJoinPool.commonPool()의 쓰레드 풀을 사용
    • 선택적으로 ThreadPoolExcecutor 사용도 가능

 

4.1 supplyAsync(Supplier s)

  • 정적 메서드로서 비동기 작업 시작
  • 작업 수행 후 결과를 반환
  • 다음 작업으로 결과를 제공하기 위해 비동기적으로 수행해야 할 경우 사용
  • Supplier<T> 함수를 인수로 받아 작업 결과 반환
  • 새로운 CompletableFuture<T> 객체를 반환
    • CompletableFuture에 비동기 작업의 결과를 저장

 

4.2 runAsync(Runnable command)

  • 정적 메서드로서 비동기 작업 시작
  • 작업 수행 후 완료 (결과 반환 X)
  • 보통 실행 로그를 남기거나 독립적인 백그라운드 작업 혹은 다음 작업에서 결과를 기다리지 않고 다른 작업을 수행해야 할 경우 사용
  • Runnable 객체를 인수로 받아 작업 실행
  • 새로운 CompletableFuture<T> 객체를 반환
    • 작업 결과 저장 X

 

 

 

5. CompletableFuture API(비동기 결과 조작)

  • CompletableFuture는 비동기 작업을 조작할 수 있도록 다음 메서드들 제공
    • thenApply()
    • thenApplyAsync()
    • thenAccept()
    • thenAcceptAsync()
    • thenRun()
    • thenRunAsync()

 

5.1 thenApply() & thenApplyAsync()

  • 인스턴스 메서드로서 동기 혹은 비동기적으로 작업의 결과를 조작하고 반환
    • Stream의 map()과 유사

 

  • Function<T, R> 함수를 인수로 받고 작업 결과를 반환
  • 새로운 CompletableFuture<T> 객체를 반환하며 CompletableFuture에 작업 결과 저장
  • thenApply() 메서드는 작업을 실행하는 시점에서 이전 작업 결과가 완료되었을 경우 main 쓰레드에서 처리되고 그렇지 않으면 이전과 동일한 쓰레드에서 처리
    • 이전 작업 결과가 완료되지 않았을 경우 이전과 동일한 쓰레드에서 결과받은 후 작업 진행해야 하므로 이전과 동일한 쓰레드에서 처리

 

  • thenApplyAsync() 메서드는 이전 작업 결과와 상관없이 무조건 풀 쓰레드에서 비동기 처리
    • 풀 쓰레드는 이전과 동일한 쓰레드 혹은 새롭게 생성된 쓰레드가 될 수 있음

 

5.1.1 thenApply() 메서드에서 작업을 실행하는 시점에서 이전 작업 결과가 완료되지 않은 케이스

 

 

 

5.1.2 thenApply() 메서드에서 작업을 실행하는 시점에서 이전 작업 결과가 이미 완료되었을 경우

 

 

5.2 thenAccept() & thenAcceptAsync()

  • 인스턴스 메서드로서 동기 혹은 비동기적으로 작업의 결과를 소비하고 추가 수행
  • Consumer<T> 함수를 인수로 받고 작업 수행
  • 새로운 CompletableFuture<Void> 객체 반환
  • thenAccept()는 이전 작업 결과가 완료되었을 경우 메인 쓰레드에서 동기 처리되고 그렇지 않을 경우 이전과 동일한 쓰레드에서 비동기 처리
    • 이전 작업 결과가 완료되지 않았을 경우 이전과 동일한 쓰레드에서 결과받은 후 작업 진행해야 하므로 이전과 동일한 쓰레드에서 처리

 

  • thenAcceptAsync()는 이전 작업 결과와 상관없이 무조건 풀 쓰레드에서 비동기 처리
    • 풀 쓰레드는 이전과 동일한 쓰레드 혹은 새롭게 생성된 쓰레드가 될 수 있음

 

 

5.3 thenRun() & thenRunAsync()

  • 인스턴스 메서드로서 동기 혹은 비동기적으로 이전 결과를 무시하고 단순한 후속 작업 수행
  • Runnable 함수를 인수로 받고 작업 수행
  • 새로운 CompletableFuture<Void> 객체 반환

 

 

 

6. CompletableFuture API(비동기 작업 조합)

 

6.1 thenCompose() & thenComposeAsync()

  • 인스턴스 메서드로써 동기/비동기적으로 하나의 CompletableFuture가 완료될 경우 그 결과를 다음 작업으로 전달하고 이어서 다음 작업을 수행할 수 있도록 지원하며 이를 통해 비동기 작업을 연속적으로 실행하고 조합 가능
  • Fuction<T, CompletionStage<U>> 함수를 인수로 받고 작업 결과 반환
  • 새로운 CompletableFuture<T> 객체를 반환하며 CompletableFuture에 작업 결과 저장

 

 

 

부연 설명

  • 파란색 CompletableFuture를 cf, 빨간색 CompletableFuture를 cf2, 그리고 연두색 CompletableFuture를 cf3라고 별칭을 부여했을 때
  • 전체적인 흐름은 cf -> cf3 -> cf2 -> cf
  • cf3가 cf의 결과를 받아서 새로운 CompletableFuture인 cf2를 시작하고 결과를 조작한 다음 그 결과를 처음 CompletableFuture인 cf에 반환
  • 따라서 결과는 10 * 10 = 100

 

6.2 thenCombine() & thenCombineAsync()

  • 두 개의 CompletableFuture가 모두 완료되었을 경우 특정 함수를 실행하고 그 결과를 새로운 CompletableFuture에 저장하고 반환
  • CompletionStage<U>, BiFunction<T, U, V> 함수를 인수로 받고 최종 작업 결과를 반환
  • 새로운 CompletableFuture<T> 객체를 반환하며 CompletableFuture에 작업 결과 저장

 

 

6.3 allOf()

  • 여러 개의 CompletableFuture를 동시에 실행하고 모든 CompletableFuture가 완료될 때까지 대기하는 데 사용
  • allOf() 메서드 실행 시 CompletableFuture 중에서 가장 오래 걸리는 작업이 완료되기 전까지 대기
    • ExecutorService의 invokeAll()과 유사한 개념

 

  • CompletableFuture<?>의 배열을 인수로 받음
  • 새로운 CompletableFuture<Void> 객체 반환

 

 

부연 설명

  • 비동기 작업 1과 3은 약 1초, 비동기 작업 2는 약 2초 소요
  • allOf()는 비동기적으로 작업을 병렬로 실행하므로 제일 오래 걸리는 작업에 영향을 받아 약 2초 걸림
  • 정리하자면 메인 쓰레드가 join()으로 대기하고 있다가 해제되는 기준은 allOf()로 반환된 CompletableFuture의 결과가 null 아닌 값인 AltResult로 채워져야 함
    • AltResult는 모든 CompletableFuture의 비동기 작업이 완료되었을 때 allOf()의 CompletableFuture에 저장되는 null 개념의 객체

 

6.4 anyOf()

  • 여러 개의 CompletableFuture 중에서 가장 먼저 완료되는 하나의 CompletableFuture를 반환하는 메서드로써 병렬 작업을 수행하고 가장 먼저 결과를 얻고자 할 때 유용
    • ExecutorService의 invokeAny()와 유사한 개념

 

  • CompletableFuture<?>의 배열을 인수로 받음
  • 새로운 CompletableFuture<Object> 객체를 반환

 

 

부연 설명

  • 비동기 작업 1이 0.5초 이후 시작되고 비동기 작업 2는 1초 이후, 비동기 작업 3은 2초 이후 실행 됨
  • anyOf()는 제일 먼저 완료되는 하나의 CompletableFuture를 반환하므로 비동기 작업 1만 실행되고 나머지 작업은 실행도 안되며 실행 시간은 약 0.5초
  • 반환된 값에 대해 체이닝 메서드인 thenApply를 부여해 10을 곱했으므로 최종 결과는 100
  • 정리하자면 메인 쓰레드가 join()으로 대기하고 있다가 해제되는 기준은 anyOf()로 반환된 CompletableFuture의 결과 값이 채워져야 함
    • CompletableFuture의 결과 값이 먼저 채워지면 다른 값으로 수정되거나 변경될 수 없음

 

7. 비동기 예외처리

 

7.1 exceptionally() & exceptionallyAsync()

  • 작업이 예외로 완료된 경우 해당 예외를 처리하고 새로운 결과나 대체 예외를 반환할 수 있음
    • 작업이 예외로 완료된 경우에만 실행됨
    • 정상적으로 완료된 경우 위 함수는 실행되지 않음

 

  • Function<Throwable, T> 형식의 함수를 인수로 받음
  • 새로운 CompletableFuture<T> 객체를 반환하며 CompletableFuture에 결과 또는 예외를 저장 가능

 

 

 

예외가 발생했을 때 exceptionally 메서드가 호출됨

 

 

예외가 발생하지 않았을 때 exceptionally 메서드가 호출되지 않음

 

 

7.2 handle() & handleAsync()

  • 결과와 예외 처리를 모두 다룰 수 있는 메서드로 비동기 작업의 예외 처리와 결과 처리를 동시에 수행할 때 유용
  • BiFunction<T, Throwable, U> 형식의 함수를 인수로 받으며 의미는 다음과 같음
    • 첫 번째 인수는 비동기 작업이 성공적으로 완료된 경우 결과
    • 두 번째 인수는 비동기 작업이 예외로 완료된 경우 해당 예외를 나타냄
    • BiFunction을 인수로 받기 때문에 결과를 반환

 

  • 새로운 CompletableFuture<T> 객체를 반환하며 CompletableFuture에 정상 결과 또는 예외 저장

 

 

비동기 작업들이 모두 성공인 경우

 

 

비동기 작업들 중 하나라도 실패인 경우

 

 

7.3 whenComplete() & whenCompleteAsync()

  •  handle()과 같이 결과와 예외 처리를 모두 다룰 수 있는 메서드로 지동기 작업의 예외 처리와 결과 처리를 동시에 수행할 때 유용
  • BiConsumer<T, Throwable> 형식의 함수를 인수로 받으며 의미는 다음과 같음
    • 첫 번째 인수는 비동기 작업이 성공적으로 완료된 경우 결과
    • 두 번째 인수는 비동기 작업이 예외로 완료된 경우 해당 예외를 나타냄
    • BiConsumer이기 때문에 반환값이 없음 (handle()과의 차이점)

 

  • 새로운 CompletableFuture<T> 객체를 반환하며 CompletableFuture에 정상 결과 또는 예외를 저장

 

 

 

예외 발생했을 경우

 

 

예외 발생하지 않았을 경우

 

 

7.4  exceptionally() vs handle() vs whenComplete()

 

메서드 정상적인 결과 예외 발생 get() 호출 시 예외 처리 결과 타입 변환 정상 결과 호출 예외 호출
exceptionally() 접근 불가능 접근 가능 필요 없음 변환 가능 호출 불가능 호출 가능
handle() 접근 가능 접근 가능 필요 없음 변환 가능 호출 가능 호출 가능
whenComplete() 접근 가능 접근 가능 필요 변환 불가능 호출 가능 호출 가능

 

* whenComplete()가 CompletableFuture.get() 호출 시 예외 처리가 필요한 이유는 반환 값이 없기 때문, 이에 따라 결과 타입 변환도 불가능

 

8. 비동기 완료 처리

비동기 완료 처리는 크게 완료 설정 메서드와 완료 상태 확인 메서드로 나뉩니다.

 

8.1 완료 설정 메서드

  • CompletableFuture의 핵심 기능 중 하나로써 비동기 작업 결과를 사용자가 직접 완료 가능

 

8.1.1 complete()

  • CompletableFuture가 완료되지 않았다면 주어진 값으로 설정하고 CompletableFuture를 완료 상태로 전환
    • 이미 완료 상태일 경우 다른 값으로 다시 완료시킬 수 없으며 완료 상태로 전환하면 true, 그렇지 않으면 false 반환

 

 

8.1.2 completedFuture()

  • 주어진 값으로 이미 완료된 새로운 CompletableFuture를 반환
  • 비동기 작업을 수행하지 않고 미리 계산된 결과를 반환해야 할 때 유용

 

 

8.1.3 completeOnTimeout()

  • 지정된 타임아웃 이전에 완료되지 않는 경우 주어진 기본 값으로 완료

 

 

8.1.4 completeExceptionally()

  • CompletableFuture를 예외 상태로 완료시키는 데 사용되는 메서드로 주어진 예외 객체를 사용하여 비동기 작업을 예외로 처리하는 데 사용
  • exceptionally() 혹은 handle() 메서드를 사용하여 예외 처리 작업을 수행할 수 있으며 예외를 처리하고 대체 결과를 반환하거나 다른 작업을 수행할 수 있음

 

 

8.2 완료 상태 확인 메서드

  • CompletableFuture는 완료 상태를 명확하게 구분할 수 있도록 isCompletedExceptionally() 메서드를 추가 제공

 

8.2.1 isCancelled()

  • 작업이 정상적으로 완료되기 전 취소되었는지 여부를 반환
  • Future에서도 제공된 메서드

 

 

8.2.2 isDone()

  • 정상적으로, 예외적으로, 또는 취소를 통해 완료된 경우 등 어떤 방식으로든 완료된 경우 true를 반환
  • Future에서도 제공된 메서드

 

 

8.2.3 isCompletedExceptionally()

  • CompletableFuture가 취소되거나 completeExceptionally()의 명시적 호출, CompletionStage 실행의 갑작스러운 종료 등 예외적으로 완료된 경우 true 반환
  • CompletableFuture에서만 제공하는 메서드 (Future에서 제공 안 함)

 

 

8.3 CompletableFuture 완료 원리

  • CompletableFuture는 비동기 작업을 관리하고 완료 상태를 처리하는 도구이며 completeValue(), completeNull(), completeThrowable() 등을 통해 다양한 방식으로 작업을 완료시킴
  • CompletionException은 이러한 비동기 작업 중 발생한 예외를 처리하는 데 사용

 

8.3.1 completeValue()

  • CompletableFuture의 complete 메서드는 주어진 값을 사용하여 비동기 작업을 완료시킴
  • 이는 명시적으로 값을 설정하여 비동기 작업을 종료하고 이후 작업이 해당 값으로 처리
  • ex) cf.complete(value)를 호출하면 cf는 value로 완료됨

 

8.3.2 AltResult

  • CompletableFuture 내부 클래스 중 하나로 실패 또는 특별한 완료 상태를 나타내기 위해 사용
    • 주로 completeThrowable() 또는 completeNull()과 같은 상황에서 사용

 

  • AltResult는 표준 결과 대신 사용되며 CompletableFuture의 내부 구현 세부 사항

 

8.3.3 completeNull()

  • CompletableFuture를 null 값으로 완료시키는 것을 의미
  • 이는 명시적으로 null 값을 설정하여 작업을 완료시키고 이후 작업을 null로 처리

 

8.3.4 completeThrowable()

  • 예외를 사용하여 CompletableFuture를 완료시키는 것을 의미
  • cf.completeExceptionally(throwable)을 호출하여 특정 예외를 발생시키며 이후 작업은 해당 예외를 처리

 

8.3.5 CompletionException

  • CompletableFuture의 비동기 작업이 실패할 때 발생하는 예외
  • 주로 비동기 작업 중 발생한 예외를 감싸서 전달하며 예외의 원인을 추적

 

8.3.6 전체적인 플로우

  • CompletableFuture가 결과를 반환하는지 여부
    • 반환하지 않을 경우 completeNull() 메서드를 통해 AltResult에 null 세팅
    • 반환할 경우 completeValue() 메서드를 통해 CompletableFuture에 value 세팅
    • 예외가 발생할 경우 completeThrowable() 메서드를 통해 AltResult 내 CompletionException에 예외 세팅

 

9. 비동기 대기 / 취소 처리

CompletableFuture는 비동기 작업의 대기, 취소를 위한 메서드를 제공합니다.

 

9.1 대기 메서드

 

9.1.1 get() & get(timeout, unit)

  • 작업이 완료될 때까지 혹은 지정된 시간까지 대기하고 결과 반환
  • 예외로 작업이 완료된 경우 CancellationException, ExecutionException, InterruptedException과 같은 Checked Exception을 발생시키며 이 때문에 try-catch 문으로 묶어 예외를 처리해줘야 함


 

9.1.2 join()

  • 작업이 완료될 때까지 대기하고 결과 반환
  • 예외로 작업이 완료된 경우 InterruptedExceptionExecutionException을 던지지 않고, CompletionException으로 Wrapping된 예외를 발생
    • Unchecked Exception이기 때문에 get()과 달리 별도 예외 처리 안 해도 컴파일 오류는 발생하지 않지만 예외 처리하는 것을 권장

 

9.2 취소 메서드

 

9.2.1 cancel(boolean mayInterruptIfRunning)

  • 해당 메서드는 Future 인터페이스로부터 상속되었으며, 비동기 작업이 실행 중인 경우 해당 작업을 중단하려고 시도
  • CompletableFuture가 취소되었을 경우 해당 작업은 더 이상 실행되지 않으며, 이후에 결과를 기다리는 호출(get(), join() 등)은 CancellationException 예외 발생
  • mayInterruptIfRunning이 true일 경우, 작업이 실행 중이라면 인터럽트를 시도하지만 인터럽트가 실제로 작업을 중단하는지 여부는 작업의 구현에 따라 상이함


 

10. ForkJoinPool

  • 자바에서 병렬 처리를 지원하는 쓰레드 풀로써 자바 7부터 도입된 개념
  • 작업을 작은 조각으로 나누고 병렬로 처리하여 다중 코어 프로세서에서 효율적으로 작업을 수행할 수 있도록 지원

 

10.1 ForkJoinPool 특징

  • 병렬 처리: 쓰레드 풀은 시스템의 프로세서 코어 수에 따라 쓰레드를 생성하고 작업은 분할되어 멀티 쓰레드에서 병렬로 실행
  • Divide and Conquer 알고리즘: 복잡한 문제를 작은 하위 문제로 분할하고 각 하위 뭄ㄴ제를 병렬로 처리한 다음 다시 병합하여 처리 ex) MergeSort
  • Work-Stealing 알고리즘: 작업이 끝난 쓰레드가 다른 쓰레드의 대기 중인 작업을 훔쳐 실행함으로써 쓰레드 간 작업의 균형과 효율적인 병렬처리 가능
  • Fork-Join 구현: RecursiveTask 및 RecursiveAction을 사용하여 작업을 정의하고 ForkJoinPool을 통해 병렬로 실행 가능

 

10.1.1 Work-Stealing 알고리즘

  • 쓰레드는 Global Shared Queue에 작업이 존재하면 자신의 WorkQueue의 head에 push 하고 작업 처리 시 head에 위치한 작업을 pop 해서 가져옴
    • 쓰레드가 자신만의 WorkQueue의 head에 push 하고 pop 하기 때문에 별도의 동기화 처리 필요 X
    • 다른 쓰레드가 자신의 WorkQueue에 Work Stealing을 하는 경우 WorkQueue의 tail에 위치한 작업을 pop 하기 때문에 동기화 없이 Lock-Free 하게 구현 가능 (deque)
    • Work-Stealing Queue가 deque 구조이기 때문에 쓰레드는 자신의 WorkQueue의 head에서 대부분 작업을 수행하고 Work-Stealing 하는 타 쓰레드는 tail에서 작업이 수행되므로 쓰레드 간 경합이 현저히 줄어듦
    • head와 tail이 동일한 케이스만 경합하며 쓰레드의 개수가 많아질 경우 Work-Stealing 하려는 쓰레드 간 경합이 심해질 수 있음

 

https://krishnakishorev.medium.com/java-multithreading-concurrency-and-parallelism-part-22-3-9bc0429adc2b

 

10.2 Fork-Join 구현

  • ForkJoinPool 프레임워크에서 사용되는 작업 유형으로 RecursiveTask와 RecursiveAction가 있으며 ForkJoinTask를 상속하는 구조 (병렬 작업 수행)
    • 추상 클래스인 RecursiveTask 또는 RecursiveAction을 상속해서 Fork-Join 기능 구현 가능

 

 

10.2.1 RecursiveTask

  • 병렬 작업을 수행하고 작업 결과 반환
    • compute() 메서드를 재정의하여 결과 반환

 

  • Divide and Conquer 알고리즘에서 큰 작업을 작은 하위 작업으로 분할하고 각 하위 작업의 결과를 합산하여 최종 결과를 반환할 때 사용

 

RecursiveTask

 

10.2.2 RecursiveAction

  • 병렬 작업을 수행하지만 작업 결과를 반환하지 않음
    • compute() 메서드를 재정의하지만 반환 값이 없는 void 형식

 

  • 배열의 요소를 병렬로 업데이트하거나 로그를 기록하는 작업 등에서 사용

 

RecursiveAction

 

10.3 ForkJoinPool 생성

  • 기본적으로 어플리케이션에서 공용으로 사용하는 쓰레드는 (CPU 코어 개수 - 1) 만큼 생성됨

 

공용으로 사용하는 쓰레드

 

  • ForkJoinPool.commonPool()은 전체 애플리케이션에서 쓰레드를 공용으로 사용하기 때문에 쓰레드 블로킹과 Starvation을 주의해야 함
    • I/O 바운드 작업을 commonPool에서 실행할 경우 쓰레드 부족으로 다른 작업이 지연될 수 있으며 I/O 작업이 지속적으로 블록 될 경우 CPU 작업이 실행 기회를 얻지 못하고 starvation 상태에 빠질 수 있음
    • 별도의 쓰레드 풀을 사용하여 I/O 작업과 CPU 작업을 분리하고 I/O 작업을 별도의 쓰레드에서 처리하는 것을 고려해야 함
    • 이는 Kotlin Coroutine에도 해당하는 내용

 

I/O 작업이 별도 쓰레드 풀에서 도는 것을 확인 가능

 

부연 설명

  • api마다 ThreadPool 사용할지 ForkJoinPool 사용할지 설정 가능
    • 아무런 인자 전달하지 않으면 ForkJoinPool의 common pool
    • 인자 전달하면 전달한 pool에서 동작

 

  • I/O Bound 작업은 동시성에 특화된 ThreadPool에서
  • Cpu Bound 작업은 ForkJoinPool에서

 

참고

자바 동시성 프로그래밍 [리액티브 프로그래밍 Part.1] - 정수원 강사님

 

반응형

'JAVA > 비동기 프로그래밍' 카테고리의 다른 글

[Netty] Netty 개요 및 간단한 예제  (0) 2024.07.27
[Java] 자바 IO, NIO, AIO  (0) 2024.07.14
[Java] ThreadPoolExecutor  (1) 2024.04.28
[Java] 자바 동시성 프레임워크  (4) 2024.04.20
[Java] 동기화 도구  (0) 2024.03.14