1. 概述

在并发编程中,Future 和 Promise 是两个高级异步抽象。本教程将带你深入了解它们在 Scala 编程语言中的作用和用法。

2. Future

Future 是一个只读的占位符,用于表示正在进行的计算结果。它充当了一个尚未存在的实际值的代理。比如 IO 或 CPU 密集型操作,这些通常需要较长时间才能完成。

2.1. 创建 Future

要创建异步计算任务,我们可以将我们的计算逻辑放入 Future 的 apply 方法中:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val result = Future {
  println("Long running computation started.")
  val result = {
    Thread.sleep(5000)
    5
  }
  println("Our computation, finally finished.")
  result
}

要运行 Future,我们需要一个 ExecutionContext,它可以让我们将业务逻辑(代码)与执行环境解耦。由于 ExecutionContext 是隐式参数,我们可以在作用域中导入或创建一个并标记为 implicit。这里为了简单起见,我们使用全局 ExecutionContext:

implicit val ec = scala.concurrent.ExecutionContext.Implicits.global
val r1 = Future{ /* computation */ }(ec) // 显式传入 ExecutionContext
val r2 = Future{ /* computation */ }     // 隐式传递

2.2. 组合 Future

使用 Future 的好处在于它支持 mapflatMap 操作,因此我们可以使用惯用的 for-comprehension 来链式组合多个 Future,实现良好的并发组合能力

以下是一个 User 类的例子,其中 createUser() 函数依赖于其他异步函数的结果,如 notExist()avatar()。我们来看看如何组合这些操作:

import java.math.BigInteger
import java.net.URL
import java.security.MessageDigest

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

type Name = String
type Email = String
type Password = String
type Avatar = URL

case class User(name: Name, email: Email, password: Password, avatar: Avatar)

def notExist(email: Email): Future[Boolean] = Future {
  Thread.sleep(100)
  true
}
def md5hash(str: String): String =
  new BigInteger(1,
    MessageDigest
      .getInstance("MD5")
      .digest(str.getBytes)
  ).toString(16)
def avatar(email: Email): Future[Avatar] = Future {
  Thread.sleep(200)
  new Avatar("http://avatar.example.com/user/23k520f23f4.png")
}
def createUser(name: Name, email: Email, password: Password): Future[User] =
  for {
    _ <- notExist(email)
    avatar <- avatar(email)
    hashedPassword = md5hash(password)
  } yield User(name, email, hashedPassword, avatar)

在这个例子中,createUser() 接收 name、email 和 password 参数,并返回一个包含对应 User 实例的 Future(即 Future[User])。

创建用户时需要执行一些涉及 IO 的步骤。首先检查数据库中是否已存在该用户;其次从公共头像服务获取其头像。

我们使用 Thread.sleep() 来模拟数据库操作的延迟。

因为 avatar()notExist() 都是 Future,而每个 Future 都支持 flatMap 操作,所以我们可以通过 for-comprehension 将它们组合起来。

2.3. 获取值

到目前为止,我们已经学会了如何创建 Future 并将其串联起来。那么,该如何获取 Future 的最终结果呢?

在深入访问 Future 值的方式之前,先了解一下 Future 的状态机制

✅ Future 有两种阶段状态:

  1. 未完成(Not Completed):计算仍在进行中。
  2. 已完成(Completed):计算结束,结果可能是成功(Success)或失败(Failure)。

也就是说,在计算结果就绪前,Future 处于未完成状态;完成后则进入成功或失败状态之一。

下面介绍两种获取 Future 结果的方法:

方法一:使用 onComplete 回调

✅ Future 提供了 onComplete 方法,接收一个 Try[T] => U 类型的回调函数,允许我们在完成的不同状态下分别处理:

val userFuture: Future[User] =
  createUser("John", "john.doe@example.com", "secret")

userFuture.onComplete {
  case Success(user) =>
    println(s"User created: $user")
  case Failure(exception) =>
    println(s"Creating user failed due to the exception: $exception")
}

通过向 onComplete() 提供回调函数,一旦 Future 完成,该回调就会被触发。

方法二:阻塞等待结果(不推荐)

⚠️ 另一种方式是通过阻塞线程来等待 Future 完成,但这通常不是推荐做法。Await.result()Await.ready() 会阻塞当前线程直到计算完成。它们的签名如下:

def result[T](awaitable: Awaitable[T], atMost: Duration): T
def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type
  • 第一个参数是要等待的对象(所有 Future 都是 Awaitable 的子类)
  • 第二个参数是指定等待的最大时间

可以这样获取 Future 的值:

val user: User = Await.result(userFuture, Duration.Inf)

⚠️ 注意:使用 result() 时需小心,因为它可能在计算完成前抛出异常。

✅ 如果只是想等待 Future 完成而不提取值,可以使用 ready() 方法:

val completedFuture: Future[User] = Await.ready(userFuture, Duration.Inf)

此时再调用 value.get 一定不会是 None,而是 Some(Success(t))Some(Failure(error))

completedFuture.value.get match {
  case Success(user) =>
    println(s"User created: $user")
  case Failure(exception) =>
    println(s"Creating user failed due to the exception: $exception")
}

3. Promise

Promise 是一个可写的、单次赋值的容器,用于完成一个 Future。虽然 Promise 和 Future 很相似,但它们的角色不同:

  • ✅ Future 负责“读”异步操作的结果
  • ✅ Promise 负责“写”异步操作的结果

换句话说,Promise 是对将来某个时刻可用值的写入句柄。它允许我们将已完成的异步操作的结果写入 Future,从而改变 Future 的状态(从未完成变为完成)。调用 success() 方法即可完成这一过程。

⚠️ 一旦 Promise 被完成,就不能再次调用 success()

现在来看一下如何从 Promise 创建 Future:

✅ 步骤如下:

  1. 为想要返回的类型创建一个 Promise。
  2. 在 ExecutionContext 中运行计算代码块。如果执行成功,则调用 Promise 的 success() 方法设置结果;如果失败,则调用 failure() 方法。
  3. 返回与 Promise 关联的 Future。

来看一个示例函数,它接收一个计算块 block: => T,并返回一个包含 Promise 值的 Future:

def runByPromise[T](block: => T)(implicit ec: ExecutionContext): Future[T] = {
  val p = Promise[T]()
  ec.execute { () =>
    try { 
      p.success(block)
    } catch {
      case NonFatal(e) => p.failure(e)
    }
  }
  p.future
}

✅ Promise 是连接传统 Java 回调风格 API 和 Future 的桥梁。

4. 总结

本文介绍了 Scala 中用于编写非阻塞、异步代码的两种核心构造:

  • ✅ 使用 Future 处理异步操作的读取端
  • ✅ 使用 Promise 处理异步操作的写入端

完整源码见 GitHub


原始标题:Futures and Promises in Scala