JAVA/Effective Java

[아이템 80] 쓰레드보다는 실행자, 태스크, 스트림을 애용하라

꾸준함. 2024. 4. 7. 15:09

실행자 프레임워크(Executor Framework)

java.util.concurrent 패키지는 실행자 프레임워크(Executor Framework)라고 하는 인터페이스 기반의 유연한 태스크 실행 기능을 담고 있습니다.

과거에는 단순한 작업 큐(work queue)를 만들기 위해 수많은 코드를 작성했어야 했지만, 이제는 다음과 같이 간단하게 작업 큐를 생성할 수 있습니다.

 

public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
exec.execute(() -> System.out.println("task")); // 태스크 실행
exec.shutdown(); // 실행자 종료
}
view raw .java hosted with ❤ by GitHub

 

1. 실행자 서비스(ExecutorService) 주요 기능

 

public class Example {
public static void main(String[] args) throws InterruptedException {
// 1. newSingleThreadExecutor
ExecutorService executor = Executors.newSingleThreadExecutor();
// 2. execute
executor.execute(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello from " + threadName);
});
try {
// 4. get (Using Future)
Future<String> futureResult = executor.submit(() -> {
TimeUnit.SECONDS.sleep(2);
return "Result of Task";
});
String result = futureResult.get();
System.out.println("Task result: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
if (!executor.isTerminated()) {
System.err.println("Cancel non-finished tasks");
}
executor.shutdownNow();
System.out.println("Shutdown finished");
}
// 3. shutdown
executor.shutdown();
// 5. invokeAny, invokeAll
ExecutorService executor2 = Executors.newFixedThreadPool(2);
try {
Callable<String> task1 = () -> {
TimeUnit.SECONDS.sleep(2);
return "Result of Task1";
};
Callable<String> task2 = () -> {
TimeUnit.SECONDS.sleep(1);
return "Result of Task2";
};
String result = executor2.invokeAny(List.of(task1, task2));
System.out.println("Result of invokeAny: " + result);
// invokeAll
executor2.invokeAll(List.of(task1, task2))
.stream()
.map(future -> {
try {
return future.get();
} catch (Exception e) {
throw new IllegalStateException(e);
}
})
.forEach(System.out::println);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor2.shutdown();
}
// 6. awaitTermination
ExecutorService executor3 = Executors.newSingleThreadExecutor();
try {
executor3.awaitTermination(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (!executor3.isTerminated()) {
executor3.shutdownNow();
}
}
// 7. ExecutorCompletionService
ExecutorService executor4 = Executors.newFixedThreadPool(3);
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor4);
for (int i = 1; i <= 5; i++) {
int task = i;
completionService.submit(() -> task);
}
try {
for (int i = 0; i < 5; i++) {
System.out.println(completionService.take().get());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor4.shutdown();
}
// 8. ScheduledThreadPoolExecutor
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
scheduledExecutor.schedule(() -> {
System.out.println("Scheduled Task executed");
}, 3, TimeUnit.SECONDS);
scheduledExecutor.shutdown();
}
}
view raw .java hosted with ❤ by GitHub

 

 

코드 부연 설명

  • get 메서드: 특정 태스크가 완료되기를 기다림
  • invokeAny 메서드: 태스크 모음 중 아무것 하나가 완료되기를 기다림
  • invokeAll 메서드: 모든 태스크가 완료되기를 기다림
  • awaitTermination 메서드: 실행자 서비스가 종료하기를 기다림
  • ExecutorCompletionService: 완료된 태스크들의 결과를 차례로 받음
  • ScheduledThreadPoolExecutor: 태스크를 특정 시간에 혹은 주기적으로 실행

 

2. java.util.concurrent.Executors

  • 큐를 둘 이상의 쓰레드가 처리하게 하고 싶을 경우 간단히 다른 정적 팩토리를 이용하여 다른 종류의 실행자 서비스(ThreadPool)를 생성
  • 쓰레드 풀의 쓰레드 개수는 고정할 수도 있고, 유동적으로 상황에 맞게 설정되게 할 수도 있음
  • 실행자 대부분은 java.util.concurrent.Executors의 정적 팩토리들을 이용해 생성할 수 있음
    • 위 클래스로 쓰레드 풀 동작을 결정하는 거의 모든 속성을 설정 가능

 

public static void main(String[] args) {
// 쓰레드 풀의 크기를 고정하고 두 개의 쓰레드로 이루어진 쓰레드 풀 생성
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 작업을 제출
executorService.submit(() -> {
try {
System.out.println("Task 1 started by thread: " + Thread.currentThread().getName());
// 작업 시뮬레이션을 위해 잠시 대기
Thread.sleep(2000);
System.out.println("Task 1 completed by thread: " + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.submit(() -> {
try {
System.out.println("Task 2 started by thread: " + Thread.currentThread().getName());
// 작업 시뮬레이션을 위해 잠시 대기
Thread.sleep(1000);
System.out.println("Task 2 completed by thread: " + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 모든 작업이 완료될 때까지 대기
executorService.shutdown();
try {
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
view raw .java hosted with ❤ by GitHub

 

 

3. Executors.newCachedThreadPool

  • 특별히 설정할 것이 없고 일반적인 용도에 적합
  • CachedThreadPool에서는 요청받은 태스크들이 큐에 쌓이지 않고 즉시 쓰레드에 위임되어 실행
    • 가용한 쓰레드가 없다면 새로 하나를 생성

 

  • 무거운 상용 서버에는 좋지 않음
    • "무겁운 상용 서버"는 주로 시스템 자원을 많이 사용하거나 처리량이 많은 작업을 수행하는 서버
    • 서버가 아주 무겁다면 CPU 이용률이 100%에 치닫고, 새로운 태스크가 도착하는 족족 또 다른 쓰레드들을 생성하여 상황을 악화시킬 것
    • 따라서 무거운 상용 서버에서는 쓰레드 개수를 고정한 Executors.newFixedThreadPool을 선택하거나 완전히 통제할 수 있는 ThreadPoolExecutor를 직접 사용하는 것을 권장

 

public class CachedThreadPoolExample {
public static void main(String[] args) {
// CachedThreadPool 생성
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
int taskNumber = i + 1;
executorService.submit(() -> {
System.out.println("Task " + taskNumber + " started by thread: " + Thread.currentThread().getName());
// 작업 시뮬레이션을 위해 잠시 대기
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskNumber + " completed by thread: " + Thread.currentThread().getName());
});
}
// 모든 작업이 완료될 때까지 대기
executorService.shutdown();
try {
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
view raw .java hosted with ❤ by GitHub

 

4. Task

  • 쓰레드를 직접 다루면 Thread가 작업 단위와 실행 메커니즘 역할을 모두 수행
  • 실행자 프레임워크에서는 작업 단위와 실행 매커니즘을 분리
  • Task는 작업 단위를 나타내는 핵심 추상 개념이며 두 가지 분류로 구분 가능
    • Runnable
    • Callable

 

  • Callable은 Runnable과 유사한 개념이지만 값을 반환하고 임의의 예외를 던질 수 있음
  • Task 수행을 ExecutorService에 맡기면 원하는 태스크 수행 정책을 선택할 수 있고, 생각이 바뀌면 언제든 변경할 수 있음
  • 여기서 핵심은 실행자 프레임워크가 작업 수행을 담당해준다는 것

 

public class TaskExample {
public static void main(String[] args) {
// Runnable task
Runnable runnableTask = () -> {
String threadName = Thread.currentThread().getName();
System.out.println("Runnable task executed by thread: " + threadName);
};
// Callable task
Callable<Integer> callableTask = () -> {
String threadName = Thread.currentThread().getName();
System.out.println("Callable task executed by thread: " + threadName);
// 작업 시뮬레이션을 위해 잠시 대기 후 결과 반환
TimeUnit.SECONDS.sleep(2);
return 123;
};
// ExecutorService 생성
ExecutorService executorService = Executors.newFixedThreadPool(2);
// Runnable task 제출
executorService.submit(runnableTask);
// Callable task 제출 및 결과 획득
Future<Integer> future = executorService.submit(callableTask);
try {
// 작업이 완료될 때까지 기다리고 결과 획득
int result = future.get();
System.out.println("Callable task result: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// ExecutorService 종료
executorService.shutdown();
}
}
view raw .java hosted with ❤ by GitHub

 

5. Fork-Join Framework

  • 자바 7+ 버전부터 지원
  • Fork-Join Task는 ForkJoinPool이라는 특별한 실행자 서비스가 실행
  • Fork-Join Task의 인스턴스는 작은 하위 Task로 나뉠 수 있으며, ForkJoinPool을 구성하는 쓰레드들이 이 Task들을 처리하며, 일을 먼저 끝낸 쓰레드는 다른 쓰레드의 남은 Task를 가져와 대신 처리 가능
    • 모든 쓰레드가 바쁘게 움직여 CPU를 최대한 활용하면서 높은 처리량과 낮은 지연 시간을 달성

 

public class ForkJoinExample {
static class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 1000;
private final long[] array;
private final int start;
private final int end;
SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
int mid = (start + end) >>> 1;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
leftTask.fork();
long rightResult = rightTask.compute();
long leftResult = leftTask.join();
return leftResult + rightResult;
}
}
}
public static void main(String[] args) {
long[] array = new long[1000000000];
for (int i = 0; i < array.length; i++) {
array[i] = i + 1;
}
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
long result = forkJoinPool.invoke(new SumTask(array, 0, array.length));
System.out.println("합: " + result);
long end = System.currentTimeMillis();
System.out.println(String.format("ForkJoinPool 실행시간: %d", (end - start)));
start = System.currentTimeMillis();
long sum = 0;
for (int i = 0; i < array.length; i++) {
sum += array[i];
}
end = System.currentTimeMillis();
System.out.println("합: " + sum);
System.out.println(String.format("단순 반복문 실행시간: %d", (end - start)));
}
}
view raw .java hosted with ❤ by GitHub

 

참고하면 좋은 카테고리

 

https://jaimemin.tistory.com/category/JAVA/RxJava

 

 

참고

이펙티브 자바

반응형