Design Pattern

[디자인 패턴] Reactor Pattern

꾸준함. 2024. 7. 15. 10:21

Reactor Pattern

  • 이벤트 드리븐 아키텍처에서 널리 사용되는 패턴으로 동시에 들어오는 요청을 처리하는 이벤트 핸들링 패턴
  • 해당 패턴은 이벤트 루프를 사용하여 다양한 I/O 이벤트(읽기, 쓰기, 연결 수락 등)를 처리하며, 각 이벤트는 사전에 등록된 핸들러에 의해 처리
  • 단인 쓰레드와 Selector를 통해 이벤트 처리
  • 네트워크 애플리케이션에서 높은 성능과 확장성을 제공

 

https://velog.io/@daehoon12/Reactive-Programming-Reactor-Pattern

 

주요 구성 요소

 

1. Selector

  • 여러 I/O 채널에서 발생한 이벤트를 감지하는 Event Demultiplexer
  • non-blocking 모드로 설정된 채널들을 감시하고, I/O 이벤트(읽기, 쓰기, 연결 요청 등)가 발생했을 때 이를 감지하고 Dispatcher에게 알림

 

2. Dispatcher

  • Selector가 감지한 이벤트를 적절한 EventHandler로 전달하는 역할
  • Selector로부터 이벤트 알림을 받으면, 해당 이벤트를 처리할 핸들러를 찾아 실행
  • Acceptor와 Handler를 실행

 

3. Acceptor

  • 새로운 클라이언트 연결을 수락하고, 해당 연결을 처리할 핸들러를 생성하여 등록
  • 새로운 연결 요청이 감지되면 SocketChannel을 생성하고, 이를 처리할 핸들러를 Selector에 등록
  • 클라이언트의 연결 요청을 처리하고, 새로운 Handler를 생성하여 Selector에 등록

 

4. Handler

  • 특정 I/O 이벤트를 처리하는 로직을 구현하며 주로 읽기, 쓰기 이벤트를 처리
  • Acceptor가 생성한 SocketChannel의 읽기 또는 쓰기 이벤트를 처리
  • Selector에 의해 읽기 또는 쓰기 이벤트가 발생하면, Dispatcher에 의해 호출되어 데이터를 처리

 

Reactor Pattern 동작 과정

  • 이벤트 감지: Selector가 여러 채널에서 발생한 I/O 이벤트를 감지
  • 이벤트 알림: 감지된 이벤트를 Dispatcher에게 알림
  • 이벤트 분배: Dispatcher는 이벤트 타입에 따라 적절한 핸들러(Acceptor 또는 Handler)를 호출
    • 연결 요청: Acceptor가 새로운 클라이언트 연결을 수락하고, 새로운 Handler를 생성하여 Selector에 등록
    • 읽기/쓰기 이벤트: 해당 이벤트를 처리할 Handler를 호출하여 데이터를 읽거나 씀

 

Selector와 Reactor Pattern을 적용하여 작성한 비동기 이벤트 드리븐 서버 코드 

 

1. EventHandler 인터페이스

  • 모든 이벤트 핸들러가 구현해야 하는 메서드 정의

 

public interface EventHandler {
void handle();
}
view raw .java hosted with ❤ by GitHub

 

2. Acceptor

  • 새로운 연결을 수락하고 새로운 SocketChannel을 생성하여 Selector 등록

 

import java.io.IOException;
import java.nio.channels.*;
public class Acceptor implements EventHandler {
private final Selector selector;
private final ServerSocketChannel serverSocket;
public Acceptor(Selector selector, ServerSocketChannel serverSocket) {
this.selector = selector;
this.serverSocket = serverSocket;
}
@Override
public void handle() {
try {
SocketChannel clientChannel = serverSocket.accept();
clientChannel.configureBlocking(false); // non-blocking 모드 설정
// 클라이언트 채널을 읽기 이벤트로 selector에 등록
clientChannel.register(selector, SelectionKey.OP_READ, new EchoHandler(clientChannel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
view raw .java hosted with ❤ by GitHub

 

3. EchoHandler

  • 클라이언트로부터 데이터를 읽고 다시 돌려주는 에코 서버의 간단한 로직 구현

 

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class EchoHandler implements EventHandler {
private final SocketChannel clientChannel;
private final ByteBuffer buffer = ByteBuffer.allocate(1024);
public EchoHandler(SocketChannel clientChannel) {
this.clientChannel = clientChannel;
}
@Override
public void handle() {
try {
int bytesRead = clientChannel.read(buffer);
if (bytesRead == -1) {
clientChannel.close();
return;
}
// 버퍼를 읽기 모드로 전환
buffer.flip();
clientChannel.write(buffer);
buffer.clear();
} catch (IOException e) {
e.printStackTrace();
try {
clientChannel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
view raw .java hosted with ❤ by GitHub

 

4. EventLoop

 

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class EventLoop implements Runnable {
private static final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final ServerSocketChannel serverSocket;
private final Selector selector;
private final EventHandler acceptor;
@SneakyThrows
public EventLoop(int port) {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress("localhost", port));
// non-blocking 모드 설정
serverSocket.configureBlocking(false);
// Acceptor 생성 및 selector에 등록
acceptor = new Acceptor(selector, serverSocket);
serverSocket.register(selector, SelectionKey.OP_ACCEPT).attach(acceptor);
}
@Override
public void run() {
// 단일 쓰레드로 event loop 실행
executorService.submit(() -> {
while (true) {
try {
// I/O 이벤트 대기
selector.select();
// 준비된 이벤트의 키를 반복자로 처리
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// 이벤트 디스패치
dispatch(key);
}
} catch (IOException e) {
log.error("Error in EventLoop: ", e);
}
}
});
}
// 이벤트 디스패치 메서드
private void dispatch(SelectionKey selectionKey) {
// SelectionKey에 첨부된 이벤트 핸들러 가져오기
EventHandler eventHandler = (EventHandler) selectionKey.attachment();
// 읽기 또는 수락 가능한 상태일 때 이벤트 핸들러 호출
if (selectionKey.isReadable() || selectionKey.isAcceptable()) {
eventHandler.handle();
}
}
}
view raw .java hosted with ❤ by GitHub

 

부연 설명

  • EventLoop 클래스는 Reactor 패턴에서 이벤트 루프의 핵심 역할을 수행
    • Selector를 사용하여 여러 채널에서 발생하는 I/O 이벤트를 감지하고, 준비된 이벤트 키들을 가져와 처리
    • 이벤트 키에 첨부된 핸들러를 호출하여 이벤트를 처리
    • ExecutorService를 사용하여 단일 스레드로 이벤트 루프를 실행하고, 비동기적으로 이벤트를 처리

 

참고

  • 패스트 캠퍼스 - Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지
반응형