Spring/스프링으로 시작하는 리액티브 프로그래밍

[Reactor] Backpressure

꾸준함. 2024. 7. 18. 00:11

Backpressure

  • Publisher와 Subscriber 간의 데이터 흐름을 제어하는 메커니즘
  • Publisher가 끊임없이 내보내는 무수히 많은 데이터를 적절하게 제어하여 데이터 처리에 과부하가 걸리지 않도록 제어하는 역할
  • Backpressure가 없다면 시스템이 과부하 되거나 메모리 부족 등의 문제가 발생할 수 있음
    • 과도한 데이터 생성: Publisher가 매우 빠르게 데이터를 생성할 때, 소비자가 그 데이터를 제시간에 처리하지 못하면 데이터가 쌓이게 됨
    • 리소스 제한: Subscriber는 제한된 리소스(메모리, CPU 등)를 사용하여 데이터를 처리하는데 Subscriber의 처리 속도가 Publisher의 생성 속도를 따라가지 못할 때 문제가 발생함
    • 시스템 안정성: Backpressure를 통해 시스템의 안정성을 유지하고, 메모리 부족이나 성능 저하를 방지할 수 있음

 

Reactor에서의 Backpressure 처리 방식

 

1. 데이터 개수 제어

  • 첫 번째 방식은 Subscriber가 적절히 처리할 수 있는 수준의 데이터 개수를 Publisher에게 요청하는 것

 

 

부연 설명

  • Subscriber가 데이터 요청 개수를 직접 제어하기 위해 Subscriber 인터페이스의 구현 클래스인 BaseSubscriber를 사용
  • hookOnSubscribe() 메서드는 Subscriber 인터페이스에 정의된 onSubscribe() 메서드를 대신해 구독 시점에 request() 메서드를 호출해서 최초 데이터 요청 개수를 제어하는 역할
  • hookOnNext() 메서드는 Subscriber 인터페이스에 정의된 onNext() 메서드를 대신해 Publisher가 내보낸 데이터를 전달받아 처리한 후 Publisher에게 또다시 데이터를 요청하는 역할을 수행하며 이때 request() 메서드를 호출해서 데이터 요청 개수 제어
  • 첫 번째 `# doOnRequest: 1`은 hookOnSubscribe() 메서드에서 request(1) 메서드를 호출함으로써 출력된 결과
    • 나머지 `#doOnRequest: 1`은 모두 hookOnNext() 메서드에서 request(1) 메서드를 호출함으로써 출력한 결과

 

2. Backpressure 전략 사용

  • Reactor에서는 Backpressure를 위한 다양한 전략 제공

 

종류 설명
IGNORE Backpressure를 적용하지 않음
ERROR Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우 예외 발생시킴
DROP Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우 버퍼 밖에서 대기하는 먼저 내보내진 데이터부터 Drop 시킴
LATEST Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우 버퍼 밖에서 대기하는 가장 나중에 내보내진 데이터부터 버퍼에 채움
BUFFER Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우 버퍼 안에 있는 데이터부터 Drop 시킴

 

2.1 IGNORE 전략

  • Backpressure를 적용하지 않느 전략
  • Downstream에서의 Backpressure 요청이 무시되기 때문에 IllegalStateException이 발생할 수 있음

 

2.2 ERROR 전략

  • Downstream의 데이터 처리 속도가 느려서 Upstream에 의해 내보내진 속도를 따라가지 못할 경우 IllegalStateException 발생시키며 Publisher는 Error 시그널을 Subscriber에게 전송하고 삭제한 데이터는 폐기함

 

 

부연 설명

  • Subscriber가 전달받은 데이터를 처리하는데 0.005초 시간이 걸리도록 시뮬레이션했고 이에 따라 Publisher에서 데이터를 내보내는 속도와 Subscriber가 전달받은 데이터를 처리하는 속도에 차이가 발생함
    • doOnNext() Operator에서 출력한 로그에서는 Publisher가 약 0.0001초에 한 번씩 데이터를 내보내는 것을 확인 가능
    • Subscriber에서 데이터를 처리하는 onNext 람다 표현식에서는 0.005초에 한 번씩 로그를 출력하다가 255라는 숫자를 출력하고 OverflowException이 발생하면서 Sequence가 종료되는 것을 확인 가능

 

  • ERROR 전략을 적용하기 위해 onBackpressureError() Operator를 사용
  • publishOn() Operator는 Reactor Sequence 중 일부를 별도의 쓰레드에서 실행할 수 있도록 해주는 Operator

 

2.3 Drop 전략

  • Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우 버퍼 밖에서 대기 중인 데이터 중에서 먼저 내보낸 데이터부터 Drop 시키는 전략

 

https://hanseom.tistory.com/364

 

부연 설명

  • Step 3에서 Publisher는 버퍼가 가득 찼는데도 불구하고 데이터를 계속 내보내고 있으며 버퍼가 가득 찼기 때문에 버퍼 밖에서 대기하는 상황 발생
  • Step 4에서는 Downstream에서 데이터 처리가 아직 끝나지 않아 버퍼가 비어 있지 않은 상태이기 때문에 버퍼 밖에서 대기 중인 먼저 내보내진 숫자 11, 12, 13이 Drop 된 것을 확인할 수 있음
  • Step 5에서는 Downstream에서 데이터 처리가 끝나서 버퍼를 비운 상태로 버퍼가 비었기 때문에 Step 6에서 숫자 14부터는 Drop 되지 않고 버퍼에 채워지는 것을 확인할 수 있음

 

 

부연 설명

  • 첫 번째 Drop 구간에서 Drop이 시작되는 데이터는 숫자 256이고 Drop이 끝나는 데이터 숫자는 1086
    • 해당 구간 동안에는 버퍼가 가득 차 있는 상태
    • 숫자 1086까지 Drop 되기 때문에 Subscriber 쪽에서는 숫자 1087부터 전달받아 처리하는 것을 확인할 수 있음
    • 정리하자면 Backpressure DROP 전략을 적용할 경우 버퍼가 가득 찬 상태에서는 버퍼가 비워질 때까지 데이터를 DROP 시키는 것을 확인 가능

 

2.4 LATEST 전략

  • Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기 중인 데이터 중에서 가장 나중에 내보내진 데이터부터 버퍼에 채우는 전략
    • Drop 전략은 버퍼가 가득 찰 경우 버퍼 밖에서 대기 중인 데이터를 하나씩 차례대로 Drop 하면서 폐기
    • 반면 Latest 전략은 새로운 데이터가 들어오는 시점에 가장 최근의 데이터만 남겨두고 나머지 데이터 폐기

 

https://hanseom.tistory.com/364

 

부연 설명

  • Step 3에서는 버퍼가 가득 찬 상태에서 Publisher로부터 데이터가 계속 내보내지기 때문에 버퍼 밖에서 대기
  • Step 4에서는 Downstream에서 데이터 처리가 끝나서 버퍼를 비운 상태이며 이에 따라 가장 나중에 내보내진 숫자 17부터 버퍼에 채워지고 나머지 데이터는 폐기
    • 그림상으로는 Step 4에서 가장 최근에 내보내진 숫자 17 이외의 나머지 숫자들이 한꺼번에 폐기되는 것처럼 표현했으나 실제로는 데이터가 들어올 때마다 이전에 유지하고 있던 데이터가 폐기됨

 

 

부연 설명

  • Subscriber가 숫자 255를 출력하고 곧바로 그다음 숫자 1073을 출력하는 것을 확인할 수 있음

 

2.5 BUFFER 전략

  • 세 가지 전략을 지원함
    • 버퍼의 데이터를 폐기하지 않고 버퍼링을 하는 전략 (이번에 살펴볼 전략)
    • 버퍼가 가득 찰 경우 버퍼 내의 데이터를 폐기하는 전략
    • 버퍼가 가득 차면 에러를 발생시키는 전략

 

2.5.1 DROP_LATEST 전략

  • Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우 가장 나중에 버퍼 안에 채워진 데이터를 Drop 하여 폐기한 후 확보된 공간에 Publisher에 의해 내보내진 데이터를 채우는 전략

 

https://hanseom.tistory.com/364

 

부연 설명

  • Step 3에서는 숫자 11이 내보내져 버퍼에 채워지는데 버퍼가 이미 가득 찬 상태에서 숫자 하나가 더 들어왔기 때문에 버퍼 overflow 발생
    • 따라서 Step 4에서는 overflow를 발생시키는 숫자 11이 Drop 되어 폐기됨

 

 

부연 설명

  • onBackpressureBuffer() Operator를 사용하여 BUFFER 전략을 적용
    • 첫 번째 파라미터 2는 버퍼의 최대 용량을 나타냄
    • 두 번째 파라미터를 통해 버퍼 overflow가 발생했을 때 Drop 되는 데이터를 전달받아 후처리 할 수 있음
    • 세 번째 파라미터는 적용할 Backpressure 전략을 나타내며 여기서는 DROP_LATEST 적용

 

  • publishOn() Operator를 사용해서 쓰레드를 하나 더 추가하는데 세 번째 파라미터에서 prefetch 수를 1로 지정
    • prefetch는 Scheduler가 생성하는 쓰레드의 비동기 경계 시점에 미리 보관할 데이터의 개수를 의미
    • 데이터의 요청 개수에 영향을 끼침

 

  • 첫 번째 doOnNext() Operator를 통해 원본 Flux 즉 interval() Operator에서 생성된 원본 데이터가 내보내지는 과정을 확인 가능
  • 두 번째 doOnNext() Operator를 통해 Buffer에서 Downstream으로 내보내지는 데이터를 확인 가능

 

2.5.2 DROP_OLDEST 전략

  • Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우 버퍼 안에 채워진 데이터 중 가장 오래된 데이터를 Drop 하여 폐기한 후 확보된 공간에 Publisher로부터 내보내진 데이터를 채우는 전략
    • DROP_LATEST와 정반대 되는 전략

 

https://hanseom.tistory.com/364

 

부연 설명

  • Step 3에서는 숫자 11이 내보내져 버퍼에 채워지는데 버퍼가 이미 가득 찬 상태에서 숫자 하나가 더 들어왔기 때문에 버퍼 overflow 발생
    • Step 4에서는 숫자 11이 Drop 되는 것이 아니라 버퍼 제일 앞쪽에 있는 숫자 1이 Drop 됨

 

 

부연 설명

  • DROP_LATEST 전략 예제 코드에서 전략만 바꾼 코드

 

참고

  • 스프링으로 시작하는 리액티브 프로그래밍 (황정식 저자)
반응형