1. 概述

本文将介绍如何将 List<CompletableFuture<T>> 转换为 CompletableFuture<List<T>>。这种转换在实际开发中非常实用,典型场景是:需要调用多个远程服务接口(异步操作),并将结果聚合到单个List中。最终只需等待一个 CompletableFuture 对象,当所有操作完成时返回结果列表,或当任意操作失败时抛出异常。

我们将先分析一种"踩坑"式的实现方式,再介绍更简单可靠的方案。

2. 链式调用CompletableFuture

通过 thenCompose() 方法链式调用 CompletableFuture 是一种可行方案。这种方式能创建一个单一对象,当所有前置任务按顺序完成时(类似多米诺骨牌)触发解析。

2.1 实现方式

首先创建一个模拟异步操作的类:

public class Application {
    ScheduledExecutorService asyncOperationEmulation;
    Application initialize() {
        asyncOperationEmulation = Executors.newScheduledThreadPool(10);
        return this;
    }

    CompletableFuture<String> asyncOperation(String operationId) {
        CompletableFuture<String> cf = new CompletableFuture<>();
        asyncOperationEmulation.submit(() -> {
            Thread.sleep(100);
            cf.complete(operationId);
        });
        return cf;
    }
}

这个 Application 类包含:

  • 使用10线程的 Executor 调度异步任务
  • asyncOperation() 方法模拟耗时100ms的异步操作

接下来通过链式调用聚合结果:

void startNaive() {
    List<CompletableFuture<String>> futures = new ArrayList<>();
    for (int i = 1; i <= 1000; i++) {
        String operationId = "Naive-Operation-" + i;
        futures.add(asyncOperation(operationId));
    }
    CompletableFuture<List<String>> aggregate = CompletableFuture.completedFuture(new ArrayList<>());
    for (CompletableFuture<String> future : futures) {
        aggregate = aggregate.thenCompose(list -> {
            list.add(future.get());
            return CompletableFuture.completedFuture(list);
        });
    }
    final List<String> results = aggregate.join();
    for (int i = 0; i < 10; i++) {
        System.out.println("Finished " + results.get(i));
    }

    close();
}

关键步骤解析:

  1. 创建已完成的 CompletableFuture 作为初始值(空List)
  2. 遍历 futures 列表,通过 thenCompose() 链式处理:
    • **thenCompose() 返回新的 CompletableFuture**,仅当当前任务和前置任务都完成时才解析
    • 每次迭代将结果添加到List并传递给下一个链
  3. 最终调用 join() 获取结果(实际场景应改用 thenAccept() 等回调)

⚠️ 必须关闭线程池

void close() {
    asyncOperationEmulation.shutdownNow();
}

应用退出时必须关闭所有 Executor,否则Java进程会挂起

2.2 实现缺陷

这种方案存在明显问题:

  1. 链式调用增加复杂度:创建大量不必要的中间 CompletableFuture 对象
  2. 操作数量多时可能栈溢出:当某个任务失败时,递归解析链可能触发 StackOverflowError

模拟异常场景(修改 asyncOperation):

CompletableFuture<String> asyncOperation(String operationId) {
    CompletableFuture<String> cf = new CompletableFuture<>();
    asyncOperationEmulation.submit(() -> {
        if (operationId.endsWith("567")) {
            cf.completeExceptionally(new Exception("Error on operation " + operationId));
            return;
        }
        Thread.sleep(100);
        cf.complete(operationId);
    });
    return cf;
}

当第567个操作失败时,aggregate.join() 会抛出运行时异常。

3. 使用CompletableFuture.allOf()

更优方案是使用 CompletableFuture.allOf() 方法。该方法接收 CompletableFuture 数组,创建新的聚合对象,当所有任务完成时触发解析。若任意任务失败,聚合对象也会失败。

3.1 实现方式

void start() {
    List<CompletableFuture<String>> futures = new ArrayList<>();
    for (int i = 1; i <= 1000; i++) {
        String operationId = "Operation-" + i;
        futures.add(asyncOperation(operationId));
    }
    CompletableFuture<?>[] futuresArray = futures.toArray(new CompletableFuture<?>[0]);
    CompletableFuture<List<String>> listFuture = CompletableFuture.allOf(futuresArray)
      .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    final List<String> results = listFuture.join();
    System.out.println("Printing first 10 results");
    for (int i = 0; i < 10; i++) {
        System.out.println("Finished " + results.get(i));
    }

    close();
}

核心改进:

  1. List 转换为数组传递给 allOf()
  2. 通过 thenApply() 聚合结果
    • 使用 join() 获取每个任务结果
    • 通过Stream收集为List
  3. 最终 listFuture 包含完整结果列表

3.2 allOf()的优势

相比链式调用,allOf() 方案优势明显: ✅ 代码简洁:避免复杂的链式调用和中间对象
原子性保证:所有任务成功则整体成功,任意失败则整体失败
非阻塞等待:实际场景中只需依赖回调,无需显式调用 join()
避免部分处理:防止因部分任务失败导致结果不一致

4. 总结

本文探讨了将 List<CompletableFuture<T>> 转换为 CompletableFuture<List<T>> 的两种方案:

  1. 链式调用方案:实现复杂且存在性能风险,不推荐
  2. allOf() 方案:简洁高效,是Java官方推荐的最佳实践

完整示例代码请参考 GitHub仓库


原始标题:Convert From List of CompletableFuture to CompletableFuture List | Baeldung