1. 概述

ListenableFutureCompletableFuture 基于 Java 的 Future 接口构建。前者是 Google 的 Guava 库的一部分,后者则是 Java 8 的组成部分。

我们都知道,Future 接口本身并不提供回调功能。ListenableFutureCompletableFuture 都弥补了这一空白。在这篇教程中,我们将学习如何使用这两种方式实现回调机制。

2. 异步任务中的回调

假设我们需要从远程服务器下载图像文件并将文件名存储到数据库。由于这个任务涉及网络操作且耗时,因此非常适合使用 Java 的异步能力。

我们可以创建一个函数,从远程服务器下载文件,并添加一个监听器,当下载完成后将数据推送到数据库。

接下来,我们将分别使用 ListenableFutureCompletableFuture 来实现这个任务。

3. ListenableFuture 中的回调

首先,在 pom.xml 文件中添加 Google Guava 库的依赖:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>32.1.3-jre</version>
</dependency>

然后,模仿一个从远程 Web 服务器下载文件的 Future

ExecutorService executorService = Executors.newFixedThreadPool(1);
ListeningExecutorService pool = MoreExecutors.listeningDecorator(executorService);
ListenableFuture<String> listenableFuture = pool.submit(downloadFile());

private static Callable<String> downloadFile() {
  return () -> {
    // Mimicking the downloading of a file by adding a sleep call
    Thread.sleep(5000);
    return "pic.jpg";
  };
}

上述代码创建了一个包含在 MoreExecutors 中的线程池的 ExecutorService。在 ListenableFutureServicesubmit 方法中,我们传递了一个 Callable<String>,它负责下载文件并返回文件名,返回一个 ListenableFuture

为了在 ListenableFuture 实例上添加回调,Guava 提供了一个 Future 的实用方法:

Futures.addCallback(
    listenableFuture,
    new FutureCallback<>() {
        @Override
        public void onSuccess(String fileName) {
            // code to push fileName to DB
        }

        @Override
        public void onFailure(Throwable throwable) {
            // code to take appropriate action when there is an error
        }
    },
    executorService);
}

在这个回调中,成功和异常情况都得到了处理。这种使用回调的方式相当自然。

我们还可以直接将监听器添加到 ListenableFuture 上:

listenableFuture.addListener(
    new Runnable() {
        @Override
        public void run() {
            // logic to download file
        }
    },
    executorService
);

这个回调没有访问 Future 结果的能力,因为它接受的是 Runnable。无论 Future 是否完成,这个回调方法都会执行。

了解了 ListenableFuture 的回调后,下一部分我们将探讨 CompletableFuture 实现相同功能的方法。

4. CompletableFuture 中的回调

CompletableFuture 中,有许多方法可以添加回调。最流行的方法包括使用链式方法,如 thenApply()thenAccept()thenCompose()exceptionally() 等,它们会在正常或异常情况下执行。

本节我们将学习 whenComplete() 方法。这个方法的优点是可以在任何希望它完成的线程上完成。以刚才的文件下载示例为例,来看看如何使用 whenComplete()

CompletableFuture<String> completableFuture = new CompletableFuture<>();
Runnable runnable = downloadFile(completableFuture);
completableFuture.whenComplete(
  (res, error) -> {
      if (error != null) {
          // handle the exception scenario
      } else if (res != null) {
          // send data to DB
      }
  });
new Thread(runnable).start();

private static Runnable downloadFile(CompletableFuture<String> completableFuture) {
  return () -> {
      try {
          // logic to download to file;
      } catch (InterruptedException e) {
          log.error("exception is {} "+e);
      }
      completableFuture.complete("pic.jpg");
  };
}

当文件下载完成后,whenComplete() 方法会执行成功或失败的条件。

5. 总结

在这篇文章中,我们学习了 ListenableFutureCompletableFuture 中的回调机制。

我们发现,与 CompletableFuture 相比,ListenableFuture 提供了一个更自然、流畅的回调 API。

根据我们的具体需求,可以选择最适合的方式,因为 CompletableFuture 是 Java 核心库的一部分,而 ListenableFuture 则是流行的 Guava 库的一部分。

本文所有代码示例可在 GitHub 查看。