[Reactor] 자주 사용되는 Operator
Operator
- 리액티브 프로그래밍은 Operator로 시작해서 Operator로 끝난다고 해도 과언이 아님
- Operator는 데이터 스트림의 요소에 대해 변환, 필터링, 결합 등의 다양한 작업을 수행할 수 있는 함수형 프로그래밍 구성 요소
- Operator는 주어진 데이터 스트림(또는 Publisher)의 각 요소에 대해 특정 작업을 수행한 후, 그 결과를 새로운 데이터 스트림으로 반환하는 함수
- 새로운 데이터 스트림 역시 다른 Operator의 입력으로 사용될 수 있으며, 이를 통해 복잡한 데이터 처리 파이프라인을 구축할 수 있음
Sequence 생성을 위한 Operator
1. justOrEmpty
- just()의 확장 Operator
- just() Operator와 달리 emit 할 데이터가 null일 경우 NPE를 발생시키지 않고 onComplete Signal을 전송
- emit 할 데이터가 null이 아닐 경우 해당 데이터를 emit 하는 Mono를 생성
justOrEmpty 예제 코드
2. fromIterable
- Iterable에 포함된 데이터를 emit 하는 Flux를 생성
- 자바에서 제공하는 Iterable을 구현한 구현체를 fromIterable()의 파라미터로 전달 가능
fromIterable 예제 코드
3. fromStream
- Stream에 포함된 데이터를 emit 하는 Flux를 생성
- 자바의 Stream 특성상 재사용할 수 없으며 cacnel, error, complete 시 Stream이 자동으로 닫힘
fromStream 예제 코드
4. range
- n부터 1씩 증가한 연속된 수를 m개 emit하는 Flux를 생성
- 명령한 언어의 for문처럼 특정 횟수만큼 어떤 작업을 처리하고자 할 경우 주로 사용
range 예제 코드
5. defer
- Operator를 선언한 시점에 데이터를 emit하는emit 하는 것이 아니라 구독하는 시점에 데이터를 emit 하는 Flux 또는 Mono를 생성
- 데이터 emit을 지연시키기 때문에 꼭 필요한 시점에 데이터를 emit 하여 불필요한 프로세스를 줄일 수 있음
defer 예제 코드
부연 설명
- justMono는 Mono.just(LocalDateTime.now())를 사용하여 현재 시간을 Mono로 즉시 wrapping 하고 이때 LocalDateTime.now()가 호출되어, justMono는 해당 시점의 LocalDateTime 객체를 캡처
- deferMono는 Mono.defer(() -> Mono.just(LocalDateTime.now()))를 사용하여 Mono를 생성하는데 이 경우에는 LocalDateTime.now() 호출이 미뤄짐
- 실제로 deferMono가 구독될 때마다 새로운 Mono가 생성되고, 그때 LocalDateTime.now()가 호출되어 현재 시각을 캡처
6. using
- 파라미터로 전달받은 resource를 emit하는 Flux를 생성
- 첫 번째 파라미터는 읽어 올 resource, 두 번째 파라미터는 읽어 온 resource를 emit 하는 Flux, 그리고 마지막 세 번째 파라미터는 종료 Signal(onComplete 혹은 onError)이 발생할 경우 resource를 해제하는 등의 후처리 지원
using 예제 코드
부연 설명
- using_example.txt 파일을 한 라인씩 읽어오고 읽어 온 라인 데이터는 Stream<String> 형태의 Stream 객체
- 읽어 온 Stream 객체를 fromStream() Operator의 데이터소스로 전달 후 라인 문자열 데이터를 emit
- 라인 문자열 데이터의 emit이 끝나면 onComplete Signal 이벤트가 발생하므로 다 사용한 Stream 객체를 전달받아 Stream을 닫음
7. generate
- 프로그래밍 방식으로 Signal 이벤트를 발생
- 특히 동기적으로 데이터를 하나씩 순차적으로 emit하고자 할 경우 사용
마블 다이어그램 부연 설명
- generate()의 첫 번째 파라미터는 emit할 숫자의 초깃값을 지정
- 숫자 0 옆에 붙는 +/- 기호는 초깃값으로 음수도 가능하고 양수도 가능하다는 의미
- 두 번째 파라미터의 람다 표현식에 보이는 S는 State의 약자
- generate() Operator는 초깃값으로 지정한 숫자부터 emit 하고 emit 한 숫자를 1씩 증가시켜 다시 emit 하는 작업을 반복
- 1씩 증가하는 숫자를 상태(State) 값으로 정의
generate 예제 코드
8. create
- generate() Operator처럼 프로그래밍 방식으로 Signal 이벤트를 발생시키지만 몇 가지 차이점 존재
- generate() Operator는 데이터를 동기적으로 한 번에 한 건씩 emit 할 수 있는 반면
- create() Operator는 한 번에 여러 건의 데이터를 비동기적으로 emit 할 수 있음
create 예제 코드
부연 설명
- Backpressure 전략 중 DROP 전략 지정
- publishOn()으로 Scheduler를 설정하면서 prefetch 수를 2로 지정한 뒤 start와 end 변수를 사용해서 매번 4개의 데이터를 emit 하게 함으로써 코드 실행 결과에서 Backpressure DROP 전략이 적용되는 것을 조금 더 쉽게 확인 가능
동작 과정
- 구독 발생 시 publishOn()에서 설정한 숫자만큼의 데이터 요청
- create() Operator 내부에서 sink.onRequest() 메서드의 람다 표현식이 실행됨
- 요청한 개수보다 2개 더 많은 데이터를 emit
- Subscriber가 emit된 데이터를 전달받아 로그로 출력
- Backpressure DROP 전략이 적용되었기 때문에 요청 개수를 초과하여 emit 된 데이터는 DROP
- 다시 publishOn()에서 지정한 숫자만큼의 데이터를 요청
- create() Operator 내부에서 onComplete 시그널이 발생하지 않았기 때문에 앞선 과정을 반복하다가 설정한 지연 시간이 지나면 메인 쓰레드가 종료되어 코드 실행이 종료됨
Sequence 필터링 Operator
1. filter
- Upstream에서 emit된 데이터 중 조건에 일치하는 데이터만 Downstream으로 emit
- 파라미터로 입력 받은 Predicate의 반환 값이 true인 데이터만 Downstream으로 emit
filter 예제 코드
2. filterWhen
- 내부에서 Inner Sequence를 통해 조건에 맞는 데이터인지를 비동기적으로 테스트한 후 테스트 결과가 true라면 filterWhen()의 Upstream으로부터 전달받은 데이터를 Downstream으로 emit
filterWhen 예제 코드
3. skip
- Upstream에서 emit 된 데이터 중 파라미터로 입력받은 숫자만큼 건너뛴 후 나머지 데이터를 Downstream으로 emit
skip 예제 코드
4. take
- Upstream에서 emit 되는 데이터 중 파라미터로 입력받은 숫자만큼만 Downstream으로 emit
take 예제 코드
5. takeLast
- Upstream에서 emit 된 데이터 중 파라미터로 입력한 개수만큼 가장 마지막에 emit 된 데이터를 Downstream으로 emit
takeLast 예제 코드
6. takeUntil
- 파라미터로 입력한 람다 표현식(Predicate)이 true가 될 때까지 Upstream에서 emit 된 데이터를 Downstream으로 emit
- Upstream에서 emit된 데이터에는 Predicate을 평가할 때 사용한 데이터가 포함됨
- 조건을 만족하는 첫 번째 요소까지 포함하여 발행하고, 그 이후의 요소들은 무시\
takeUntil 예제 코드
7. takeWhile
- takeUntil() Operator와 달리 파라미터로 입력한 람다 표현식이 true가 되는 동안에만 Upstream에서 emit 된 데이터를 Downstream으로 emit
- Predicate을 평가할 때 사용한 데이터가 Downstream으로 emit 되지 않음
- Upstream에서 emit된 데이터가 false라면 Sequence가 종료됨
takeWhile 예제 코드
8. next
- Upstream에서 emit되는 데이터 중 첫 번째 데이터만 Downstream으로 emit
- 만약 Upstream에서 emit되는 데이터가 empty라면 Downstream으로 empty Mono를 emit
next 예제 코드
Sequence 변환 Operator
1. map
- Upstream에서 emit된 데이터를 mapper Function을 사용하여 변환 후 Downstream으로 emit
- map() Operator 내부에서 에러 발생 시 Sequence가 종료되지 않고 계속 진행되도록 하는 기능 지원
map 예제 코드
2. flatMap
- Upstream에서 emit 된 데이터 한 건이 Inner Sequence에 여러 건의 데이터로 변환
- Upstream에서 emit 된 데이터는 이렇게 InnerSequence에서 평탄화 작업을 거치면서 하나의 Sequence로 병합되어 Downstream으로 emit
- 단, flatMap() 내부의 Inner Sequence를 비동기적으로 실행하면 데이터 emit의 순서를 보장하지 않음
flatMap 예제 코드
3. concat
- 파라미터로 입력되는 Publisher의 Sequence를 연결해서 데이터를 순차적으로 emit
- 먼저 입력된 Publisher의 Sequence가 종료될 때까지 나머지 Publisher의 Sequence는 구독되지 않고 대기하는 특성을 지님
concat 예제 코드
4. merge
- 파라미터로 입력되는 Publisher의 Sequence에서 emit 된 데이터를 인터리빙 방식으로 병합
- 인터리브(interleave)는 `교차로 배치하다`라는 뜻을 지니며 컴퓨터 하드디스크 상의 데이터를 서로 인접하지 않게 배열해 성능을 향상하거나 인접한 메모리 위치를 서로 다른 메모리 뱅크에 두어 동시에 여러 곳을 접근할 수 있게 해주는 것을 인터리빙이라고 함
- concat() Operator와 달리 모든 Publisher의 Sequence가 즉시 구독됨
merge 예제 코드
부연 설명
- delayElements() Operator를 사용해서 첫 번째 파라미터의 Flux는 0.3초에 한 번씩 데이터를 emit, 두 번째 파라미터의 Flux는 0.5초에 한 번씩 데이터를 emit
5. zip
- 파라미터로 입력되는 Publisher Sequence에서 emit 된 데이터를 결합
- 각 Publisher가 데이터를 하나씩 emit 할 때까지 대기했다가 결합
zip 예제 코드
부연 설명
- merge() 예제 코드와 달리 zip() Operator는 두 개의 Flux가 emit 하는 시간이 다름에도 불구하고 각 Flux에서 하나씩 emit 할 때까지 대기했다가 emit 된 데이터를 Tuple2 객체로 묶어서 구독자에게 전달
6. and
- Mono의 Complete 시그널과 파라미터로 입력된 Publisher의 Complete 시그널을 결합하여 새로운 Mono<Void>를 반환
- Mono와 파라미터로 입력된 Publisher의 Sequence가 모두 종료되었음을 Subscriber에게 알릴 수 있음
and 예제 코드
부연 설명
- and() Operator를 기준으로 Upstream에서 1초의 지연 시간을 가진 뒤 `Task 1`을 emit
- and() Operator 내부의 Inner Sequence에서는 0.6초에 한 번씩 `Task2`, `Task3`을 emit
- 결과적으로 Subscriber에게 onComplete 시그널만 전달되고 Upstream에서 emit된 데이터는 전달되지 않음
- and() Operator는 모든 Sequence가 종료되길 기다렸다가 최종적으로 onComplete 시그널만 전송
7. collectList
- Flux에서 emit 된 데이터를 모아서 List로 변환한 후 변환된 List를 emit 하는 Mono를 반환
- Upstream Sequence가 비어있다면 비어 있는 List를 Downstream으로 emit
collectList 예제 코드
8. collectMap
- Flux에서 emit 된 데이터를 기반으로 key-value를 생성하여 Map의 Element로 추가한 후 최종적으로 Map을 emit 하는 Mono를 반환
- Upstream Sequence가 비어있다면 비어 있는 List를 Downstream으로 emit
collectMap 예제 코드
Sequence 내부 동작 확인을 위한 Operator
- Upstream Publisher에서 emit 되는 데이터의 변경 없이 부수 효과만을 수행하기 위한 Operator들이 존재하고 이들은 모두 doOnXXXX()으로 시작하는 Operator
- doOnXXXX()으로 시작하는 Operator는 Consumer 또는 Runnable 타입의 함수형 인터페이스를 파라미터로 가지기 때문엔 별도의 반환 값이 없음
- Upstream Publisher로부터 emit 되는 데이터를 이용해 Upstream Publisher의 내부 동작을 디버깅하는 용도로 많이 사용 (로그 출력 등)
- 데이터 emit 과정에서 에러가 발생하면 해당 에러에 대한 알림을 전송하는 로직을 추가하는 등 부수 효과를 위한 다양한 로직을 적용할 수 있음
Operator | 설명 |
doOnSubscribe() | Publisher가 구독 중일 때 trigger 되는 동작 추가 가능 |
doOnRequest() | Publisher가 요청을 수신할 때 trigger 되는 동작 추가 가능 |
doOnNext() | Publisher가 데이터를 emit 할 때 trigger 되는 동작 추가 가능 |
doOnComplete() | Publisher가 성공적으로 완료되었을 때 trigger 되는 동작 추가 가능 |
doOnError() | Publisher가 에러가 발생한 상태로 종료되었을 때 trigger 되는 동작 추가 가능 |
doOnCancel() | Publisher가 취소 되었을 때 trigger 되는 동작 추가 가능 |
doOnTerminate() | Publisher가 성공적으로 완료되었을 때 또는 에러가 발생한 상태로 종료되었을 때 trigger되는 동작 추가 가능 |
doOnEach() | Publisher가 데이터를 emit 할 때 성공적으로 완료되었을 때와 에러가 발생한 상태를 종료되었을 때 trigger 되는 동작 추가 가능 |
doOnDiscard() | Upstream에 있는 전체 Operator 체인의 동작 중에서 Operator에 의해 폐기되는 요소를 조건부로 정리 가능 |
doAfterTerminate() | Downstream을 성공적으로 완료한 직후 또는 에러가 발생하여 Publisher가 종료된 직후 trigger 되는 동작 추가 가능 |
doFirst() | Publisher가 구독되기 전에 trigger 되는 동작 추가 가능 |
doFinallly() | 에러를 포함해서 어떤 이유이든 간에 Publisher가 종료된 후 trigger 되는 동작 추가 가능 |
doOnXXXX 예제 코드
에러 처리를 위한 Operator
1. error
- 파라미터로 지정된 에러로 종료하는 Flux를 생성
- 자바의 throw 키워드를 사용해서 에외를 의도적으로 던지는 것 같은 역할을 수행
- 주로 Checked Exception을 catch 해서 다시 throw 해야 하는 경우 사용
error 예제 코드
2. onErrorReturn
- 에러 이벤트가 발생했을 때 에러 이벤트를 Downstream으로 전파하지 않고 대체 값을 emit
- 자바에서 예외가 발생했을 때 try-catch 문의 catch 블록에서 예외에 해당하는 대체 값을 반환하는 방식과 유사함
onErrorReturn 예제 코드
3. onErrorResume
- 에러 이벤트가 발생했을 때 에러 이벤트를 Downstream으로 전파하지 않고 대체 Publisher를 반환
- 자바에서 예외가 발생했을 때 try-catch 문의 catch 블록에서 예외가 발생한 메서드를 대체할 수 있는 또 다른 메서드를 호출하는 방식
onErrorResume 예제 코드
부연 설명
- "three" 문자열을 Integer.parseInt() 메서드로 변환하려고 할 때 예외가 발생하면, onErrorResume() 연산자가 호출되면서 에러를 처리하고 새로운 대체 Flux를 반환
- 이 시점에서 원래의 Flux 스트림은 중단되고, onErrorResume()에서 제공한 대체 Flux로 흐름이 넘어가기 때문에 "4"가 처리되지 않고, 대신 "Error handled: fallback value"가 출력
4. onErrorContinue
- 에러가 발생했을 때 에러 영역 내에 있는 데이터를 제거하고 Upstream에서 후속 데이터를 emit 하는 방식으로 에러를 복구할 수 있도록 지원
- onErrorContinue() Operator의 파라미터인 BiConsumer 함수형 인터페이스를 통해 에러 메시지와 에러가 발생했을 때 emit 된 데이터를 전달받아 로그를 기록하는 등의 후처리 진행 가능
onErrorContinue 예제 코드
5. retry
- Publisher가 데이터를 emit 하는 과정에서 에러가 발생할 경우 파라미터로 입력한 횟수만큼 원본 Flux의 Sequence를 재구독
- 파라미터로 Long.MAX_VALUE를 전달하면 재구독을 무한 반복
- 특히 timeout() Operator와 함께 사용하여 네트워크 지연으로 인해 정해진 시간 내 응답을 받지 못하면 일정 횟수만큼 재요청해야 하는 상황에서 유용하게 사용
retry 예제 코드
Sequence의 동작 시간 측정을 위한 Operator
1. elapsed
- emit 된 데이터 사이의 경과 시간을 측정해서 Tuple<Long, T> 형태로 Downstream으로 emit
- emit 되는 첫 번째 데이터는 onSubscribe 시그널과 첫 번째 데이터 사이를 기준으로 시간을 측정하며 측정된 시간의 단위는 milliseconds
elapsed 예제 코드
Flux Sequence 분할을 위한 Operator
1. window
- Upstream에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 포함하는 새로운 Flux로 분할
- Reactor에서는 이렇게 분할된 Flux를 window라고 칭함
- 마지막 window에 포함된 데이터의 개수는 maxSize보다 같거나 적음
window 예제 코드
2. buffer
- Upstream 에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 List 버퍼로 한 번에 emit
- 마지막 버퍼에 포함된 데이터의 개수는 maxSize보다 같거나 적음
- 높은 처리량을 요구하는 애플리케이션일 경우 인입되는 데이터를 순차적으로 처리하기보다는 batch insert 같은 일괄 작업에 buffer() Operator를 이용해서 성능 향상을 도모할 수 있음
buffer 예제 코드
3. bufferTimeout
- Upstream에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터 혹은 maxTime 내 emit 된 데이터를 List 버퍼로 한 번에 emit
- maxSize나 maxTime 중 먼저 조건에 부합할 때까지 emit 된 데이터를 List 버퍼로 emit
bufferTimeout 예제 코드
부연 설명
- bufferTimeout() Operator의 maxSize가 3이고 maxTime은 0.4초
- range() Operator에서 emit 된 숫자 중 10보다 더 작은 숫자는 0.1초의 지연 시간을 가지므로 0.4초가 지나기 전에 버퍼의 maxSize에 도달하는 반면 10 이상의 숫자는 0.3초의 지연 시간을 가지기 때문에 버퍼에 2개가 담긴 시점에 emit 되는 것을 확인 가능
4. groupBy
- emit 되는 데이터를 keyMapper로 생성한 key를 기준으로 그룹화 한 GruopedFlux를 반환하며 해당 GroupedFlux를 통해서 그룹 별로 작업을 수행 가능
groupBy 예제 코드
다수의 구독자에게 Flux를 Multicasting 하기 위한 Operator
1. publish
- 구독을 하더라도 구독 시점에 즉시 데이터를 emit 하지 않고 connect()를 호출하는 시점에 비로소 데이터를 emit
- Host Sequence로 변환되기 때문에 구독 시점 이후 emit 된 데이터만 전달받을 수 있음
publish 예제 코드
부연 설명
- publish() Operator를 이용해 0.3초에 한 번씩 1~5까지 emit 하는 ConnectableFlux<Integer>를 반환받음
- 이때 publish() Operator를 호출했지만 아직 connect()를 호출하지 않았기 때문에 해당 시점에 emit 되는 데이터는 없음
- 0.5초 뒤 첫 번째 구독 발생
- 0.2초 뒤 두 번째 구독 발생
- connect()가 호출되고 해당 시점부터 데이터가 0.3초에 한 번씩 emit
- 1초 뒤에 세 번째 구독이 발생하며 connect()가 호출된 시점부터 0.3초에 한 번씩 데이터가 emit 되기 때문에 숫자 1부터 3까지는 세 번째 구독 전에 이미 emit 된 상태라 세 번째 Subscriber는 전달받지 못함
2. autoConnect
- 파라미터로 지정하는 숫자만큼의 구독이 발생하는 시점에 Upstream 소스에 자동으로 연결되기 때문에 별도의 connect() 호출이 필요 없음
- 반면, publish() Operator의 경우 구독이 발생하더라도 connect()를 직접 호출하기 전까지는 데이터를 emit 하지 않기 때문에 코드 상에서 명시적으로 connect()를 직접 호출해야 함
autoConnect 예제 코드
부연 설명
- autoConnect(2) Operator를 추가해서 두 번째 구독이 발생하는 시점에 데이터가 emit 되도록 설정
3. refCount
- 파라미터로 입력된 숫자만큼의 구독이 발생하는 시점에 Upstream 소스에 연결되며 모든 구독이 취소되거나 Upstream의 데이터 emit 이 종료되면 연결이 해제됨
- 주로 무한 스트림 상황에서 모든 구독이 취소될 경우 연결을 해제하는 데 사용
refCount 예제 코드
부연 설명
- 첫 번째 구독이 발생한 이후 2.1초 후에 구독을 해제했고 해당 시점에는 모든 구독이 취소된 상태이기 때문에 연결이 해제되고 두 번째 구독이 발생할 경우에는 Upstream 소스에 다시 연결
- 따라서 0부터 다시 숫자가 증가
- 반면, refCount(1)이 대신 autoConnect(1);이였다면 다음과 같은 결과였을 것
참고
- 스프링으로 시작하는 리액티브 프로그래밍 (황정식 저자)