1. 概述
Java 8 的并发API引入了 CompletableFuture
,这是一个简化异步和非阻塞编程的强大工具。本文将探讨Java的CompletableFuture
及其所利用的线程池。我们将深入研究其异步和非异步方法的区别,并学习如何充分利用CompletableFuture
API 的潜力。
2. 非异步方法
CompletableFuture
提供了一个包含超过50个方法的广泛API,其中许多方法有非异步和异步两种版本。我们先从非异步版本开始,通过thenApply()
方法进行实际示例:
使用thenApply()
时,我们将一个函数作为参数传递,该函数接收CompletableFuture
的前一个值,执行操作并返回新值。因此,会创建一个新的CompletableFuture
来封装结果值。让我们以一个简单的例子来说明,我们将一个String
值转换为表示其大小的Integer
,同时打印执行此操作的线程名称:
@Test
void whenUsingNonAsync_thenMainThreadIsUsed() throws Exception {
CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");
CompletableFuture<Integer> nameLength = name.thenApply(value -> {
printCurrentThread(); // will print "main"
return value.length();
});
assertThat(nameLength.get()).isEqualTo(8);
}
private static void printCurrentThread() {
System.out.println(Thread.currentThread().getName());
}
传递给thenApply()
的函数将在直接与CompletableFuture
API交互的线程中执行,对我们来说是主线程。但如果我们将对CompletableFuture
的交互提取出来,并从其他线程调用,我们将会注意到变化:
@Test
void whenUsingNonAsync_thenUsesCallersThread() throws Exception {
Runnable test = () -> {
CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");
CompletableFuture<Integer> nameLength = name.thenApply(value -> {
printCurrentThread(); // will print "test-thread"
return value.length();
});
try {
assertThat(nameLength.get()).isEqualTo(8);
} catch (Exception e) {
fail(e.getMessage());
}
};
new Thread(test, "test-thread").start();
Thread.sleep(100l);
}
3. 异步方法
API中的大多数方法都有异步对应版本。我们可以使用这些异步变体,确保中间操作在单独的线程池上执行。让我们改变之前的代码示例,将thenApply()
更改为thenApplyAsync()
:
@Test
void whenUsingAsync_thenUsesCommonPool() throws Exception {
CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");
CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
printCurrentThread(); // will print "ForkJoinPool.commonPool-worker-1"
return value.length();
});
assertThat(nameLength.get()).isEqualTo(8);
}
**根据官方文档(https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/CompletableFuture.html_),如果我们不显式提供
Executor就使用异步方法,函数将在
ForkJoinPool.commonPool()中执行。**因此,如果运行代码片段,我们应该看到其中一个常见的
ForkJoinPool工作者:在我的情况下,可能是“
ForkJoinPool.commonPool-worker-1`”。
4. 异步方法与自定义Executor
我们可以注意到所有异步方法都被重载,提供了接受执行代码和Executor
的选项。我们可以利用这一点,为异步操作使用明确的线程池。让我们进一步更新测试,为thenApplyAsync()
方法提供一个自定义线程池:
@Test
void whenUsingAsync_thenUsesCustomExecutor() throws Exception {
Executor testExecutor = Executors.newFixedThreadPool(5);
CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");
CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
printCurrentThread(); // will print "pool-2-thread-1"
return value.length();
}, testExecutor);
assertThat(nameLength.get()).isEqualTo(8);
}
正如预期的那样,当我们使用重载方法时,CompletableFuture
将不再使用通用的ForkJoinPool
。
5. 扩展CompletableFuture
最后,我们可以扩展CompletableFuture
并重写defaultExecutor()
,封装一个自定义线程池。这样,我们就可以在不指定Executor
的情况下使用异步方法,而函数将由我们的线程池而不是通用的ForkJoinPool
来调用。
让我们创建一个名为CustomCompletableFuture
的扩展,使用newSingleThreadExecutor
创建一个具有易于在测试时识别名称的线程。此外,我们将重写defaultExecutor()
方法,使CompletableFuture
能够无缝使用我们的自定义线程池:
public class CustomCompletableFuture<T> extends CompletableFuture<T> {
private static final Executor executor = Executors.newSingleThreadExecutor(
runnable -> new Thread(runnable, "Custom-Single-Thread")
);
@Override
public Executor defaultExecutor() {
return executor;
}
}
另外,我们添加一个静态工厂方法,遵循CompletableFuture
模式。这将使我们能够轻松创建并完成CustomCompletableFuture
对象:
public static <TYPE> CustomCompletableFuture<TYPE> supplyAsync(Supplier<TYPE> supplier) {
CustomCompletableFuture<TYPE> future = new CustomCompletableFuture<>();
executor.execute(() -> {
try {
future.complete(supplier.get());
} catch (Exception ex) {
future.completeExceptionally(ex);
}
});
return future;
}
现在,让我们创建CustomCompletableFuture
的实例,在thenSupplyAsync()
中对String
值执行同样的转换。不过,这次我们不再指定Executor
,但仍期望函数由我们的专用线程“Custom-Single-Thread
”调用:
@Test
void whenOverridingDefaultThreadPool_thenUsesCustomExecutor() throws Exception {
CompletableFuture<String> name = CustomCompletableFuture.supplyAsync(() -> "Baeldung");
CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
printCurrentThread(); // will print "Custom-Single-Thread"
return value.length();
});
assertThat(nameLength.get()).isEqualTo(8);
}
6. 总结
在这篇文章中,我们了解到CompletableFuture
API 的大多数方法都支持异步和非异步执行。调用非异步版本时,调用CompletableFuture
的线程也将执行所有中间操作和转换。另一方面,异步对应方法将使用不同的线程池,默认的是通用的ForkJoinPool
。
接下来,我们讨论了如何进一步定制执行,为每个异步步骤使用自定义Executor
。最后,我们学习了如何创建自定义的CompletableFuture
对象并重写defaultExecutor()
方法。这使得我们可以在不每次指定自定义Executor
的情况下使用异步方法。
一如既往,您可以在GitHub上的示例代码库找到工作代码示例。