Scheduler
- OS에서 사용되는 Scheduler와 의미가 비슷
- OS 레벨에서의 Scheduler는 실행되는 프로그램인 프로세스를 선택하고 실행하는 등 프로세스의 라이프 사이클을 관리해 주는 관리자 역할
- Reactor의 Scheduler는 비동기 프로그래밍을 위해 사용되는 쓰레드를 관리해 주는 역할
- Scheduler를 사용하여 어떤 쓰레드에서 무엇을 처리할지 제어
- Reactor Scheduler를 사용하면 코드 자체가 매우 간결해지고 Scheduler가 쓰레드의 제어를 대신해 주기 때문에 멀티 쓰레드 환경에서의 예상치 못한 오류가 발생할 확률을 최소화시킬 수 있음
Scheduler를 위한 전용 Operator
- Reactor에서 Scheduler는 Scheduler 전용 Operator를 통해 사용할 수 있음
- subscribeOn() Operator와 publishOn() Operator가 대표적인 예시이며 해당 Operator의 파라미터로 적절한 Scheduler를 전달하면 해당 Scheduler의 특성에 맞는 쓰레드가 Reactor Sequence에 할당됨
- parallel()이라는 특별한 Operator도 있음
1. subscribeOn() Operator
- 구독이 발생한 직후 실행될 쓰레드를 지정하는 Operator
- 구독이 발생하면 원본 Publisher가 데이터를 최초로 내보내게 되는데 subscribeOn() Operator는 구독 시점 직후에 실행되기 때문에 원본 Publisher의 동작을 수행하기 위한 쓰레드
부연 설명
- subscribeOn() Operator를 추가했기 때문에 구독이 발생한 직후 원본 Publisher의 동작을 처리하기 위한 쓰레드를 할당
- doOnNext() Operator를 사용해 원본 Flux에서 emit 되는 데이터를 로그로 출력
- doOnSubscribe() Operator를 사용해 구독이 발생한 시점에 추가적인 어떤 처리가 필요할 경우 해당 처리 동작을 추가할 수 있으며 여기서는 구독이 발생한 시점에 실행되는 쓰레드가 무엇인지 확인
2. publishOn() Operator
- Downstream으로 Signal을 전송할 때 실행되는 쓰레드를 제어하는 역할을 하는 Operator
- publishOn()을 기준으로 아래쪽인 Downstream의 실행 쓰레드를 변경
- subscribeOn()과 마찬가지로 파라미터로 Scheduler를 지정함으로써 해당 Scheduler의 특성을 가진 쓰레드로 변경 가능
부연 설명
- publishOn() Operator를 사용했기 때문에 Downstream으로 데이터를 내보내는 쓰레드를 변경하며 나머지 코드는 subscribeOn() Operator 예제 코드와 유사함
- 최초 실행 쓰레드는 main 쓰레드이기 때문에 doOnSubscribe()는 main 쓰레드에서 실행되며 doOnNext()의 경우 subscribeOn() Operator를 사용하지 않았기 때문에 여전히 main 쓰레드에서 실행
- publishOn() Operator를 추가했기 때문에 publishOn()을 기준으로 Downstream의 실행 쓰레드가 변경되어 parallel-1 쓰레드에서 실행
3. parallel() Operator
- subscribeOn() Operator와 publishOn() Operator의 경우 동시성을 가지는 논리적인 쓰레드
- 동시성이란 무수히 많은 논리적이니 쓰레드가 N개의 물리적인 쓰레드를 아주 빠른 속도로 번갈아 가며 사용하면서 마치 동시에 실행되는 것처럼 보이는 효과
- parallel() Operator는 병렬성을 가지는 물리적인 쓰레드
- 병렬성은 물리적인 쓰레드가 실제로 동시에 실행되기 때문에 여러 작업을 동시에 처리함을 의미
- 라운드 로빈 방식으로 CPU 코어 개수만큼의 쓰레드를 병렬로 실행
부연 설명
- 병렬 작업을 처리하기 위해 4개의 쓰레드를 지정
- 코드 실행 결과를 보면 4개의 쓰레드가 병렬로 실행되는 것을 확인 가능
publishOn()과 subscribeOn()의 동작 이해
1. 두 개의 publishOn() Operator를 사용할 경우, Operator 체인에서 실행되는 스레드의 동작 과정
부연 설명
- 첫 번째 publishOn()을 추가함으로써 filter() Operator는 parallel-2 쓰레드에서 실행
- 두 번째 publishOn()을 추가함으로써 map Operator부터는 parallel-1 쓰레드에서 실행
- 이처럼 Operator 체인상에서 한 개 이상의 publishOn() Operator를 사용하여 실행 쓰레드를 목적에 맞게 적절하게 분리할 수 있음
2. subscribeOn()과 publishOn()을 함께 사용할 경우, Operator 체인에서 실행되는 쓰레드의 동작 과정
부연 설명
- publishOn() 이전까지의 Operator 체인은 subscribeOn()에서 지정한 boundedElastic-1 쓰레드에서 실행
- publishOn() 이후의 Operator 체인은 parallel-1 쓰레드에서 실행
- 이처럼 subscribe() Operator와 publishOn() Operator를 함께 사용하면 원본 Publisher에서 데이터를 내보내는 쓰레드와 emit 된 데이터를 가공 처리하는 쓰레드를 적절하게 분리할 수 있음
Scheduler 종류
1. Scheduler.immediate()
- 별도의 쓰레드를 추가적으로 생성하지 않고 현재 쓰레드에서 작업을 처리하고자 할 때 사용
부연 설명
- Schedulers.parallel()을 사용했기 때문에 parallel-1 쓰레드가 현재 쓰레드가 됨
- Schedulers.immediate()는 추가 쓰레드를 생성하지 않으므로 map() Operator에서의 처리 작업과 Subscriber에서의 처리 작업이 별도의 추가 쓰레드가 아닌 parallel-1이라는 현재 쓰레드에서 실행된 것을 확인할 수 있음
2. Scheduler.single()
- 쓰레드 하나만 생성해서 Scheduler가 제거되기 전까지 재사용하는 방식
부연 설명
- doTask()라는 메서드를 호출해서 각자 다른 작업을 처리한다고 가정
- Schedulers.single()을 사용했기 때문에 doTask()를 두 번 호출하더라도 첫 번째 호출에서 이미 생성된 쓰레드를 재사용
- 이처럼 Schedulers.single()을 통해 하나의 쓰레드를 재사용하면서 다수의 작업을 처리할 수 있는데 하나의 쓰레드로 다수의 작업을 처리해야 되므로 지연 시간이 짧은 작업을 처리하는 것이 효과적
3. Schedulers.newSingle()
- 호출할 때마다 매번 새로운 쓰레드 하나를 생성
부연 설명
- Schedulers.newSingle() 메서드의 첫 번째 파라미터에는 생성할 쓰레드명을 지정하고 두 번째 파라미터는 해당 쓰레드를 데몬 쓰레드로 동작하게 할지 여부를 설정
- 데몬 쓰레드는 보조 쓰레드라고도 불리는데 main 쓰레드가 종료되면 자동으로 종료되는 특성이 있음
- doTask() 메서드를 호출할 때마다 새로운 쓰레드 하나를 생성해서 각각의 작업을 처리하는 것을 확인 가능
4. Schedulers.boundedElastic()
- ExecutorService 기반의 쓰레드 풀을 생성한 후 그 안에서 정해진 수만큼의 쓰레드를 사용하여 작업을 처리하고 작업이 종료된 쓰레드는 반납하여 재사용하는 방식
- 기본적으로 CPU 코어수의 10배만큼의 쓰레드를 생성하여 풀에 속한 모든 쓰레드가 작업을 처리하고 있다면 이용 가능한 쓰레드가 생길 때까지 최대 10만개의 작업이 큐에서 대기할 수 있음
- Schedulers.boundedElastic()은 Blocking I/O 작업을 효과적으로 처리하기 위한 방식
- 실행 시간이 긴 Blocking I/O 작업이 포함된 경우 다른 Non-Blocking 처리에 영향을 주지 않도록 전용 쓰레드를 할당해서 Blocking I/O 작업을 처리
5. Schedulers.parallel()
- Non-Blocking I/O에 최적화되어 있는 Scheduler로써 CPU 코어 수만큼의 쓰레드를 생성
6. Schedulers.fromExecutorService()
- 기존에 이미 사용하고 있는 ExecutorService가 있다면 해당 ExecutorService로부터 Scheduler를 생성하는 방식
- ExecutorService로부터 직접 생성할 수도 있지만 Reactor에서는 해당 방식을 권장하지 않음
7. Schedulers.newXXXX()
- Schedulers.single(), Schedulers.boundedElastic(), Schedulers.parallel()은 Reactor에서 제공하는 디폴트 Scheduler 인스턴스를 사용하지만 필요할 경우 Schedulers.newSingle(), Schedulers.newBoundedElastic(), Schedulers.newParallel() 메서드를 사용해서 새로운 Scheduler 인스턴스를 생성할 수 있음
- 쓰레드명, 생성 가능한 디폴트 쓰레드의 개수, 쓰레드의 유휴 시간, 데몬 쓰레드로의 동작 여부 등을 직접 지정해서 커스텀 쓰레드 풀을 새로 생성할 수 있음
참고
- 스프링으로 시작하는 리액티브 프로그래밍 (황정식 저자)
반응형
'Spring > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
[Reactor] Reactor Sequence 디버깅 방법 (0) | 2024.07.31 |
---|---|
[Reactor] Context (0) | 2024.07.30 |
[Reactor] Sinks (0) | 2024.07.23 |
[Reactor] Backpressure (0) | 2024.07.18 |
[Reactor] Cold Sequence, Hot Sequence (0) | 2024.07.17 |