1. 引言

本文将探讨如何将 Java 中的 Future 转换为 CompletableFuture。通过这种转换,我们可以在使用返回 Future 的 API 或库时,充分利用 CompletableFuture 的非阻塞操作、任务链式调用和更强大的异常处理等高级特性。

2. 为什么需要将 Future 转换为 CompletableFuture?

Java 中的 Future 接口表示异步计算的结果,它提供了检查计算是否完成、等待其完成以及检索结果的方法。

然而,Future 存在一些明显限制:

  • 阻塞调用:必须使用 get() 方法获取结果,这会阻塞当前线程
  • 缺乏链式调用支持:无法直接串联多个异步任务
  • 回调机制缺失:无法在任务完成时自动触发后续操作

相比之下,Java 8 引入的 CompletableFuture 解决了这些问题:

  • 非阻塞操作:通过 thenApply()thenAccept() 等方法实现任务链式调用
  • 异常处理:使用 exceptionally() 方法优雅处理异常
  • 函数式编程:支持更灵活的异步编程模式

通过将 Future 转换为 CompletableFuture,我们可以在兼容现有 API 的同时,享受现代异步编程的便利。

3. 转换步骤详解

3.1. 使用 ExecutorService 模拟 Future

首先通过 ExecutorService 模拟一个异步任务,理解 Future 的基本工作原理:

@Test
void givenFuture_whenGet_thenBlockingCall() throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newSingleThreadExecutor();

    Future<String> future = executor.submit(() -> {
        Thread.sleep(1000);
        return "Hello from Future!";
    });

    String result = future.get(); // 阻塞调用
    executor.shutdown();

    assertEquals("Hello from Future!", result);
}

⚠️ 关键点future.get() 会阻塞主线程直到任务完成。这种阻塞行为正是 Future 的主要痛点,也是我们转向 CompletableFuture 的主要原因。

3.2. 将 Future 包装为 CompletableFuture

要转换 Future,我们需要桥接其阻塞特性与 CompletableFuture 的非阻塞设计。以下是核心转换方法:

static <T> CompletableFuture<T> toCompletableFuture(Future<T> future, ExecutorService executor) {
    CompletableFuture<T> completableFuture = new CompletableFuture<>();
    executor.submit(() -> {
        try {
            completableFuture.complete(future.get());
        } catch (Exception e) {
            completableFuture.completeExceptionally(e);
        }
    });
    return completableFuture;
}

实现原理

  1. 创建新的 CompletableFuture
  2. 使用独立线程池监控 Future
  3. Future 完成时,通过 complete() 传递结果
  4. 异常时通过 completeExceptionally() 传播错误

使用示例:

@Test
void givenFuture_whenWrappedInCompletableFuture_thenNonBlockingCall() throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newSingleThreadExecutor();

    Future<String> future = executor.submit(() -> {
        Thread.sleep(1000);
        return "Hello from Future!";
    });

    CompletableFuture<String> completableFuture = toCompletableFuture(future, executor);

    completableFuture.thenAccept(result -> assertEquals("Hello from Future!", result));

    executor.shutdown();
}

优势:相比 future.get(),这种方案避免了主线程阻塞,且支持链式操作:

@Test
void givenFuture_whenTransformedAndChained_thenCorrectResult() throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newSingleThreadExecutor();

    Future<String> future = executor.submit(() -> {
        Thread.sleep(1000);
        return "Hello from Future!";
    });

    CompletableFuture<String> completableFuture = toCompletableFuture(future, executor);

    completableFuture
      .thenApply(result -> result.toUpperCase()) // 转换结果
      .thenAccept(transformedResult -> assertEquals("HELLO FROM FUTURE!", transformedResult));

    executor.shutdown();
}

3.3. 使用 CompletableFuture.supplyAsync() 方法

另一种更简单粗暴的方式是使用 supplyAsync()

@Test
void givenFuture_whenWrappedUsingSupplyAsync_thenNonBlockingCall() throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newSingleThreadExecutor();

    Future<String> future = executor.submit(() -> {
        Thread.sleep(1000);
        return "Hello from Future!";
    });

    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        try {
            return future.get(); // 在独立线程中执行阻塞调用
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    });

    completableFuture.thenAccept(result -> assertEquals("Hello from Future!", result));

    executor.shutdown();
}

优点

  • 无需手动管理线程
  • CompletableFuture 自动处理异步执行
  • 代码更简洁

⚠️ 注意:虽然内部仍使用 future.get(),但由于它在独立线程中执行,不会阻塞主线程。

4. 合并多个 Future 为单个 CompletableFuture

在实际场景中,我们常需要合并多个 Future 的结果。例如:

  • 聚合不同任务的计算结果
  • 等待所有任务完成后再进行后续处理

实现步骤:

  1. 将所有 Future 转换为 CompletableFuture
  2. 使用 CompletableFuture.allOf() 合并

核心方法:

static CompletableFuture<Void> allOfFutures(List<Future<String>> futures, ExecutorService executor) {
    // 转换所有 Future 为 CompletableFuture
    List<CompletableFuture<String>> completableFutures = futures.stream()
      .map(future -> FutureToCompletableFuture.toCompletableFuture(future, executor))
      .toList();

    return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));
}

使用示例:

@Test
void givenMultipleFutures_whenCombinedWithAllOf_thenAllResultsAggregated() throws Exception {
    ExecutorService executor = Executors.newFixedThreadPool(3);

    List<Future<String>> futures = List.of(
        executor.submit(() -> "Task 1 Result"),
        executor.submit(() -> "Task 2 Result"),
        executor.submit(() -> "Task 3 Result")
    );

    CompletableFuture<Void> allOf = allOfFutures(futures, executor);

    allOf.thenRun(() -> {
        try {
            List<String> results = futures.stream()
              .map(future -> {
                  try {
                      return future.get();
                  } catch (Exception e) {
                      throw new RuntimeException(e);
                  }
              })
              .toList();
            assertEquals(3, results.size());
            assertTrue(results.contains("Task 1 Result"));
            assertTrue(results.contains("Task 2 Result"));
            assertTrue(results.contains("Task 3 Result"));
        } catch (Exception e) {
            fail("Unexpected exception: " + e.getMessage());
        }
    }).join();

    executor.shutdown();
}

应用场景

  • 并行处理独立任务
  • 结果聚合计算
  • 微服务调用组合

5. 总结

本文详细介绍了将 Future 转换为 CompletableFuture 的多种方法,包括:

  • 手动包装方案
  • supplyAsync() 简化方案
  • Future 合并方案

通过转换,我们可以:

  • ✅ 摆脱阻塞调用限制
  • ✅ 实现任务链式编排
  • ✅ 获得更强大的异常处理能力
  • ✅ 兼容现有 Future API

这些技术特别适合需要增强异步编程能力的场景,如微服务调用、并行计算等。完整代码示例可在 GitHub 获取。


原始标题:Transform a Future into CompletableFuture | Baeldung