Spring/스프링으로 시작하는 리액티브 프로그래밍

[Reactor] Sinks

꾸준함. 2024. 7. 23. 23:36

Sinks

  • Reactor 3.4.0 버전부터 지원되기 시작했으며 리액티브 스트림즈의 구성 요소 중 하나로 Processor의 기능을 개선했음
  • Sink는 Reactor에서 이벤트를 방출하는 엔터티를 의미

 

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Sinks.html

 

부연 설명

  • Sink는 리액티브 스트림즈의 Signal을 프로그래밍 방식으로 푸시할 수 있는 구조이며 Flux 또는 Mono의 의미 체계를 가짐
  • Sinks를 사용하면 프로그래밍 코드를 통해 명시적으로 Signal을 전송할 수 있음
  • Sinks는 멀티 쓰레드 방식으로 Signal을 전송해도 thread-safe하기 때문에 예기치 않은 동작으로 이어지는 것을 방지해줌
    • generate() Operator나 create() Operator는 싱글 쓰레드 기반에서 Signal을 전송하는 데 사용

 

1. create() Operator를 사용하는 예제

 

 

부연 설명

  • create() Operator가 처리해야 할 작업의 개수만큼 doTask() 메서드를 호출해 작업을 처리한 후 결과를 반환받음
  • 처리 결과를 map() Operator를 사용해 추가적으로 가공 처리한 후 최종적으로 Subscriber에게 전달하는데 각 단계마다 별도의 쓰레드에서 실행되도록 구성
    • 작업을 처리하는 단계 (subscribeOn() Operator, boundedElastic-1 쓰레드)
    • 처리 결과를 가공하는 단계 (첫 번째 publishOn() Operator, parallel-2 쓰레드)
    • 가공된 결과를 Subscriber에게 전달하는 단계 (두 번째 publishOn() Operator, parallel-1 쓰레드)

 

  • 원본 데이터를 생성하는 create() Operator에서는 subscribeOn() Operator에서 지정한 쓰레드를 사용해서 생성한 데이터를 내보냄
  • 위 코드에서는 doTask() 메서드가 여러 개의 스레드에서 각각의 작업을 처리한 후 결과 값을 반환할 수 있음
    • 이로 인해 싱글 스레드가 아닌, 병렬로 작업이 처리되어 각기 다른 작업들의 결과가 반환되는 상황이 발생할 수 있음
    • 이 같은 상황에서 적절하게 사용할 수 있는 방식이 Sinks

 

2. Sinks를 사용하는 예제

 

 

부연 설명

  • 앞선 create() Operator 예제와 달리 doTask() 메서드가 반복문을 돌 때마다 새로운 쓰레드에서 실행됨
    • doTask() 메서드는 총 5개의 쓰레드에서 실행 (Thread-0 ~ Thread-4)
    • map() Operator에서의 가공 처리는 parallel-2
    • Subscriber에서 전달받은 데이터의 처리는 parallel-1에서 처리

 

  • doTask() 메서드의 작업 처리 결과를 Sinks를 통해 Downstream에 내보냄
  • 이처럼 Sinks는 프로그래밍 방식으로 Signal을 전송할 수 있으며 멀티 쓰레드 환경에서 thread-safe 함을 보장받을 수 있는 장점이 있음

 

Sinks 종류 및 특징

 

Reactor에서 Sinks를 사용하여 프로그래밍 방식으로 Singal을 전송할 수 있는 방법은 크게 두 가지입니다.

  • Sinks.One을 사용하는 것
  • Sinks.Many를 사용하는 것

 

1. Sinks.One

  • Sinks.one() 메서드를 사용해서 한 건의 데이터를 전송하는 방법을 정의해 둔 기능 명세

 

Sinks.one()

 

부연 설명

  • Sinks.One은 Sinks 클래스 내부에 인터페이스로 정의된 Sinks의 스펙 또는 사양으로
    • 한 건의 데이터를 프로그래밍 방식으로 내보내는 역할을 하기도 하고
    • Mono 방식으로 Subscriber가 데이터를 소비할 수 있음

 

  • 정리하면 Sinks.one() 메서드를 호출하는 것은 한 건의 데이터를 프로그래밍 방식으로 내보내는 기능을 사용하고 싶으니 거기에 맞는 적당한 기능 명세를 달라고 요청하는 것

 

 

 

부연 설명

  • Sinks.one() 메서드를 호출하면 Sinks.One이라는 기능 명세를 반환하며 Sinks.One 객체로 데이터를 내보냄
  • emitValue() 메서드의 두 번째 파라미터는 emit 도중에 에러가 발생할 경우 어떻게 처리할 것인지에 대한 핸들러
  • FAIL_FAST는 람다 표현식으로 표현한 EmitFailureHandler 인터페이스의 구현 객체
    • 해당 EmitFailureHandler 객체를 통해 emit 도중 발생한 에러에 대해 빠르게 실패 처리
    • 빠르게 실패 처리한다는 의미는 에러가 발생했을 때 재시도를 하지 않고 즉시 실패 처리를 함으로써 쓰레드 간의 경합 등으로 발생하는 데드락 상태 등을 미연에 방지할 수 있으며 궁극적으로 thread-safe 함을 보장

 

  • Sinks.One으로 아무리 많은 수의 데이터를 내보낸다 하더라도 처음 내보낸 데이터(Hello Reactor)는 정상적으로 내보내지지만 나머지 데이터(Hi Reactor)들은 Drop 된다는 것을 확인할 수 있음

 

2. Sinks.Many

  • Sinks.many() 메서드를 사용해서 여러 건의 데이터를 여러 가지 방식으로 전송하는 기능을 정의해 둔 기능 명세

 

Sinks.many()

 

부연 설명

  • Sinks.many() 메서드는 Sinks.Many를 반환하지 않고 ManySpec이라는 인터페이스를 반환
    • Sinks.One의 경우 단순히 한 건의 데이터를 내보내는 한 가지 기능만 가지기 때문에 별도의 Spec이 정의되는 게 아니라 Default Spec을 사용
    • 반면 Sinks.Many의 경우 데이터를 내보내기 위한 여러 가지 기능이 정의된 ManySpec을 반환

 

ManySpec

 

2.1 Sinks.Many unicast() 예제

 

 

부연 설명

  • ManySpec의 구현 메서드 중 하나인 unicast() 메서드를 호출
  • unicast() 메서드를 호출하면 반환 값으로 UnicastSpec을 리턴하고 최종적으로 UnicastSpec에 정의된 기능을 사용
    • UnicastSpec에 정의된 기능은 onBackpressureBuffer() 메서드를 호출함으로써 사용

 

  • 마지막 라인의 주석을 해제하고 구독을 한 번 더 발생하게 할 경우 IllegalStateException 발생
    • UnicastProcessor는 하나의 Subscriber만 허용하기 때문

 

 

2.2 Sinks.Many multicast() 예제

 

 

 

부연 설명

  •  예제 코드에서는 ManySpec의 구현 메서드 중 하나인 multicast() 메서드를 호출
    • multicast() 메서드를 호출하면 반환 값으로 MulticastSpec을 반환하고 MulticastSpec의 구현 메서드인 onBackpressureBuffer() 메서드를 호출
    • MulticastSpec의 기능은 하나 이상의 Subscriber에게 데이터를 내보내는 것

 

  • Subscriber1은 세 개의 데이터를 모두 전달받은 반면 Subscriber2는 세 번째 데이터만 전달받음
    • Sinks가 Publisher의 역할을 수행할 경우 기본적으로 Hot Publisher로 동작하며 특히 onBackpressureBuffer() 메서드는 Warm up의 특징을 가지는 Hot Sequence로 동작하기 때문에 첫 번째 구독이 발생한 시점에 Downstream 쪽으로 데이터가 전달됨

 

2.3 Sinks.Many replay() 예제

 

 

부연 설명

  • replay() 메서드를 호출하면 반환 값으로 MulticastReplaySpec을 반환하고 MulticastReplaySpec의 구현 메서드 중 하나인 limit() 메서드를 호출
  • MulticastReplaySpec에는 emit 된 데이터를 다시 replay해서 구독 전에 이미 내보내진 데이터라도 Subscriber가 전달받을 수 있도록 다양한 메서드들이 정의되어 있음
    • all() 메서드는 구독 전에 이미 내보내진 데이터가 있더라도 처음 emit된 데이터부터 모든 데이터들이 Subscriber에게 전달됨

 

  • limit() 메서드는 emit된 데이터 중 파라미터로 입력한 개수만큼 가장 나중에 내보내진 데이터부터 Subscriber에게 전달하는 기능
    • 내보내진 데이터 중 2개만 뒤로 돌려서 전달하겠다는 의미이기 때문에 Subscriber1 입장에서는 구독 시점에 이미 세 개의 데이터가 emit 되었기 때문에 1은 생략되고 2, 3이 Subscriber1에 전달된 것을 확인할 수 있음
    • Subscriber2의 경우 구독 전에 4개의 숫자가 내보내졌기 때문에 마지막 두 개인 3, 4가 전달된 것을 확인할 수 있음

 

참고

  • 스프링으로 시작하는 리액티브 프로그래밍 (황정식 저자)
반응형