실행자 프레임워크(Executor Framework)
java.util.concurrent 패키지는 실행자 프레임워크(Executor Framework)라고 하는 인터페이스 기반의 유연한 태스크 실행 기능을 담고 있습니다.
과거에는 단순한 작업 큐(work queue)를 만들기 위해 수많은 코드를 작성했어야 했지만, 이제는 다음과 같이 간단하게 작업 큐를 생성할 수 있습니다.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) { | |
ExecutorService exec = Executors.newSingleThreadExecutor(); | |
exec.execute(() -> System.out.println("task")); // 태스크 실행 | |
exec.shutdown(); // 실행자 종료 | |
} |
1. 실행자 서비스(ExecutorService) 주요 기능
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Example { | |
public static void main(String[] args) throws InterruptedException { | |
// 1. newSingleThreadExecutor | |
ExecutorService executor = Executors.newSingleThreadExecutor(); | |
// 2. execute | |
executor.execute(() -> { | |
String threadName = Thread.currentThread().getName(); | |
System.out.println("Hello from " + threadName); | |
}); | |
try { | |
// 4. get (Using Future) | |
Future<String> futureResult = executor.submit(() -> { | |
TimeUnit.SECONDS.sleep(2); | |
return "Result of Task"; | |
}); | |
String result = futureResult.get(); | |
System.out.println("Task result: " + result); | |
} catch (InterruptedException | ExecutionException e) { | |
e.printStackTrace(); | |
} finally { | |
if (!executor.isTerminated()) { | |
System.err.println("Cancel non-finished tasks"); | |
} | |
executor.shutdownNow(); | |
System.out.println("Shutdown finished"); | |
} | |
// 3. shutdown | |
executor.shutdown(); | |
// 5. invokeAny, invokeAll | |
ExecutorService executor2 = Executors.newFixedThreadPool(2); | |
try { | |
Callable<String> task1 = () -> { | |
TimeUnit.SECONDS.sleep(2); | |
return "Result of Task1"; | |
}; | |
Callable<String> task2 = () -> { | |
TimeUnit.SECONDS.sleep(1); | |
return "Result of Task2"; | |
}; | |
String result = executor2.invokeAny(List.of(task1, task2)); | |
System.out.println("Result of invokeAny: " + result); | |
// invokeAll | |
executor2.invokeAll(List.of(task1, task2)) | |
.stream() | |
.map(future -> { | |
try { | |
return future.get(); | |
} catch (Exception e) { | |
throw new IllegalStateException(e); | |
} | |
}) | |
.forEach(System.out::println); | |
} catch (InterruptedException | ExecutionException e) { | |
e.printStackTrace(); | |
} finally { | |
executor2.shutdown(); | |
} | |
// 6. awaitTermination | |
ExecutorService executor3 = Executors.newSingleThreadExecutor(); | |
try { | |
executor3.awaitTermination(3, TimeUnit.SECONDS); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} finally { | |
if (!executor3.isTerminated()) { | |
executor3.shutdownNow(); | |
} | |
} | |
// 7. ExecutorCompletionService | |
ExecutorService executor4 = Executors.newFixedThreadPool(3); | |
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor4); | |
for (int i = 1; i <= 5; i++) { | |
int task = i; | |
completionService.submit(() -> task); | |
} | |
try { | |
for (int i = 0; i < 5; i++) { | |
System.out.println(completionService.take().get()); | |
} | |
} catch (InterruptedException | ExecutionException e) { | |
e.printStackTrace(); | |
} finally { | |
executor4.shutdown(); | |
} | |
// 8. ScheduledThreadPoolExecutor | |
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); | |
scheduledExecutor.schedule(() -> { | |
System.out.println("Scheduled Task executed"); | |
}, 3, TimeUnit.SECONDS); | |
scheduledExecutor.shutdown(); | |
} | |
} |

코드 부연 설명
- get 메서드: 특정 태스크가 완료되기를 기다림
- invokeAny 메서드: 태스크 모음 중 아무것 하나가 완료되기를 기다림
- invokeAll 메서드: 모든 태스크가 완료되기를 기다림
- awaitTermination 메서드: 실행자 서비스가 종료하기를 기다림
- ExecutorCompletionService: 완료된 태스크들의 결과를 차례로 받음
- ScheduledThreadPoolExecutor: 태스크를 특정 시간에 혹은 주기적으로 실행
2. java.util.concurrent.Executors
- 큐를 둘 이상의 쓰레드가 처리하게 하고 싶을 경우 간단히 다른 정적 팩토리를 이용하여 다른 종류의 실행자 서비스(ThreadPool)를 생성
- 쓰레드 풀의 쓰레드 개수는 고정할 수도 있고, 유동적으로 상황에 맞게 설정되게 할 수도 있음
- 실행자 대부분은 java.util.concurrent.Executors의 정적 팩토리들을 이용해 생성할 수 있음
- 위 클래스로 쓰레드 풀 동작을 결정하는 거의 모든 속성을 설정 가능
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) { | |
// 쓰레드 풀의 크기를 고정하고 두 개의 쓰레드로 이루어진 쓰레드 풀 생성 | |
ExecutorService executorService = Executors.newFixedThreadPool(2); | |
// 작업을 제출 | |
executorService.submit(() -> { | |
try { | |
System.out.println("Task 1 started by thread: " + Thread.currentThread().getName()); | |
// 작업 시뮬레이션을 위해 잠시 대기 | |
Thread.sleep(2000); | |
System.out.println("Task 1 completed by thread: " + Thread.currentThread().getName()); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}); | |
executorService.submit(() -> { | |
try { | |
System.out.println("Task 2 started by thread: " + Thread.currentThread().getName()); | |
// 작업 시뮬레이션을 위해 잠시 대기 | |
Thread.sleep(1000); | |
System.out.println("Task 2 completed by thread: " + Thread.currentThread().getName()); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}); | |
// 모든 작업이 완료될 때까지 대기 | |
executorService.shutdown(); | |
try { | |
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} |

3. Executors.newCachedThreadPool
- 특별히 설정할 것이 없고 일반적인 용도에 적합
- CachedThreadPool에서는 요청받은 태스크들이 큐에 쌓이지 않고 즉시 쓰레드에 위임되어 실행
- 가용한 쓰레드가 없다면 새로 하나를 생성
- 무거운 상용 서버에는 좋지 않음
- "무겁운 상용 서버"는 주로 시스템 자원을 많이 사용하거나 처리량이 많은 작업을 수행하는 서버
- 서버가 아주 무겁다면 CPU 이용률이 100%에 치닫고, 새로운 태스크가 도착하는 족족 또 다른 쓰레드들을 생성하여 상황을 악화시킬 것
- 따라서 무거운 상용 서버에서는 쓰레드 개수를 고정한 Executors.newFixedThreadPool을 선택하거나 완전히 통제할 수 있는 ThreadPoolExecutor를 직접 사용하는 것을 권장
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class CachedThreadPoolExample { | |
public static void main(String[] args) { | |
// CachedThreadPool 생성 | |
ExecutorService executorService = Executors.newCachedThreadPool(); | |
for (int i = 0; i < 5; i++) { | |
int taskNumber = i + 1; | |
executorService.submit(() -> { | |
System.out.println("Task " + taskNumber + " started by thread: " + Thread.currentThread().getName()); | |
// 작업 시뮬레이션을 위해 잠시 대기 | |
try { | |
Thread.sleep(2000); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
System.out.println("Task " + taskNumber + " completed by thread: " + Thread.currentThread().getName()); | |
}); | |
} | |
// 모든 작업이 완료될 때까지 대기 | |
executorService.shutdown(); | |
try { | |
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |

4. Task
- 쓰레드를 직접 다루면 Thread가 작업 단위와 실행 메커니즘 역할을 모두 수행
- 실행자 프레임워크에서는 작업 단위와 실행 매커니즘을 분리
- Task는 작업 단위를 나타내는 핵심 추상 개념이며 두 가지 분류로 구분 가능
- Runnable
- Callable
- Callable은 Runnable과 유사한 개념이지만 값을 반환하고 임의의 예외를 던질 수 있음
- Task 수행을 ExecutorService에 맡기면 원하는 태스크 수행 정책을 선택할 수 있고, 생각이 바뀌면 언제든 변경할 수 있음
- 여기서 핵심은 실행자 프레임워크가 작업 수행을 담당해준다는 것
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TaskExample { | |
public static void main(String[] args) { | |
// Runnable task | |
Runnable runnableTask = () -> { | |
String threadName = Thread.currentThread().getName(); | |
System.out.println("Runnable task executed by thread: " + threadName); | |
}; | |
// Callable task | |
Callable<Integer> callableTask = () -> { | |
String threadName = Thread.currentThread().getName(); | |
System.out.println("Callable task executed by thread: " + threadName); | |
// 작업 시뮬레이션을 위해 잠시 대기 후 결과 반환 | |
TimeUnit.SECONDS.sleep(2); | |
return 123; | |
}; | |
// ExecutorService 생성 | |
ExecutorService executorService = Executors.newFixedThreadPool(2); | |
// Runnable task 제출 | |
executorService.submit(runnableTask); | |
// Callable task 제출 및 결과 획득 | |
Future<Integer> future = executorService.submit(callableTask); | |
try { | |
// 작업이 완료될 때까지 기다리고 결과 획득 | |
int result = future.get(); | |
System.out.println("Callable task result: " + result); | |
} catch (InterruptedException | ExecutionException e) { | |
e.printStackTrace(); | |
} | |
// ExecutorService 종료 | |
executorService.shutdown(); | |
} | |
} |

5. Fork-Join Framework
- 자바 7+ 버전부터 지원
- Fork-Join Task는 ForkJoinPool이라는 특별한 실행자 서비스가 실행
- Fork-Join Task의 인스턴스는 작은 하위 Task로 나뉠 수 있으며, ForkJoinPool을 구성하는 쓰레드들이 이 Task들을 처리하며, 일을 먼저 끝낸 쓰레드는 다른 쓰레드의 남은 Task를 가져와 대신 처리 가능
- 모든 쓰레드가 바쁘게 움직여 CPU를 최대한 활용하면서 높은 처리량과 낮은 지연 시간을 달성
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ForkJoinExample { | |
static class SumTask extends RecursiveTask<Long> { | |
private static final int THRESHOLD = 1000; | |
private final long[] array; | |
private final int start; | |
private final int end; | |
SumTask(long[] array, int start, int end) { | |
this.array = array; | |
this.start = start; | |
this.end = end; | |
} | |
@Override | |
protected Long compute() { | |
if (end - start <= THRESHOLD) { | |
long sum = 0; | |
for (int i = start; i < end; i++) { | |
sum += array[i]; | |
} | |
return sum; | |
} else { | |
int mid = (start + end) >>> 1; | |
SumTask leftTask = new SumTask(array, start, mid); | |
SumTask rightTask = new SumTask(array, mid, end); | |
leftTask.fork(); | |
long rightResult = rightTask.compute(); | |
long leftResult = leftTask.join(); | |
return leftResult + rightResult; | |
} | |
} | |
} | |
public static void main(String[] args) { | |
long[] array = new long[1000000000]; | |
for (int i = 0; i < array.length; i++) { | |
array[i] = i + 1; | |
} | |
long start = System.currentTimeMillis(); | |
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); | |
long result = forkJoinPool.invoke(new SumTask(array, 0, array.length)); | |
System.out.println("합: " + result); | |
long end = System.currentTimeMillis(); | |
System.out.println(String.format("ForkJoinPool 실행시간: %d", (end - start))); | |
start = System.currentTimeMillis(); | |
long sum = 0; | |
for (int i = 0; i < array.length; i++) { | |
sum += array[i]; | |
} | |
end = System.currentTimeMillis(); | |
System.out.println("합: " + sum); | |
System.out.println(String.format("단순 반복문 실행시간: %d", (end - start))); | |
} | |
} |

참고하면 좋은 카테고리
https://jaimemin.tistory.com/category/JAVA/RxJava
참고
이펙티브 자바
반응형
'JAVA > Effective Java' 카테고리의 다른 글
[아이템 82] 쓰레드 안전성 수준을 문서화하라 (0) | 2024.04.07 |
---|---|
[아이템 81] wait와 notify보다는 동시성 유틸리티를 애용하라 (0) | 2024.04.07 |
[아이템 79] 과도한 동기화는 피하라 (0) | 2024.04.07 |
[아이템 78] 공유 중인 가변 데이터는 동기화해 사용하라 (0) | 2024.03.29 |
[아이템 77] 예외를 무시하지 말라 (2) | 2024.03.29 |