Spring/스프링으로 시작하는 리액티브 프로그래밍

[Reactor] Scheduler

꾸준함. 2024. 7. 24. 01:01

Scheduler

  • OS에서 사용되는 Scheduler와 의미가 비슷
  • OS 레벨에서의 Scheduler는 실행되는 프로그램인 프로세스를 선택하고 실행하는 등 프로세스의 라이프 사이클을 관리해 주는 관리자 역할
  • Reactor의 Scheduler는 비동기 프로그래밍을 위해 사용되는 쓰레드를 관리해 주는 역할
    • Scheduler를 사용하여 어떤 쓰레드에서 무엇을 처리할지 제어
    • Reactor Scheduler를 사용하면 코드 자체가 매우 간결해지고 Scheduler가 쓰레드의 제어를 대신해 주기 때문에 멀티 쓰레드 환경에서의 예상치 못한 오류가 발생할 확률을 최소화시킬 수 있음

 

Scheduler를 위한 전용 Operator

  • Reactor에서 Scheduler는 Scheduler 전용 Operator를 통해 사용할 수 있음
    • subscribeOn() Operator와 publishOn() Operator가 대표적인 예시이며 해당 Operator의 파라미터로 적절한 Scheduler를 전달하면 해당 Scheduler의 특성에 맞는 쓰레드가 Reactor Sequence에 할당됨
    • parallel()이라는 특별한 Operator도 있음

 

1. subscribeOn() Operator

  • 구독이 발생한 직후 실행될 쓰레드를 지정하는 Operator
  • 구독이 발생하면 원본 Publisher가 데이터를 최초로 내보내게 되는데 subscribeOn() Operator는 구독 시점 직후에 실행되기 때문에 원본 Publisher의 동작을 수행하기 위한 쓰레드

 

 

부연 설명

  • subscribeOn() Operator를 추가했기 때문에 구독이 발생한 직후 원본 Publisher의 동작을 처리하기 위한 쓰레드를 할당
  • doOnNext() Operator를 사용해 원본 Flux에서 emit 되는 데이터를 로그로 출력
  • doOnSubscribe() Operator를 사용해 구독이 발생한 시점에 추가적인 어떤 처리가 필요할 경우 해당 처리 동작을 추가할 수 있으며 여기서는 구독이 발생한 시점에 실행되는 쓰레드가 무엇인지 확인

 

2. publishOn() Operator

  • Downstream으로 Signal을 전송할 때 실행되는 쓰레드를 제어하는 역할을 하는 Operator
  • publishOn()을 기준으로 아래쪽인 Downstream의 실행 쓰레드를 변경
  • subscribeOn()과 마찬가지로 파라미터로 Scheduler를 지정함으로써 해당 Scheduler의 특성을 가진 쓰레드로 변경 가능

 

 

 

부연 설명

  • publishOn() Operator를 사용했기 때문에 Downstream으로 데이터를 내보내는 쓰레드를 변경하며 나머지 코드는 subscribeOn() Operator 예제 코드와 유사함
  • 최초 실행 쓰레드는 main 쓰레드이기 때문에 doOnSubscribe()는 main 쓰레드에서 실행되며 doOnNext()의 경우 subscribeOn() Operator를 사용하지 않았기 때문에 여전히 main 쓰레드에서 실행
  • publishOn() Operator를 추가했기 때문에 publishOn()을 기준으로 Downstream의 실행 쓰레드가 변경되어 parallel-1 쓰레드에서 실행

 

3. parallel() Operator

  • subscribeOn() Operator와 publishOn() Operator의 경우 동시성을 가지는 논리적인 쓰레드
    • 동시성이란 무수히 많은 논리적이니 쓰레드가 N개의 물리적인 쓰레드를 아주 빠른 속도로 번갈아 가며 사용하면서 마치 동시에 실행되는 것처럼 보이는 효과

 

  • parallel() Operator는 병렬성을 가지는 물리적인 쓰레드
    • 병렬성은 물리적인 쓰레드가 실제로 동시에 실행되기 때문에 여러 작업을 동시에 처리함을 의미
    • 라운드 로빈 방식으로 CPU 코어 개수만큼의 쓰레드를 병렬로 실행

 

 

 

부연 설명

  • 병렬 작업을 처리하기 위해 4개의 쓰레드를 지정
  • 코드 실행 결과를 보면 4개의 쓰레드가 병렬로 실행되는 것을 확인 가능

 

publishOn()과 subscribeOn()의 동작 이해

 

1. 두 개의 publishOn() Operator를 사용할 경우, Operator 체인에서 실행되는 스레드의 동작 과정

 

https://liltdevs.tistory.com/210

 

부연 설명

  • 첫 번째 publishOn()을 추가함으로써 filter() Operator는 parallel-2 쓰레드에서 실행
  • 두 번째 publishOn()을 추가함으로써 map Operator부터는 parallel-1 쓰레드에서 실행
  • 이처럼 Operator 체인상에서 한 개 이상의 publishOn() Operator를 사용하여 실행 쓰레드를 목적에 맞게 적절하게 분리할 수 있음

 

2. subscribeOn()과 publishOn()을 함께 사용할 경우, Operator 체인에서 실행되는 쓰레드의 동작 과정

 

https://liltdevs.tistory.com/210

 

부연 설명

  • publishOn() 이전까지의 Operator 체인은 subscribeOn()에서 지정한 boundedElastic-1 쓰레드에서 실행
  • publishOn() 이후의 Operator 체인은 parallel-1 쓰레드에서 실행
  • 이처럼 subscribe() Operator와 publishOn() Operator를 함께 사용하면 원본 Publisher에서 데이터를 내보내는 쓰레드와 emit 된 데이터를 가공 처리하는 쓰레드를 적절하게 분리할 수 있음

 

Scheduler 종류

 

1. Scheduler.immediate()

  • 별도의 쓰레드를 추가적으로 생성하지 않고 현재 쓰레드에서 작업을 처리하고자 할 때 사용

 

 

부연 설명

  • Schedulers.parallel()을 사용했기 때문에 parallel-1 쓰레드가 현재 쓰레드가 됨
  • Schedulers.immediate()는 추가 쓰레드를 생성하지 않으므로 map() Operator에서의 처리 작업과 Subscriber에서의 처리 작업이 별도의 추가 쓰레드가 아닌 parallel-1이라는 현재 쓰레드에서 실행된 것을 확인할 수 있음

 

2. Scheduler.single()

  • 쓰레드 하나만 생성해서 Scheduler가 제거되기 전까지 재사용하는 방식

 

 

부연 설명

  • doTask()라는 메서드를 호출해서 각자 다른 작업을 처리한다고 가정
  • Schedulers.single()을 사용했기 때문에 doTask()를 두 번 호출하더라도 첫 번째 호출에서 이미 생성된 쓰레드를 재사용
  • 이처럼 Schedulers.single()을 통해 하나의 쓰레드를 재사용하면서 다수의 작업을 처리할 수 있는데 하나의 쓰레드로 다수의 작업을 처리해야 되므로 지연 시간이 짧은 작업을 처리하는 것이 효과적

 

3. Schedulers.newSingle()

  • 호출할 때마다 매번 새로운 쓰레드 하나를 생성

 

 

부연 설명

  • Schedulers.newSingle() 메서드의 첫 번째 파라미터에는 생성할 쓰레드명을 지정하고 두 번째 파라미터는 해당 쓰레드를 데몬 쓰레드로 동작하게 할지 여부를 설정
    • 데몬 쓰레드는 보조 쓰레드라고도 불리는데 main 쓰레드가 종료되면 자동으로 종료되는 특성이 있음

 

  • doTask() 메서드를 호출할 때마다 새로운 쓰레드 하나를 생성해서 각각의 작업을 처리하는 것을 확인 가능

 

4. Schedulers.boundedElastic()

  • ExecutorService 기반의 쓰레드 풀을 생성한 후 그 안에서 정해진 수만큼의 쓰레드를 사용하여 작업을 처리하고 작업이 종료된 쓰레드는 반납하여 재사용하는 방식
  • 기본적으로 CPU 코어수의 10배만큼의 쓰레드를 생성하여 풀에 속한 모든 쓰레드가 작업을 처리하고 있다면 이용 가능한 쓰레드가 생길 때까지 최대 10만개의 작업이 큐에서 대기할 수 있음
  • Schedulers.boundedElastic()은 Blocking I/O 작업을 효과적으로 처리하기 위한 방식
    • 실행 시간이 긴 Blocking I/O 작업이 포함된 경우 다른 Non-Blocking 처리에 영향을 주지 않도록 전용 쓰레드를 할당해서 Blocking I/O 작업을 처리

 

5. Schedulers.parallel()

  • Non-Blocking I/O에 최적화되어 있는 Scheduler로써 CPU 코어 수만큼의 쓰레드를 생성

 

6. Schedulers.fromExecutorService()

  • 기존에 이미 사용하고 있는 ExecutorService가 있다면 해당 ExecutorService로부터 Scheduler를 생성하는 방식
  • ExecutorService로부터 직접 생성할 수도 있지만 Reactor에서는 해당 방식을 권장하지 않음

 

7. Schedulers.newXXXX()

  • Schedulers.single(), Schedulers.boundedElastic(), Schedulers.parallel()은 Reactor에서 제공하는 디폴트 Scheduler 인스턴스를 사용하지만 필요할 경우 Schedulers.newSingle(), Schedulers.newBoundedElastic(), Schedulers.newParallel() 메서드를 사용해서 새로운 Scheduler 인스턴스를 생성할 수 있음
    • 쓰레드명, 생성 가능한 디폴트 쓰레드의 개수, 쓰레드의 유휴 시간, 데몬 쓰레드로의 동작 여부 등을 직접 지정해서 커스텀 쓰레드 풀을 새로 생성할 수 있음

 

참고

  • 스프링으로 시작하는 리액티브 프로그래밍 (황정식 저자)
반응형