1. 概述

Java 8的CompletableFuture非常适合处理异步计算。例如,当一个网络客户端调用服务器时,它可能会使用CompletableFuture。开始使用并处理单个CompletableFuture响应很容易。但是,如何在执行多个CompletableFuture的同时处理异常并不明显。

在这个教程中,我们将开发一个简单的模拟微服务客户端,它返回一个CompletableFuture,并展示如何多次调用它以生成成功和失败的结果汇总。

2. 示例微服务客户端

为了演示,我们编写一个简单的微服务客户端,负责创建资源并返回资源的标识符。

我们将声明一个简单的接口MicroserviceClient,可以在单元测试中使用Mockito进行模拟:

interface MicroserviceClient {
    CompletableFuture<Long> createResource(String resourceName);
}

虽然测试CompletableFuture本身存在挑战,但测试对MicroserviceClient的一次简单调用还是相对直接的。在这里,我们跳过这个部分,转而处理可能抛出异常的多个客户端调用。

3. 多次调用微服务

首先,让我们创建一个单元测试,并声明一个MicroserviceClient的模拟,对于输入“Good Resource",它返回成功的响应,而对于“Bad Resource",它抛出异常:

@ParameterizedTest
@MethodSource("clientData")
public void givenMicroserviceClient_whenMultipleCreateResource_thenCombineResults(List<String> inputs,
  int expectedSuccess, int expectedFailure) throws ExecutionException, InterruptedException {
    MicroserviceClient mockMicroservice = mock(MicroserviceClient.class);
    when(mockMicroservice.createResource("Good Resource"))
      .thenReturn(CompletableFuture.completedFuture(123L));
    when(mockMicroservice.createResource("Bad Resource"))
      .thenReturn(CompletableFuture.failedFuture(new IllegalArgumentException("Bad Resource")));
}

我们将把这个测试转化为一个参数化测试,通过传递一组不同的数据,使用MethodSource(/parameterized-tests-junit-5#6-method)。我们需要创建一个静态方法来为测试提供一个JUnit的Arguments流:

private static Stream<Arguments> clientData() {
    return Stream.of(
      Arguments.of(List.of("Good Resource"), 1, 0),
      Arguments.of(List.of("Bad Resource"), 0, 1),
      Arguments.of(List.of("Good Resource", "Bad Resource"), 1, 1),
      Arguments.of(List.of("Good Resource", "Bad Resource", "Good Resource", "Bad Resource", 
        "Good Resource"), 3, 2)
    );
}

这将产生四个测试执行,每个执行都会传入一个输入列表和预期的成功和失败次数。

接下来,回到我们的单元测试,使用测试数据调用MicroserviceClient并收集每个结果的CompletableFuture到一个列表中:

List<CompletableFuture<Long>> clientCalls = new ArrayList<>();
for (String resource : inputs) {
    clientCalls.add(mockMicroservice.createResource(resource));
}

现在,我们面临核心问题:一个包含需要完成并收集结果的CompletableFuture对象列表,同时处理遇到的任何异常。

3.1. 处理异常

在深入讨论如何完成每个CompletableFuture之前,我们先定义一个处理异常的辅助方法。我们还需要定义并模拟一个Logger,以模拟实际的错误处理:

private final Logger logger = mock(Logger.class);

private Long handleError(Throwable throwable) {
    logger.error("Encountered error: " + throwable);
    return -1L;
}

interface Logger {
    void error(String message);
}

辅助方法只是“记录”错误消息并返回-1,我们用它来表示无效的资源。

3.2. 带有异常处理的完成CompletableFuture

现在,我们需要完成所有的CompletableFuture并适当地处理任何异常。我们可以利用CompleteableFuture提供的几个工具来实现这一点:

  • exceptionally():如果CompletableFuture以异常完成,它会执行提供的函数。
  • join():在CompletableFuture完成时返回其结果。

然后,我们可以定义一个完成单个CompletableFuture的辅助方法:

private Long handleFuture(CompletableFuture<Long> future) {
    return future
      .exceptionally(this::handleError)
      .join();
}

值得注意的是,我们使用exceptionally()处理MicroserviceClient调用可能通过我们的handleError()辅助方法抛出的任何异常。最后,我们在CompletableFuture上调用join()来等待客户端调用完成并返回其资源标识符。

3.3. 处理CompletableFuture列表

回到我们的单元测试,现在我们可以利用我们的辅助方法以及Java的流API来创建一个简单的语句,解决所有客户端调用:

Map<Boolean, List<Long>> resultsByValidity = clientCalls.stream()
  .map(this::handleFuture)
  .collect(Collectors.partitioningBy(resourceId -> resourceId != -1L));

让我们分解这个语句:

  • 我们使用我们的handleFuture()辅助方法将每个CompletableFuture映射到结果资源标识符。
  • 我们使用Java的Collectors.partitioningBy()实用工具根据有效性将结果资源标识符分割成两个单独的列表。

我们可以通过检查分区列表的大小以及模拟的Logger调用来轻松验证我们的测试:

List<Long> validResults = resultsByValidity.getOrDefault(true, List.of());
assertThat(validResults.size()).isEqualTo(successCount);

List<Long> invalidResults = resultsByValidity.getOrDefault(false, List.of());
assertThat(invalidResults.size()).isEqualTo(errorCount);
verify(logger, times(errorCount))
  .error(eq("Encountered error: java.lang.IllegalArgumentException: Bad Resource"));

运行测试后,我们可以看到分区列表符合我们的预期。

4. 总结

在这篇文章中,我们学习了如何处理完成一个CompletableFuture集合。如果需要,我们可以轻松扩展我们的方法,使用更强大的错误处理或复杂的业务逻辑。

如往常一样,文章的所有代码都可以在GitHub上找到这里