마블 다이어그램
- 리액티브 프로그래밍에서 사용되는 시각적 도구로, 스트림의 흐름과 데이터 변환을 이해하기 쉽게 표현한 것
- 주로 RxJava나 Reactor와 같은 리액티브 라이브러리의 문서에서 자주 사용
- Reactor에서 지원하는 Operator를 이해하는 데 중요한 역할
- 마블 다이어그램을 먼저 보고 난 후 API 문서를 읽을 때와 그렇지 않을 때의 이해도에는 상당한 차이가 존재
- 처음 사용해 보는 Operator를 올바르게 이해하고 사용하기 위해서 해당 Operator의 API 설명을 보기 전 마블 다이어그램부터 먼저 확인하는 것을 권장

1. Publisher가 데이터를 내보내는 타임라인
- 그림에서는 단순히 Publisher의 타임라인으로 표시했지만 해당 Publisher는 데이터 소스를 최초로 내보내는 Publisher일 수도 있고 그렇지 않을 수도 있음
- Operator 함수인 map(x => x + 1)을 기준으로 Upstream의 Publisher라고 보는 것이 적절
- Reactor에서는 Flux의 경우 Source Flux라고도 부름
2. Publisher가 내보내는 데이터
- 타임라인은 왼쪽에서 오른쪽으로 시간이 흐르는 것을 의미
- 가장 왼쪽에 있는 1번 구슬이 시간상으로 가장 먼저 내보낸 데이터
3. 파이프 기호
- Publisher 타임라인 끝에 위치한 | 기호
- Publisher가 정상적으로 데이터를 내보냈음을 의미
- Signal로 표현하면 onComplete 시그널에 해당
4. Operator 함수 쪽으로 들어가는 점선 화살표
- Publisher가 내보낸 데이터가 Operator 함수의 입력으로 전달되는 것을 의미
5. Operator 함수
- Publisher로부터 전달받은 데이터를 처리하는 함수
- Reactor는 많은 수의 Operator를 지원하며 각각의 Operator마다 해당 Operator를 잘 설명하는 마블 다이어그램을 가짐
- 위 그림에서는 map() Operator 함수를 보여주는 마블 다이어그램
6. Operator에서 나가는 점선 화살표
- Publisher로부터 전달받은 데이터를 가공 처리한 후 출력으로 내보내는 것을 의미
- Operator 함수에서 반환하는 새로운 Publisher를 이용해 다운스트림에 가공된 데이터를 전달하는 것을 의미
7. Operator 함수에서 가공 처리되어 출력으로 내보내진 데이터의 타임라인
- Reactor에서는 Operator의 출력으로 반환된 Flux의 경우 Output Flux라고도 부름
8. Operator 함수에서 가공 처리된 데이터
9. Operator 타임라인의 X 기호
- 에러가 발생해 데이터 처리가 종료되었음을 의미
- Signal로 표현하면 onError 시그널
마블 다이어그램으로 알아보는 Reactor Publisher
1. Mono 마블 다이어그램

부연 설명
- Mono는 0개 또는 단 하나의 데이터를 내보내는 Publisher이기 때문에 다이어그램에서 3이라고 적혀 있는 구슬 데이터가 내보내지지 않고 onComplete 시그널만 전송된 것을 확인 가능
- RxJava의 Maybe와 같은 기능 수행
1.1 Mono 예제 #1
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) { | |
Mono.just("Hello Reactor") | |
.subscribe(System.out::println); | |
} |

부연 설명
- 위 코드처럼 데이터 한 건을 내보내기 위해 Mono를 사용할 수 있음
- just() Operator는 한 개 이상의 데이터를 내보내기 위한 대표적인 Operator로서 2개 이상의 데이터를 파라미터로 전달할 경우 내부적으로 fromArray() Operator를 이용해 데이터를 내보냄
1.2 Mono 예제 #2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) { | |
Mono | |
.empty() | |
.subscribe( | |
none -> System.out.println("# emitted onNext signal"), | |
error -> {}, | |
() -> System.out.println("# emitted onComplete signal") | |
); | |
} |

부연 설명
- 데이터를 한 건도 내보내지 않는 예제 코드 (Mono는 0개 혹은 1개의 데이터를 내보내는 역할)
- empty() Operator를 사용하면 데이터를 내보내지 않고 onComplete 시그널을 전송
- empty() Operator는 주로 어떤 특정 작업을 통해 데이터를 전달받을 필요는 없지만 작업이 끝났음을 알리고 이에 따른 후처리를 진행하고 싶을 때 사용
- 첫 번째 람다 함수는 onNext 시그널을 처리하는데 Mono.empty()는 데이터 아이템을 방출하지 않으므로, 이 람다 함수는 호출되지 않음
- 두 번째 람다 함수는 onError 시그널을 처리하는데 Mono.empty()는 오류를 방출하지 않으므로, 이 람다 함수 역시 호출되지 않음
- 따라서 none과 error는 무시되며, 오직 onComplete 시그널을 처리하는 람다 함수만 호출
1.3 Mono 예제 #3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) { | |
URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http") | |
.host("worldtimeapi.org") | |
.port(80) | |
.path("/api/timezone/Asia/Seoul") | |
.build() | |
.encode() | |
.toUri(); | |
RestTemplate restTemplate = new RestTemplate(); | |
HttpHeaders headers = new HttpHeaders(); | |
headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON)); | |
Mono.just( | |
restTemplate | |
.exchange(worldTimeUri, | |
HttpMethod.GET, | |
new HttpEntity<String>(headers), | |
String.class) | |
) | |
.map(response -> { | |
DocumentContext jsonContext = JsonPath.parse(response.getBody()); | |
String dateTime = jsonContext.read("$.datetime"); | |
return dateTime; | |
}) | |
.subscribe( | |
data -> System.out.println("# emitted data: " + data), | |
error -> { | |
System.out.println(error); | |
}, | |
() -> System.out.println("# emitted onComplete signal") | |
); | |
} |

부연 설명
- World Time Open API를 사용해서 특정 지역의 현재 날짜/시간을 JSON 형태의 응답으로 수신
- map() Operator에서 응답으로 수신한 JSON 형식의 데이터를 파싱 해서 현재 날짜/시간 정보를 Subscriber에게 전달한 후 로그로 출력
- 이처럼 Mono는 단 한 건의 데이터를 응답으로 보내는 HTTP 요청/응답에 사용하기 아주 적합한 Publisher 타입
- 위 코드는 non-blocking I/O 방식의 통신이 아니기 때문에 non-blocking 통신의 이점을 얻을 수 없지만 Mono를 사용하여 HTTP 요청/응답을 처리할 경우 요청과 응답을 하나의 Operator 체인으로 깔끔하게 처리할 수 있음
2. Flux 마블 다이어그램

부연 설명
- Flux는 여러 건의 데이터를 내보낼 수 있는 Publisher 타입
- Mono의 마블 다이어그램에서는 내보내는 구슬 모양의 데이터가 하나인 반면, Flux의 마블 다이어그램에서는 내보내는 구슬 모양의 데이터가 N개
2.1 Flux 예제 #1
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) { | |
Flux.just(6, 9, 13) | |
.map(num -> num % 2) | |
.subscribe(System.out::println); | |
} |

부연 설명
- just() Operator에서 내보내는 세 개의 숫자들을 전달받은 후 map() Operator에서 2로 나눈 나머지를 Subscriber에게 전달하여 출력
2.2 Flux 에제 #2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) { | |
Flux.fromArray(new Integer[]{3, 6, 7, 9}) | |
.filter(num -> num > 6) | |
.map(num -> num * 2) | |
.subscribe(System.out::println); | |
} |

부연 설명
- 데이터 소스로 제공되는 배열 데이터를 처리하기 위해 fromArray() Operator 사용
- 전달받은 배열의 원소를 하나씩 차례대로 내보내면 filter() Operator에서 해당 배열 원소를 전달받아 6보다 큰 숫자만 필터링한 후 map() Operator로 전달
2.3 Flux 예제 #3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) { | |
Flux<String> flux = | |
Mono.justOrEmpty("Steve") | |
.concatWith(Mono.justOrEmpty("Jobs")); | |
flux.subscribe(System.out::println); | |
} |

부연 설명
- just() Operator의 경우 파라미터의 값으로 null 값을 허용하지 않지만 justOrEmpty()는 null 값을 허용
- justOrEmpty()의 파라미터로 null이 전달되면 내부적으로 empty() Operator를 호출하도록 구현
- concatWith() Operator는 concatWith()를 호출하는 Publisher와 concatWith()의 파라미터로 전달되는 Publisher가 각각 내보내는 데이터들을 하나로 연결해서 새로운 Publisher의 데이터 소스로 만들어주는 Operator
- 아래 다이어그램은 concatWith 마블 다이어그램

2.4 Flux 예제 #4
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) { | |
Flux.concat( | |
Flux.just("Mercury", "Venus", "Earth"), | |
Flux.just("Mars", "Jupiter", "Saturn"), | |
Flux.just("Uranus", "Neptune", "Pluto")) | |
.collectList() | |
.subscribe(planets -> System.out.println(planets)); | |
} |

부연 설명
- concatWith() Operator의 경우 두 개의 데이터 소스만 연결할 수 있는 반면 concat() Operator는 여러 개의 데이터 소스를 원하는만큼 연결 가능
- collectList() Operator는 Upstream Publisher에서 내보내는 데이터를 모아 List의 원소로 포함시킨 새로운 데이터 소스로 만들어주는 Operator
참고
- 스프링으로 시작하는 리액티브 프로그래밍 (황정식 저자)
반응형
'Spring > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
[Reactor] Sinks (0) | 2024.07.23 |
---|---|
[Reactor] Backpressure (0) | 2024.07.18 |
[Reactor] Cold Sequence, Hot Sequence (0) | 2024.07.17 |
[Java] Reactor 간단 정리 (0) | 2024.07.14 |
[Java] 리액티브 프로그래밍 (1) | 2024.06.11 |