一、简介

Guava 为我们提供了 ListenableFuture, 它比默认的 Java Future 提供了更丰富的 API。 让我们看看如何利用这一点来发挥我们的优势。

2.FutureListenableFutureFuture

让我们简要了解一下这些不同的类是什么以及它们之间的关系。

2.1. 未来

Java 5开始, 我们可以使用 java.util.concurrent.Future 来表示异步任务。

Future 允许我们访问已经完成或将来可能完成的任务的结果,并支持取消它们。

2.2. 聆听未来

使用 java.util.concurrent.Future 时缺少的一个功能是添加侦听器以在完成时运行的能力,这是最流行的异步框架提供的一项常见功能。

Guava 通过允许我们将侦听器附加到其 com.google.common.util.concurrent.ListenableFuture 来解决这个问题

2.3. 期货

Guava 为我们提供了方便的类 com.google.common.util.concurrent.Futures ,以便更轻松地使用其 ListenableFuture。

此类提供了与 ListenableFuture 交互的各种方式, 其中包括 支持添加成功/失败回调,并允许我们通过聚合或转换来协调多个 future。

3. 使用简单

现在让我们看看如何以最简单的方式使用 ListenableFuture ;创建和添加回调。

3.1.创造 可听的未来

我们获取 ListenableFuture 的 最简单方法是将任务提交给 ListeningExecutorService (非常类似于我们使用普通 ExecutorService 获取普通 Future 的 方式):

ExecutorService execService = Executors.newSingleThreadExecutor();
ListeningExecutorService lExecService = MoreExecutors.listeningDecorator(execService);

ListenableFuture<Integer> asyncTask = lExecService.submit(() -> {
    TimeUnit.MILLISECONDS.sleep(500); // long running task
    return 5;
});

请注意我们如何使用 MoreExecutors 类将 ExecutorService 装饰为 ListeningExecutorService。 我们可以参考Guava中线程池的实现来了解更多关于 MoreExecutors 的 信息。

如果我们已经有一个返回 Future 的 API,并且需要将其转换为 ListenableFuture ,则可以通过初始化其具体实现 ListenableFutureTask 轻松完成此操作:

// old api
public FutureTask<String> fetchConfigTask(String configKey) {
    return new FutureTask<>(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}

// new api
public ListenableFutureTask<String> fetchConfigListenableTask(String configKey) {
    return ListenableFutureTask.create(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}

我们需要注意,除非将这些任务提交给 执行者,否则它们不会运行。 直接与 ListenableFutureTask 交互并不常见,仅在极少数情况下才会使用 (例如:实现我们自己的 ExecutorService )。实际使用可以参考Guava的 AbstractListeningExecutorService

如果我们的异步任务无法使用 ListeningExecutorService 或提供的 Futures 实用方法,并且需要手动设置未来值,我们也可以使用 com.google.common.util.concurrent.SettableFuture 。对于更复杂的用法,我们还可以考虑 com.google.common.util.concurrent.AbstractFuture。

3.2.添加监听器/回调

我们 ListenableFuture 添加侦听器的一种方法是使用 Futures.addCallback() 注册回调, 以便我们在成功或失败发生时访问结果或异常:

Executor listeningExecutor = Executors.newSingleThreadExecutor();

ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
Futures.addCallback(asyncTask, new FutureCallback<Integer>() {
    @Override
    public void onSuccess(Integer result) {
        // do on success
    }

    @Override
    public void onFailure(Throwable t) {
        // do on failure
    }
}, listeningExecutor);

我们还可以 通过直接将侦听器添加到 ListenableFuture 来添加侦听器。 请注意,当 future 成功或异常完成时,此侦听器将运行。另请注意,我们无权访问异步任务的结果:

Executor listeningExecutor = Executors.newSingleThreadExecutor();

int nextTask = 1;
Set<Integer> runningTasks = ConcurrentHashMap.newKeySet();
runningTasks.add(nextTask);

ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
asyncTask.addListener(() -> runningTasks.remove(nextTask), listeningExecutor);

4. 复杂的用法

现在让我们看看如何在更复杂的场景中使用这些 future。

4.1.扇入

有时我们可能需要调用多个异步任务并收集它们的结果,通常称为扇入操作。

Guava 为我们提供了两种实现此目的的方法。但是,我们应该根据我们的要求谨慎选择正确的方法。假设我们需要协调以下异步任务:

ListenableFuture<String> task1 = service.fetchConfig("config.0");
ListenableFuture<String> task2 = service.fetchConfig("config.1");
ListenableFuture<String> task3 = service.fetchConfig("config.2");

扇入多个 future 的一种方法是使用 Futures.allAsList() 方法。这使我们能够按照提供的 future 的顺序收集所有 future 的结果(如果所有 future 都成功) 。如果这些 future 中的任何一个失败,那么整个结果就是一个失败的 future:

ListenableFuture<List<String>> configsTask = Futures.allAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
    @Override
    public void onSuccess(@Nullable List<String> configResults) {
        // do on all futures success
    }

    @Override
    public void onFailure(Throwable t) {
        // handle on at least one failure
    }
}, someExecutor);

如果我们需要收集所有异步任务的结果,无论它们是否失败,我们可以使用 Futures.successfulAsList() 。这将返回一个列表,其结果将与传递给参数的任务具有相同的顺序,并且失败的任务将 null 分配给它们在列表中各自的位置:

ListenableFuture<List<String>> configsTask = Futures.successfulAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
    @Override
    public void onSuccess(@Nullable List<String> configResults) {
        // handle results. If task2 failed, then configResults.get(1) == null
    }

    @Override
    public void onFailure(Throwable t) {
        // handle failure
    }
}, listeningExecutor);

在上面的用法中我们应该小心, 如果未来任务通常在成功时返回 null ,那么它将与失败的任务(也将结果设置为 null )无法区分。

4.2.带组合器的扇入

如果我们需要协调返回不同结果的多个 future,则上述解决方案可能不够。在这种情况下,我们可以使用扇入操作的组合器变体来协调这种未来的组合。

与简单的扇入操作类似, Guava 为我们提供了两种变体;一种是在所有任务成功完成时成功,另一种是即使某些任务失败也成功,分别使用 Futures.whenAllSucceed()Futures.whenAllComplete() 方法。

让我们看看如何使用 Futures.whenAllSucceed() 来组合来自多个 future 的不同结果类型:

ListenableFuture<Integer> cartIdTask = service.getCartId();
ListenableFuture<String> customerNameTask = service.getCustomerName();
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();

ListenableFuture<CartInfo> cartInfoTask = Futures.whenAllSucceed(cartIdTask, customerNameTask, cartItemsTask)
    .call(() -> {
        int cartId = Futures.getDone(cartIdTask);
        String customerName = Futures.getDone(customerNameTask);
        List<String> cartItems = Futures.getDone(cartItemsTask);
        return new CartInfo(cartId, customerName, cartItems);
    }, someExecutor);

Futures.addCallback(cartInfoTask, new FutureCallback<CartInfo>() {
    @Override
    public void onSuccess(@Nullable CartInfo result) {
        //handle on all success and combination success
    }

    @Override
    public void onFailure(Throwable t) {
        //handle on either task fail or combination failed
    }
}, listeningExecService);

如果我们需要允许某些任务失败,我们可以使用 Futures.whenAllComplete() 。虽然语义大多与上面类似,但我们应该意识到,当对 Futures.getDone() 调用时,失败的 future 将抛出 ExecutionException

4.3.转换

有时我们需要将一次成功后的结果转换为future。 Guava 为我们提供了两种方法 : Futures.transform()Futures.lazyTransform()

让我们看看如何 使用 Futures.transform() 来转换 future 的结果。只要变换计算量不大就可以使用:

ListenableFuture<List<String>> cartItemsTask = service.getCartItems();

Function<List<String>, Integer> itemCountFunc = cartItems -> {
    assertNotNull(cartItems);
    return cartItems.size();
};

ListenableFuture<Integer> itemCountTask = Futures.transform(cartItemsTask, itemCountFunc, listenExecService);

我们还可以使用 Futures.lazyTransform() 将转换函数应用于 java.util.concurrent.Future。 我们需要记住,此选项不会返回 ListenableFuture, 而是返回普通的 java.util.concurrent.Future ,并且每次在结果 future 上调用 get() 时都会应用转换函数。

4.4.链式期货

我们可能会遇到我们的 future 需要调用其他 future 的情况。在这种情况下,Guava 为我们提供了 async() 变体来安全地将这些 future 链接起来依次执行。

让我们看看如何 使用 Futures.submitAsync() 从提交的 Callable 内部调用 future:

AsyncCallable<String> asyncConfigTask = () -> {
    ListenableFuture<String> configTask = service.fetchConfig("config.a");
    TimeUnit.MILLISECONDS.sleep(500); //some long running task
    return configTask;
};

ListenableFuture<String> configTask = Futures.submitAsync(asyncConfigTask, executor);

* 如果我们想要真正的链接,即一个 future 的结果被输入到另一个 future 的计算中,我们可以使用 Futures.transformAsync() : *

ListenableFuture<String> usernameTask = service.generateUsername("john");
AsyncFunction<String, String> passwordFunc = username -> {
    ListenableFuture<String> generatePasswordTask = service.generatePassword(username);
    TimeUnit.MILLISECONDS.sleep(500); // some long running task
    return generatePasswordTask;
};

ListenableFuture<String> passwordTask = Futures.transformAsync(usernameTask, passwordFunc, executor);

Guava 还为我们提供了 Futures.scheduleAsync()Futures.writingAsync() 分别用于提交计划任务和提供错误恢复时的后备任务。虽然它们满足不同的场景,但我们不会讨论它们,因为它们与其他 async() 调用类似。

5. 使用注意事项

现在让我们研究一下使用 future 时可能遇到的一些常见陷阱以及如何避免它们。

5.1.工作执行者与倾听执行者

使用 Guava 期货时,了解工作执行器和监听执行器之间的区别非常重要。例如,假设我们有一个异步任务来获取配置:

public ListenableFuture<String> fetchConfig(String configKey) {
    return lExecService.submit(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}

假设我们想为上面的 future 附加一个监听器:

ListenableFuture<String> configsTask = service.fetchConfig("config.0");
Futures.addCallback(configsTask, someListener, listeningExecutor);

请注意,这里的 lExecService 是运行异步任务的执行器,而 listeningExecutor 是调用侦听器的执行器。

如上所述, 我们应该始终考虑将这两个执行器分开,以避免监听器和工作线程竞争相同的线程池资源。 共享相同的执行器可能会导致我们的繁重任务使侦听器执行匮乏。或者一个写得不好的重量级监听器最终会阻碍我们重要的繁重任务。

5.2.小心 directExecutor()

虽然我们可以在单元测试中使用 MoreExecutors.directExecutor()MoreExecutors.newDirectExecutorService() 来更轻松地处理异步执行,但我们在生产代码中使用它们时应该小心。

当我们从上述方法获取执行器时,我们提交给它的任何任务,无论是重量级任务还是侦听器,都将在当前线程上执行。如果当前执行上下文需要高吞吐量,这可能会很危险。

例如,使用 directExecutor 并在 UI 线程中向其提交重量级任务将自动阻塞我们的 UI 线程。

我们还可能面临这样一种情况:我们的侦听器最终会减慢所有其他侦听器的速度(甚至是那些不涉及 directExecutor 的侦听器 )。这是因为 Guava 在各自的 Executor 中以 while 循环的方式执行所有监听器 但是 directExecutor 会导致监听器与 while 循环运行在同一个线程中。

5.3.嵌套期货不好

使用链式 future 时,我们应该小心,不要从另一个 future 内部调用一个 future,从而创建嵌套 future:

public ListenableFuture<String> generatePassword(String username) {
    return lExecService.submit(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return username + "123";
    });
}

String firstName = "john";
ListenableFuture<ListenableFuture<String>> badTask = lExecService.submit(() -> {
    final String username = firstName.replaceAll("[^a-zA-Z]+", "")
        .concat("@service.com");
    return generatePassword(username);
});

如果我们看到包含 ListenableFuture<ListenableFuture* 的代码** ***>, 那么我们应该知道这是一个写得很糟糕的未来 ,因为外部未来的取消和完成可能会发生竞争,并且取消可能不会传播到内部未来。

如果我们看到上述场景,我们应该始终使用 Futures.async() 变体以连接的方式安全地解开这些链式 future。

5.4.小心 JdkFutureAdapters.listenInPoolThread()

Guava 建议我们利用 ListenableFuture 的最佳方式是将所有使用 Future 的 代码转换为 ListenableFuture。

如果这种转换在某些情况下不可行, Guava 为我们提供了适配器,可以使用 JdkFutureAdapters.listenInPoolThread() 重写来完成此转换。 虽然这看起来很有帮助, 但 Guava 警告我们这些是重量级适配器,应尽可能避免。

六,结论

在本文中,我们了解了如何使用 Guava 的 ListenableFuture 来丰富我们对 future 的使用,以及如何使用 Futures API 来更轻松地使用这些 future。

我们还看到了在使用这些 future 和提供的执行器时可能会犯的一些常见错误。

与往常一样,包含示例的完整源代码可在 GitHub 上获取。