1. 概述
本文将介绍如何将 List<CompletableFuture<T>>
转换为 CompletableFuture<List<T>>
。这种转换在实际开发中非常实用,典型场景是:需要调用多个远程服务接口(异步操作),并将结果聚合到单个List中。最终只需等待一个 CompletableFuture
对象,当所有操作完成时返回结果列表,或当任意操作失败时抛出异常。
我们将先分析一种"踩坑"式的实现方式,再介绍更简单可靠的方案。
2. 链式调用CompletableFuture
通过 thenCompose()
方法链式调用 CompletableFuture
是一种可行方案。这种方式能创建一个单一对象,当所有前置任务按顺序完成时(类似多米诺骨牌)触发解析。
2.1 实现方式
首先创建一个模拟异步操作的类:
public class Application {
ScheduledExecutorService asyncOperationEmulation;
Application initialize() {
asyncOperationEmulation = Executors.newScheduledThreadPool(10);
return this;
}
CompletableFuture<String> asyncOperation(String operationId) {
CompletableFuture<String> cf = new CompletableFuture<>();
asyncOperationEmulation.submit(() -> {
Thread.sleep(100);
cf.complete(operationId);
});
return cf;
}
}
这个 Application
类包含:
- 使用10线程的
Executor
调度异步任务 asyncOperation()
方法模拟耗时100ms的异步操作
接下来通过链式调用聚合结果:
void startNaive() {
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 1; i <= 1000; i++) {
String operationId = "Naive-Operation-" + i;
futures.add(asyncOperation(operationId));
}
CompletableFuture<List<String>> aggregate = CompletableFuture.completedFuture(new ArrayList<>());
for (CompletableFuture<String> future : futures) {
aggregate = aggregate.thenCompose(list -> {
list.add(future.get());
return CompletableFuture.completedFuture(list);
});
}
final List<String> results = aggregate.join();
for (int i = 0; i < 10; i++) {
System.out.println("Finished " + results.get(i));
}
close();
}
关键步骤解析:
- 创建已完成的
CompletableFuture
作为初始值(空List) - 遍历
futures
列表,通过thenCompose()
链式处理:- **
thenCompose()
返回新的CompletableFuture
**,仅当当前任务和前置任务都完成时才解析 - 每次迭代将结果添加到List并传递给下一个链
- **
- 最终调用
join()
获取结果(实际场景应改用thenAccept()
等回调)
⚠️ 必须关闭线程池:
void close() {
asyncOperationEmulation.shutdownNow();
}
应用退出时必须关闭所有
Executor
,否则Java进程会挂起
2.2 实现缺陷
这种方案存在明显问题:
- 链式调用增加复杂度:创建大量不必要的中间
CompletableFuture
对象 - 操作数量多时可能栈溢出:当某个任务失败时,递归解析链可能触发
StackOverflowError
模拟异常场景(修改 asyncOperation
):
CompletableFuture<String> asyncOperation(String operationId) {
CompletableFuture<String> cf = new CompletableFuture<>();
asyncOperationEmulation.submit(() -> {
if (operationId.endsWith("567")) {
cf.completeExceptionally(new Exception("Error on operation " + operationId));
return;
}
Thread.sleep(100);
cf.complete(operationId);
});
return cf;
}
当第567个操作失败时,aggregate.join()
会抛出运行时异常。
3. 使用CompletableFuture.allOf()
更优方案是使用 CompletableFuture.allOf()
方法。该方法接收 CompletableFuture
数组,创建新的聚合对象,当所有任务完成时触发解析。若任意任务失败,聚合对象也会失败。
3.1 实现方式
void start() {
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 1; i <= 1000; i++) {
String operationId = "Operation-" + i;
futures.add(asyncOperation(operationId));
}
CompletableFuture<?>[] futuresArray = futures.toArray(new CompletableFuture<?>[0]);
CompletableFuture<List<String>> listFuture = CompletableFuture.allOf(futuresArray)
.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
final List<String> results = listFuture.join();
System.out.println("Printing first 10 results");
for (int i = 0; i < 10; i++) {
System.out.println("Finished " + results.get(i));
}
close();
}
核心改进:
- 将
List
转换为数组传递给allOf()
- 通过
thenApply()
聚合结果:- 使用
join()
获取每个任务结果 - 通过Stream收集为List
- 使用
- 最终
listFuture
包含完整结果列表
3.2 allOf()的优势
相比链式调用,allOf()
方案优势明显:
✅ 代码简洁:避免复杂的链式调用和中间对象
✅ 原子性保证:所有任务成功则整体成功,任意失败则整体失败
✅ 非阻塞等待:实际场景中只需依赖回调,无需显式调用 join()
✅ 避免部分处理:防止因部分任务失败导致结果不一致
4. 总结
本文探讨了将 List<CompletableFuture<T>>
转换为 CompletableFuture<List<T>>
的两种方案:
- 链式调用方案:实现复杂且存在性能风险,不推荐
allOf()
方案:简洁高效,是Java官方推荐的最佳实践
完整示例代码请参考 GitHub仓库