-
[Java] 현대적인 병렬처리와 비동기: Fork/Join과 CompletableFutureJava 2025. 12. 1. 16:55
1. Java 7: Fork/Join 프레임워크와 데이터 병렬화
Java 5의 ExecutorService는 훌륭했지만, 작업의 크기가 균일하지 않은 경우(예: 어떤 작업은 1초, 어떤 작업은 10초 소요) 코어 간의 부하 불균형(Load Imbalance)이 발생하는 구조적 한계가 있었습니다.
한가한 스레드가 바쁜 스레드를 도와주지 못하고 낭비되는 현상을 해결하기 위해, Java 7은 Fork/Join Framework를 도입했습니다. 이는 단순히 작업을 나누는 것을 넘어, CPU 자원을 쥐어짜듯 효율적으로 사용하는 매커니즘을 제공합니다.
1) 핵심 원리: 분할 정복 (Divide and Conquer)
Fork/Join 프레임워크는 거대한 작업을 더 이상 쪼갤 수 없는 단위(Threshold)까지 잘게 나눈 뒤 병렬 처리하고, 결과를 다시 합치는 방식을 사용합니다.
- Fork (분할): 큰 작업을 재귀적으로 쪼개어 ForkJoinPool의 큐에 넣습니다(Push).
- Compute (수행): 작업의 크기가 충분히 작아지면(임계값 도달), 실제 로직을 수행합니다.
- Join (병합): 각 하위 작업(Sub-task)의 완료를 기다렸다가(Blocking), 그 결과를 취합하여 상위 작업으로 반환합니다.
이 과정은 개발자가 RecursiveTask (반환값 있음) 또는 RecursiveAction (반환값 없음) 클래스를 상속받아 구현합니다.
2) 내부 알고리즘: Work-Stealing
기존 ThreadPoolExecutor는 모든 스레드가 하나의 공용 큐(Global Queue)를 바라보는 구조였습니다. 이로 인해 작업이 많아질수록 큐에 접근하기 위한 락 경합(Lock Contention)이 심해져 성능 저하의 원인이 되었습니다.
Java 7의 ForkJoinPool은 이 문제를 해결하기 위해 각 스레드가 독립된 큐를 가지는 Work-Stealing 알고리즘을 도입했습니다.
① 이중 끝 큐 (Deque)와 소유자 스레드
ForkJoinPool의 각 워커 스레드(Worker Thread)는 자신만의 Deque (Double-Ended Queue)를 하나씩 할당받습니다.
- 동작 방식 (LIFO): 스레드는 자신이 분할(Fork)한 작업을 자신의 Deque Head에 push하고, 다시 머리에서 pop하여 처리합니다.
- 이유 (Cache Locality): 방금 쪼갠 작업은 CPU 캐시(L1/L2)에 데이터가 남아있을 확률이 매우 높습니다. 이를 LIFO(Last-In-First-Out) 방식으로 즉시 가져와 처리함으로써 캐시 적중률(Cache Hit Ratio)을 극대화하고 메모리 접근 비용을 줄입니다. (마치 스택을 쌓듯이 깊이 우선 탐색(DFS) 형태로 동작합니다.)
② Work-Stealing
만약 어떤 스레드(A)가 자신의 Deque를 모두 비워서 할 일이 없어지면, 다른 바쁜 스레드(B)를 찾아가 작업을 훔쳐옵니다.
- 동작 방식 (FIFO): 도둑 스레드(A)는 피해자 스레드(B)의 Deque Tail에 접근하여 작업을 가져옵니다(Steal).
- 이유 1 (경합 최소화): 주인(B)은 Head에서 작업하고, 도둑(A)은 Tail에서 훔쳐가므로 서로 접근하는 위치가 다릅니다. 이를 통해 동기화 비용과 경합을 최소화할 수 있습니다.
- 이유 2 (큰 작업 훔치기): 분할 정복 알고리즘 특성상, 큐의 Tail(오래전에 들어온 작업)에는 아직 쪼개지지 않은 큰 덩어리의 작업(Large Chunk)이 있을 확률이 높습니다. 도둑이 큰 덩어리를 훔쳐오면, 이후에는 그 작업을 쪼개서 자신의 큐에 채우면 되므로, 자주 훔치러 다니는 오버헤드를 줄일 수 있습니다.
③ 요약: 왜 Work-Stealing 인가?
결과적으로 Work-Stealing은 다음과 같은 이점을 제공합니다.
- 부하 균형 (Load Balancing): 바쁜 스레드의 짐을 한가한 스레드가 자동으로 나눠 가짐으로써, 전체 코어의 가동률을 100%에 가깝게 유지합니다.
- 확장성 (Scalability): 공용 큐에 의존하지 않으므로 스레드 수가 늘어나도 락 경합으로 인한 성능 저하가 적습니다.
3) 구현 클래스: RecursiveTask vs RecursiveAction
Fork/Join 프레임워크를 사용하기 위해서는 Runnable이나 Thread 대신, 프레임워크가 제공하는 ForkJoinTask<V>를 상속받아야 합니다. 우리는 주로 이를 상속받은 다음 두 가지 추상 클래스를 사용합니다.
핵심은 compute() 메서드를 오버라이딩하여, "작업을 쪼갤 것인가(Fork), 아니면 지금 처리할 것인가(Base Case)"를 결정하는 로직을 작성하는 것입니다.
① RecursiveTask<V>: 반환값(Result)이 있는 작업
Callable<V>와 유사하게 작업의 완료 후 결과를 반환해야 할 때 사용합니다. (예: 대용량 배열의 합계, 최대/최소값 찾기, 리스트 필터링 결과 반환 등)
import java.util.concurrent.RecursiveTask; // 1억 개의 숫자가 담긴 배열의 합을 구하는 작업 public class SumTask extends RecursiveTask<Long> { private final long[] array; private final int start, end; private static final int THRESHOLD = 10_000; // 분할 기준 (임계값) public SumTask(long[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } @Override protected Long compute() { // 1. 기저 조건(Base Case): 작업이 충분히 작으면 직접 계산 // 분할 오버헤드가 실제 연산보다 커지는 것을 방지하기 위해 임계값을 설정합니다. if (end - start <= THRESHOLD) { long sum = 0; for (int i = start; i < end; i++) { sum += array[i]; } return sum; } // 2. 분할(Divide): 작업을 절반으로 쪼갬 int mid = (start + end) / 2; SumTask leftTask = new SumTask(array, start, mid); SumTask rightTask = new SumTask(array, mid, end); // 3. 실행 및 병합(Conquer & Combine) - [중요 패턴!] // (A) 왼쪽 작업은 큐에 넣음 (비동기) // fork()를 호출하면 해당 작업은 Work Queue에 들어가고, // 다른 유휴 스레드(Thief)가 훔쳐가서 실행할 수 있게 됩니다. leftTask.fork(); // (B) 오른쪽 작업은 현재 스레드에서 바로 실행 (동기) // 두 작업 모두 fork() 하지 않는 이유: // 현재 스레드도 어차피 결과를 기다려야 하므로, // 굳이 새 스레드를 쓰지 않고 직접 처리하여 스레드 생성/관리 비용을 아낍니다. Long rightResult = rightTask.compute(); // (C) 결과 병합 // join()은 fork된 작업이 끝날 때까지 기다립니다(Blocking). // 만약 작업이 아직 시작 안 됐다면, 현재 스레드가 가져와서 처리하기도 합니다. return rightResult + leftTask.join(); } }② RecursiveAction: 반환값(Result)이 없는 작업
Runnable과 유사하게 반환값이 void인 작업입니다. 데이터 자체를 변경(Side-effect)하거나, 결과를 별도의 공유 자료구조에 저장할 때 사용합니다. (예: 배열 정렬(Sort), 이미지 픽셀 변환, 파일 일괄 처리 등)
import java.util.concurrent.RecursiveAction; // 배열의 모든 요소에 10을 더하는 작업 (반환값 없음) public class TransformAction extends RecursiveAction { private final int[] array; private final int start, end; private static final int THRESHOLD = 10_000; public TransformAction(int[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } @Override protected void compute() { // 1. 기저 조건 if (end - start <= THRESHOLD) { for (int i = start; i < end; i++) { array[i] += 10; } } else { // 2. 분할 및 실행 int mid = (start + end) / 2; // invokeAll() 헬퍼 메서드 사용 // 내부적으로 fork()와 join()을 적절히 조합하여 두 작업을 실행하고, // 두 작업이 모두 끝날 때까지 대기합니다. // 코드가 훨씬 간결해지는 장점이 있습니다. invokeAll( new TransformAction(array, start, mid), new TransformAction(array, mid, end) ); } } }③ 실행 방법 (ForkJoinPool)
정의한 Task는 ForkJoinPool 인스턴스에 제출하여 실행합니다.
public static void main(String[] args) { long[] data = new long[1_000_000]; // ... 데이터 초기화 (1~100만 채우기) ... // 1. 스레드 풀 생성 // 인자를 생략하면 기본적으로 CPU 코어 수(Runtime.getRuntime().availableProcessors())와 // 동일한 병렬 레벨을 가집니다. ForkJoinPool pool = new ForkJoinPool(); // 2. 작업 생성 SumTask task = new SumTask(data, 0, data.length); // 3. 작업 제출 및 결과 대기 (Blocking) // invoke()는 작업을 풀에 넣고, 최종 결과가 나올 때까지 메인 스레드를 대기시킵니다. Long result = pool.invoke(task); System.out.println("총합: " + result); // 참고: Java 8부터는 굳이 pool을 직접 만들지 않고 // ForkJoinPool.commonPool().invoke(task); 를 사용할 수도 있습니다. }
2. Java 8: CompletableFuture
Java 8에서는 Future의 한계(Blocking, 조합 불가)를 극복하기 위해 CompletableFuture 클래스가 도입되었습니다. 이는 Future와 CompletionStage 인터페이스를 모두 구현하여, 비동기 작업을 함수형 스타일(Functional Style)로 정의하고 파이프라인처럼 조립할 수 있게 해줍니다.
1) 비동기 파이프라인 구축 (Chaining)
CompletableFuture는 작업의 결과를 기다리지 않고(Non-blocking), "작업이 성공하면(then), 이어서 ~를 하라"는 흐름을 선언적으로 정의합니다.
① 비동기 작업 시작 (supplyAsync)
Supplier 함수형 인터페이스를 인자로 받아 비동기 작업을 시작합니다. 별도의 Executor를 지정하지 않으면 기본적으로 글로벌 스레드 풀인 ForkJoinPool.commonPool()을 사용합니다.
// 1. 반환값이 있는 비동기 작업 시작 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "User-1234"; // (예: DB에서 유저 ID 조회) });② 작업의 연결 (thenApply, thenAccept)
앞선 작업의 결과값을 받아 다음 작업을 수행합니다. 직관적인 체이닝이 가능합니다.
- thenApply (Map): 앞선 결과값을 받아서 변환(Processing)하여 반환합니다. (Stream의 .map()과 유사)
- thenAccept (Consumer): 앞선 결과값을 받아서 소비(Consume)하고 끝냅니다. 반환값이 없습니다. (로그 출력, DB 저장 등)
CompletableFuture.supplyAsync(() -> { System.out.println("1. 유저 정보 조회 중... " + Thread.currentThread().getName()); return "HongGilDong"; }) .thenApply(name -> { System.out.println("2. 데이터 가공 중..."); return "USER_NAME: " + name; // String -> String 변환 }) .thenAccept(result -> { System.out.println("3. 최종 처리: " + result); // 결과 소비 }); // 메인 스레드는 블로킹되지 않고 즉시 다음 코드로 넘어감2) 작업의 결합: 순차 vs 병렬
실무에서 가장 유용한 기능입니다. 두 개의 비동기 작업을 어떻게 연결할지에 따라 메서드가 나뉩니다.
① thenCompose: 의존적인 작업 연결 (FlatMap)
두 작업이 순서대로(Sequential) 실행되어야 할 때 사용합니다.
- 상황: "유저 정보를 가져온 후, 그 유저의 ID를 가지고 주문 내역을 조회해야 한다." (앞 작업의 결과가 뒤 작업의 인풋이 됨)
public CompletableFuture<String> getUserEmail(int userId) { return CompletableFuture.supplyAsync(() -> "user@test.com"); } public CompletableFuture<String> getReport(String email) { return CompletableFuture.supplyAsync(() -> "Report Data for " + email); } // [사용 예시] // thenApply를 쓰면 Future<Future<String>> 중첩 타입이 되어버림. // thenCompose를 써야 Future<String>으로 평탄화(Flattening)됨. CompletableFuture<String> result = getUserEmail(1) .thenCompose(email -> getReport(email)); // 1번 실행 후 -> 2번 실행② thenCombine: 독립적인 작업의 병렬 실행 및 병합
두 작업이 서로 상관없이 독립적(Independent)일 때 사용합니다. 동시에 실행시키고 둘 다 끝나면 결과를 합칩니다.
- 상황: "상품 정보 조회"와 "현재 환율 조회"를 동시에 실행한 뒤, 최종 가격을 계산한다.
CompletableFuture<Integer> priceTask = CompletableFuture.supplyAsync(() -> { System.out.println("상품 가격 조회 중..."); return 1000; }); CompletableFuture<Double> rateTask = CompletableFuture.supplyAsync(() -> { System.out.println("환율 조회 중..."); return 1300.0; }); // 두 스레드가 동시에 돌고, 둘 다 완료되면 람다식이 실행됨 priceTask.thenCombine(rateTask, (price, rate) -> { return price * rate; // 결과 조합 }) .thenAccept(finalPrice -> System.out.println("최종 가격: " + finalPrice));3) 예외 처리 (Exception Handling)
try-catch 블록 없이, 비동기 파이프라인 안에서 예외를 처리하고 복구할 수 있습니다.
- exceptionally: 예외 발생 시 대체 값(Fallback)을 리턴하여 파이프라인을 유지합니다.
CompletableFuture.supplyAsync(() -> { if (true) throw new RuntimeException("API 서버 다운!"); return "정상 데이터"; }) .exceptionally(ex -> { System.err.println("에러 발생: " + ex.getMessage()); return "기본 캐시 데이터"; // 에러 시 대체 데이터 반환 }) .thenAccept(data -> System.out.println("결과: " + data)); // "기본 캐시 데이터" 출력'Java' 카테고리의 다른 글
[Java] 절차 지향적인 자바 코드를 객체 지향적이게 바꾸기 (0) 2025.12.11 [Java] 절차지향적인 자바 코드 (0) 2025.12.10 [Java] 모던 자바 스레딩의 시작: ExecutorService와 Future (0) 2025.12.01 [Java] 멀티스레드의 위험과 동시성 제어 (1) 2025.12.01 [Java] Thread란? (0) 2025.12.01