운영체제

스레드 풀과 Executor 프레임워크 - 2

으엉어엉 2024. 12. 21. 15:46
728x90

ExecutorService 우아한 종료

서비스를 안정적으로 종료하는 것도 매우 중요하다. 이렇게 문제 없이 우아하게 종료하는 방식을 graceful shutdown 이라 한다.

 

서비스 종료

void shutdown()

  • 새로운 작업을 받지 않고, 이미 제출된 작업을 모두 완료한 후에 종료한다.
  • 논 블로킹 메서드(이 메서드를 호출한 스레드는 대기하지 않고 즉시 다음 코드를 호출한다.)

 

List shutdownNow()

  • 실행 중인 작업을 중단하고, 대기 중인 작업을 반환하며 즉시 종료한다.
  • 실행 중인 작업을 중단하기 위해 인터럽트를 발생시킨다.
  • 논 블로킹 메서드

서비스 상태 확인

boolean isShutdown(): 서비스가 종료되었는지 확인한다.

boolean isTerminated(): shutdown(), shutdownNow() 호출 후, 모든 작업이 완료되었는지 확인한다.

boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException: 서비스 종료시 모든 작업이 완료될 때까지 대기한다. 이때 지정된 시간까지만 대기한다. 블록킹 매서드

 

close() 는 자바 19부터 지원하는 서비즈 종료 메서드이다. 이 메서드는 shutdown()와 같다고 생각하면 된다.

 

 

 

shutdown() 을 호출해서 이미 들어온 모든 작업을 다 처리하고 서비스를 우아하게 종료(graceful shutdown)하는 것이 가장 이상적이지만, 갑자기 요청이 너무 많이 들어와서 큐에 대기중인 작업이 너무 많아 작업 완료 어렵거나, 작업 이 너무 오래 걸리거나, 또는 버그가 발생해서 특정 작업이 끝나지 않을 수 있다. 이렇게 되면 서비스가 너무 늦게 종료 되거나, 종료되지 않는 문제가 발생할 수 있다.

public class ExecutorShutdownMain {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(2);
        
        es.execute(new RunnableTask("taskA"));
        es.execute(new RunnableTask("taskB"));
        es.execute(new RunnableTask("taskC"));
       
        es.execute(new RunnableTask("longTask", 100_000)); // 100초 대기
        printState(es);
        log("== shutdown 시작 ==");
        
        shutdownAndAwaitTermination(es);
        log("== shutdown 완료 ==");
        printState(es);
    }
    static void shutdownAndAwaitTermination(ExecutorService es) {
        es.shutdown(); // non-blocking, 새로운 작업을 받지 않는다. 처리 중이거나, 큐에 이미대기중인 작업은 처리한다. 이후에 풀의 스레드를 종료한다.
        try {
            // 이미 대기중인 작업들을 모두 완료할 때 까지 10초 기다린다.
            log("서비스 정상 종료 시도");
            if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
                // 정상 종료가 너무 오래 걸리면...
                log("서비스 정상 종료 실패 -> 강제 종료 시도");
                es.shutdownNow();
                // 작업이 취소될 때 까지 대기한다.
                if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
                    log("서비스가 종료되지 않았습니다.");
                }
            }
        }
        catch (InterruptedException ex) {
            // awaitTermination()으로 대기중인 현재 스레드가 인터럽트 될 수 있다.
            es.shutdownNow();
        }
    }
}
public class PoolSizeMainV1 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ExecutorService es = new ThreadPoolExecutor(2, 4, 3000, TimeUnit.MILLISECONDS, workQueue);
        printState(es);
        es.execute(new RunnableTask("task1"));
        printState(es, "task1");
        es.execute(new RunnableTask("task2"));
        printState(es, "task2");
        es.execute(new RunnableTask("task3"));
        printState(es, "task3");
        es.execute(new RunnableTask("task4"));
        printState(es, "task4");
        es.execute(new RunnableTask("task5"));
        printState(es, "task5");
        es.execute(new RunnableTask("task6"));
        printState(es, "task6");
        try {
            es.execute(new RunnableTask("task7"));
        }
        catch (RejectedExecutionException e) {
            log("task7 실행 거절 예외 발생: " + e);
        }
        sleep(3000);
        log("== 작업 수행 완료 ==");
        printState(es);
        sleep(3000);
        log("== maximumPoolSize 대기 시간 초과 ==");
        printState(es);
        es.close();
        log("== shutdown 완료 ==");
        printState(es);
    }
}

15:10:21.482 [     main] [pool=0, active=0, queuedTasks=0, completedTasks=0]
15:10:21.487 [pool-1-thread-1] task1 시작
15:10:21.496 [     main] task1 -> [pool=1, active=1, queuedTasks=0, completedTasks=0]
15:10:21.496 [     main] task2 -> [pool=2, active=2, queuedTasks=0, completedTasks=0]
15:10:21.496 [     main] task3 -> [pool=2, active=1, queuedTasks=1, completedTasks=0]
15:10:21.496 [pool-1-thread-2] task2 시작
15:10:21.497 [     main] task4 -> [pool=2, active=2, queuedTasks=2, completedTasks=0]
15:10:21.497 [     main] task5 -> [pool=3, active=3, queuedTasks=2, completedTasks=0]
15:10:21.497 [pool-1-thread-3] task5 시작
15:10:21.497 [     main] task6 -> [pool=4, active=4, queuedTasks=2, completedTasks=0]
15:10:21.498 [pool-1-thread-4] task6 시작
15:10:21.498 [     main] task7 실행 거절 예외 발생: java.util.concurrent.RejectedExecutionException: Task thread.executor.RunnableTask rejected from java.util.concurrent.ThreadPoolExecutor[Running, pool size = 4, active threads = 4, queued tasks = 2, completed tasks = 0]
15:10:22.503 [pool-1-thread-3] task5 완료
15:10:22.503 [pool-1-thread-4] task6 완료
15:10:22.503 [pool-1-thread-2] task2 완료
15:10:22.503 [pool-1-thread-1] task1 완료
15:10:22.505 [pool-1-thread-3] task3 시작
15:10:22.505 [pool-1-thread-4] task4 시작
15:10:23.509 [pool-1-thread-3] task3 완료
15:10:23.509 [pool-1-thread-4] task4 완료
15:10:24.503 [     main] == 작업 수행 완료 ==
15:10:24.504 [     main] [pool=4, active=0, queuedTasks=0, completedTasks=6]
15:10:27.514 [     main] == maximumPoolSize 대기 시간 초과 ==
15:10:27.517 [     main] [pool=2, active=0, queuedTasks=0, completedTasks=6]
15:10:27.519 [     main] == shutdown 완료 ==
15:10:27.519 [     main] [pool=0, active=0, queuedTasks=0, completedTasks=6]

 

 

 

1. 작업을 요청하면 core 사이즈 만큼 스레드를 만든다.

2. core 사이즈를 초과하면 큐에 작업을 넣는다.

3. 큐를 초과하면 max 사이즈 만큼 스레드를 만든다. 임시로 사용되는 초과 스레드가 생성된다. 큐가 가득차서 큐에 넣을 수도 없다. 초과 스레드가 바로 수행해야 한다.

4. max 사이즈를 초과하면 요청을 거절한다. 예외가 발생한다. 큐도 가득차고, 풀에 최대 생성 가능한 스레드 수도 가득 찼다. 작업을 받을 수 없다.

 

 

 

자바는 Executors 클래스를 통해 3가지 기본 전력을 제공한다.

newSingleThreadPool() : 단일 스레드 풀 전략

newFixedThreadPool(nThreads): 고정 스레드 풀 전략

newCachedThreadPool() : 캐시 스레드 풀 전략

 

 

public class PoolSizeMainV2 {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(2);
        //ExecutorService es = new ThreadPoolExecutor(2, 2, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>())
        log("pool 생성");
        printState(es);
        for (int i = 1; i <= 6; i++) {
            String taskName = "task" + i;
            es.execute(new RunnableTask(taskName));
            printState(es, taskName);
        }
        es.close();
        log("== shutdown 완료 ==");
    }
}

스레드 수가 고정되어 있기 때문에 CPU, 메모리 리소스가 어느정도 예측 가능한 안정적인 방식이다. 큐 사이즈도 제한 이 없어서 작업을 많이 담아두어도 문제가 없다.

 이 방식의 가장 큰 장점은 스레드 수가 고정되어서 CPU, 메모리 리소스가 어느정도 예측 가능하다는 점이다. 따라서 일 반적인 상황에 가장 안정적으로 서비스를 운영할 수 있다. 하지만 상황에 따라 장점이 가장 큰 단점이 되기도 한다.

 

 

Executor 전략 - 캐시 풀 전략

기본 스레드를 사용하지 않고, 60초 생존 주기를 가진 초과 스레드만 사용한다. 초과 스레드의 수는 제한이 없다. 큐에 작업을 저장하지 않는다. 대신에 생산자의 요청을 스레드 풀의 소비자 스레드가 직접 받아서 바로 처리한다. 모든 요청이 대기하지 않고 스레드가 바로바로 처리한다. 따라서 빠른 처리가 가능하다.

캐시 스레드 풀 전략은 매우 빠르고, 유연한 전략이다. 이 전략은 기본 스레드도 없고, 대기 큐에 작업도 쌓이지 않는다. 대신에 작업 요청이 오면 초과 스레드로 작업을 바로바 로 처리한다. 따라서 빠른 처리가 가능하다. 초과 스레드의 수도 제한이 없기 때문에 CPU, 메모리 자원만 허용한다면 시스템의 자원을 최대로 사용할 수 있다. 추가로 초과 스레드는 60초간 생존하기 때문에 작업 수에 맞추어 적절한 수의 스레드가 재사용된다. 이런 특징 때문에 요청이 갑자기 증가하면 스레드도 갑자기 증가하고, 요청이 줄어들면 스레드도 점점 줄어든다. 이 전략은 작업의 요청 수에 따라서 스레드도 증가하고 감소하므로, 매우 유연한 전략이다

public class PoolSizeMain {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();
        
        log("pool 생성");
        printState(es);
        for (int i = 1; i <= 4; i++) {
            String taskName = "task" + i;
            es.execute(new RunnableTask(taskName));
            printState(es, taskName);
        }
        sleep(3000);
        log("== 작업 수행 완료 ==");
        printState(es);
        sleep(3000);
        log("== maximumPoolSize 대기 시간 초과 ==");
        printState(es);
        es.close();
        log("== shutdown 완료 ==");
        printState(es);
    }
}

Executor 스레드 풀 관리

1. 작업을 요청하면 core 사이즈 만큼 스레드를 만든다. core 사이즈가 없다. 바로 core 사이즈를 초과한다.

2. core 사이즈를 초과하면 큐에 작업을 넣는다. 큐에 작업을 넣을 수 없다.

3. 큐를 초과하면 max 사이즈 만큼 스레드를 만든다. 임시로 사용되는 초과 스레드가 생성된다. 초과 스레드가 생성된다. 물론 풀에 대기하는 초과 스레드가 있으면 재사용된다.

4. max 사이즈를 초과하면 요청을 거절한다. 예외가 발생한다. 참고로 max 사이즈가 무제한이다. 따라서 초과 스레드를 무제한으로 만들 수 있다.

 

728x90

'운영체제' 카테고리의 다른 글

스레드 풀과 Executor 프레임워크  (1) 2024.12.21
원자적 연산  (2) 2024.12.20
생산자 소비자 문제  (0) 2024.12.18
concurrent Lock  (0) 2024.12.16
Synchronized  (0) 2024.12.15