자바 IO
- 자바 1 버전에 최초 도입
- 파일과 네트워크에 데이터를 읽고 쓸 수 있는 API 제공
- InputStream, OutputStream과 같이 byte 단위로 읽고 쓸 수 있는 stream
- blocking 방식으로 동작
1. InputStream
- 바이트 스트림을 통해 데이터를 읽는 데 사용되는 추상 클래스
- Closable 구현체이기 때문에 명시적으로 스트림을 닫거나 try-with-resources 사용 가능
- 파일, 네트워크 연결, 메모리 버퍼 등 다양한 소스로부터 데이터를 읽을 수 있는 표준화된 방법을 제공
- 어떤 source로부터 데이터를 읽을지에 따라 다양한 구현체 존재
- FileInputStream
- ByteArrayInputStream
- BufferedInputStream
- SocketInputStream
1.1 주요 메서드
- read(): stream으로 데이터를 읽고 읽은 값을 반환, -1이라면 끝에 도달했다는 것을 의미
- close(): stream을 닫고 더 이상 데이터를 읽지 않음
- available(): 스트림에서 읽을 수 있는 바이트 수를 반환 (읽지 않고도 읽을 수 있는 데이터 양 확인 가능)
- skip(long n): 스트림에서 n 바이트를 건너뛰고 실제 건너뛴 바이트 수 반환
1.2 자바 IO를 이용한 소켓 통신 (InputStream 관점)
- serverSocket을 open 하여 외부의 요청을 수신
- bind.accept를 통해 serverSocket open을 준비
- accept가 끝나면 반환값으로 클라이언트의 socket을 전달
- 클라이언트 socket의 getInputStream으로 소켓의 inputStream 접근
- SocketInputStream은 public이 아니기 때문에 직접 접근이 불가하며 socket.getInputStream()으로만 접근 가능
- blocking 발생
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.BufferedInputStream; | |
import java.io.IOException; | |
import java.net.ServerSocket; | |
import java.net.Socket; | |
public class Server { | |
public static void main(String[] args) { | |
int port = <포트>; // 사용할 포트 번호 | |
// 서버 소켓 생성 및 포트 바인딩 | |
try (ServerSocket serverSocket = new ServerSocket(port)) { | |
System.out.println("서버가 시작되었습니다. 클라이언트를 기다리는 중..."); | |
// 클라이언트 접속 대기 (blocking) | |
try (Socket clientSocket = serverSocket.accept(); | |
BufferedInputStream bis = new BufferedInputStream(clientSocket.getInputStream())) { | |
System.out.println("클라이언트가 접속되었습니다."); | |
byte[] buffer = new byte[1024]; | |
int bytesRead; | |
StringBuilder inputBuilder = new StringBuilder(); | |
// 클라이언트로부터 데이터를 읽어들임 (blocking) | |
while ((bytesRead = bis.read(buffer)) != -1) { | |
inputBuilder.append(new String(buffer, 0, bytesRead)); | |
} | |
String inputLine = inputBuilder.toString(); | |
System.out.println("클라이언트로부터 받은 데이터: " + inputLine); | |
} | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |

2. OutputStream
- 바이트 스트림을 통해 데이터를 쓰기 위해 사용되는 추상 클래스로 write 시 바로 전송하지 않고 버퍼에 저장한 다음 일정량의 데이터가 모이면 한 번에 전달
- Closable 구현체이기 때문에 명시적으로 스트림을 닫거나 try-with-resources 사용 가능
- 파일, 네트워크 연결, 메모리 버퍼 등 다양한 목적지에 데이터를 쓰는 데 표준화된 방법을 제공
- 여러 desination에 데이터를 쓸지 여부에 따라 다양한 구현체 존재
- FileOutputStream
- ByteArrayOutputStream
- BufferedOutputStream
- SocketOutputStream
2.1 주요 메서드
- write(): stream으로 데이터를 출력
- flush(): 출력 스트림을 강제로 비워 버퍼에 저장된 모든 출력을 목적지로 보냄
- close(): 출력 스트림을 닫고 시스템 자원을 해제
2.2 자바 IO를 이용한 소켓 통신 (OutputStream 관점)
- serverSocket을 open 하여 외부의 요청을 수신
- bind.accept를 통해 serverSocket을 open할 준비
- serverSocket.accept() 호출로 클라이언트 접속을 대기하며 이 부분에서 blocking이 발생
- accept가 끝나면 반환값으로 클라이언트의 socket을 전달
- 클라이언트 socket의 getOutputStream을 호출하여 소켓의 OutputStream에 접근
- SocketOutputStream은 public이 아니기 때문에 직접 접근이 불가하며 socket.getOutputStream()으로만 접근 가능
- BufferedOutputStream을 사용하여 데이터를 클라이언트에게 전송하며 데이터를 전송하는 동안 blocking이 발생할 수 있음
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.BufferedOutputStream; | |
import java.io.IOException; | |
import java.io.OutputStream; | |
import java.net.ServerSocket; | |
import java.net.Socket; | |
public class Server { | |
public static void main(String[] args) { | |
int port = <포트>; // 사용할 포트 번호 | |
// serverSocket을 open 하여 외부의 요청을 수신 | |
try (ServerSocket serverSocket = new ServerSocket(port)) { | |
System.out.println("서버가 시작되었습니다. 클라이언트를 기다리는 중..."); | |
// bind.accept를 통해 serverSocket open을 준비 | |
// 클라이언트 접속 대기 (blocking) | |
try (Socket clientSocket = serverSocket.accept(); | |
OutputStream outputStream = clientSocket.getOutputStream(); | |
BufferedOutputStream bos = new BufferedOutputStream(outputStream)) { | |
System.out.println("클라이언트가 접속되었습니다."); | |
// 클라이언트에게 보낼 데이터 | |
String response = "Hello, Client!"; | |
byte[] responseBytes = response.getBytes(); | |
// 데이터를 클라이언트로 전송 (blocking) | |
bos.write(responseBytes); | |
bos.flush(); // 데이터를 모두 보냄 | |
System.out.println("클라이언트로 데이터를 전송했습니다."); | |
} | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
3. 자바 IO의 한계
- 앞서 소켓 통신 예제에서 살펴봤듯이 동기 blocking으로 동작
- 애플리케이션이 read 호출 시 kernel이 응답을 돌려줄 때까지 아무것도 할 수 없음
- I/O 요청이 발생할 때마다 쓰레드를 새로 할당할 경우 쓰레드 생성 및 관리하는 비용과 context switching으로 인한 cpu 자원 소모
- 커널 버퍼에 직접 접근 불가하기 때문에 메모리 copy가 발생하고 이에 따른 지연이 발생
- 자바 IO는 사용자 공간의 버퍼에서 커널 공간의 버퍼로, 또는 그 반대로 데이터를 복사해야 하며 이는 시스템 호출을 통해 이루어지며, 메모리 복사가 빈번하게 발생
- 대용량 데이터를 전송하거나 여러 입출력 작업을 동시에 수행할 때 지연 발생

부연 설명
- 하드웨어에서 값을 읽어오면 disk controlller가 DMA를 통해 커널 버퍼에 값을 복사
- 커널 버퍼에서 jvm 버퍼로 복사하며 이 과정에서 cpu 자원 소모
- jvm 버퍼, jvm 메모리에 있기 때문에 gc 대상이 되고 이 또한 cpu 자원 소모
자바 NIO
- Java New Input/Output의 약자로 자바 4 버전에 최초 도입
- 기존의 java.io 패키지보다 더 효율적이고 유연한 I/O 작업을 제공하는 것을 목표로 구현됨
- non-blocking 지원
- selector, channel 도입으로 높은 성능 보장
자바 IO | 자바 NIO | |
데이터의 흐름 | 단방향 | 양방향 |
종류 | InputStream, OutputStream | Channel |
데이터 단위 | byte/character | buffer |
blocking 여부 | 동기 blocking만 가능 | non-blocking 지원 (blocking API도 존재) |
특이사항 | selector 지원 |
1. Channel과 Buffer
- 데이터 읽을 때 적절한 크기의 버퍼를 생성하고 채널의 read() 메서드를 사용하여 데이터를 버퍼에 저장
- 데이터를 쓸 때 먼저 버퍼에 데이터를 저장하고 채널의 write() 메서드를 사용하여 목적지로 전달
- 버퍼를 clear() 메서드로 초기화하여 다시 사용 가능

1.1 Buffer 위치 속성
- capacity: 버퍼가 저장할 수 있는 데이터의 최대 크기로 버퍼 생성 시 결정되며 변경 불가
- position: 버퍼에서 현재 위치를 가리키며 버퍼에서 데이터를 읽거나 쓸 때 해당 위치로부터 시작
- limit: 버퍼에서 데이털르 읽거나 쓸 수 있는 마지막 위치, limit 이후로 데이터를 읽거나 쓰기 불가
- mark: 현재 position 위치를 mark()로 지정할 수 있고 reset() 호출 시 position을 mark로 이동
- 0 <= mark <= position <= limit <= capacity

2. DirectByteBuffer, HeapByteBuffer
2.1 DirectByteBuffer
- JVM 힙 외부의 메모리 영역을 사용하여 버퍼를 직접 할당하는 버퍼
- native 메모리에 저장
- 커널 메모리에 복사를 하지 않으므로 데이터를 읽고 쓰는 속도가 빠름
- 힙 메모리를 덜 사용하므로, JVM의 gc 부하를 줄일 수 있음
- 비용이 많이 드는 system call을 사용하므로 네이티브 메모리 할당은 JVM 힙 메모리 할당보다 시간이 더 걸림
2.2 HeapByteBuffer
- JVM 힙 메모리에 저장하며 byte array를 랩핑
- 자바 IO처럼 커널 메모리에 복사가 일어나므로 데이터를 읽고 쓰는 속도가 느림
- 단, JVM의 gc에서 관리가 되므로 메모리 할당이 상대적으로 빠름
2.3 ByteBuffer 구분 방법
- DirectByteBuffer: allocateDirect() 함수로 생성 가능
- HeapByteBuffer: allocate(), wrap() 함수로 생성 가능
- isDirect() 메서드를 통해 구분 가능
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.ByteBuffer; | |
public class ByteBufferExample { | |
public static void main(String[] args) { | |
// DirectByteBuffer 생성 | |
ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024); | |
// HeapByteBuffer 생성 | |
ByteBuffer heapBuffer = ByteBuffer.allocate(1024); | |
// wrap()을 사용하여 HeapByteBuffer 생성 | |
byte[] byteArray = new byte[1024]; | |
ByteBuffer wrappedHeapBuffer = ByteBuffer.wrap(byteArray); | |
System.out.println("directBuffer is direct: " + directBuffer.isDirect()); // true | |
System.out.println("heapBuffer is direct: " + heapBuffer.isDirect()); // false | |
System.out.println("wrappedHeapBuffer is direct: " + wrappedHeapBuffer.isDirect()); // false | |
} | |
} |
3. SelectableChannel
- 자바 NIO의 비동기 I/O 기능을 지원하는 채널
- SelectableChannel은 non-blocking 모드로 설정 가능
- SelectableChannel은 Selector와 통합되어 사용되며 이를 통해 여러 채널의 I/O 이벤트를 감지하고 처리 가능

3.1 SelectableChannel 주요 메서드
- configureBlocking(false): serverSocketChannel의 accept, socketChannel의 connect 등이 non-blocking으로 동작
- register(Selector sel, int ops): 채널을 Selector에 등록하고, 감지할 I/O 작업을 지정

This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lombok.SneakyThrows; | |
import lombok.extern.slf4j.Slf4j; | |
import java.net.InetSocketAddress; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.ServerSocketChannel; | |
import java.nio.channels.SocketChannel; | |
import java.nio.charset.StandardCharsets; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.atomic.AtomicInteger; | |
@Slf4j | |
public class JavaNIONonBlockingMultiServer { | |
// 요청을 처리한 횟수를 추적하기 위한 AtomicInteger | |
private static AtomicInteger count = new AtomicInteger(0); | |
// 클라이언트 요청을 비동기적으로 처리하기 위한 고정된 쓰레드 풀을 가진 ExecutorService (50개의 스레드 사용) | |
private static ExecutorService executorService = Executors.newFixedThreadPool(50); | |
@SneakyThrows | |
public static void main(String[] args) { | |
// non-blocking 서버 소켓 채널을 생성하고, 로컬호스트의 8080 포트에 바인딩 | |
try (ServerSocketChannel serverSocket = ServerSocketChannel.open()) { | |
serverSocket.bind(new InetSocketAddress("localhost", 8080)); | |
serverSocket.configureBlocking(false); // non-blocking 처리 | |
// 클라이언트 연결을 계속해서 대기 | |
while (true) { | |
SocketChannel clientSocket = serverSocket.accept(); | |
// 클라이언트 연결이 없으면 100밀리초 동안 대기하고 루프 계속 | |
if (clientSocket == null) { | |
Thread.sleep(100); | |
continue; | |
} | |
// CompletableFuture를 사용하여 클라이언트 요청을 비동기적으로 처리 | |
CompletableFuture.runAsync(() -> { | |
handleRequest(clientSocket); | |
}); | |
} | |
} | |
} | |
@SneakyThrows | |
private static void handleRequest(SocketChannel clientSocket) { | |
// DirectByteBuffer | |
ByteBuffer requestByteBuffer = ByteBuffer.allocateDirect(1024); | |
// 클라이언트 소켓에서 데이터를 읽어서 버퍼에 채울 때까지 대기 | |
while (clientSocket.read(requestByteBuffer) == 0) { | |
log.info("읽어오는 중..."); | |
} | |
// 버퍼를 쓰기 모드에서 읽기 모드로 전환 | |
requestByteBuffer.flip(); | |
String requestBody = StandardCharsets.UTF_8.decode(requestByteBuffer).toString(); | |
log.info("요청: {}", requestBody); | |
Thread.sleep(10); | |
// 응답 메시지를 가진 바이트 버퍼를 준비하고, 클라이언트 소켓에 작성 | |
ByteBuffer responseByteBuffer = ByteBuffer.wrap("멀티 클라이언트를 수용할 수 있는 서버".getBytes()); | |
clientSocket.write(responseByteBuffer); | |
clientSocket.close(); | |
int c = count.incrementAndGet(); | |
log.info("요청 횟수: {}", c); | |
} | |
} |
자바 NIO의 한계
- Polling 문제: 클라이언트 연결을 기다리는 while 루프와 같은 구조에서는 새로운 클라이언트가 연결될 때까지 CPU가 계속해서 루프를 돌며 accept를 호출, 각각의 쓰레드에서 read 가능한지 주기적으로 확인
- Thread Sleep: clientSocket == null인 경우 Thread.sleep(100)을 호출하여 CPU 사용을 줄이지만, 이는 정확한 시간 조절이 어렵고, 연결 지연을 초래
- 동기 I/O 작업: clientSocket.read(requestByteBuffer)와 같은 호출은 블로킹 I/O와 유사하게 작동하며 데이터를 읽을 수 있을 때까지 계속해서 대기
- 성능 문제: 동시에 발생하는 요청이 증가하는 경우 연결 처리가 순차적으로 발생하여 성능 저하
자바 AIO (NIO2)
- Java 7부터 도입된 NIO.2(New I/O 2) 라이브러리의 일부로, 비동기적 입출력을 지원하여 성능을 향상시키고 자원을 효율적으로 사용하도록 지원
- 기존의 블로킹 I/O와 대비하여, AIO는 스레드가 입출력 작업이 완료될 때까지 블로킹되지 않도록 하여 높은 동시성을 제공
- AsynchronousChannel 지원

1. 자바 AIO의 특징
- I/O가 준비되었을 때 Future 혹은 callback으로 비동기적인 로직 처리 가능
- 쓰레드 풀과 epool, kqueue 등의 이벤트 알림 system call 이용
- 이러한 시스템 콜은 커널에서 I/O 이벤트를 모니터링하고, I/O 작업이 준비되었을 때 이를 알림
- CPU 사용량을 최소화하고, 높은 성능을 제공
2. 자바 AIO vs 자바 NIO
자바 AIO | 자바 NIO | |
비동기 I/O 처리 | 1. 자바 AIO는 진정한 비동기 I/O 처리를 제공하여, I/O 작업이 완료될 때까지 기다리지 않고 바로 다른 작업을 수행할 수 있음 2. Future와 CompletionHandler를 사용하여 비동기 작업을 처리할 수 있어, 코드가 간결하고 이해하기 쉬움 |
1. 자바 NIO에서는 I/O 작업이 완료될 때까지 대기하는 블로킹 방식으로 데이터를 읽고 쓸 수 있으며 이는 큰 데이터를 처리할 때 성능 병목을 초래할 수 있음 2. Selector와 Channel을 사용하여 비동기적으로 처리할 수 있지만, 코드가 복잡해지고 관리가 어려움 |
자원 효율성 및 성능 | 1. 자바 AIO는 내부적으로 쓰레드 풀을 사용하여 비동기 작업을 처리하므로, 많은 클라이언트 요청을 효율적으로 처리 2. epoll, kqueue와 같은 시스템 콜을 사용하여 I/O 이벤트를 모니터링하므로, CPU 사용량을 최소화하고 높은 성능을 제공 |
1. 자바 NIO는 많은 클라이언트 연결을 처리할 수 있지만, 클라이언트 수가 많아질수록 Selector와 Channel을 관리하는 데 부담이 증가함 2. 이벤트 처리 루프에서 발생하는 불필요한 폴링과 블로킹으로 인해 CPU 사용량이 증가 |
코드 간결성 및 유지보수성 | 1. 자바 AIO는 CompletionHandler와 Future를 사용하여 비동기 작업을 간단하게 처리할 수 있으며 코드가 간결하고 명확하여 유지보수가 용이해짐 2. 콜백 메서드를 사용하여 이벤트 발생 시 필요한 작업을 처리하므로, 코드 구조가 더 직관적이고 이해하기 쉬움 |
1. 비동기 I/O 작업을 구현하려면 Selector와 Channel을 사용하여 복잡한 이벤트 처리 루프를 작성해야 하므로, 코드가 복잡해지고 유지보수가 어려움 |
3. 자바 NIO로 작성한 멀티 클라이언트 수용 서버 코드에 자바 AIO 적용
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lombok.SneakyThrows; | |
import lombok.extern.slf4j.Slf4j; | |
import java.net.InetSocketAddress; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.SelectionKey; | |
import java.nio.channels.Selector; | |
import java.nio.channels.ServerSocketChannel; | |
import java.nio.channels.SocketChannel; | |
import java.nio.charset.StandardCharsets; | |
import java.util.Iterator; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
@Slf4j | |
public class SelectorMultiServer { | |
private static ExecutorService executorService = Executors.newFixedThreadPool(50); | |
@SneakyThrows | |
public static void main(String[] args) { | |
try (ServerSocketChannel serverSocket = ServerSocketChannel.open(); | |
Selector selector = Selector.open(); | |
) { | |
serverSocket.bind(new InetSocketAddress("localhost", 8080)); | |
// 서버 소켓을 논블로킹 모드로 설정 | |
serverSocket.configureBlocking(false); | |
// 서버 소켓을 selector에 등록하여 연결 수락 이벤트를 대기 | |
serverSocket.register(selector, SelectionKey.OP_ACCEPT); | |
while (true) { | |
// selector에서 이벤트가 발생할 때까지 대기 | |
selector.select(); | |
// 선택된 키의 집합을 가져옴 | |
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); | |
while (selectedKeys.hasNext()) { | |
// 다음 키를 가져와서 집합에서 제거 | |
SelectionKey key = selectedKeys.next(); | |
selectedKeys.remove(); | |
if (key.isAcceptable()) { | |
// 들어오는 연결을 수락하고 논블로킹 모드로 설정 | |
SocketChannel clientSocket = ((ServerSocketChannel)key.channel()).accept(); | |
clientSocket.configureBlocking(false); | |
// 클라이언트 소켓을 읽기 작업을 위해 selector에 등록 | |
clientSocket.register(selector, SelectionKey.OP_READ); | |
} else if (key.isReadable()) { | |
// 클라이언트 소켓으로부터 데이터를 읽음 | |
SocketChannel clientSocket = (SocketChannel) key.channel(); | |
String requestBody = handleRequest(clientSocket); | |
// 클라이언트에게 응답을 보냄 | |
sendResponse(clientSocket, requestBody); | |
} | |
} | |
} | |
} | |
} | |
@SneakyThrows | |
private static String handleRequest(SocketChannel clientSocket) { | |
ByteBuffer requestByteBuffer = ByteBuffer.allocateDirect(1024); | |
clientSocket.read(requestByteBuffer); | |
requestByteBuffer.flip(); | |
String requestBody = StandardCharsets.UTF_8.decode(requestByteBuffer).toString(); | |
log.info("요청: {}", requestBody); | |
return requestBody; | |
} | |
@SneakyThrows | |
private static void sendResponse(SocketChannel clientSocket, String requestBody) { | |
// 응답 처리를 ExecutorService를 사용하여 비동기로 실행 | |
CompletableFuture.runAsync(() -> { | |
try { | |
Thread.sleep(10); | |
// 응답 내용을 준비 | |
String content = "received: " + requestBody; | |
ByteBuffer responeByteBuffer = ByteBuffer.wrap(content.getBytes()); | |
clientSocket.write(responeByteBuffer); | |
clientSocket.close(); | |
} catch (Exception e) { } | |
}, executorService); | |
} | |
} |
참고
- 패스트 캠퍼스 - Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지
반응형
'JAVA > 비동기 프로그래밍' 카테고리의 다른 글
[Netty] Netty 개요 및 간단한 예제 (0) | 2024.07.27 |
---|---|
[Java] CompletableFuture (0) | 2024.05.12 |
[Java] ThreadPoolExecutor (1) | 2024.04.28 |
[Java] 자바 동시성 프레임워크 (4) | 2024.04.20 |
[Java] 동기화 도구 (0) | 2024.03.14 |