1. 概述
Java 8的CompletableFuture
非常适合处理异步计算。例如,当一个网络客户端调用服务器时,它可能会使用CompletableFuture
。开始使用并处理单个CompletableFuture
响应很容易。但是,如何在执行多个CompletableFuture
的同时处理异常并不明显。
在这个教程中,我们将开发一个简单的模拟微服务客户端,它返回一个CompletableFuture
,并展示如何多次调用它以生成成功和失败的结果汇总。
2. 示例微服务客户端
为了演示,我们编写一个简单的微服务客户端,负责创建资源并返回资源的标识符。
我们将声明一个简单的接口MicroserviceClient
,可以在单元测试中使用Mockito进行模拟:
interface MicroserviceClient {
CompletableFuture<Long> createResource(String resourceName);
}
虽然测试CompletableFuture
本身存在挑战,但测试对MicroserviceClient
的一次简单调用还是相对直接的。在这里,我们跳过这个部分,转而处理可能抛出异常的多个客户端调用。
3. 多次调用微服务
首先,让我们创建一个单元测试,并声明一个MicroserviceClient
的模拟,对于输入“Good Resource",它返回成功的响应,而对于“Bad Resource",它抛出异常:
@ParameterizedTest
@MethodSource("clientData")
public void givenMicroserviceClient_whenMultipleCreateResource_thenCombineResults(List<String> inputs,
int expectedSuccess, int expectedFailure) throws ExecutionException, InterruptedException {
MicroserviceClient mockMicroservice = mock(MicroserviceClient.class);
when(mockMicroservice.createResource("Good Resource"))
.thenReturn(CompletableFuture.completedFuture(123L));
when(mockMicroservice.createResource("Bad Resource"))
.thenReturn(CompletableFuture.failedFuture(new IllegalArgumentException("Bad Resource")));
}
我们将把这个测试转化为一个参数化测试,通过传递一组不同的数据,使用MethodSource
(/parameterized-tests-junit-5#6-method)。我们需要创建一个静态方法来为测试提供一个JUnit的Arguments流:
private static Stream<Arguments> clientData() {
return Stream.of(
Arguments.of(List.of("Good Resource"), 1, 0),
Arguments.of(List.of("Bad Resource"), 0, 1),
Arguments.of(List.of("Good Resource", "Bad Resource"), 1, 1),
Arguments.of(List.of("Good Resource", "Bad Resource", "Good Resource", "Bad Resource",
"Good Resource"), 3, 2)
);
}
这将产生四个测试执行,每个执行都会传入一个输入列表和预期的成功和失败次数。
接下来,回到我们的单元测试,使用测试数据调用MicroserviceClient
并收集每个结果的CompletableFuture
到一个列表中:
List<CompletableFuture<Long>> clientCalls = new ArrayList<>();
for (String resource : inputs) {
clientCalls.add(mockMicroservice.createResource(resource));
}
现在,我们面临核心问题:一个包含需要完成并收集结果的CompletableFuture
对象列表,同时处理遇到的任何异常。
3.1. 处理异常
在深入讨论如何完成每个CompletableFuture
之前,我们先定义一个处理异常的辅助方法。我们还需要定义并模拟一个Logger
,以模拟实际的错误处理:
private final Logger logger = mock(Logger.class);
private Long handleError(Throwable throwable) {
logger.error("Encountered error: " + throwable);
return -1L;
}
interface Logger {
void error(String message);
}
辅助方法只是“记录”错误消息并返回-1
,我们用它来表示无效的资源。
3.2. 带有异常处理的完成CompletableFuture
现在,我们需要完成所有的CompletableFuture
并适当地处理任何异常。我们可以利用CompleteableFuture
提供的几个工具来实现这一点:
-
exceptionally()
:如果CompletableFuture
以异常完成,它会执行提供的函数。 -
join()
:在CompletableFuture
完成时返回其结果。
然后,我们可以定义一个完成单个CompletableFuture
的辅助方法:
private Long handleFuture(CompletableFuture<Long> future) {
return future
.exceptionally(this::handleError)
.join();
}
值得注意的是,我们使用exceptionally()
处理MicroserviceClient
调用可能通过我们的handleError()
辅助方法抛出的任何异常。最后,我们在CompletableFuture
上调用join()
来等待客户端调用完成并返回其资源标识符。
3.3. 处理CompletableFuture
列表
回到我们的单元测试,现在我们可以利用我们的辅助方法以及Java的流API来创建一个简单的语句,解决所有客户端调用:
Map<Boolean, List<Long>> resultsByValidity = clientCalls.stream()
.map(this::handleFuture)
.collect(Collectors.partitioningBy(resourceId -> resourceId != -1L));
让我们分解这个语句:
- 我们使用我们的
handleFuture()
辅助方法将每个CompletableFuture
映射到结果资源标识符。 - 我们使用Java的
Collectors.partitioningBy()
实用工具根据有效性将结果资源标识符分割成两个单独的列表。
我们可以通过检查分区列表的大小以及模拟的Logger
调用来轻松验证我们的测试:
List<Long> validResults = resultsByValidity.getOrDefault(true, List.of());
assertThat(validResults.size()).isEqualTo(successCount);
List<Long> invalidResults = resultsByValidity.getOrDefault(false, List.of());
assertThat(invalidResults.size()).isEqualTo(errorCount);
verify(logger, times(errorCount))
.error(eq("Encountered error: java.lang.IllegalArgumentException: Bad Resource"));
运行测试后,我们可以看到分区列表符合我们的预期。
4. 总结
在这篇文章中,我们学习了如何处理完成一个CompletableFuture
集合。如果需要,我们可以轻松扩展我们的方法,使用更强大的错误处理或复杂的业务逻辑。
如往常一样,文章的所有代码都可以在GitHub上找到这里。