Sinks
- Reactor 3.4.0 버전부터 지원되기 시작했으며 리액티브 스트림즈의 구성 요소 중 하나로 Processor의 기능을 개선했음
- Sink는 Reactor에서 이벤트를 방출하는 엔터티를 의미

부연 설명
- Sink는 리액티브 스트림즈의 Signal을 프로그래밍 방식으로 푸시할 수 있는 구조이며 Flux 또는 Mono의 의미 체계를 가짐
- Sinks를 사용하면 프로그래밍 코드를 통해 명시적으로 Signal을 전송할 수 있음
- Sinks는 멀티 쓰레드 방식으로 Signal을 전송해도 thread-safe하기 때문에 예기치 않은 동작으로 이어지는 것을 방지해줌
- generate() Operator나 create() Operator는 싱글 쓰레드 기반에서 Signal을 전송하는 데 사용
1. create() Operator를 사용하는 예제
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) throws InterruptedException { | |
int tasks = 6; | |
Flux | |
.create((FluxSink<String> sink) -> { | |
IntStream | |
.range(1, tasks) | |
.forEach(n -> sink.next(doTask(n))); | |
}) | |
.subscribeOn(Schedulers.boundedElastic()) | |
.doOnNext(n -> log.info("# create(): {}", n)) | |
.publishOn(Schedulers.parallel()) | |
.map(result -> result + " success!") | |
.doOnNext(n -> log.info("# map(): {}", n)) | |
.publishOn(Schedulers.parallel()) | |
.subscribe(data -> log.info("# onNext: {}", data)); | |
Thread.sleep(500L); | |
} | |
private static String doTask(int taskNumber) { | |
return "task " + taskNumber + " result"; | |
} |

부연 설명
- create() Operator가 처리해야 할 작업의 개수만큼 doTask() 메서드를 호출해 작업을 처리한 후 결과를 반환받음
- 처리 결과를 map() Operator를 사용해 추가적으로 가공 처리한 후 최종적으로 Subscriber에게 전달하는데 각 단계마다 별도의 쓰레드에서 실행되도록 구성
- 작업을 처리하는 단계 (subscribeOn() Operator, boundedElastic-1 쓰레드)
- 처리 결과를 가공하는 단계 (첫 번째 publishOn() Operator, parallel-2 쓰레드)
- 가공된 결과를 Subscriber에게 전달하는 단계 (두 번째 publishOn() Operator, parallel-1 쓰레드)
- 원본 데이터를 생성하는 create() Operator에서는 subscribeOn() Operator에서 지정한 쓰레드를 사용해서 생성한 데이터를 내보냄
- 위 코드에서는 doTask() 메서드가 여러 개의 스레드에서 각각의 작업을 처리한 후 결과 값을 반환할 수 있음
- 이로 인해 싱글 스레드가 아닌, 병렬로 작업이 처리되어 각기 다른 작업들의 결과가 반환되는 상황이 발생할 수 있음
- 이 같은 상황에서 적절하게 사용할 수 있는 방식이 Sinks
2. Sinks를 사용하는 예제
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) throws InterruptedException { | |
int tasks = 6; | |
Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer(); | |
Flux<String> fluxView = unicastSink.asFlux(); | |
IntStream | |
.range(1, tasks) | |
.forEach(n -> { | |
try { | |
new Thread(() -> { | |
unicastSink.emitNext(doTask(n), Sinks.EmitFailureHandler.FAIL_FAST); | |
log.info("# emitted: {}", n); | |
}).start(); | |
Thread.sleep(100L); | |
} catch (InterruptedException e) { | |
log.error(e.getMessage()); | |
} | |
}); | |
fluxView | |
.publishOn(Schedulers.parallel()) | |
.map(result -> result + " success!") | |
.doOnNext(n -> log.info("# map(): {}", n)) | |
.publishOn(Schedulers.parallel()) | |
.subscribe(data -> log.info("# onNext: {}", data)); | |
Thread.sleep(200L); | |
} | |
private static String doTask(int taskNumber) { | |
return "task " + taskNumber + " result"; | |
} |

부연 설명
- 앞선 create() Operator 예제와 달리 doTask() 메서드가 반복문을 돌 때마다 새로운 쓰레드에서 실행됨
- doTask() 메서드는 총 5개의 쓰레드에서 실행 (Thread-0 ~ Thread-4)
- map() Operator에서의 가공 처리는 parallel-2
- Subscriber에서 전달받은 데이터의 처리는 parallel-1에서 처리
- doTask() 메서드의 작업 처리 결과를 Sinks를 통해 Downstream에 내보냄
- 이처럼 Sinks는 프로그래밍 방식으로 Signal을 전송할 수 있으며 멀티 쓰레드 환경에서 thread-safe 함을 보장받을 수 있는 장점이 있음
Sinks 종류 및 특징
Reactor에서 Sinks를 사용하여 프로그래밍 방식으로 Singal을 전송할 수 있는 방법은 크게 두 가지입니다.
- Sinks.One을 사용하는 것
- Sinks.Many를 사용하는 것
1. Sinks.One
- Sinks.one() 메서드를 사용해서 한 건의 데이터를 전송하는 방법을 정의해 둔 기능 명세

부연 설명
- Sinks.One은 Sinks 클래스 내부에 인터페이스로 정의된 Sinks의 스펙 또는 사양으로
- 한 건의 데이터를 프로그래밍 방식으로 내보내는 역할을 하기도 하고
- Mono 방식으로 Subscriber가 데이터를 소비할 수 있음
- 정리하면 Sinks.one() 메서드를 호출하는 것은 한 건의 데이터를 프로그래밍 방식으로 내보내는 기능을 사용하고 싶으니 거기에 맞는 적당한 기능 명세를 달라고 요청하는 것
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) throws InterruptedException { | |
Sinks.One<String> sinkOne = Sinks.one(); | |
Mono<String> mono = sinkOne.asMono(); | |
sinkOne.emitValue("Hello Reactor", FAIL_FAST); | |
sinkOne.emitValue("Hi Reactor", FAIL_FAST); | |
sinkOne.emitValue(null, FAIL_FAST); | |
mono.subscribe(data -> log.info("# Subscriber1 {}", data)); | |
mono.subscribe(data -> log.info("# Subscriber2 {}", data)); | |
} |

부연 설명
- Sinks.one() 메서드를 호출하면 Sinks.One이라는 기능 명세를 반환하며 Sinks.One 객체로 데이터를 내보냄
- emitValue() 메서드의 두 번째 파라미터는 emit 도중에 에러가 발생할 경우 어떻게 처리할 것인지에 대한 핸들러
- FAIL_FAST는 람다 표현식으로 표현한 EmitFailureHandler 인터페이스의 구현 객체
- 해당 EmitFailureHandler 객체를 통해 emit 도중 발생한 에러에 대해 빠르게 실패 처리
- 빠르게 실패 처리한다는 의미는 에러가 발생했을 때 재시도를 하지 않고 즉시 실패 처리를 함으로써 쓰레드 간의 경합 등으로 발생하는 데드락 상태 등을 미연에 방지할 수 있으며 궁극적으로 thread-safe 함을 보장
- Sinks.One으로 아무리 많은 수의 데이터를 내보낸다 하더라도 처음 내보낸 데이터(Hello Reactor)는 정상적으로 내보내지지만 나머지 데이터(Hi Reactor)들은 Drop 된다는 것을 확인할 수 있음
2. Sinks.Many
- Sinks.many() 메서드를 사용해서 여러 건의 데이터를 여러 가지 방식으로 전송하는 기능을 정의해 둔 기능 명세

부연 설명
- Sinks.many() 메서드는 Sinks.Many를 반환하지 않고 ManySpec이라는 인터페이스를 반환
- Sinks.One의 경우 단순히 한 건의 데이터를 내보내는 한 가지 기능만 가지기 때문에 별도의 Spec이 정의되는 게 아니라 Default Spec을 사용
- 반면 Sinks.Many의 경우 데이터를 내보내기 위한 여러 가지 기능이 정의된 ManySpec을 반환

2.1 Sinks.Many unicast() 예제
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) throws InterruptedException { | |
Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer(); | |
Flux<Integer> fluxView = unicastSink.asFlux(); | |
unicastSink.emitNext(1, FAIL_FAST); | |
unicastSink.emitNext(2, FAIL_FAST); | |
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data)); | |
unicastSink.emitNext(3, FAIL_FAST); | |
// fluxView.subscribe(data -> log.info("# Subscriber2: {}", data)); | |
} |

부연 설명
- ManySpec의 구현 메서드 중 하나인 unicast() 메서드를 호출
- unicast() 메서드를 호출하면 반환 값으로 UnicastSpec을 리턴하고 최종적으로 UnicastSpec에 정의된 기능을 사용
- UnicastSpec에 정의된 기능은 onBackpressureBuffer() 메서드를 호출함으로써 사용
- 마지막 라인의 주석을 해제하고 구독을 한 번 더 발생하게 할 경우 IllegalStateException 발생
- UnicastProcessor는 하나의 Subscriber만 허용하기 때문

2.2 Sinks.Many multicast() 예제
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) { | |
Sinks.Many<Integer> multicastSink = | |
Sinks.many().multicast().onBackpressureBuffer(); | |
Flux<Integer> fluxView = multicastSink.asFlux(); | |
multicastSink.emitNext(1, FAIL_FAST); | |
multicastSink.emitNext(2, FAIL_FAST); | |
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data)); | |
fluxView.subscribe(data -> log.info("# Subscriber2: {}", data)); | |
multicastSink.emitNext(3, FAIL_FAST); | |
} |

부연 설명
- 예제 코드에서는 ManySpec의 구현 메서드 중 하나인 multicast() 메서드를 호출
- multicast() 메서드를 호출하면 반환 값으로 MulticastSpec을 반환하고 MulticastSpec의 구현 메서드인 onBackpressureBuffer() 메서드를 호출
- MulticastSpec의 기능은 하나 이상의 Subscriber에게 데이터를 내보내는 것
- Subscriber1은 세 개의 데이터를 모두 전달받은 반면 Subscriber2는 세 번째 데이터만 전달받음
- Sinks가 Publisher의 역할을 수행할 경우 기본적으로 Hot Publisher로 동작하며 특히 onBackpressureBuffer() 메서드는 Warm up의 특징을 가지는 Hot Sequence로 동작하기 때문에 첫 번째 구독이 발생한 시점에 Downstream 쪽으로 데이터가 전달됨
2.3 Sinks.Many replay() 예제
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) { | |
Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2); | |
Flux<Integer> fluxView = replaySink.asFlux(); | |
replaySink.emitNext(1, FAIL_FAST); | |
replaySink.emitNext(2, FAIL_FAST); | |
replaySink.emitNext(3, FAIL_FAST); | |
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data)); | |
replaySink.emitNext(4, FAIL_FAST); | |
fluxView.subscribe(data -> log.info("# Subscriber2: {}", data)); | |
} |

부연 설명
- replay() 메서드를 호출하면 반환 값으로 MulticastReplaySpec을 반환하고 MulticastReplaySpec의 구현 메서드 중 하나인 limit() 메서드를 호출
- MulticastReplaySpec에는 emit 된 데이터를 다시 replay해서 구독 전에 이미 내보내진 데이터라도 Subscriber가 전달받을 수 있도록 다양한 메서드들이 정의되어 있음
- all() 메서드는 구독 전에 이미 내보내진 데이터가 있더라도 처음 emit된 데이터부터 모든 데이터들이 Subscriber에게 전달됨
- limit() 메서드는 emit된 데이터 중 파라미터로 입력한 개수만큼 가장 나중에 내보내진 데이터부터 Subscriber에게 전달하는 기능
- 내보내진 데이터 중 2개만 뒤로 돌려서 전달하겠다는 의미이기 때문에 Subscriber1 입장에서는 구독 시점에 이미 세 개의 데이터가 emit 되었기 때문에 1은 생략되고 2, 3이 Subscriber1에 전달된 것을 확인할 수 있음
- Subscriber2의 경우 구독 전에 4개의 숫자가 내보내졌기 때문에 마지막 두 개인 3, 4가 전달된 것을 확인할 수 있음
참고
- 스프링으로 시작하는 리액티브 프로그래밍 (황정식 저자)
반응형
'Spring > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
[Reactor] Context (0) | 2024.07.30 |
---|---|
[Reactor] Scheduler (0) | 2024.07.24 |
[Reactor] Backpressure (0) | 2024.07.18 |
[Reactor] Cold Sequence, Hot Sequence (0) | 2024.07.17 |
마블 다이어그램 (Marble Diagram) (0) | 2024.07.15 |