1. 概述

Java 8 的并发API引入了 CompletableFuture,这是一个简化异步和非阻塞编程的强大工具。本文将探讨Java的CompletableFuture及其所利用的线程池。我们将深入研究其异步和非异步方法的区别,并学习如何充分利用CompletableFuture API 的潜力。

2. 非异步方法

CompletableFuture 提供了一个包含超过50个方法的广泛API,其中许多方法有非异步和异步两种版本。我们先从非异步版本开始,通过thenApply()方法进行实际示例:


了解更多

使用thenApply()时,我们将一个函数作为参数传递,该函数接收CompletableFuture的前一个值,执行操作并返回新值。因此,会创建一个新的CompletableFuture来封装结果值。让我们以一个简单的例子来说明,我们将一个String值转换为表示其大小的Integer,同时打印执行此操作的线程名称:

@Test
void whenUsingNonAsync_thenMainThreadIsUsed() throws Exception {
    CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");

    CompletableFuture<Integer> nameLength = name.thenApply(value -> {
        printCurrentThread(); // will print "main"
        return value.length();
    });

   assertThat(nameLength.get()).isEqualTo(8);
}

private static void printCurrentThread() {
    System.out.println(Thread.currentThread().getName());
}

传递给thenApply()的函数将在直接与CompletableFuture API交互的线程中执行,对我们来说是主线程。但如果我们将对CompletableFuture的交互提取出来,并从其他线程调用,我们将会注意到变化:

@Test
void whenUsingNonAsync_thenUsesCallersThread() throws Exception {
    Runnable test = () -> {
        CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");

        CompletableFuture<Integer> nameLength = name.thenApply(value -> {
            printCurrentThread(); // will print "test-thread"
            return value.length();
        });

        try {
            assertThat(nameLength.get()).isEqualTo(8);
        } catch (Exception e) {
            fail(e.getMessage());
        }
    };

    new Thread(test, "test-thread").start();
    Thread.sleep(100l);
}

3. 异步方法

API中的大多数方法都有异步对应版本。我们可以使用这些异步变体,确保中间操作在单独的线程池上执行。让我们改变之前的代码示例,将thenApply()更改为thenApplyAsync()

@Test
void whenUsingAsync_thenUsesCommonPool() throws Exception {
    CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");

    CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
        printCurrentThread(); // will print "ForkJoinPool.commonPool-worker-1"
        return value.length();
    });

    assertThat(nameLength.get()).isEqualTo(8);
}

**根据官方文档(https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/CompletableFuture.html_),如果我们不显式提供Executor就使用异步方法,函数将在ForkJoinPool.commonPool()中执行。**因此,如果运行代码片段,我们应该看到其中一个常见的ForkJoinPool工作者:在我的情况下,可能是“ForkJoinPool.commonPool-worker-1`”。

4. 异步方法与自定义Executor

我们可以注意到所有异步方法都被重载,提供了接受执行代码和Executor的选项。我们可以利用这一点,为异步操作使用明确的线程池。让我们进一步更新测试,为thenApplyAsync()方法提供一个自定义线程池:

@Test
void whenUsingAsync_thenUsesCustomExecutor() throws Exception {
    Executor testExecutor = Executors.newFixedThreadPool(5);
    CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");

    CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
        printCurrentThread(); // will print "pool-2-thread-1"
        return value.length();
    }, testExecutor);

   assertThat(nameLength.get()).isEqualTo(8);
}

正如预期的那样,当我们使用重载方法时,CompletableFuture将不再使用通用的ForkJoinPool

5. 扩展CompletableFuture

最后,我们可以扩展CompletableFuture并重写defaultExecutor(),封装一个自定义线程池。这样,我们就可以在不指定Executor的情况下使用异步方法,而函数将由我们的线程池而不是通用的ForkJoinPool来调用。

让我们创建一个名为CustomCompletableFuture的扩展,使用newSingleThreadExecutor创建一个具有易于在测试时识别名称的线程。此外,我们将重写defaultExecutor()方法,使CompletableFuture能够无缝使用我们的自定义线程池:

public class CustomCompletableFuture<T> extends CompletableFuture<T> {
    private static final Executor executor = Executors.newSingleThreadExecutor(
        runnable -> new Thread(runnable, "Custom-Single-Thread")
    );

    @Override
    public Executor defaultExecutor() {
        return executor;
    }
}

另外,我们添加一个静态工厂方法,遵循CompletableFuture模式。这将使我们能够轻松创建并完成CustomCompletableFuture对象:

public static <TYPE> CustomCompletableFuture<TYPE> supplyAsync(Supplier<TYPE> supplier) {
    CustomCompletableFuture<TYPE> future = new CustomCompletableFuture<>();
    executor.execute(() -> {
        try {
            future.complete(supplier.get());
        } catch (Exception ex) {
            future.completeExceptionally(ex);
        }
    });
    return future;
}

现在,让我们创建CustomCompletableFuture的实例,在thenSupplyAsync()中对String值执行同样的转换。不过,这次我们不再指定Executor,但仍期望函数由我们的专用线程“Custom-Single-Thread”调用:

@Test
void whenOverridingDefaultThreadPool_thenUsesCustomExecutor() throws Exception {
    CompletableFuture<String> name = CustomCompletableFuture.supplyAsync(() -> "Baeldung");

    CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
        printCurrentThread(); // will print "Custom-Single-Thread"
        return value.length();
    });

   assertThat(nameLength.get()).isEqualTo(8);
}

6. 总结

在这篇文章中,我们了解到CompletableFuture API 的大多数方法都支持异步和非异步执行。调用非异步版本时,调用CompletableFuture的线程也将执行所有中间操作和转换。另一方面,异步对应方法将使用不同的线程池,默认的是通用的ForkJoinPool

接下来,我们讨论了如何进一步定制执行,为每个异步步骤使用自定义Executor。最后,我们学习了如何创建自定义的CompletableFuture对象并重写defaultExecutor()方法。这使得我们可以在不每次指定自定义Executor的情况下使用异步方法。

一如既往,您可以在GitHub上的示例代码库找到工作代码示例。