이 글에서는 Java 멀티스레딩 API의 역사를 따라가며, 각 기술이 왜 등장했고 어떻게 사용하는지 알아보겠습니다.
1. Runnable (Java 1.0, 1996)
등장 배경
1990년대 중반, 컴퓨터 프로그래밍은 대부분 순차적(sequential) 실행 모델을 따랐습니다. 프로그램은 한 번에 한 가지 일만 수행했고, 이는 다음과 같은 심각한 문제를 야기했습니다:
문제 1: 사용자 인터페이스 프리징 GUI 애플리케이션에서 파일을 다운로드하거나 데이터베이스에서 데이터를 가져오는 동안, 전체 UI가 멈춰버렸습니다. 사용자는 작업이 완료될 때까지 아무것도 할 수 없었습니다. 이는 현대적인 사용자 경험과는 거리가 멀었습니다.
문제 2: CPU 자원의 낭비 프로그램이 I/O 작업(파일 읽기, 네트워크 통신 등)을 수행하는 동안 CPU는 놀고 있었습니다. 디스크에서 데이터를 읽어오는 동안 CPU가 수백만 사이클을 낭비하는 것은 엄청난 비효율이었습니다.
문제 3: 서버 확장성의 한계 단일 스레드로 동작하는 서버는 한 번에 하나의 클라이언트만 처리할 수 있었습니다. 두 번째 클라이언트는 첫 번째 클라이언트의 요청이 완전히 끝날 때까지 기다려야 했습니다. 이는 실용적인 서버 애플리케이션 개발을 사실상 불가능하게 만들었습니다.
문제 4: 멀티코어 CPU의 등장 1990년대 중반부터 멀티프로세서 시스템이 등장하기 시작했습니다. 하지만 순차적 프로그램은 여러 CPU 코어 중 하나만 사용할 수 있었습니다. 나머지 코어는 유휴 상태로 낭비되었습니다.
해결책: Runnable 인터페이스의 도입
Java는 이러한 문제를 해결하기 위해 멀티스레딩을 언어 차원에서 지원하기로 결정했습니다. Runnable 인터페이스는 다음과 같은 혁신을 가져왔습니다:
- 작업과 실행의 분리: 실행할 코드(Runnable)와 그것을 실행하는 메커니즘(Thread)을 분리했습니다.
- 동시 실행 가능: 여러 작업을 동시에 실행하여 UI 응답성을 유지하고 I/O 대기 시간을 활용할 수 있게 되었습니다.
- 이식성: OS별로 다른 스레드 구현을 Java가 추상화하여, 개발자는 플랫폼에 관계없이 동일한 코드를 작성할 수 있었습니다.
핵심 개념
Runnable은 단 하나의 메서드만 가진 매우 단순한 인터페이스이지만, 그 의미는 혁명적이었습니다.
설계 철학:
- 작업의 추상화: Runnable은 "실행 가능한 무언가"를 나타냅니다. 파일 다운로드, 계산 작업, 네트워크 요청 등 어떤 작업이든 Runnable로 표현할 수 있습니다.
- 실행 방법의 독립성: 누가, 언제, 어떻게 실행할지는 Runnable 자체가 결정하지 않습니다. 이는 나중에 등장하는 Executor 프레임워크의 기반이 되었습니다.
- 함수형 프로그래밍의 시초: Java 8의 람다가 등장하기 18년 전에, Runnable은 이미 "코드를 값처럼 전달"하는 개념을 구현했습니다.
왜 이 설계가 중요한가:
- 재사용성: 동일한 Runnable 인스턴스를 여러 Thread에서 실행할 수 있습니다.
- 테스트 가능성: Runnable의 로직을 스레드 없이 독립적으로 테스트할 수 있습니다.
- 확장성의 기반: 이 단순한 인터페이스가 나중에 등장하는 모든 고급 동시성 API의 기초가 되었습니다.
주요 메서드
public interface Runnable {
void run();
}
사용법
// 방법 1: 익명 클래스를 사용한 Runnable 구현
Runnable task1 = new Runnable() {
@Override
public void run() {
// run() 메서드 내부의 코드가 별도의 스레드에서 실행됨
// 현재 실행 중인 스레드의 이름을 출력
System.out.println("Task 1 실행 중: " + Thread.currentThread().getName());
}
};
// 방법 2: 람다 표현식 사용 (Java 8+)
// Runnable은 단 하나의 추상 메서드만 가지므로 함수형 인터페이스
// 따라서 람다로 간결하게 표현 가능
Runnable task2 = () -> {
System.out.println("Task 2 실행 중: " + Thread.currentThread().getName());
};
// Thread 객체 생성 및 Runnable 전달
// Thread는 실제로 OS 스레드를 관리하는 래퍼 클래스
Thread thread1 = new Thread(task1);
thread1.start(); // start()를 호출해야 새 스레드가 생성되고 run()이 실행됨
// run()을 직접 호출하면 현재 스레드에서 실행되므로 의미 없음
// 두 번째 스레드 생성 및 시작
Thread thread2 = new Thread(task2);
thread2.start(); // 각 스레드는 독립적으로 실행됨
// 실전 예제: 백그라운드에서 파일 다운로드
Runnable downloadTask = () -> {
System.out.println("다운로드 시작...");
try {
// 다운로드 작업 시뮬레이션 (실제로는 네트워크 I/O 수행)
Thread.sleep(3000); // 3초 대기
System.out.println("다운로드 완료!");
} catch (InterruptedException e) {
// 다른 스레드가 이 스레드를 중단시킬 수 있음
// 이 경우 InterruptedException이 발생
System.out.println("다운로드가 중단되었습니다.");
// interrupted 플래그를 다시 설정하여 호출 스택 상위에 전파
Thread.currentThread().interrupt();
}
};
// 다운로드를 백그라운드 스레드에서 실행
Thread downloadThread = new Thread(downloadTask);
downloadThread.start();
// 메인 스레드는 계속 다른 작업 수행 가능
System.out.println("메인 스레드는 다른 작업을 계속합니다...");
한계점과 문제점
Runnable과 Thread를 직접 사용하는 방식은 여러 심각한 문제를 가지고 있었습니다:
1. 반환값이 없음 - 결과를 받을 방법이 없다
// 문제: 계산 결과를 어떻게 받아올까?
Runnable calculation = () -> {
int result = 1 + 1; // 2를 계산했지만...
// result를 어떻게 반환? run()은 void 반환!
// 해결책: 공유 변수 사용 (하지만 이는 동기화 문제 발생)
};
run() 메서드는 void를 반환하므로, 작업의 결과를 받으려면 공유 변수를 사용해야 했습니다. 이는 스레드 안전성 문제를 야기했습니다.
2. 예외 처리의 어려움 - Checked Exception을 던질 수 없다
// 컴파일 에러!
Runnable badTask = () -> {
// run() 메서드는 throws 절이 없으므로 checked exception을 던질 수 없음
throw new IOException("파일을 찾을 수 없습니다"); // 컴파일 에러!
};
// 어쩔 수 없이 try-catch로 감싸야 함
Runnable workaround = () -> {
try {
// 파일 작업
throw new IOException("파일을 찾을 수 없습니다");
} catch (IOException e) {
// 예외를 어떻게 처리? 로깅만 하고 삼켜버림?
e.printStackTrace(); // 예외 정보가 사라짐!
}
};
이는 에러 처리를 매우 어렵게 만들었고, 많은 예외가 조용히 삼켜져 버그를 찾기 어려웠습니다.
3. 스레드 관리의 어려움 - 직접 생성과 관리의 복잡성
// 100개의 작업을 처리하려면?
for (int i = 0; i < 100; i++) {
Thread thread = new Thread(() -> {
// 작업 수행
});
thread.start();
}
// 문제: 100개의 스레드가 동시에 생성됨!
// - 각 스레드는 약 1-2MB의 메모리 사용
// - 컨텍스트 스위칭 오버헤드
// - 완료 대기를 어떻게 관리?
스레드 생성은 비싼 작업입니다. 각 스레드는 OS 커널 스레드와 1:1로 매핑되며, 스택 메모리를 할당받고, 컨텍스트 스위칭 비용이 발생합니다.
4. 자원 낭비 - 무한정 스레드 생성
// 웹 서버 예제 (안티패턴!)
ServerSocket server = new ServerSocket(8080);
while (true) {
Socket client = server.accept();
// 클라이언트 하나당 스레드 하나 생성
new Thread(() -> {
handleClient(client); // 요청 처리
}).start();
}
// 문제: 동시 접속자가 10,000명이면 10,000개 스레드 생성
// → OutOfMemoryError 발생!
5. 스레드 생명주기 관리의 부재
Thread worker = new Thread(() -> {
while (true) {
// 무한 루프
doWork();
}
});
worker.start();
// 문제: 이 스레드를 어떻게 종료시킬까?
// worker.stop()은 deprecated (안전하지 않음)
// 적절한 종료 메커니즘이 없음
6. 작업 대기 및 결과 수집의 어려움
// 여러 작업을 시작하고 모두 완료될 때까지 기다리기
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Thread t = new Thread(() -> { /* 작업 */ });
t.start();
threads.add(t);
}
// 모든 스레드가 끝날 때까지 대기
for (Thread t : threads) {
try {
t.join(); // 이 스레드가 끝날 때까지 대기
} catch (InterruptedException e) {
// 인터럽트 처리...
}
}
// 코드가 복잡하고 에러가 발생하기 쉬움
이러한 문제들이 Java 5의 Executor 프레임워크 등장으로 이어졌습니다.
2. Executor 인터페이스 (Java 5, 2004)
등장 배경
Java 1.0에서 1.4까지 약 8년간, 개발자들은 Thread를 직접 생성하고 관리하면서 많은 고통을 겪었습니다. 실전 프로젝트에서 다음과 같은 패턴이 반복적으로 나타났습니다:
문제 상황 1: 중복되는 스레드 관리 코드
// 프로젝트 A의 코드
public void processRequests(List<Request> requests) {
for (Request req : requests) {
new Thread(() -> handle(req)).start();
}
}
// 프로젝트 B의 코드
public void executeTasks(List<Task> tasks) {
for (Task task : tasks) {
new Thread(() -> task.run()).start();
}
}
// 프로젝트 C의 코드
public void runJobs(List<Job> jobs) {
for (Job job : jobs) {
new Thread(() -> job.execute()).start();
}
}
모든 코드가 본질적으로 같은 패턴입니다: "작업을 받아서 새 스레드로 실행". 하지만 매번 new Thread()를 호출하고 start()를 호출하는 보일러플레이트 코드가 반복되었습니다.
문제 상황 2: 작업 제출과 실행 방법의 강한 결합
// 개발 초기: 각 작업을 새 스레드로 실행
public void submitTask(Runnable task) {
new Thread(task).start();
}
// 문제 발견: 너무 많은 스레드 생성으로 시스템 과부하
// 해결책: 스레드 풀로 변경하고 싶지만...
// submitTask를 호출하는 수백 곳의 코드를 모두 수정해야 함!
작업을 "어떻게" 실행할지가 작업 제출 코드에 하드코딩되어 있어서, 실행 전략을 변경하려면 코드를 대대적으로 수정해야 했습니다.
문제 상황 3: 테스트의 어려움
public class DataProcessor {
public void process(Data data) {
// 데이터 처리를 별도 스레드에서 수행
new Thread(() -> {
// 복잡한 처리 로직
transform(data);
validate(data);
save(data);
}).start();
}
}
// 단위 테스트를 어떻게 작성할까?
@Test
public void testProcess() {
DataProcessor processor = new DataProcessor();
processor.process(testData);
// 문제: 스레드가 언제 끝나는지 알 수 없음!
// Thread.sleep()으로 기다려야 하나? (나쁜 방법)
}
비동기 작업을 테스트하기가 매우 어려웠고, 테스트가 불안정했습니다.
근본 원인 분석:
모든 문제의 근본 원인은 관심사의 분리(Separation of Concerns) 실패였습니다:
- 무엇을 실행할까? (작업의 정의) ← Runnable이 담당
- 어떻게 실행할까? (실행 전략) ← 매번 new Thread().start()로 하드코딩됨
Java 5의 해결책: Executor 패턴
Doug Lea가 이끄는 JSR-166 전문가 그룹은 이 문제를 해결하기 위해 명령 패턴(Command Pattern) 과 전략 패턴(Strategy Pattern) 을 결합한 Executor 인터페이스를 설계했습니다:
public interface Executor {
void execute(Runnable command);
}
이 단순해 보이는 인터페이스가 가져온 변화:
- 작업 제출과 실행의 분리: 작업을 제출하는 코드는 그 작업이 어떻게 실행될지 몰라도 됩니다.
- 실행 전략의 교체 가능성: 동일한 코드로 다양한 실행 방식을 사용할 수 있습니다.
- 테스트 용이성: 테스트에서는 동기식 Executor를 주입하여 제어 가능합니다.
핵심 개념
Executor는 겨우 한 줄짜리 인터페이스지만, 그 설계 철학과 의의는 깊습니다.
설계 철학 1: 최소 인터페이스의 힘
// 이것이 전부입니다
public interface Executor {
void execute(Runnable command);
}
왜 이렇게 단순할까요?
- 명확한 책임: "작업을 실행하라"는 단 하나의 책임만 가집니다.
- 유연성: 구현 방법을 전혀 제약하지 않아, 무한한 가능성을 열어둡니다.
- 조합 가능성: 단순한 인터페이스는 다양한 방식으로 조합하고 확장할 수 있습니다.
설계 철학 2: 다양한 구현 전략
동일한 Executor 인터페이스로 완전히 다른 실행 전략을 구현할 수 있습니다:
// 전략 1: 동기 실행 (호출자의 스레드에서 직접 실행)
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run(); // 새 스레드 없음, 즉시 실행
}
}
// 전략 2: 매번 새 스레드 생성 (기존 방식)
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
// 전략 3: 단일 백그라운드 스레드에서 순차 실행
class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<>();
Runnable active;
public synchronized void execute(final Runnable r) {
tasks.offer(() -> {
try {
r.run();
} finally {
scheduleNext();
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
new Thread(active).start();
}
}
}
이것이 중요한 이유:
- 클라이언트 코드는 변경 없이 실행 전략만 교체 가능
- 개발 환경에서는 DirectExecutor, 프로덕션에서는 스레드 풀 사용 가능
- 테스트에서는 동기 실행으로 제어 가능
설계 철학 3: 의존성 역전 (Dependency Inversion)
// Before: 구체적인 Thread 클래스에 의존
public class Service {
public void handleRequest(Request req) {
new Thread(() -> process(req)).start(); // Thread에 직접 의존!
}
}
// After: Executor 추상화에 의존
public class Service {
private final Executor executor;
public Service(Executor executor) {
this.executor = executor; // 추상화에 의존
}
public void handleRequest(Request req) {
executor.execute(() -> process(req)); // 구현 세부사항을 모름
}
}
이제 Service 클래스는:
- 스레드 생성 방법을 몰라도 됩니다
- 다양한 실행 전략을 주입받을 수 있습니다
- 단위 테스트가 쉬워집니다
실전에서의 의의:
- 점진적 마이그레이션: 기존 new Thread().start() 코드를 한 번에 하나씩 executor.execute()로 교체 가능
- 성능 튜닝 용이: 코드 변경 없이 설정만으로 실행 전략 변경
- 표준화: 모든 비동기 작업이 동일한 인터페이스를 통해 제출됨
왜 ExecutorService가 아닌 Executor를 먼저 만들었나?
Executor는 의도적으로 최소한의 기능만 제공합니다. 이는:
- 구현이 쉽고
- 이해하기 쉬우며
- 확장 가능한 기반을 제공합니다
더 풍부한 기능(shutdown, submit 등)은 ExecutorService로 확장되었습니다.
주요 메서드
public interface Executor {
void execute(Runnable command);
}
사용법과 실전 적용
// 간단한 Executor 구현 예제
// 이 구현은 교육 목적이며, 실전에서는 ExecutorService를 사용해야 함
Executor executor = new Executor() {
@Override
public void execute(Runnable command) {
// 매번 새로운 스레드를 생성하여 작업 실행
// 주의: 이는 리소스 낭비가 심한 방식 (실전에서 사용 금지)
new Thread(command).start();
}
};
// 작업 제출
// 중요: execute() 호출자는 작업이 어떻게 실행되는지 알 필요 없음
executor.execute(() -> {
// 이 코드가 별도 스레드에서 실행되는지,
// 현재 스레드에서 실행되는지,
// 스레드 풀에서 실행되는지는
// executor 구현체에 따라 결정됨
System.out.println("작업 실행: " + Thread.currentThread().getName());
});
// 실전 예제: 의존성 주입을 통한 유연한 설계
public class TaskManager {
private final Executor executor;
// 생성자 주입: 실행 전략을 외부에서 주입받음
public TaskManager(Executor executor) {
this.executor = executor;
}
public void processTask(Task task) {
// 작업 제출: 실행 방법은 주입된 executor에 위임
executor.execute(() -> {
System.out.println("작업 처리 시작: " + task.getId());
task.process(); // 실제 작업 수행
System.out.println("작업 처리 완료: " + task.getId());
});
}
}
// 프로덕션 환경: 스레드 풀 사용
Executor productionExecutor = Executors.newFixedThreadPool(10);
TaskManager productionManager = new TaskManager(productionExecutor);
// 테스트 환경: 동기 실행으로 테스트 제어
Executor testExecutor = new Executor() {
@Override
public void execute(Runnable command) {
command.run(); // 새 스레드 없이 즉시 실행 (동기)
}
};
TaskManager testManager = new TaskManager(testExecutor);
// 개발 환경: 디버깅을 위한 로깅 추가
Executor debugExecutor = new Executor() {
private final Executor delegate = Executors.newCachedThreadPool();
@Override
public void execute(Runnable command) {
System.out.println("[DEBUG] 작업 제출됨");
delegate.execute(() -> {
System.out.println("[DEBUG] 작업 실행 시작");
try {
command.run();
} finally {
System.out.println("[DEBUG] 작업 실행 완료");
}
});
}
};
TaskManager debugManager = new TaskManager(debugExecutor);
의미
Executor는 매우 간단한 인터페이스지만, 작업 제출자가 스레드 관리 세부사항을 알 필요가 없게 만들었습니다.
3. ExecutorService (Java 5, 2004)
등장 배경
Executor 인터페이스는 작업 제출과 실행을 분리하는 혁신을 가져왔지만, 실전 프로젝트에서는 금방 한계에 부딪혔습니다:
문제 상황 1: 종료 관리의 부재
Executor executor = Executors.newCachedThreadPool();
// 수많은 작업 제출
for (int i = 0; i < 10000; i++) {
executor.execute(() -> doWork());
}
// 문제: 프로그램을 종료하려면?
// - executor.shutdown() 같은 메서드가 없음
// - 스레드가 계속 살아있어서 JVM이 종료되지 않음
// - 강제 종료하면 실행 중인 작업이 손실될 수 있음
// 해결책이 없어서 결국 System.exit()를 호출해야 함 (나쁜 방법!)
System.exit(0);
애플리케이션을 우아하게 종료할 방법이 없었습니다. 특히 서버 애플리케이션에서는 치명적이었습니다:
- 배포를 위해 서버를 재시작할 때
- 처리 중인 요청을 완료하고 종료하고 싶을 때
- 더 이상 새 작업을 받지 않으면서 기존 작업은 완료하고 싶을 때
문제 상황 2: 작업 결과를 받을 방법이 없음
Executor executor = getExecutor();
// 데이터베이스에서 사용자 정보를 비동기로 조회하고 싶음
executor.execute(() -> {
User user = database.findUser(userId);
// 문제: user 객체를 어떻게 반환?
// execute()는 void를 반환하고, Runnable.run()도 void
});
// 어쩔 수 없이 공유 변수 사용 (동기화 문제 발생)
final User[] result = new User[1]; // 배열로 감싸야 람다에서 접근 가능
executor.execute(() -> {
result[0] = database.findUser(userId);
});
// 결과를 언제 사용할 수 있을까?
Thread.sleep(1000); // 나쁜 방법: 추측으로 대기
User user = result[0]; // null일 수도 있음!
비동기 작업의 결과를 받는 것이 매우 번거롭고 에러가 발생하기 쉬웠습니다.
문제 상황 3: 작업 완료 대기의 어려움
Executor executor = getExecutor();
List<Runnable> tasks = createTasks();
// 모든 작업을 제출
for (Runnable task : tasks) {
executor.execute(task);
}
// 문제: 모든 작업이 완료되었는지 어떻게 알 수 있나?
// - 각 작업의 완료 상태를 추적할 방법이 없음
// - CountDownLatch 같은 별도 동기화 메커니즘 필요
// - 코드가 복잡해지고 에러가 발생하기 쉬움
문제 상황 4: 예외 처리의 불확실성
executor.execute(() -> {
if (someCondition) {
throw new RuntimeException("작업 실패!");
}
// 정상 처리
});
// 문제: 작업이 예외를 던졌는지 어떻게 알 수 있나?
// - 예외가 조용히 삼켜짐
// - 작업이 실패했는지, 성공했는지 알 수 없음
문제 상황 5: 작업 취소 불가능
executor.execute(() -> {
for (int i = 0; i < 1_000_000; i++) {
// 오래 걸리는 작업
processItem(i);
}
});
// 문제: 사용자가 "취소" 버튼을 눌렀을 때
// 실행 중인 작업을 중단할 방법이 없음!
근본 원인 분석:
Executor는 일방향 통신(fire-and-forget) 만 지원했습니다:
- 작업을 제출할 수는 있지만
- 작업의 상태를 확인할 수 없고
- 작업의 결과를 받을 수 없으며
- 작업을 제어할 수 없었습니다
Java 5의 해결책: ExecutorService
ExecutorService는 Executor를 확장하여 다음 기능들을 추가했습니다:
- 라이프사이클 관리: shutdown(), shutdownNow(), awaitTermination()
- 작업 추적: submit()이 Future를 반환하여 작업 상태 추적 가능
- 일괄 처리: invokeAll(), invokeAny()로 여러 작업을 한 번에 처리
- 예외 처리: Future.get()을 통해 작업에서 발생한 예외 확인 가능
핵심 개념
ExecutorService는 단순히 메서드 몇 개를 추가한 것이 아니라, 비동기 작업을 일급 객체(first-class citizen)로 만들었습니다.
핵심 개념 1: 작업의 라이프사이클 관리
public interface ExecutorService extends Executor {
// 우아한 종료: 새 작업 거부, 기존 작업은 완료
void shutdown();
// 즉시 종료: 실행 중인 작업 중단 시도
List<Runnable> shutdownNow();
// 종료 여부 확인
boolean isShutdown();
boolean isTerminated();
// 종료될 때까지 대기
boolean awaitTermination(long timeout, TimeUnit unit);
}
왜 이것이 중요한가:
실전 애플리케이션은 항상 시작과 종료가 있습니다. ExecutorService는 이를 명시적으로 관리할 수 있게 해줍니다:
ExecutorService executor = Executors.newFixedThreadPool(10);
try {
// 애플리케이션 실행
runApplication(executor);
} finally {
// 우아한 종료
executor.shutdown(); // 새 작업 거부
try {
// 기존 작업이 60초 안에 완료되기를 기다림
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
// 타임아웃 발생: 강제 종료 시도
executor.shutdownNow();
// 다시 기다림
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Executor가 종료되지 않았습니다");
}
}
} catch (InterruptedException e) {
// 대기 중 인터럽트: 강제 종료
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
핵심 개념 2: Future를 통한 작업 추적
public interface ExecutorService extends Executor {
// Callable 제출: 결과를 반환하는 작업
<T> Future<T> submit(Callable<T> task);
// Runnable 제출: 결과는 null
Future<?> submit(Runnable task);
// Runnable 제출: 완료 시 지정된 결과 반환
<T> Future<T> submit(Runnable task, T result);
}
왜 Callable이 필요했나:
Runnable은 결과를 반환할 수 없고 checked exception을 던질 수 없었습니다. Callable은 이를 해결합니다:
// Runnable: 결과 없음, checked exception 불가
interface Runnable {
void run();
}
// Callable: 결과 반환, checked exception 가능
interface Callable<V> {
V call() throws Exception;
}
Future의 의의:
Future는 "미래의 결과"를 나타내는 일급 객체입니다:
- 작업이 완료되었는지 확인 가능
- 완료될 때까지 대기 가능
- 작업 취소 가능
- 작업 결과 또는 예외 획득 가능
// 비동기 작업 제출
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "결과";
});
// Future를 통한 제어
if (!future.isDone()) {
System.out.println("아직 실행 중...");
}
// 결과 대기 및 획득
try {
String result = future.get(); // 완료될 때까지 블로킹
System.out.println(result);
} catch (ExecutionException e) {
// 작업에서 발생한 예외 처리
System.out.println("작업 실패: " + e.getCause());
}
// 또는 타임아웃과 함께
try {
String result = future.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true); // 작업 취소
}
핵심 개념 3: 일괄 작업 처리
// 모든 작업을 실행하고 완료될 때까지 대기
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
// 하나라도 성공하면 나머지 취소하고 그 결과 반환
<T> T invokeAny(Collection<? extends Callable<T>> tasks);
invokeAll의 의의:
여러 독립적인 작업을 병렬로 실행하고 모든 결과를 수집하는 패턴이 매우 흔합니다:
List<Callable<Integer>> tasks = Arrays.asList(
() -> queryDatabase1(),
() -> queryDatabase2(),
() -> queryDatabase3()
);
// 모든 쿼리를 병렬 실행
List<Future<Integer>> futures = executor.invokeAll(tasks);
// 모든 결과 수집
int total = 0;
for (Future<Integer> future : futures) {
total += future.get(); // 이미 완료되어 있으므로 즉시 반환
}
invokeAny의 의의:
여러 서버나 미러 사이트 중 가장 빠른 응답을 사용하는 패턴:
List<Callable<String>> tasks = Arrays.asList(
() -> fetchFromServer1(),
() -> fetchFromServer2(),
() -> fetchFromServer3()
);
// 가장 빠른 서버의 응답 사용, 나머지는 자동 취소
String fastestResponse = executor.invokeAny(tasks);
전체적인 설계 의의:
ExecutorService는 다음을 가능하게 만들었습니다:
- 비동기 프로그래밍의 체계화: 임시방편이 아닌 체계적인 비동기 처리
- 리소스 관리: 스레드 풀의 명시적인 생명주기 관리
- 오류 처리: 비동기 작업의 예외를 안전하게 전파
- 조합 가능성: Future를 통해 비동기 작업을 조합 (CompletableFuture의 기반)
Executor에서 ExecutorService로의 진화:
- Executor: "작업을 실행해라" (일방향)
- ExecutorService: "작업을 실행하고 추적/관리해라" (양방향)
주요 메서드
public interface ExecutorService extends Executor {
// 라이프사이클 관리
void shutdown(); // 새 작업 거부, 기존 작업 완료 대기
List<Runnable> shutdownNow(); // 즉시 종료 시도
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit);
// 작업 제출 (결과 반환)
<T> Future<T> submit(Callable<T> task); // Callable 제출
Future<?> submit(Runnable task); // Runnable 제출
<T> Future<T> submit(Runnable task, T result);
// 일괄 작업 실행
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
<T> T invokeAny(Collection<? extends Callable<T>> tasks);
}
사용법
// ExecutorService 생성 (보통 Executors 팩토리 사용)
// newFixedThreadPool: 고정된 개수의 스레드를 가진 풀 생성
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
// 1. Runnable 제출 (결과 없음)
// submit()은 Future<?>를 반환하여 작업 상태 추적 가능
Future<?> future1 = executorService.submit(() -> {
System.out.println("Runnable 작업 실행");
// Runnable이므로 반환값 없음
});
// 2. Callable 제출 (결과 있음)
// Callable은 결과를 반환하고 checked exception을 던질 수 있음
Future<Integer> future2 = executorService.submit(() -> {
// 시간이 걸리는 작업 시뮬레이션
Thread.sleep(1000);
// 결과 계산
return 42;
});
// 3. 작업 완료 대기 및 결과 가져오기
try {
// get()은 작업이 완료될 때까지 현재 스레드를 블로킹
Integer result = future2.get();
System.out.println("결과: " + result);
// 타임아웃과 함께 결과 가져오기
// 5초 안에 완료되지 않으면 TimeoutException 발생
// Integer resultWithTimeout = future2.get(5, TimeUnit.SECONDS);
} catch (ExecutionException e) {
// 작업 실행 중 발생한 예외를 포착
// e.getCause()로 원본 예외 확인 가능
System.out.println("작업 실행 중 예외: " + e.getCause());
} catch (InterruptedException e) {
// get() 대기 중 현재 스레드가 인터럽트됨
System.out.println("대기 중 인터럽트 발생");
Thread.currentThread().interrupt(); // 인터럽트 상태 복원
}
} finally {
// ExecutorService는 반드시 종료해야 함
// shutdown()을 호출하지 않으면 JVM이 종료되지 않음
// 1단계: 우아한 종료 시작
executorService.shutdown(); // 새 작업 거부, 기존 작업은 완료
try {
// 2단계: 기존 작업이 완료되기를 기다림
// 5초 안에 모든 작업이 완료되면 true 반환
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
// 3단계: 타임아웃 발생 시 강제 종료
// 실행 중인 작업에 인터럽트 전송
List<Runnable> notExecuted = executorService.shutdownNow();
System.out.println("강제 종료됨. 실행되지 않은 작업 수: " + notExecuted.size());
// 4단계: 강제 종료 후 다시 대기
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
System.err.println("ExecutorService가 종료되지 않았습니다");
}
}
} catch (InterruptedException e) {
// awaitTermination 대기 중 인터럽트
// 즉시 강제 종료
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
Callable vs Runnable 비교
ExecutorService executor = Executors.newFixedThreadPool(2);
// Runnable: 반환값 없음, checked exception 불가
Runnable runnableTask = () -> {
System.out.println("Runnable 실행");
// return "값"; // 컴파일 에러: void 반환
// throw new IOException(); // 컴파일 에러: checked exception 불가
};
// Callable: 반환값 있음, checked exception 가능
Callable<String> callableTask = () -> {
System.out.println("Callable 실행");
// checked exception을 던질 수 있음
if (Math.random() > 0.5) {
throw new Exception("에러 발생!");
}
// 결과 반환
return "성공";
};
// 제출 및 결과 처리
Future<?> runnableFuture = executor.submit(runnableTask);
Future<String> callableFuture = executor.submit(callableTask);
try {
// Runnable의 결과는 null
Object runnableResult = runnableFuture.get();
System.out.println("Runnable 결과: " + runnableResult); // null
// Callable의 결과는 실제 반환값
String callableResult = callableFuture.get();
System.out.println("Callable 결과: " + callableResult); // "성공"
} catch (ExecutionException e) {
// Callable에서 던진 예외를 여기서 처리
System.out.println("작업 중 예외: " + e.getCause().getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executor.shutdown();
invokeAll 예제: 모든 작업 병렬 실행
ExecutorService executor = Executors.newFixedThreadPool(3);
// 여러 개의 독립적인 작업 정의
List<Callable<String>> tasks = Arrays.asList(
() -> {
Thread.sleep(1000);
return "작업 1 완료";
},
() -> {
Thread.sleep(1500);
return "작업 2 완료";
},
() -> {
Thread.sleep(800);
return "작업 3 완료";
}
);
try {
// 모든 작업을 병렬로 실행하고 모두 완료될 때까지 대기
// invokeAll()은 모든 작업이 완료될 때까지 블로킹
long startTime = System.currentTimeMillis();
List<Future<String>> futures = executor.invokeAll(tasks);
long endTime = System.currentTimeMillis();
// 모든 작업이 완료되었으므로 (약 1.5초 소요)
// get()은 즉시 반환됨 (추가 대기 없음)
System.out.println("총 소요 시간: " + (endTime - startTime) + "ms");
for (int i = 0; i < futures.size(); i++) {
// 이미 완료된 상태이므로 get()은 블로킹하지 않음
String result = futures.get(i).get();
System.out.println("작업 " + (i + 1) + " 결과: " + result);
}
// 타임아웃과 함께 사용
// 10초 안에 완료되지 않은 작업은 취소됨
List<Future<String>> futuresWithTimeout =
executor.invokeAll(tasks, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.out.println("작업 실행 중 인터럽트");
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
System.out.println("작업 실패: " + e.getCause());
} finally {
executor.shutdown();
}
invokeAny 예제: 가장 빠른 결과 사용
ExecutorService executor = Executors.newFixedThreadPool(3);
// 같은 작업을 수행하지만 속도가 다른 여러 방법
List<Callable<String>> tasks = Arrays.asList(
() -> {
Thread.sleep(3000); // 느린 서버
return "서버 1 응답";
},
() -> {
Thread.sleep(500); // 빠른 서버
return "서버 2 응답";
},
() -> {
Thread.sleep(2000); // 중간 서버
return "서버 3 응답";
}
);
try {
// 가장 빠르게 완료된 작업의 결과를 반환
// 나머지 작업은 자동으로 취소됨
long startTime = System.currentTimeMillis();
String fastestResult = executor.invokeAny(tasks);
long endTime = System.currentTimeMillis();
System.out.println("가장 빠른 응답: " + fastestResult); // "서버 2 응답"
System.out.println("소요 시간: " + (endTime - startTime) + "ms"); // 약 500ms
// 실전 활용: 여러 미러 사이트에서 다운로드
List<Callable<byte[]>> downloadTasks = Arrays.asList(
() -> downloadFrom("mirror1.com"),
() -> downloadFrom("mirror2.com"),
() -> downloadFrom("mirror3.com")
);
// 가장 빠른 미러에서 다운로드, 나머지는 취소
byte[] data = executor.invokeAny(downloadTasks);
} catch (InterruptedException e) {
System.out.println("작업 중 인터럽트");
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
// 모든 작업이 실패한 경우
System.out.println("모든 작업 실패: " + e.getCause());
} finally {
executor.shutdown();
}
// 헬퍼 메서드
static byte[] downloadFrom(String mirror) throws Exception {
// 다운로드 로직
return new byte[0];
}
4. Executors 유틸리티 클래스 (Java 5, 2004)
등장 배경
ExecutorService 인터페이스의 다양한 구현체를 쉽게 생성할 수 있도록 팩토리 메서드를 제공하는 유틸리티 클래스가 필요했습니다.
핵심 개념
일반적으로 사용되는 스레드 풀 설정을 미리 정의해둔 팩토리 메서드들을 제공합니다.
주요 메서드
public class Executors {
// 고정 크기 스레드 풀
static ExecutorService newFixedThreadPool(int nThreads)
// 단일 스레드 풀
static ExecutorService newSingleThreadExecutor()
// 캐시 스레드 풀 (필요시 생성, 60초 idle 후 제거)
static ExecutorService newCachedThreadPool()
// 스케줄링 가능한 스레드 풀
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
// Work Stealing Pool (Java 8+)
static ExecutorService newWorkStealingPool()
}
사용법 및 특징
1) newFixedThreadPool
// 고정된 3개의 스레드로 작업 처리
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " - " +
Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
특징:
- 스레드 수가 고정되어 있어 자원 사용량 예측 가능
- 큐에 작업이 쌓일 수 있음
2) newSingleThreadExecutor
// 순차적으로 작업을 처리 (하나의 스레드만 사용)
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> System.out.println("작업 1"));
executor.submit(() -> System.out.println("작업 2"));
executor.submit(() -> System.out.println("작업 3"));
executor.shutdown();
특징:
- 작업이 순차적으로 실행됨을 보장
- 스레드 안전성이 중요한 경우 유용
3) newCachedThreadPool
// 필요에 따라 스레드를 생성하고 재사용
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
executor.submit(() -> {
System.out.println("작업 실행: " + Thread.currentThread().getName());
});
}
executor.shutdown();
특징:
- 단기 비동기 작업에 적합
- 스레드 수가 무한정 증가할 수 있어 주의 필요
4) newScheduledThreadPool
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// 3초 후 한 번 실행
scheduler.schedule(() -> {
System.out.println("3초 후 실행");
}, 3, TimeUnit.SECONDS);
// 초기 1초 대기 후, 2초마다 반복 실행
scheduler.scheduleAtFixedRate(() -> {
System.out.println("주기적 실행: " + System.currentTimeMillis());
}, 1, 2, TimeUnit.SECONDS);
// 10초 후 종료
scheduler.schedule(() -> {
scheduler.shutdown();
}, 10, TimeUnit.SECONDS);
특징:
- 지연 실행 및 주기적 실행 가능
- Timer보다 안전하고 유연함
5. ThreadPoolExecutor (Java 5, 2004)
등장 배경
Executors 팩토리 메서드로 생성되는 대부분의 스레드 풀은 실제로 ThreadPoolExecutor를 반환합니다. 더 세밀한 제어가 필요한 경우를 위해 직접 설정 가능한 구현체가 제공되었습니다.
핵심 개념
ExecutorService의 가장 강력하고 유연한 구현체로, 스레드 풀의 모든 측면을 세밀하게 제어할 수 있습니다.
주요 생성자 파라미터
public ThreadPoolExecutor(
int corePoolSize, // 기본 스레드 수
int maximumPoolSize, // 최대 스레드 수
long keepAliveTime, // 유휴 스레드 유지 시간
TimeUnit unit, // keepAliveTime의 단위
BlockingQueue<Runnable> workQueue, // 작업 대기 큐
ThreadFactory threadFactory, // 스레드 생성 팩토리
RejectedExecutionHandler handler // 거부 정책
)
동작 원리
- 작업 제출 시 로직:
- 현재 스레드 수 < corePoolSize → 새 스레드 생성
- 현재 스레드 수 >= corePoolSize → 큐에 작업 추가
- 큐가 가득 참 && 스레드 수 < maximumPoolSize → 새 스레드 생성
- 큐가 가득 참 && 스레드 수 >= maximumPoolSize → 거부 정책 실행
- 스레드 정리:
- corePoolSize보다 많은 스레드가 keepAliveTime 동안 유휴 상태면 종료
거부 정책 (RejectedExecutionHandler)
// 1. AbortPolicy (기본값): RejectedExecutionException 발생
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new ThreadPoolExecutor.AbortPolicy()
);
// 2. CallerRunsPolicy: 호출한 스레드에서 직접 실행
new ThreadPoolExecutor.CallerRunsPolicy()
// 3. DiscardPolicy: 조용히 무시
new ThreadPoolExecutor.DiscardPolicy()
// 4. DiscardOldestPolicy: 가장 오래된 작업 제거 후 재시도
new ThreadPoolExecutor.DiscardOldestPolicy()
사용 예제
// 커스텀 ThreadPoolExecutor 생성
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 최소 2개 스레드 유지
5, // 최대 5개 스레드
60L, TimeUnit.SECONDS, // 60초 후 유휴 스레드 제거
new LinkedBlockingQueue<>(10), // 최대 10개 작업 대기
new ThreadFactory() { // 커스텀 스레드 팩토리
private AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "CustomPool-" + threadNumber.getAndIncrement());
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 포화 시 호출자가 실행
);
// 작업 제출
for (int i = 0; i < 20; i++) {
final int taskId = i;
try {
executor.execute(() -> {
System.out.println("Task " + taskId + " - " +
Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
} catch (RejectedExecutionException e) {
System.out.println("Task " + taskId + " rejected!");
}
}
// 모니터링
System.out.println("Active threads: " + executor.getActiveCount());
System.out.println("Pool size: " + executor.getPoolSize());
System.out.println("Queue size: " + executor.getQueue().size());
System.out.println("Completed tasks: " + executor.getCompletedTaskCount());
executor.shutdown();
다양한 큐 전략
// 1. 무제한 큐 (LinkedBlockingQueue)
// - maximumPoolSize 무시됨 (corePoolSize만 사용)
// - OOM 위험
new LinkedBlockingQueue<>()
// 2. 제한된 큐 (ArrayBlockingQueue)
// - 큐 크기 제한
// - 예측 가능한 메모리 사용
new ArrayBlockingQueue<>(100)
// 3. 동기 큐 (SynchronousQueue)
// - 큐에 저장하지 않고 직접 전달
// - cachedThreadPool이 사용
new SynchronousQueue<>()
// 4. 우선순위 큐 (PriorityBlockingQueue)
// - 우선순위에 따라 작업 실행
new PriorityBlockingQueue<>()
Executors 팩토리 메서드의 내부 구현
// newFixedThreadPool의 실제 구현
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads, nThreads, // core = max
0L, TimeUnit.MILLISECONDS, // 유휴 스레드 즉시 제거
new LinkedBlockingQueue<Runnable>() // 무제한 큐
);
}
// newCachedThreadPool의 실제 구현
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0, Integer.MAX_VALUE, // 필요시 무제한 생성
60L, TimeUnit.SECONDS, // 60초 후 제거
new SynchronousQueue<Runnable>() // 큐에 저장 안 함
);
}
실전 활용: 동적 스레드 풀 조정
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100)
);
// 런타임에 풀 크기 조정
executor.setCorePoolSize(10);
executor.setMaximumPoolSize(20);
// 사전 시작 (미리 스레드 생성)
executor.prestartAllCoreThreads();
// 통계 수집
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {
System.out.println("====== Thread Pool Stats ======");
System.out.println("Active: " + executor.getActiveCount());
System.out.println("Pool Size: " + executor.getPoolSize());
System.out.println("Queue Size: " + executor.getQueue().size());
System.out.println("Completed: " + executor.getCompletedTaskCount());
}, 0, 5, TimeUnit.SECONDS);
6. Future (Java 5, 2004)
등장 배경
비동기 작업의 결과를 나중에 받아야 하는 경우가 많았지만, Runnable로는 반환값을 받을 수 없었습니다. 작업의 상태를 확인하고 결과를 가져올 방법이 필요했습니다.
핵심 개념
비동기 계산의 결과를 나타내는 인터페이스입니다. 계산이 완료되었는지 확인하고, 완료될 때까지 대기하고, 결과를 가져올 수 있습니다.
주요 메서드
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning); // 작업 취소
boolean isCancelled(); // 취소 여부 확인
boolean isDone(); // 완료 여부 확인
V get() throws InterruptedException, ExecutionException; // 결과 가져오기 (블로킹)
V get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException; // 타임아웃과 함께 결과 가져오기
}
사용법
ExecutorService executor = Executors.newFixedThreadPool(2);
// 비동기 작업 제출
Future<String> future = executor.submit(() -> {
Thread.sleep(2000);
return "작업 완료!";
});
System.out.println("작업 제출됨");
// 작업이 완료되었는지 확인
if (!future.isDone()) {
System.out.println("아직 작업 중...");
}
try {
// 결과 가져오기 (완료될 때까지 블로킹)
String result = future.get();
System.out.println("결과: " + result);
// 타임아웃과 함께 결과 가져오기
// String result = future.get(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.out.println("대기 중 인터럽트 발생");
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
System.out.println("작업 실행 중 예외 발생: " + e.getCause());
} catch (TimeoutException e) {
System.out.println("타임아웃 발생");
future.cancel(true); // 작업 취소
}
executor.shutdown();
여러 Future 처리하기
ExecutorService executor = Executors.newFixedThreadPool(3);
List<Future<Integer>> futures = new ArrayList<>();
// 여러 작업 제출
for (int i = 0; i < 5; i++) {
final int taskId = i;
Future<Integer> future = executor.submit(() -> {
Thread.sleep((long) (Math.random() * 3000));
return taskId * taskId;
});
futures.add(future);
}
// 모든 결과 수집
for (Future<Integer> future : futures) {
try {
Integer result = future.get();
System.out.println("결과: " + result);
} catch (Exception e) {
e.printStackTrace();
}
}
executor.shutdown();
Future의 한계점
- 콜백 부재: 작업 완료 시 자동으로 실행되는 콜백을 등록할 수 없습니다
- 조합의 어려움: 여러 Future를 조합하거나 체이닝하기 어렵습니다
- 예외 처리: 예외 처리가 복잡합니다
- 블로킹: get() 메서드는 블로킹 방식이므로 리액티브 프로그래밍에 부적합합니다
7. ForkJoinPool (Java 7, 2011)
등장 배경
2000년대 후반, 컴퓨팅 환경에 중대한 변화가 일어났습니다. CPU 클럭 속도의 한계에 부딪히면서, 프로세서 제조사들은 멀티코어로 방향을 전환했습니다. 듀얼코어, 쿼드코어, 옥타코어가 일반화되었고, 서버용 프로세서는 수십 개의 코어를 가지게 되었습니다.
하지만 기존의 스레드 풀은 이러한 멀티코어 환경에서 특정 유형의 작업을 효율적으로 처리하지 못했습니다:
문제 상황 1: 분할 정복 알고리즘의 비효율성
// 대용량 배열의 합계를 계산하는 전통적인 접근
public class ArraySum {
public static long sum(long[] array) {
long sum = 0;
for (long value : array) {
sum += value;
}
return sum;
}
}
// 단일 스레드: 1억 개 요소 처리에 수백 ms 소요
long[] array = new long[100_000_000];
long result = sum(array); // 너무 느림!
// ThreadPoolExecutor로 병렬화 시도
ExecutorService executor = Executors.newFixedThreadPool(8);
int chunkSize = array.length / 8;
List<Future<Long>> futures = new ArrayList<>();
for (int i = 0; i < 8; i++) {
final int start = i * chunkSize;
final int end = (i == 7) ? array.length : (i + 1) * chunkSize;
Future<Long> future = executor.submit(() -> {
long partialSum = 0;
for (int j = start; j < end; j++) {
partialSum += array[j];
}
return partialSum;
});
futures.add(future);
}
// 문제 1: 수동 분할이 복잡하고 에러 발생 가능
// 문제 2: 작업 크기가 균등하지 않으면 부하 불균형
// 문제 3: 재귀적 분할이 어려움 (QuickSort 등)
문제 상황 2: 재귀적 작업의 스레드 기아(Thread Starvation)
// 재귀적 작업: 각 작업이 두 개의 하위 작업을 생성
ExecutorService executor = Executors.newFixedThreadPool(4);
Future<Integer> future = executor.submit(() -> {
// 하위 작업 1 제출
Future<Integer> left = executor.submit(() -> {
// 더 깊은 하위 작업들...
Future<Integer> leftLeft = executor.submit(...);
Future<Integer> leftRight = executor.submit(...);
return leftLeft.get() + leftRight.get(); // 데드락 가능!
});
// 하위 작업 2 제출
Future<Integer> right = executor.submit(() -> { ... });
return left.get() + right.get(); // 여기서 블로킹
});
// 문제: 스레드 풀의 모든 스레드가 하위 작업 완료를 대기
// 하지만 하위 작업은 큐에서 대기 중
// → 데드락! (작업들이 서로를 기다림)
이것이 바로 스레드 기아 문제입니다. 모든 스레드가 자식 작업의 완료를 기다리지만, 자식 작업은 실행될 스레드가 없어서 큐에서 대기합니다.
문제 상황 3: 부하 불균형 (Load Imbalance)
// 작업 크기가 불균등한 경우
List<Callable<Result>> tasks = Arrays.asList(
() -> quickTask(), // 1초
() -> slowTask(), // 10초
() -> mediumTask(), // 5초
() -> quickTask() // 1초
);
// 4개 스레드로 실행
ExecutorService executor = Executors.newFixedThreadPool(4);
List<Future<Result>> futures = executor.invokeAll(tasks);
// 문제:
// - 스레드 1: quickTask (1초) 완료 후 유휴
// - 스레드 2: slowTask (10초) 실행 중 (병목!)
// - 스레드 3: mediumTask (5초) 완료 후 유휴
// - 스레드 4: quickTask (1초) 완료 후 유휴
//
// 전체 실행 시간: 10초 (slowTask에 의해 결정)
// 평균 CPU 활용도: 매우 낮음 (대부분 대기)
문제 상황 4: 깊은 재귀의 스택 오버플로우
// 재귀적 정렬 알고리즘
public void recursiveSort(int[] array, int start, int end) {
if (end - start < THRESHOLD) {
Arrays.sort(array, start, end);
return;
}
int mid = (start + end) / 2;
recursiveSort(array, start, mid); // 재귀 호출
recursiveSort(array, mid, end); // 재귀 호출
merge(array, start, mid, end);
}
// 문제: 매우 큰 배열에서 StackOverflowError 발생 가능
// 특히 불균형 분할 시 재귀 깊이가 매우 깊어짐
근본 원인 분석:
기존 ThreadPoolExecutor의 설계는 독립적인 작업들을 위한 것이었습니다:
- 각 작업은 서로 독립적
- 작업이 하위 작업을 생성하지 않음
- 작업 크기가 대체로 균등
하지만 분할 정복 알고리즘은 다릅니다:
- 작업이 하위 작업을 재귀적으로 생성
- 하위 작업의 결과에 의존
- 작업 크기가 불균등할 수 있음
Doug Lea의 해결책: Work-Stealing 알고리즘
Doug Lea는 Cilk 언어의 work-stealing 스케줄러에서 영감을 받아 ForkJoinPool을 설계했습니다.
핵심 아이디어:
- 각 스레드가 자신의 작업 큐(deque)를 가짐
- 작업을 fork하면 자신의 deque에 추가
- 일이 없는 스레드가 다른 스레드의 deque에서 작업을 "훔쳐옴"
이를 통해:
- 동적 부하 분산: 바쁜 스레드의 작업을 유휴 스레드가 가져감
- 스레드 기아 방지: 하위 작업을 기다리는 동안 다른 작업 실행
- 캐시 지역성: 각 스레드가 자신이 생성한 작업을 먼저 실행 (캐시 효율 향상)
핵심 개념
ForkJoinPool은 단순한 스레드 풀이 아니라, 재귀적 병렬 알고리즘을 위한 특화된 실행 엔진입니다.
핵심 개념 1: Work-Stealing 알고리즘
전통적인 스레드 풀과의 차이:
[전통적인 스레드 풀]
공유 큐: [Task1] [Task2] [Task3] [Task4] [Task5]
↓ ↓ ↓ ↓
Thread1 Thread2 Thread3 Thread4
문제: 큐 접근 시 동기화 필요 (경합 발생)
문제: 작업 크기 불균형 시 일부 스레드만 바쁨
[ForkJoinPool의 Work-Stealing]
Thread1 Deque: [Task1] [Task1.left] [Task1.left.left]
↑ HEAD ↓ TAIL
(자신이 pop) (다른 스레드가 steal)
Thread2 Deque: [Task2] [Task2.left]
↑ ↓
Thread3 Deque: [] (비어있음)
↓
Thread1이나 Thread2의 TAIL에서 훔쳐옴
장점:
- HEAD/TAIL 분리로 경합 최소화
- 자신의 작업은 캐시에 유리한 LIFO (스택처럼)
- 훔치는 작업은 FIFO (오래된 큰 작업 먼저)
왜 이것이 효율적인가:
// 예제: 피보나치 계산
class FibTask extends RecursiveTask<Integer> {
final int n;
FibTask(int n) { this.n = n; }
protected Integer compute() {
if (n <= 1) return n;
// 왼쪽 하위 작업 생성 및 fork
FibTask f1 = new FibTask(n - 1);
f1.fork(); // 자신의 deque TAIL에 추가
// 오른쪽 계산은 현재 스레드에서
FibTask f2 = new FibTask(n - 2);
int rightResult = f2.compute(); // 직접 실행
// 왼쪽 결과 대기
int leftResult = f1.join(); // 가능하면 자신이 실행
return leftResult + rightResult;
}
}
// Work-Stealing의 동작:
// 1. Thread1이 Fib(10)을 실행
// 2. Fib(9)를 fork (Thread1의 deque에 추가)
// 3. Fib(8)을 직접 계산하기 시작
// 4. Thread2가 유휴 상태라면, Thread1의 deque에서 Fib(9)를 steal
// 5. 이제 Thread1과 Thread2가 병렬로 작업
// 6. 계속 재귀적으로 분할되며 모든 코어 활용
핵심 개념 2: Fork/Join 패턴
ForkJoinPool의 작업은 두 가지 기본 연산을 제공합니다:
// fork(): 비동기 실행
// - 작업을 자신의 deque에 추가
// - 즉시 반환 (블로킹 없음)
task.fork();
// join(): 결과 대기
// - 작업이 완료되었으면 결과 즉시 반환
// - 미완료 시, 가능하면 자신이 직접 실행 (work-stealing의 핵심!)
// - 다른 작업을 훔쳐서 실행하며 대기
int result = task.join();
join()의 영리함:
// 나쁜 예 (전통적인 방식)
Future<Integer> future = executor.submit(() -> compute());
Integer result = future.get(); // 그냥 대기만 함 (낭비!)
// 좋은 예 (ForkJoinPool)
FibTask task = new FibTask(40);
task.fork();
// ... 다른 작업 수행 ...
Integer result = task.join(); // 대기하면서도 다른 작업 실행!
join()은 단순히 대기하지 않습니다. 대기하는 동안:
- 자신의 deque에서 다른 작업을 가져와 실행
- deque가 비었으면 다른 스레드에서 작업 훔치기
- 이렇게 계속 일하면서 대기
이것이 스레드 기아를 방지하는 핵심 메커니즘입니다!
핵심 개념 3: RecursiveTask와 RecursiveAction
// RecursiveTask<V>: 결과를 반환하는 작업
abstract class RecursiveTask<V> extends ForkJoinTask<V> {
protected abstract V compute(); // 실제 계산 로직
}
// RecursiveAction: 결과를 반환하지 않는 작업
abstract class RecursiveAction extends ForkJoinTask<Void> {
protected abstract void compute(); // 실제 작업 로직
}
표준 패턴:
class SomeTask extends RecursiveTask<Result> {
protected Result compute() {
if (작업이_충분히_작으면) {
// 직접 계산
return directlyCompute();
}
// 작업 분할
SomeTask leftTask = new SomeTask(왼쪽_절반);
SomeTask rightTask = new SomeTask(오른쪽_절반);
// 최적화된 fork/compute/join 패턴
leftTask.fork(); // 왼쪽을 비동기 실행
Result rightResult = rightTask.compute(); // 오른쪽은 현재 스레드에서
Result leftResult = leftTask.join(); // 왼쪽 결과 대기
// 결과 결합
return combine(leftResult, rightResult);
}
}
왜 이 패턴이 최적인가:
- leftTask.fork(): 다른 스레드에서 실행 가능하도록 준비
- rightTask.compute(): 현재 스레드의 작업 생성 오버헤드 없음
- leftTask.join(): 필요시 직접 실행하여 CPU 활용도 최대화
핵심 개념 4: 임계값(Threshold) 튜닝
private static final int THRESHOLD = 1000; // 중요한 매개변수!
protected Result compute() {
if (end - start <= THRESHOLD) {
// 임계값 이하: 분할하지 않고 직접 처리
return sequentialCompute();
}
// 임계값 초과: 분할 계속
...
}
임계값이 중요한 이유:
- 너무 작으면: fork/join 오버헤드가 실제 작업보다 큼 (성능 저하)
- 너무 크면: 병렬성이 부족하여 일부 코어만 사용 (성능 저하)
- 최적값은 작업 특성과 하드웨어에 따라 다름 (실험 필요)
경험상 가이드:
- CPU 집약적 작업: 수천~수만 개 요소
- 메모리 집약적 작업: 더 큰 임계값 (캐시 미스 고려)
전체적인 설계 의의:
ForkJoinPool은 다음을 가능하게 만들었습니다:
- 자동 부하 분산: 개발자가 수동으로 작업을 분할할 필요 없음
- 최대 CPU 활용: work-stealing으로 모든 코어를 바쁘게 유지
- 재귀적 알고리즘의 자연스러운 표현: 코드가 순차 버전과 유사
- Java 8 Parallel Streams의 기반: parallelStream()은 내부적으로 ForkJoinPool 사용
성능 특성:
시나리오: 1억 개 정수 배열 합계
단일 스레드: ~100ms
ThreadPoolExecutor: ~50ms (2배 향상, 하지만 수동 분할 복잡)
ForkJoinPool: ~15ms (6-7배 향상, 자동 부하 분산)
이는 재귀적 분할과 work-stealing이 얼마나 효과적인지 보여줍니다.
주요 특징
- 재귀적 작업 분할에 최적화
- Work-Stealing으로 자동 부하 분산
- fork()와 join()으로 작업 분할 및 결합
- Java 8 parallel streams의 기반
ForkJoinTask 계층구조
// ForkJoinTask: 추상 기본 클래스
// ├─ RecursiveTask<V>: 결과를 반환하는 작업
// └─ RecursiveAction: 결과를 반환하지 않는 작업
RecursiveTask 예제: 병렬 합계 계산
import java.util.concurrent.*;
class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10_000; // 분할 기준
private final long[] array;
private final int start;
private final int end;
public SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
// 작은 작업은 직접 처리
if (length <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// 큰 작업은 분할
int mid = start + length / 2;
// 왼쪽 절반을 새 작업으로 fork
SumTask leftTask = new SumTask(array, start, mid);
leftTask.fork(); // 비동기 실행
// 오른쪽 절반은 현재 스레드에서 처리
SumTask rightTask = new SumTask(array, mid, end);
Long rightResult = rightTask.compute();
// 왼쪽 결과를 기다려서 합산
Long leftResult = leftTask.join();
return leftResult + rightResult;
}
}
// 사용 예제
public class ForkJoinExample {
public static void main(String[] args) {
long[] array = new long[100_000_000];
for (int i = 0; i < array.length; i++) {
array[i] = i + 1;
}
// ForkJoinPool 생성 (기본값: CPU 코어 수)
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(array, 0, array.length);
long startTime = System.currentTimeMillis();
Long result = pool.invoke(task); // 실행하고 결과 대기
long endTime = System.currentTimeMillis();
System.out.println("Sum: " + result);
System.out.println("Time: " + (endTime - startTime) + "ms");
pool.shutdown();
}
}
RecursiveAction 예제: 병렬 배열 처리
class IncrementTask extends RecursiveAction {
private static final int THRESHOLD = 10_000;
private final long[] array;
private final int start;
private final int end;
public IncrementTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
int length = end - start;
if (length <= THRESHOLD) {
// 직접 처리
for (int i = start; i < end; i++) {
array[i]++;
}
} else {
// 분할
int mid = start + length / 2;
IncrementTask left = new IncrementTask(array, start, mid);
IncrementTask right = new IncrementTask(array, mid, end);
invokeAll(left, right); // 두 작업을 모두 fork하고 join
}
}
}
ForkJoinPool 생성 및 설정
// 1. 기본 생성 (CPU 코어 수만큼 병렬성)
ForkJoinPool pool = new ForkJoinPool();
// 2. 커스텀 병렬성 수준
ForkJoinPool pool = new ForkJoinPool(8);
// 3. 공통 풀 사용 (권장)
ForkJoinPool.commonPool().invoke(task);
// 4. 상세 설정
ForkJoinPool pool = new ForkJoinPool(
4, // 병렬성
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, // 예외 핸들러
false // 비동기 모드
);
실전 예제: 병렬 Quick Sort
class QuickSortTask extends RecursiveAction {
private static final int THRESHOLD = 100;
private final int[] array;
private final int start;
private final int end;
public QuickSortTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
// 작은 배열은 순차 정렬
Arrays.sort(array, start, end);
return;
}
// 피벗 선택 및 분할
int pivot = partition(array, start, end);
// 병렬 정렬
QuickSortTask left = new QuickSortTask(array, start, pivot);
QuickSortTask right = new QuickSortTask(array, pivot + 1, end);
invokeAll(left, right);
}
private int partition(int[] array, int start, int end) {
int pivot = array[end - 1];
int i = start;
for (int j = start; j < end - 1; j++) {
if (array[j] <= pivot) {
swap(array, i, j);
i++;
}
}
swap(array, i, end - 1);
return i;
}
private void swap(int[] array, int i, int j) {
int temp = array[i];
array[i] = array[j];
array[j] = temp;
}
}
// 사용
int[] array = generateRandomArray(10_000_000);
ForkJoinPool.commonPool().invoke(new QuickSortTask(array, 0, array.length));
Java 8 Parallel Streams와의 관계
// Parallel Stream은 내부적으로 ForkJoinPool.commonPool() 사용
List<Integer> numbers = IntStream.range(0, 1_000_000)
.boxed()
.collect(Collectors.toList());
// 병렬 스트림 처리
long sum = numbers.parallelStream()
.mapToLong(i -> i * i)
.sum();
// 커스텀 ForkJoinPool 사용 (트릭)
ForkJoinPool customPool = new ForkJoinPool(4);
long result = customPool.submit(() ->
numbers.parallelStream()
.mapToLong(i -> i * i)
.sum()
).get();
customPool.shutdown();
Work-Stealing 시각화
Thread 1 Deque: [Task A] [Task B] [Task C]
↑ HEAD ↓ TAIL
(pop) (steal)
Thread 2 Deque: [Task D] [Task E]
↑ HEAD
(pop)
Thread 3 Deque: [] (idle)
↓
Steals from Thread 1's TAIL → [Task C]
ForkJoinPool vs ThreadPoolExecutor
| 용도 | 재귀적 분할 정복 작업 | 독립적인 작업들 |
| 알고리즘 | Work-Stealing | 작업 큐 |
| 작업 분할 | 자동 (fork/join) | 수동 |
| 부하 분산 | 동적 (stealing) | 정적 (큐) |
| 최적 시나리오 | CPU 집약적, 병렬 가능 | I/O, 비동기 작업 |
특징 ForkJoinPool ThreadPoolExecutor
모범 사례
// 1. 적절한 THRESHOLD 설정 (너무 작으면 오버헤드)
private static final int THRESHOLD = 1000; // 실험을 통해 조정
// 2. 불필요한 fork 피하기
// 나쁜 예
leftTask.fork();
rightTask.fork();
leftTask.join();
rightTask.join();
// 좋은 예 (한쪽은 현재 스레드에서 처리)
leftTask.fork();
Long rightResult = rightTask.compute();
Long leftResult = leftTask.join();
// 3. ForkJoinPool.commonPool() 활용
// 대부분의 경우 충분하고, 리소스 효율적
ForkJoinPool.commonPool().invoke(task);
// 4. CPU 집약적 작업에만 사용
// I/O 작업은 ThreadPoolExecutor가 더 적합
8. CompletableFuture (Java 8, 2014)
등장 배경
Future의 한계를 극복하고 현대적인 비동기 프로그래밍을 지원하기 위해 등장했습니다. 콜백, 조합, 예외 처리 등을 우아하게 처리할 수 있습니다.
핵심 개념
Future와 CompletionStage 인터페이스를 구현하여, 비동기 파이프라인을 구축할 수 있는 강력한 API를 제공합니다.
주요 메서드 분류
1) 비동기 작업 시작
// 반환값이 없는 비동기 작업
static CompletableFuture<Void> runAsync(Runnable runnable)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 반환값이 있는 비동기 작업
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
// 이미 완료된 Future 생성
static <U> CompletableFuture<U> completedFuture(U value)
2) 결과 변환
// 동기 변환
<U> CompletableFuture<U> thenApply(Function<T,U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<T,U> fn)
// 결과 소비 (반환값 없음)
CompletableFuture<Void> thenAccept(Consumer<T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<T> action)
// 실행만 (입력도 출력도 없음)
CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action)
3) 결과 조합
// 두 Future 조합
<U,V> CompletableFuture<V> thenCombine(CompletableFuture<U> other,
BiFunction<T,U,V> fn)
// 두 Future 중 먼저 완료되는 것 사용
CompletableFuture<Void> acceptEither(CompletableFuture<T> other,
Consumer<T> action)
// 모든 Future 완료 대기
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
// 하나라도 완료되면 진행
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
4) 예외 처리
// 예외 처리
CompletableFuture<T> exceptionally(Function<Throwable,T> fn)
// 결과와 예외 모두 처리
<U> CompletableFuture<U> handle(BiFunction<T,Throwable,U> fn)
// 완료 시 항상 실행 (finally와 유사)
CompletableFuture<T> whenComplete(BiConsumer<T,Throwable> action)
기본 사용법
// 1. 간단한 비동기 작업
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Hello";
});
// 2. 결과 변환
CompletableFuture<String> result = future.thenApply(s -> s + " World");
// 3. 결과 소비
result.thenAccept(s -> System.out.println("결과: " + s));
// 블로킹하여 결과 가져오기
String finalResult = result.join(); // get()과 유사하지만 unchecked exception
System.out.println(finalResult); // "Hello World"
체이닝 예제
CompletableFuture.supplyAsync(() -> {
System.out.println("1. 사용자 ID로 검색");
return "user123";
})
.thenApplyAsync(userId -> {
System.out.println("2. 사용자 정보 조회: " + userId);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "User[name=홍길동, id=" + userId + "]";
})
.thenApplyAsync(user -> {
System.out.println("3. 사용자 정보 가공: " + user);
return user.toUpperCase();
})
.thenAccept(result -> {
System.out.println("4. 최종 결과: " + result);
})
.exceptionally(ex -> {
System.out.println("에러 발생: " + ex.getMessage());
return null;
});
여러 Future 조합하기
두 개의 독립적인 작업 조합
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "결과1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "결과2";
});
// 두 결과를 조합
CompletableFuture<String> combined = future1.thenCombine(future2, (r1, r2) -> {
return r1 + " + " + r2;
});
System.out.println(combined.join()); // "결과1 + 결과2"
이전 결과를 다음 비동기 작업의 입력으로 사용
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "사용자ID";
})
.thenCompose(userId -> { // flatMap과 유사
return CompletableFuture.supplyAsync(() -> {
return "사용자 정보: " + userId;
});
});
System.out.println(future.join());
모든 작업 완료 대기
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
final int index = i;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 2000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "작업 " + index + " 완료";
});
futures.add(future);
}
// 모든 작업이 완료될 때까지 대기
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
// 모든 결과 수집
CompletableFuture<List<String>> allResults = allFutures.thenApply(v -> {
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
});
List<String> results = allResults.join();
results.forEach(System.out::println);
가장 빠른 작업 사용
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
return "느린 서버";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
return "빠른 서버";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture<Object> fastest = CompletableFuture.anyOf(future1, future2);
System.out.println("가장 빠른 응답: " + fastest.join()); // "빠른 서버"
예외 처리 패턴
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("랜덤 에러 발생!");
}
return "성공";
})
.exceptionally(ex -> {
System.out.println("예외 처리: " + ex.getMessage());
return "기본값";
})
.thenApply(result -> {
return "처리 완료: " + result;
});
System.out.println(future.join());
handle을 사용한 예외 처리
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("에러!");
}
return "성공";
})
.handle((result, ex) -> {
if (ex != null) {
System.out.println("예외 발생: " + ex.getMessage());
return "예외 처리됨";
}
return result;
});
System.out.println(future.join());
whenComplete 사용 (결과는 변경하지 않음)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "결과";
})
.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("실패: " + ex.getMessage());
} else {
System.out.println("성공: " + result);
}
// 여기서 로깅이나 리소스 정리 가능
});
실전 예제: 병렬 API 호출
public class ParallelAPIExample {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
// 세 개의 외부 API를 병렬로 호출
CompletableFuture<String> userFuture =
CompletableFuture.supplyAsync(() -> fetchUserInfo("user123"));
CompletableFuture<String> orderFuture =
CompletableFuture.supplyAsync(() -> fetchOrderInfo("user123"));
CompletableFuture<String> recommendationFuture =
CompletableFuture.supplyAsync(() -> fetchRecommendations("user123"));
// 모든 결과를 조합
CompletableFuture<String> combinedFuture = userFuture
.thenCombine(orderFuture, (user, order) -> user + ", " + order)
.thenCombine(recommendationFuture, (combined, recommendation) ->
combined + ", " + recommendation);
// 결과 출력
String finalResult = combinedFuture.join();
long endTime = System.currentTimeMillis();
System.out.println("결과: " + finalResult);
System.out.println("소요 시간: " + (endTime - startTime) + "ms");
// 병렬 처리로 약 1000ms (순차 처리 시 3000ms)
}
private static String fetchUserInfo(String userId) {
sleep(1000);
return "User[" + userId + "]";
}
private static String fetchOrderInfo(String userId) {
sleep(1000);
return "Orders[10]";
}
private static String fetchRecommendations(String userId) {
sleep(1000);
return "Recommendations[5]";
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
9. Flow API - Reactive Streams (Java 9, 2017)
등장 배경
비동기 스트림 처리에서 백프레셔(backpressure) 문제가 있었습니다. 생산자가 너무 빠르게 데이터를 생성하면 소비자가 처리하지 못해 메모리 문제가 발생했습니다. Reactive Streams 표준을 Java에 도입하여 이를 해결했습니다.
핵심 개념
Publisher-Subscriber 패턴에 백프레셔를 추가하여, 소비자가 처리할 수 있는 만큼만 데이터를 요청하도록 합니다.
주요 인터페이스
// java.util.concurrent.Flow 패키지
// 1. Publisher: 데이터 생산자
@FunctionalInterface
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
// 2. Subscriber: 데이터 소비자
public interface Subscriber<T> {
void onSubscribe(Subscription subscription);
void onNext(T item);
void onError(Throwable throwable);
void onComplete();
}
// 3. Subscription: 구독 관계
public interface Subscription {
void request(long n); // n개의 아이템 요청
void cancel(); // 구독 취소
}
// 4. Processor: Publisher이자 Subscriber
public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
기본 사용 예제
import java.util.concurrent.*;
import java.util.concurrent.Flow.*;
// 간단한 Publisher 구현
class SimplePublisher implements Publisher<Integer> {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
private boolean cancelled = false;
@Override
public void request(long n) {
if (cancelled) return;
// n개의 아이템 발행
for (int i = 0; i < n; i++) {
subscriber.onNext(i);
}
subscriber.onComplete();
}
@Override
public void cancel() {
cancelled = true;
}
});
}
}
// Subscriber 구현
class SimpleSubscriber implements Subscriber<Integer> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(5); // 처음 5개 요청
}
@Override
public void onNext(Integer item) {
System.out.println("Received: " + item);
// 처리 후 더 요청할 수 있음
// subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Completed!");
}
}
// 사용
Publisher<Integer> publisher = new SimplePublisher();
Subscriber<Integer> subscriber = new SimpleSubscriber();
publisher.subscribe(subscriber);
SubmissionPublisher 사용
// Java 9에서 제공하는 Publisher 구현체
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// Subscriber 등록
Subscriber<String> subscriber = new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // 한 번에 1개씩 요청
}
@Override
public void onNext(String item) {
System.out.println("Processing: " + item);
try {
Thread.sleep(100); // 처리 시간 시뮬레이션
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
subscription.request(1); // 다음 아이템 요청
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("All done!");
}
};
publisher.subscribe(subscriber);
// 데이터 발행
CompletableFuture.runAsync(() -> {
for (int i = 0; i < 10; i++) {
publisher.submit("Item-" + i);
System.out.println("Submitted: Item-" + i);
}
publisher.close(); // 완료 신호
});
Thread.sleep(2000); // 처리 대기
Processor 예제: 데이터 변환
// Processor: 중간에서 데이터 변환
class TransformProcessor<T, R> extends SubmissionPublisher<R>
implements Flow.Processor<T, R> {
private Function<T, R> transformer;
private Subscription subscription;
public TransformProcessor(Function<T, R> transformer) {
this.transformer = transformer;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
R transformed = transformer.apply(item);
submit(transformed); // 다음 단계로 발행
subscription.request(1); // 다음 아이템 요청
}
@Override
public void onError(Throwable throwable) {
closeExceptionally(throwable);
}
@Override
public void onComplete() {
close();
}
}
// 사용: Publisher -> Processor -> Subscriber 체인
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
TransformProcessor<Integer, String> processor =
new TransformProcessor<>(i -> "Number: " + (i * 2));
Subscriber<String> subscriber = new SimpleStringSubscriber();
publisher.subscribe(processor);
processor.subscribe(subscriber);
for (int i = 0; i < 5; i++) {
publisher.submit(i);
}
publisher.close();
백프레셔 시연
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>(
Executors.newFixedThreadPool(1),
10 // 버퍼 크기
);
Subscriber<Integer> slowSubscriber = new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println("Processing: " + item);
try {
Thread.sleep(1000); // 느린 처리
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
subscription.request(1); // 다음 요청
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
};
publisher.subscribe(slowSubscriber);
// 빠르게 발행 시도
for (int i = 0; i < 20; i++) {
int lag = publisher.offer(
i,
1,
TimeUnit.SECONDS,
(subscriber, value) -> {
System.out.println("Dropped: " + value);
return false;
}
);
System.out.println("Lag: " + lag); // 백프레셔 확인
}
publisher.close();
Flow API vs Reactive Libraries
Flow API는 표준 인터페이스만 제공합니다. 실제 사용을 위해서는:
- Project Reactor: Spring WebFlux의 기반
- RxJava: Android 개발에서 인기
- Akka Streams: Actor 모델 기반
// Project Reactor 예제 (참고용)
Flux.range(1, 10)
.map(i -> i * 2)
.filter(i -> i % 3 == 0)
.subscribe(System.out::println);
10. Virtual Threads (Java 19 Preview, Java 21 LTS, 2023)
등장 배경
2020년대 초반, 클라우드 네이티브 애플리케이션과 마이크로서비스가 보편화되면서, Java는 심각한 도전에 직면했습니다. Node.js, Go, Kotlin의 코루틴 등 경량 동시성 모델을 가진 언어들이 인기를 얻는 반면, Java의 전통적인 스레드 모델은 확장성 한계에 부딪혔습니다.
문제 상황 1: 웹 서버의 C10K 문제
// 전통적인 Java 웹 서버 (Thread-per-Request 모델)
ServerSocket serverSocket = new ServerSocket(8080);
// 각 요청마다 새 스레드 생성
while (true) {
Socket clientSocket = serverSocket.accept();
// 문제: 각 스레드는 약 1-2MB 메모리 사용
new Thread(() -> {
try {
// 요청 처리 (평균 100ms)
handleRequest(clientSocket);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
// 시나리오: 동시 접속자 10,000명
// 필요 메모리: 10,000 스레드 × 2MB = 20GB!
// 결과: OutOfMemoryError 또는 심각한 성능 저하
C10K 문제란 10,000개의 동시 연결을 처리하는 문제입니다. Java의 전통적인 스레드는 OS 스레드와 1:1 매핑되어 너무 무겁습니다:
- 각 스레드: ~2MB 스택 메모리
- 생성 시간: ~1ms (비용이 큼)
- 컨텍스트 스위칭: ~1-10μs (오버헤드)
- 최대 스레드 수: OS 제한 (~수천 개)
문제 상황 2: 비동기 프로그래밍의 복잡성
// 해결책 1: 스레드 풀 사용
ExecutorService executor = Executors.newFixedThreadPool(200);
executor.submit(() -> {
// 각 요청을 200개 스레드로 처리
handleRequest(request);
});
// 문제: 200개 스레드로 10,000개 동시 요청?
// 큐에 9,800개 요청이 대기 (지연 발생)
// 해결책 2: 비동기 논블로킹 (CompletableFuture)
CompletableFuture.supplyAsync(() -> fetchUserFromDB(userId))
.thenCompose(user -> fetchOrdersFromDB(user.id()))
.thenCompose(orders -> fetchRecommendations(orders))
.thenApply(recommendations -> buildResponse(recommendations))
.thenAccept(response -> sendToClient(response))
.exceptionally(error -> handleError(error));
// 문제:
// 1. 콜백 지옥 (Callback Hell)
// 2. 디버깅 어려움 (스택 트레이스가 분절됨)
// 3. 예외 처리 복잡
// 4. 코드 가독성 급격히 저하
// 5. 개발자 러닝 커브가 가파름
문제 상황 3: 블로킹 API와의 불일치
// 기존의 수많은 블로킹 API들
Connection conn = dataSource.getConnection(); // 블로킹
ResultSet rs = stmt.executeQuery("SELECT ..."); // 블로킹
InputStream is = socket.getInputStream(); // 블로킹
String data = bufferedReader.readLine(); // 블로킹
// 비동기 프레임워크에서는 이런 코드를 쓸 수 없음!
// 전부 논블로킹 버전으로 다시 작성해야 함
// 예: JDBC → R2DBC (완전히 다른 API)
// 예: InputStream → Reactive Streams
// 결과: 기존 라이브러리 대부분 사용 불가
수십 년간 축적된 Java의 블로킹 라이브러리 생태계가 모두 쓸모없어지는 문제였습니다.
문제 상황 4: 다른 언어들의 성공
// Go언어의 고루틴 (2009년)
go handleRequest(request) // 경량 "스레드" 생성
// - 2KB 스택
// - 수백만 개 동시 실행 가능
// - 동기식 코드 작성 가능
// Node.js의 이벤트 루프 (2009년)
// - 단일 스레드로 수만 개 연결 처리
// - 하지만 콜백 지옥...
// Kotlin의 코루틴 (2016년)
launch {
val user = fetchUser() // 중단 가능
val orders = fetchOrders(user)
sendResponse(orders)
}
// - 경량
// - 동기식 코드 작성
// - JVM에서 실행
Java는 경쟁력을 잃어가고 있었습니다. 개발자들은 "Java는 현대적인 서버 애플리케이션에 적합하지 않다"고 말하기 시작했습니다.
근본 원인 분석:
문제의 핵심은 Java 스레드 = OS 스레드라는 1:1 매핑이었습니다:
[전통적인 Java 스레드 모델]
Java Thread 1 ←→ OS Thread 1 ←→ Kernel Thread 1
Java Thread 2 ←→ OS Thread 2 ←→ Kernel Thread 2
...
Java Thread N ←→ OS Thread N ←→ Kernel Thread N
문제:
- OS 스레드는 귀한 자원 (수천 개 제한)
- 큰 메모리 오버헤드 (스레드당 ~2MB)
- 컨텍스트 스위칭 비용
- OS 스케줄러에 의존 (Java가 제어 불가)
Project Loom의 해결책: Virtual Threads
2017년 시작된 Project Loom은 이 문제를 근본적으로 해결하기 위한 프로젝트였습니다. 핵심 아이디어:
"스레드를 다시 저렴하게 만들자"
가상 스레드(Virtual Thread)는:
- JVM이 관리하는 경량 스레드
- 수백만 개 생성 가능 (메모리가 허용하는 한)
- 동기식 코드 작성 가능 (기존 API 모두 사용 가능)
- OS 스레드를 효율적으로 공유
[Virtual Threads 모델]
Virtual Thread 1 ┐
Virtual Thread 2 ├→ Carrier Thread 1 ←→ OS Thread 1
Virtual Thread 3 ┘
Virtual Thread 4 ┐
Virtual Thread 5 ├→ Carrier Thread 2 ←→ OS Thread 2
Virtual Thread 6 ┘
...
Virtual Thread 1,000,000 → (대기 중, 메모리만 사용)
장점:
- 가상 스레드는 매우 가벼움 (~1KB)
- OS 스레드(Carrier)는 소수만 필요 (보통 CPU 코어 수)
- 블로킹 시 자동으로 언마운트 (OS 스레드 해제)
- 재개 시 자동으로 다시 마운트
핵심 개념
Virtual Threads는 단순히 "가벼운 스레드"가 아니라, Java 동시성 모델의 패러다임 전환입니다.
핵심 개념 1: 마운트(Mount)와 언마운트(Unmount)
// 가상 스레드의 생명주기
┌─────────────────────────────────────────────┐
│ Virtual Thread 실행 흐름 │
└─────────────────────────────────────────────┘
1. 생성: VirtualThread.start()
- 매우 저렴 (~1KB 메모리)
- 즉시 반환
2. 마운트: 실행 준비 완료
- Carrier Thread (OS 스레드)에 마운트
- 실제 CPU에서 실행 시작
3. 언마운트: 블로킹 작업 만남
- I/O 대기, sleep(), wait() 등
- Carrier Thread에서 자동 분리
- Carrier Thread는 다른 가상 스레드 실행
4. 재마운트: 블로킹 해제
- I/O 완료, 타이머 만료 등
- (가능한) Carrier Thread에 다시 마운트
- 실행 재개
구체적인 예시:
// 가상 스레드에서 실행되는 코드
void handleRequest() {
// CPU 작업: Carrier Thread 사용
String userId = parseRequest();
// I/O 블로킹: 언마운트!
User user = database.query("SELECT ..."); // 여기서 언마운트
// 이 시점에 Carrier Thread는 다른 가상 스레드를 실행
// I/O 완료: 재마운트!
// 실행 재개
// CPU 작업: Carrier Thread 사용
String response = buildResponse(user);
// I/O 블로킹: 언마운트!
socket.write(response); // 여기서 다시 언마운트
}
왜 이것이 혁명적인가:
// 10,000개 요청 처리
[전통적인 플랫폼 스레드]
10,000 OS 스레드 필요
메모리: 20GB
컨텍스트 스위칭: 많음
결과: 불가능 또는 매우 느림
[가상 스레드]
10,000 가상 스레드 생성
Carrier Thread: 8개 (CPU 코어 수)
메모리: ~10MB (가상 스레드) + 16MB (Carrier)
컨텍스트 스위칭: 최소 (JVM 내부)
결과: 빠르고 효율적!
핵심 개념 2: Continuation (연속)
가상 스레드의 마법은 Continuation으로 구현됩니다:
// 개념적으로 (실제 API는 아님)
class VirtualThread {
private Continuation continuation; // 실행 상태 저장
void run() {
while (!done) {
continuation.run(); // 실행
if (blocked) {
continuation.yield(); // 상태 저장 및 양보
// Carrier Thread 해제
}
}
}
}
Continuation이란:
- 실행 중인 코드의 "스냅샷"
- 스택 프레임, 로컬 변수, 실행 위치 등 저장
- 나중에 정확히 같은 지점에서 재개 가능
전통적인 스레드와의 차이:
[Platform Thread]
블로킹 시: OS에게 "나 좀 재워줘" → 컨텍스트 스위칭 (비용 큼)
재개 시: OS가 다시 깨움 → 컨텍스트 스위칭 (비용 큼)
[Virtual Thread]
블로킹 시: Continuation.yield() → Carrier Thread만 바꿈 (JVM 내부)
재개 시: Continuation.run() → 이전 상태에서 재개
핵심 개념 3: Carrier Thread Pool
// 가상 스레드는 Carrier Thread에서 실행됨
// Carrier Thread는 ForkJoinPool
ForkJoinPool carrierPool = ForkJoinPool.commonPool();
// 기본 크기: CPU 코어 수
// 가상 스레드 생성
Thread vt = Thread.ofVirtual().start(() -> {
// 이 코드는 Carrier Thread에 마운트되어 실행
doWork();
});
설계 원칙:
Carrier Thread 수 = CPU 코어 수
이유:
- CPU 집약적 작업은 여전히 코어 수에 제한됨
- I/O 바운드 작업은 언마운트되므로 Carrier 수 무관
- 과도한 Carrier는 오히려 컨텍스트 스위칭 증가
핵심 개념 4: Pinning (고정) 문제
가상 스레드의 유일한 제약사항:
// 문제 상황: synchronized 블록 내에서 블로킹
synchronized (lock) {
Thread.sleep(1000); // 여기서 언마운트 불가!
// Carrier Thread가 고정됨 (pinning)
}
// 왜 문제인가:
// - 1,000개 가상 스레드가 모두 이렇게 하면?
// - 1,000개 Carrier Thread 필요 (가상 스레드의 장점 상실)
Pinning이 발생하는 경우:
- synchronized 블록/메서드 내에서 블로킹
- Native 메서드 실행 중
해결책:
// 나쁜 예
synchronized (lock) {
Thread.sleep(1000); // Pinning!
}
// 좋은 예: ReentrantLock 사용
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
Thread.sleep(1000); // 언마운트 가능!
} finally {
lock.unlock();
}
핵심 개념 5: 구조적 동시성과의 시너지
가상 스레드는 Structured Concurrency와 함께 사용할 때 진가를 발휘합니다:
// 수백만 개의 가상 스레드를 쉽게 관리
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 10,000개 데이터베이스 쿼리를 병렬로
for (int i = 0; i < 10_000; i++) {
scope.fork(() -> queryDatabase(i)); // 각각 가상 스레드
}
scope.join(); // 모두 완료 대기
} // scope 종료 시 모든 가상 스레드 자동 정리
왜 이것이 가능한가:
- 가상 스레드는 저렴하므로 수만 개 생성 가능
- Structured Concurrency가 생명주기를 자동 관리
- 메모리 누수 걱정 없음
전체적인 설계 의의:
Virtual Threads가 가져온 패러다임 전환:
Before (전통적인 방식):
스레드는 귀한 자원
→ 스레드 풀로 재사용
→ 비동기 프로그래밍 필요
→ 콜백 지옥, 복잡한 코드
After (Virtual Threads):
스레드는 저렴한 자원
→ 요청당 스레드 사용 가능
→ 동기식 프로그래밍 가능
→ 간단하고 읽기 쉬운 코드
성능 특성:
시나리오: 10,000개 동시 HTTP 요청 처리 (각 100ms 소요)
Platform Threads (풀 200개):
- 처리 시간: ~5초 (대기 큐로 인한 지연)
- 메모리: ~400MB
- CPU 활용: 낮음 (대부분 I/O 대기)
Virtual Threads:
- 처리 시간: ~100ms (모두 즉시 처리)
- 메모리: ~10MB (가상) + 16MB (Carrier)
- CPU 활용: 높음 (I/O 대기 시 다른 작업)
개선: 50배 빠름, 15배 적은 메모리
Java의 미래:
Virtual Threads는 Java를 다시 서버 프로그래밍의 최고 선택지로 만들었습니다:
- Go의 간결함 (고루틴)
- Node.js의 성능 (고성능 I/O)
- Java의 안정성 (타입 안전성, 생태계)
역사적 의의:
이것은 Java 역사상 가장 큰 변화 중 하나입니다:
- Java 5의 Generics/Concurrency 이후 최대 혁신
- 30년 동안 유지된 스레드 모델의 근본적 변경
- "Java는 느리다"는 편견을 깨뜨림
주요 특징
- 매우 저렴한 생성 비용: 1개당 ~1KB (플랫폼 스레드는 ~2MB)
- 빠른 컨텍스트 스위칭: OS 개입 없이 JVM 내에서 처리
- 블로킹 작업에 최적: I/O 대기 중 다른 작업 실행
- 기존 API와 호환: Thread API 그대로 사용
가상 스레드 생성 방법
// 방법 1: Thread.ofVirtual() 사용
Thread virtualThread = Thread.ofVirtual().start(() -> {
// 이 코드는 가상 스레드에서 실행됨
// 블로킹 호출 시 자동으로 Carrier Thread에서 언마운트
System.out.println("Virtual thread: " + Thread.currentThread());
// 출력 예: Virtual thread: VirtualThread[#21]/runnable@ForkJoinPool-1-worker-1
});
// 가상 스레드가 완료될 때까지 대기
virtualThread.join();
// 방법 2: Thread.startVirtualThread() - 더 간결한 방식
Thread.startVirtualThread(() -> {
System.out.println("Running in virtual thread");
// start()와 동시에 생성 및 시작
});
// 방법 3: Thread.ofVirtual()로 빌더 패턴 사용
Thread.Builder builder = Thread.ofVirtual()
.name("my-virtual-thread-", 0) // 이름 패턴 및 시작 번호
.uncaughtExceptionHandler((t, e) -> {
System.err.println("Thread " + t + " threw exception: " + e);
});
// 빌더로 여러 스레드 생성
Thread vt1 = builder.start(() -> doWork1());
Thread vt2 = builder.start(() -> doWork2());
// 방법 4: Executors.newVirtualThreadPerTaskExecutor() - 가장 실용적
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
// ExecutorService를 사용한 표준 방식
// try-with-resources로 자동 종료 관리
for (int i = 0; i < 10_000; i++) {
final int taskId = i;
// 각 작업마다 새로운 가상 스레드 생성
// 플랫폼 스레드였다면 불가능한 규모!
executor.submit(() -> {
try {
// 블로킹 작업 시뮬레이션
Thread.sleep(Duration.ofSeconds(1));
System.out.println("Task " + taskId + " completed");
return "Result " + taskId;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
});
}
// try 블록 종료 시:
// 1. executor.shutdown() 자동 호출
// 2. 모든 작업 완료 대기
// 3. 모든 가상 스레드 정리
} // 여기서 자동으로 모든 작업 완료 대기
// 방법 5: 커스텀 ThreadFactory
ThreadFactory factory = Thread.ofVirtual()
.name("custom-vthread-", 0)
.factory();
// 팩토리로 스레드 생성
Thread vt = factory.newThread(() -> {
System.out.println("Custom virtual thread: " +
Thread.currentThread().getName());
});
vt.start();
대규모 동시성 예제
// 100만 개의 가상 스레드를 몇 초 안에 생성 및 실행!
// 플랫폼 스레드로는 불가능한 규모
System.out.println("시작...");
long startTime = System.currentTimeMillis();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// 1,000,000개의 가상 스레드 제출
IntStream.range(0, 1_000_000).forEach(i -> {
executor.submit(() -> {
try {
// 각 가상 스레드가 1초 동안 "sleep"
// sleep 중에는 Carrier Thread에서 언마운트되므로
// 실제로는 소수의 Carrier Thread만 사용됨
Thread.sleep(Duration.ofSeconds(1));
// I/O 작업 시뮬레이션
// 실제로는 DB 쿼리, HTTP 요청 등
return i;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return -1;
}
});
});
// 모든 작업 제출 완료
System.out.println("모든 작업 제출 완료: " +
(System.currentTimeMillis() - startTime) + "ms");
} // try-with-resources가 모든 작업 완료를 기다림
long endTime = System.currentTimeMillis();
System.out.println("모든 작업 완료: " + (endTime - startTime) + "ms");
// 결과 예상:
// 모든 작업 제출 완료: ~100ms (100만 개 가상 스레드 생성)
// 모든 작업 완료: ~1000ms (모두 병렬 실행)
//
// 플랫폼 스레드로 시도하면:
// - OutOfMemoryError 또는
// - 수 분 이상 소요 (순차 처리)
실전 예제: HTTP 서버
/**
* 가상 스레드를 사용한 고성능 HTTP 서버
* 수만 개의 동시 연결을 처리 가능
*/
public class VirtualThreadHttpServer {
public static void main(String[] args) throws IOException {
// 서버 소켓 생성
ServerSocket serverSocket = new ServerSocket(8080);
System.out.println("서버 시작: http://localhost:8080");
// 각 연결마다 가상 스레드 생성
// 이전에는 스레드 풀로 제한했지만, 이제는 무제한!
while (true) {
// 클라이언트 연결 대기 (블로킹)
Socket clientSocket = serverSocket.accept();
// 새로운 가상 스레드에서 요청 처리
// 비용이 저렴하므로 매번 새로 생성
Thread.startVirtualThread(() ->
handleRequest(clientSocket)
);
// 메인 루프는 즉시 다음 연결 대기
// 요청 처리는 백그라운드 가상 스레드에서
}
}
private static void handleRequest(Socket socket) {
try (socket; // try-with-resources로 자동 close
BufferedReader in = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(
socket.getOutputStream(), true)) {
// HTTP 요청 읽기 (블로킹 I/O)
// 여기서 가상 스레드가 언마운트됨
String requestLine = in.readLine();
System.out.println("요청: " + requestLine);
// 헤더 건너뛰기
String line;
while ((line = in.readLine()) != null && !line.isEmpty()) {
// 헤더 처리
}
// 비즈니스 로직 수행
// 예: 데이터베이스 조회 (블로킹)
String result = performBusinessLogic();
// DB 쿼리 중에도 언마운트되어 Carrier Thread 해제
// HTTP 응답 전송 (블로킹 I/O)
out.println("HTTP/1.1 200 OK");
out.println("Content-Type: text/plain");
out.println("Content-Length: " + result.length());
out.println(); // 빈 줄
out.println(result);
// 연결 종료 (try-with-resources가 처리)
} catch (IOException e) {
System.err.println("요청 처리 오류: " + e.getMessage());
}
}
private static String performBusinessLogic() {
try {
// 데이터베이스 쿼리 시뮬레이션
Thread.sleep(Duration.ofMillis(100));
// 외부 API 호출 시뮬레이션
Thread.sleep(Duration.ofMillis(50));
return "처리 완료";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "오류";
}
}
}
// 성능 특성:
// - 동시 연결 10,000개 처리 가능
// - 메모리 사용량: ~10MB (가상 스레드) + OS 스레드
// - 응답 시간: 일정 (큐 대기 없음)
//
// 플랫폼 스레드 풀 (200개)과 비교:
// - 동시 처리: 200개만
// - 나머지 9,800개는 큐에서 대기
// - 평균 응답 시간: 크게 증가
가상 스레드 vs 플랫폼 스레드 성능 비교
public class VirtualThreadBenchmark {
public static void main(String[] args) throws Exception {
int tasks = 100_000;
System.out.println("=== 10만 개 작업 처리 (각 100ms I/O) ===\n");
// 1. 플랫폼 스레드 풀 (200개)
System.out.println("1. 플랫폼 스레드 풀 (200개):");
long platformTime = measurePlatformThreads(tasks);
System.out.println(" 소요 시간: " + platformTime + "ms");
System.out.println(" 예상 처리: 순차적 (큐 대기)\n");
// 2. 가상 스레드
System.out.println("2. 가상 스레드:");
long virtualTime = measureVirtualThreads(tasks);
System.out.println(" 소요 시간: " + virtualTime + "ms");
System.out.println(" 예상 처리: 모두 병렬\n");
// 3. 성능 비교
System.out.println("=== 결과 ===");
System.out.println("속도 향상: " + (platformTime / virtualTime) + "배");
System.out.println("가상 스레드가 " +
String.format("%.1f%%", (1.0 - (double)virtualTime/platformTime) * 100)
+ " 더 빠름");
}
static long measurePlatformThreads(int tasks) throws Exception {
long start = System.currentTimeMillis();
// 200개 플랫폼 스레드 풀
try (var executor = Executors.newFixedThreadPool(200)) {
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < tasks; i++) {
// 각 작업을 제출
// 200개 스레드만 사용 가능하므로 나머지는 큐에서 대기
Future<?> future = executor.submit(() -> {
try {
// I/O 작업 시뮬레이션 (블로킹)
Thread.sleep(Duration.ofMillis(100));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null;
});
futures.add(future);
}
// 모든 작업 완료 대기
for (Future<?> future : futures) {
future.get();
}
}
return System.currentTimeMillis() - start;
// 예상: ~50,000ms (10만 작업 / 200 스레드 × 100ms)
}
static long measureVirtualThreads(int tasks) throws Exception {
long start = System.currentTimeMillis();
// 가상 스레드 executor
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < tasks; i++) {
// 각 작업마다 새로운 가상 스레드 생성
// 10만 개 모두 거의 동시에 실행 가능!
Future<?> future = executor.submit(() -> {
try {
// I/O 작업 시뮬레이션
// sleep 중에는 Carrier Thread에서 언마운트
Thread.sleep(Duration.ofMillis(100));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null;
});
futures.add(future);
}
// 모든 작업 완료 대기
for (Future<?> future : futures) {
future.get();
}
}
return System.currentTimeMillis() - start;
// 예상: ~100ms (모두 병렬 실행)
}
}
// 실행 결과 예시:
// === 10만 개 작업 처리 (각 100ms I/O) ===
//
// 1. 플랫폼 스레드 풀 (200개):
// 소요 시간: 50234ms
// 예상 처리: 순차적 (큐 대기)
//
// 2. 가상 스레드:
// 소요 시간: 156ms
// 예상 처리: 모두 병렬
//
// === 결과 ===
// 속도 향상: 322배
// 가상 스레드가 99.7% 더 빠름
주의사항 및 모범 사례
1. Pinning 문제
// 나쁜 예: synchronized 블록에서 블로킹
synchronized (lock) {
// I/O 작업을 synchronized 안에서 하면
// 가상 스레드가 캐리어 스레드에 고정(pin)됨
Thread.sleep(Duration.ofSeconds(1));
}
// 좋은 예: ReentrantLock 사용
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
Thread.sleep(Duration.ofSeconds(1)); // Pinning 없음
} finally {
lock.unlock();
}
2. ThreadLocal 주의
// 가상 스레드에서 ThreadLocal은 메모리 문제 유발 가능
// 수백만 개 가상 스레드 × ThreadLocal 데이터 = 큰 메모리 사용
// 대안: Scoped Values (Java 20+)
3. CPU 집약적 작업에는 부적합
// 가상 스레드는 I/O 바운드 작업에 최적
// CPU 집약적 작업은 플랫폼 스레드나 ForkJoinPool 사용
// 나쁜 예 (가상 스레드로 CPU 작업)
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> {
// CPU 집약적 계산
for (long i = 0; i < 1_000_000_000; i++) {
Math.sqrt(i);
}
});
}
// 좋은 예 (플랫폼 스레드 또는 ForkJoinPool)
try (var executor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors())) {
// CPU 작업 처리
}
4. 언제 사용해야 하나?
가상 스레드 사용:
- 웹 서버, API 서버
- DB 연결, 파일 I/O
- 네트워크 통신
- 많은 동시 연결 처리
플랫폼 스레드/ForkJoinPool 사용:
- CPU 집약적 계산
- 짧고 빠른 작업
- 분할 정복 알고리즘
11. Structured Concurrency (Java 19 Preview, 진화 중)
등장 배경
가상 스레드로 수많은 스레드를 쉽게 생성할 수 있게 되었지만, 이들의 생명주기 관리가 어려웠습니다. 부모 작업이 끝나도 자식 스레드가 계속 실행되거나, 예외 처리가 복잡한 문제가 있었습니다.
핵심 개념
Structured Concurrency: 동시성 작업을 구조화하여, 작업들이 명확한 생명주기를 가지도록 합니다. 부모 스코프가 끝나면 모든 자식 작업도 종료됩니다.
StructuredTaskScope 사용 (Java 21+)
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.*;
// 모든 하위 작업 완료 대기
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<String> user = scope.fork(() -> fetchUser());
Subtask<String> order = scope.fork(() -> fetchOrder());
Subtask<String> inventory = scope.fork(() -> fetchInventory());
scope.join(); // 모든 작업 완료 대기
scope.throwIfFailed(); // 실패 시 예외 발생
// 모든 작업 성공 시 결과 사용
String response = combineResults(
user.get(),
order.get(),
inventory.get()
);
} // scope 종료 시 모든 스레드 자동 종료
ShutdownOnSuccess: 첫 성공 시 나머지 취소
// 여러 서버 중 가장 빠른 응답 사용
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
scope.fork(() -> callServer("server1.com"));
scope.fork(() -> callServer("server2.com"));
scope.fork(() -> callServer("server3.com"));
scope.join();
String fastestResponse = scope.result(); // 첫 성공 결과
System.out.println("Fastest: " + fastestResponse);
} // 나머지 작업 자동 취소
전통적 방식 vs Structured Concurrency
// ❌ 전통적 방식: 복잡한 생명주기 관리
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
try {
Future<String> future1 = executor.submit(() -> task1());
Future<String> future2 = executor.submit(() -> task2());
try {
String result1 = future1.get();
String result2 = future2.get();
// 사용
} catch (Exception e) {
future1.cancel(true);
future2.cancel(true);
throw e;
}
} finally {
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
// ✅ Structured Concurrency: 간결하고 안전
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> task1());
var task2 = scope.fork(() -> task2());
scope.join().throwIfFailed();
String result = combine(task1.get(), task2.get());
} // 자동 정리
실전 예제: 병렬 API 조합
record UserData(String name, List<String> orders, List<String> recommendations) {}
UserData fetchUserData(String userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var userName = scope.fork(() -> {
Thread.sleep(Duration.ofMillis(100));
return "User-" + userId;
});
var orders = scope.fork(() -> {
Thread.sleep(Duration.ofMillis(150));
return List.of("Order1", "Order2");
});
var recommendations = scope.fork(() -> {
Thread.sleep(Duration.ofMillis(120));
return List.of("Rec1", "Rec2", "Rec3");
});
scope.join().throwIfFailed();
return new UserData(
userName.get(),
orders.get(),
recommendations.get()
);
}
}
커스텀 TaskScope
// 커스텀 정책: 2개 이상 성공하면 종료
class SucceedAfterTwo<T> extends StructuredTaskScope<T> {
private final AtomicInteger successCount = new AtomicInteger(0);
private volatile T firstResult;
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
if (subtask.state() == Subtask.State.SUCCESS) {
if (successCount.incrementAndGet() == 1) {
firstResult = subtask.get();
}
if (successCount.get() >= 2) {
shutdown(); // 2개 성공 시 나머지 취소
}
}
}
public T result() {
return firstResult;
}
}
12. 정리 및 비교
진화 과정 요약
| Java 1.0 (1996) | Runnable, Thread | 기본 스레드 작업 | 학습 목적, 레거시 |
| Java 5 (2004) | Executor, ExecutorService | 스레드 풀, 작업 제출 분리 | 일반적인 비동기 작업 |
| Java 5 (2004) | Executors | 다양한 스레드 풀 팩토리 | 표준 스레드 풀 생성 |
| Java 5 (2004) | ThreadPoolExecutor | 세밀한 스레드 풀 제어 | 커스텀 스레드 풀 |
| Java 5 (2004) | Future, Callable | 비동기 결과 반환 | 결과가 필요한 비동기 작업 |
| Java 7 (2011) | ForkJoinPool | Work-Stealing, 분할 정복 | CPU 집약적 병렬 작업 |
| Java 8 (2014) | CompletableFuture | 비동기 파이프라인, 콜백 | 복잡한 비동기 워크플로우 |
| Java 8 (2014) | Parallel Streams | 선언적 병렬 처리 | 컬렉션 병렬 처리 |
| Java 9 (2017) | Flow API | Reactive Streams, 백프레셔 | 스트리밍 데이터 처리 |
| Java 21 (2023) | Virtual Threads | 경량 스레드, 대규모 동시성 | I/O 바운드 서버 애플리케이션 |
| Java 21 (2023) | Structured Concurrency | 구조화된 동시성 | 가상 스레드 생명주기 관리 |
시기 기술 주요 특징 주요 사용 사례
13. 현대 Java 애플리케이션 권장사항 (2024+)
작업 유형별 선택 가이드
1. I/O 바운드 작업 (웹 서버, API, DB 통신)
// ✅ 최선: Virtual Threads (Java 21+)
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (Request request : requests) {
executor.submit(() -> handleRequest(request));
}
}
// ✅ 차선: CompletableFuture (Java 8+)
CompletableFuture.supplyAsync(() -> fetchFromDB())
.thenApply(data -> process(data))
.thenAccept(result -> sendResponse(result));
이유: 가상 스레드는 블로킹 I/O에서도 효율적으로 작동하며, 코드가 간단합니다.
2. CPU 집약적 작업 (계산, 데이터 처리)
// ✅ 최선: ForkJoinPool (재귀적 분할)
ForkJoinPool.commonPool().invoke(new ComputeTask(data));
// ✅ 차선: 고정 크기 스레드 풀
ExecutorService executor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
// ✅ 컬렉션 처리: Parallel Stream
List<Result> results = largeList.parallelStream()
.map(this::compute)
.collect(Collectors.toList());
이유: CPU 작업은 코어 수만큼만 병렬화하는 것이 최적입니다.
3. 복잡한 비동기 워크플로우
// ✅ CompletableFuture 조합
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> fetchUserData())
.thenCompose(user -> fetchUserOrders(user.id()))
.thenCombine(
CompletableFuture.supplyAsync(() -> fetchRecommendations()),
(orders, recommendations) -> combine(orders, recommendations)
)
.exceptionally(ex -> defaultValue())
.orTimeout(5, TimeUnit.SECONDS);
이유: 비동기 체인, 조합, 예외 처리가 우아하게 표현됩니다.
4. 대규모 동시 연결 (서버 애플리케이션)
// ✅ Virtual Threads + Structured Concurrency
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
for (Connection conn : connections) {
scope.fork(() -> handleConnection(conn));
}
scope.join().throwIfFailed();
}
이유: 수만 개의 동시 연결을 메모리 효율적으로 처리합니다.
5. 실시간 스트리밍 데이터
// ✅ Flow API (또는 Reactor, RxJava)
SubmissionPublisher<Event> publisher = new SubmissionPublisher<>();
publisher.subscribe(new Subscriber<Event>() {
public void onNext(Event event) {
processEvent(event);
subscription.request(1); // 백프레셔
}
// ...
});
이유: 백프레셔로 메모리 오버플로우를 방지합니다.
시나리오별 실전 예제
시나리오 1: 마이크로서비스 API 게이트웨이
// Java 21 Virtual Threads 사용
public class APIGateway {
public Response handleRequest(Request request) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var authTask = scope.fork(() ->
authenticateUser(request.token()));
var dataTask = scope.fork(() ->
fetchData(request.params()));
var cacheTask = scope.fork(() ->
checkCache(request.key()));
scope.join().throwIfFailed();
return buildResponse(
authTask.get(),
dataTask.get(),
cacheTask.get()
);
}
}
}
시나리오 2: 대용량 데이터 병렬 처리
// ForkJoinPool 사용
public class DataProcessor {
public void processLargeDataset(List<Data> dataset) {
ForkJoinPool.commonPool().invoke(
new ProcessTask(dataset, 0, dataset.size())
);
}
class ProcessTask extends RecursiveAction {
private static final int THRESHOLD = 1000;
private List<Data> data;
private int start, end;
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
for (int i = start; i < end; i++) {
processDataItem(data.get(i));
}
} else {
int mid = (start + end) / 2;
invokeAll(
new ProcessTask(data, start, mid),
new ProcessTask(data, mid, end)
);
}
}
}
}
시나리오 3: 비동기 이벤트 처리 파이프라인
// CompletableFuture 체이닝
public class EventPipeline {
public CompletableFuture<Void> processEvent(Event event) {
return CompletableFuture
.supplyAsync(() -> validate(event))
.thenApplyAsync(validated -> enrich(validated))
.thenApplyAsync(enriched -> transform(enriched))
.thenComposeAsync(transformed ->
persistToDatabase(transformed))
.thenAcceptAsync(saved ->
publishToKafka(saved))
.exceptionally(ex -> {
logger.error("Pipeline failed", ex);
handleError(ex);
return null;
})
.orTimeout(30, TimeUnit.SECONDS);
}
}
성능 비교 및 벤치마크
public class PerformanceComparison {
public static void main(String[] args) throws Exception {
int tasks = 10_000;
System.out.println("=== I/O Bound Tasks ===");
benchmarkIOBound(tasks);
System.out.println("\n=== CPU Bound Tasks ===");
benchmarkCPUBound(tasks);
}
static void benchmarkIOBound(int tasks) throws Exception {
// 1. 플랫폼 스레드 풀
long time1 = measure(() -> {
try (var executor = Executors.newFixedThreadPool(200)) {
for (int i = 0; i < tasks; i++) {
executor.submit(() -> simulateIO());
}
}
});
System.out.println("Platform Threads (pool): " + time1 + "ms");
// 2. Virtual Threads
long time2 = measure(() -> {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < tasks; i++) {
executor.submit(() -> simulateIO());
}
}
});
System.out.println("Virtual Threads: " + time2 + "ms");
System.out.println("Speedup: " + (double) time1 / time2 + "x");
}
static void benchmarkCPUBound(int tasks) throws Exception {
// 1. ForkJoinPool
long time1 = measure(() -> {
ForkJoinPool.commonPool().invoke(
new CPUTask(0, tasks)
);
});
System.out.println("ForkJoinPool: " + time1 + "ms");
// 2. Parallel Stream
long time2 = measure(() -> {
IntStream.range(0, tasks)
.parallel()
.forEach(i -> simulateCPU());
});
System.out.println("Parallel Stream: " + time2 + "ms");
}
static void simulateIO() {
try {
Thread.sleep(Duration.ofMillis(10));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
static void simulateCPU() {
double result = 0;
for (int i = 0; i < 1_000_000; i++) {
result += Math.sqrt(i);
}
}
static long measure(Runnable task) throws Exception {
long start = System.currentTimeMillis();
task.run();
return System.currentTimeMillis() - start;
}
}
예상 결과:
=== I/O Bound Tasks ===
Platform Threads (pool): 500ms
Virtual Threads: 15ms
Speedup: 33.3x
=== CPU Bound Tasks ===
ForkJoinPool: 250ms
Parallel Stream: 255ms
안티패턴과 주의사항
❌ 안티패턴 1: Virtual Thread에서 synchronized 블록 내 블로킹
// 나쁜 예
synchronized (lock) {
Thread.sleep(Duration.ofSeconds(1)); // Pinning 발생!
}
// 좋은 예
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
Thread.sleep(Duration.ofSeconds(1)); // OK
} finally {
lock.unlock();
}
❌ 안티패턴 2: CPU 작업을 Virtual Thread로 처리
// 나쁜 예
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> {
// 캐리어 스레드를 오래 점유
for (long i = 0; i < 1_000_000_000; i++) {
Math.sqrt(i);
}
});
}
// 좋은 예
ForkJoinPool.commonPool().submit(() -> {
// CPU 작업
});
❌ 안티패턴 3: Future.get() 체이닝
// 나쁜 예 (블로킹 체인)
String result1 = future1.get();
String result2 = future2.get();
String result3 = future3.get();
// 좋은 예 (CompletableFuture 조합)
CompletableFuture.allOf(future1, future2, future3)
.thenApply(v -> combine(
future1.join(),
future2.join(),
future3.join()
));
❌ 안티패턴 4: ExecutorService 종료 누락
// 나쁜 예
ExecutorService executor = Executors.newFixedThreadPool(10);
executor.submit(() -> task());
// shutdown() 호출 안 함 → 프로그램이 종료되지 않음!
// 좋은 예
try (ExecutorService executor = Executors.newFixedThreadPool(10)) {
executor.submit(() -> task());
} // 자동 shutdown
마이그레이션 가이드
Java 8 → Java 21
1단계: CompletableFuture 활용
// Before
ExecutorService executor = Executors.newFixedThreadPool(10);
Future<String> future = executor.submit(() -> slowTask());
String result = future.get(); // 블로킹
// After
CompletableFuture<String> cf = CompletableFuture
.supplyAsync(() -> slowTask())
.thenApply(result -> process(result));
2단계: Virtual Threads 도입 (Java 21+)
// Before
ExecutorService executor = Executors.newCachedThreadPool();
// After
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
3단계: Structured Concurrency 적용
// Before
List<CompletableFuture<String>> futures = new ArrayList<>();
futures.add(CompletableFuture.supplyAsync(() -> task1()));
futures.add(CompletableFuture.supplyAsync(() -> task2()));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// After
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var t1 = scope.fork(() -> task1());
var t2 = scope.fork(() -> task2());
scope.join().throwIfFailed();
// 사용
}
최신 프레임워크 통합
Spring Boot 3.2+ (Virtual Threads)
// application.properties
spring.threads.virtual.enabled=true
// 자동으로 Virtual Threads 사용
@RestController
public class MyController {
@GetMapping("/users/{id}")
public User getUser(@PathVariable Long id) {
// 자동으로 Virtual Thread에서 실행
return userService.findById(id);
}
}
Quarkus (Reactive)
@Path("/users")
public class UserResource {
@GET
@Path("/{id}")
public Uni<User> getUser(@PathParam Long id) {
return userService.findById(id); // 논블로킹
}
}
마치며
Java의 멀티스레딩 API는 지난 28년 동안 혁명적으로 발전했습니다:
1세대 (1996-2004): Runnable, Thread
→ 기본적인 동시성
2세대 (2004-2011): Executor, ExecutorService, ThreadPoolExecutor, Future
→ 스레드 풀과 작업 관리
3세대 (2011-2014): ForkJoinPool, CompletableFuture
→ 병렬 처리와 비동기 프로그래밍
4세대 (2017-현재): Flow API, Virtual Threads, Structured Concurrency
→ 리액티브 스트림과 대규모 동시성
2024년 현재 권장사항
// I/O 바운드: Virtual Threads (Java 21+)
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> handleRequest());
}
// CPU 바운드: ForkJoinPool 또는 Parallel Stream
largeList.parallelStream()
.map(this::compute)
.collect(Collectors.toList());
// 복잡한 비동기: CompletableFuture
CompletableFuture.supplyAsync(() -> fetchData())
.thenCompose(data -> processData(data))
.thenAccept(result -> saveResult(result));
// 구조화된 동시성: StructuredTaskScope (Java 21+)
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> operation1());
var task2 = scope.fork(() -> operation2());
scope.join().throwIfFailed();
}
핵심 사항
- Java 21+: Virtual Threads를 I/O 작업의 기본으로 사용
- CPU 작업: ForkJoinPool이나 적절한 크기의 스레드 풀
- 비동기 체인: CompletableFuture로 우아하게 표현
- 생명주기 관리: Structured Concurrency로 안전하게 관리
- 항상 종료: ExecutorService는 반드시 shutdown
- 적절한 도구 선택: 작업 특성에 맞는 API 사용
참고 자료
- JEP 444: Virtual Threads
- JEP 453: Structured Concurrency
- Java Concurrency in Practice - Brian Goetz
- Project Loom
- Reactive Streams Specification
'Develop(개발)' 카테고리의 다른 글
| [AWS] 프리티어 VPC 과금 문제 해결법 (RDS Public IP 없이 MySQL 워크벤치 연결) (0) | 2024.07.18 |
|---|---|
| 스프링 동시성 문제 해결법의 종류와 장단점 (5) | 2024.03.17 |
| 스프링이 어떻게 여러 요청을 동시에 처리할 수 있을까? (0) | 2024.03.10 |
| Github Action에서 간단한 테스트 자동화 하기! (4) | 2024.02.25 |
| vscode 터미널에서 실행하는 환경변수 직접 설정하는 방법 (1) | 2023.10.03 |