1. 概述
ExecutorService 框架让多线程任务处理变得简单。本文将演示几种等待线程完成执行的场景,并展示如何优雅关闭 ExecutorService,同时确保已运行线程完成执行。
2. 关闭 Executor 后等待
使用 Executor 时,可通过 shutdown()
或 shutdownNow()
关闭线程池。但注意:这些方法不会等待所有线程执行完毕!
要等待现有线程完成执行,需使用 awaitTermination()
方法。该方法会阻塞线程,直到所有任务完成或超时:
public void awaitTerminationAfterShutdown(ExecutorService threadPool) {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
}
} catch (InterruptedException ex) {
threadPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
3. 使用 CountDownLatch
另一种解决方案是使用 CountDownLatch 标记任务完成。初始化时指定计数值,当该值减至零时,所有调用 await()
的线程将被唤醒。
例如,让当前线程等待另外 N 个线程完成执行:
ExecutorService WORKER_THREAD_POOL
= Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
WORKER_THREAD_POOL.submit(() -> {
try {
// 业务逻辑...
latch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 等待门闩计数归零
latch.await();
4. 使用 invokeAll()
invokeAll()
是批量执行任务的首选方法。该方法在所有任务完成或超时后,返回 Future 对象列表。
注意:返回的 Future 列表顺序与提交的 Callable 顺序一致:
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10);
List<Callable<String>> callables = Arrays.asList(
new DelayedCallable("fast thread", 100),
new DelayedCallable("slow thread", 3000));
long startProcessingTime = System.currentTimeMillis();
List<Future<String>> futures = WORKER_THREAD_POOL.invokeAll(callables);
awaitTerminationAfterShutdown(WORKER_THREAD_POOL);
long totalProcessingTime = System.currentTimeMillis() - startProcessingTime;
assertTrue(totalProcessingTime >= 3000);
String firstThreadResponse = futures.get(0).get();
assertTrue("fast thread".equals(firstThreadResponse));
String secondThreadResponse = futures.get(1).get();
assertTrue("slow thread".equals(secondThreadResponse));
5. 使用 ExecutorCompletionService
ExecutorCompletionService 提供另一种多线程执行方案。它使用指定的 ExecutorService 执行任务。
与 invokeAll()
的关键区别在于 Future 返回顺序:ExecutorCompletionService 按任务完成顺序存储结果,而 invokeAll()
按任务提交顺序返回:
CompletionService<String> service
= new ExecutorCompletionService<>(WORKER_THREAD_POOL);
List<Callable<String>> callables = Arrays.asList(
new DelayedCallable("fast thread", 100),
new DelayedCallable("slow thread", 3000));
for (Callable<String> callable : callables) {
service.submit(callable);
}
通过 take()
方法获取结果:
long startProcessingTime = System.currentTimeMillis();
Future<String> future = service.take();
String firstThreadResponse = future.get();
long totalProcessingTime
= System.currentTimeMillis() - startProcessingTime;
assertTrue("First response should be from the fast thread",
"fast thread".equals(firstThreadResponse));
assertTrue(totalProcessingTime >= 100
&& totalProcessingTime < 1000);
LOG.debug("Thread finished after: " + totalProcessingTime
+ " milliseconds");
future = service.take();
String secondThreadResponse = future.get();
totalProcessingTime
= System.currentTimeMillis() - startProcessingTime;
assertTrue(
"Last response should be from the slow thread",
"slow thread".equals(secondThreadResponse));
assertTrue(
totalProcessingTime >= 3000
&& totalProcessingTime < 4000);
LOG.debug("Thread finished after: " + totalProcessingTime
+ " milliseconds");
awaitTerminationAfterShutdown(WORKER_THREAD_POOL);
6. 总结
根据场景不同,有多种等待线程完成的方式:
✅ CountDownLatch 适用场景:需要通知一个或多个线程,其他线程的操作集已完成时
✅ ExecutorCompletionService 适用场景:需要尽快获取任务结果时
✅ invokeAll() 适用场景:需要等待所有任务完成时
踩坑提醒:使用
shutdown()
后务必配合awaitTermination()
,否则可能丢失未完成任务!
完整代码示例可在 GitHub 获取。