1. 概述

ExecutorService 框架让多线程任务处理变得简单。本文将演示几种等待线程完成执行的场景,并展示如何优雅关闭 ExecutorService,同时确保已运行线程完成执行。

2. 关闭 Executor 后等待

使用 Executor 时,可通过 shutdown()shutdownNow() 关闭线程池。但注意:这些方法不会等待所有线程执行完毕!

要等待现有线程完成执行,需使用 awaitTermination() 方法。该方法会阻塞线程,直到所有任务完成或超时:

public void awaitTerminationAfterShutdown(ExecutorService threadPool) {
    threadPool.shutdown();
    try {
        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
            threadPool.shutdownNow();
        }
    } catch (InterruptedException ex) {
        threadPool.shutdownNow();
        Thread.currentThread().interrupt();
    }
}

3. 使用 CountDownLatch

另一种解决方案是使用 CountDownLatch 标记任务完成。初始化时指定计数值,当该值减至零时,所有调用 await() 的线程将被唤醒。

例如,让当前线程等待另外 N 个线程完成执行:

ExecutorService WORKER_THREAD_POOL 
  = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    WORKER_THREAD_POOL.submit(() -> {
        try {
            // 业务逻辑...
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}

// 等待门闩计数归零
latch.await();

4. 使用 invokeAll()

invokeAll() 是批量执行任务的首选方法。该方法在所有任务完成或超时后,返回 Future 对象列表

注意:返回的 Future 列表顺序与提交的 Callable 顺序一致:

ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10);

List<Callable<String>> callables = Arrays.asList(
  new DelayedCallable("fast thread", 100), 
  new DelayedCallable("slow thread", 3000));

long startProcessingTime = System.currentTimeMillis();
List<Future<String>> futures = WORKER_THREAD_POOL.invokeAll(callables);

awaitTerminationAfterShutdown(WORKER_THREAD_POOL);

long totalProcessingTime = System.currentTimeMillis() - startProcessingTime;
 
assertTrue(totalProcessingTime >= 3000);

String firstThreadResponse = futures.get(0).get();
 
assertTrue("fast thread".equals(firstThreadResponse));

String secondThreadResponse = futures.get(1).get();
assertTrue("slow thread".equals(secondThreadResponse));

5. 使用 ExecutorCompletionService

ExecutorCompletionService 提供另一种多线程执行方案。它使用指定的 ExecutorService 执行任务。

invokeAll() 的关键区别在于 Future 返回顺序:ExecutorCompletionService 按任务完成顺序存储结果,而 invokeAll() 按任务提交顺序返回:

CompletionService<String> service
  = new ExecutorCompletionService<>(WORKER_THREAD_POOL);

List<Callable<String>> callables = Arrays.asList(
  new DelayedCallable("fast thread", 100), 
  new DelayedCallable("slow thread", 3000));

for (Callable<String> callable : callables) {
    service.submit(callable);
}

通过 take() 方法获取结果:

long startProcessingTime = System.currentTimeMillis();

Future<String> future = service.take();
String firstThreadResponse = future.get();
long totalProcessingTime
  = System.currentTimeMillis() - startProcessingTime;

assertTrue("First response should be from the fast thread", 
  "fast thread".equals(firstThreadResponse));
assertTrue(totalProcessingTime >= 100
  && totalProcessingTime < 1000);
LOG.debug("Thread finished after: " + totalProcessingTime
  + " milliseconds");

future = service.take();
String secondThreadResponse = future.get();
totalProcessingTime
  = System.currentTimeMillis() - startProcessingTime;

assertTrue(
  "Last response should be from the slow thread", 
  "slow thread".equals(secondThreadResponse));
assertTrue(
  totalProcessingTime >= 3000
  && totalProcessingTime < 4000);
LOG.debug("Thread finished after: " + totalProcessingTime
  + " milliseconds");

awaitTerminationAfterShutdown(WORKER_THREAD_POOL);

6. 总结

根据场景不同,有多种等待线程完成的方式:

CountDownLatch 适用场景:需要通知一个或多个线程,其他线程的操作集已完成时
ExecutorCompletionService 适用场景:需要尽快获取任务结果时
invokeAll() 适用场景:需要等待所有任务完成时

踩坑提醒:使用 shutdown() 后务必配合 awaitTermination(),否则可能丢失未完成任务!

完整代码示例可在 GitHub 获取。


原始标题:ExecutorService - Waiting for Threads to Finish | Baeldung