1. 概述

有些操作,比如数据库查询或调用另一个 HTTP 服务,可能需要较长时间才能完成。如果在主线程上运行这些操作,会阻塞程序继续执行,从而降低性能。

本教程将重点介绍 Future,它是 Scala 中用于在后台运行操作并解决上述问题的一种方式。

2. Future 是什么?

Future 表示一个异步计算的结果,这个结果可能已经可用,也可能尚未准备好。

当我们创建一个新的 Future 时,Scala 会启动一个新线程来执行其代码。一旦执行完成,计算的结果(值或异常)将被赋给这个 Future。

3. 创建 Future

3.1. ExecutionContext

在创建任何 Future 之前,我们需要提供一个隐式的 ExecutionContext。它指定了 Future 的代码将在哪个线程池上、以何种方式执行。我们可以从 ExecutorExecutorService 创建它:

val forkJoinPool: ExecutorService = new ForkJoinPool(4)
implicit val forkJoinExecutionContext: ExecutionContext = 
  ExecutionContext.fromExecutorService(forkJoinPool)

val singleThread: Executor = Executors.newSingleThreadExecutor()
implicit val singleThreadExecutionContext: ExecutionContext = 
  ExecutionContext.fromExecutor(singleThread)

还有一种内置的 global ExecutionContext,它使用 ForkJoinPool,其并行级别设置为可用处理器数量:

implicit val globalExecutionContext: ExecutionContext = ExecutionContext.global

在接下来的示例中,我们将使用 ExecutionContext.global,只需一行导入即可使其生效:

import scala.concurrent.ExecutionContext.Implicits.global

3.2. 启动 Future

有了 ExecutionContext 后,我们就可以创建一个 Future 来在后台运行耗时操作了。我们用 Thread.sleep 来模拟耗时操作:

def generateMagicNumber(): Int = {
  Thread.sleep(3000L)
  23
}
val generatedMagicNumberF: Future[Int] = Future {
  generateMagicNumber()
}

当我们调用 Future.apply 并传入 generateMagicNumber() 时,Future 运行时会在另一个线程中执行它。看起来像是把方法的返回值传给了 Future,但其实 Future.apply 接收的是一个 by-name 参数,也就是说,它会把执行过程推迟到由隐式 ExecutionContext 提供的线程中进行

3.3. 已完成的 Future

如果已经有了计算结果,就没必要再启动异步计算来获取 Future。我们可以直接创建一个已完成的 Future:

def multiply(multiplier: Int): Future[Int] =
  if (multiplier == 0) {
    Future.successful(0)
  } else {
    Future(multiplier * generateMagicNumber())
  }

或者创建一个已完成并带有失败结果的 Future:

def divide(divider: Int): Future[Int] =
  if (divider == 0) {
    Future.failed(new IllegalArgumentException("Don't divide by zero"))
  } else {
    Future(generateMagicNumber() / divider)
  }

为了简化这种条件处理,我们可以使用 Future.fromTry() 方法:

def tryDivide(divider: Int): Future[Int] = Future.fromTry(Try {
  generateMagicNumber() / divider
})

当传给 Try {} 的表达式抛出异常时,Future.fromTry() 等价于 Future.failed()。否则,它类似于 Future.successful()

4. 阻塞等待 Future

我们已经创建了 Future,现在需要一种方式来等待它的结果:

val maxWaitTime: FiniteDuration = Duration(5, TimeUnit.SECONDS)
val magicNumber: Int = Await.result(generatedMagicNumberF, maxWaitTime)

Await.result 会阻塞主线程,并等待给定的 Future 在指定时间内完成。如果超时或 Future 失败,它会抛出异常。

在这个例子中,我们最多等待 5 秒钟获取 generatedMagicNumberF 的结果。

如果想无限期等待,可以使用 Duration.Inf

⚠️ 由于 Await.result 会阻塞主线程,因此只应在确实需要等待时使用。 如果只是想转换 Future 的结果或与其他 Future 组合,应该使用非阻塞方式,我们将在后面介绍。

5. 回调处理

5.1. onComplete

与其阻塞主线程等待 Future 结果,不如通过 onComplete 方法注册回调:

def printResult[A](result: Try[A]): Unit = result match {
  case Failure(exception) => println("Failed with: " + exception.getMessage)
  case Success(number)    => println("Succeed with: " + number)
}
magicNumberF.onComplete(printResult)

在这个例子中,当 magicNumberF 完成后(无论成功或失败),Future 会执行 printResult 方法。

5.2. foreach

如果只想在 Future 成功完成时执行回调,可以使用 foreach 方法:

def printSucceedResult[A](result: A): Unit = println("Succeed with: " + result)
magicNumberF.foreach(printSucceedResult)

onComplete 不同,如果 magicNumberF 失败,则不会执行回调函数。

它的语义与 TryOptionforeach 方法相同。

6. 错误处理

6.1. failed

如果我们希望把失败的 Future 转换为成功的 Future(携带异常信息),可以使用 failed 方法:

val failedF: Future[Int] = Future.failed(new IllegalArgumentException("Boom!"))
val failureF: Future[Throwable] = failedF.failed

它尝试将失败的 Future 转换为一个成功的 Future,其结果是 Throwable。如果原 Future 成功完成,则结果 Future 会失败,并抛出 NoSuchElementException

6.2. fallbackTo

假设我们有一个 DatabaseRepository 可以从数据库读取魔数,还有一个 FileBackup 可以从备份文件中读取魔数(每天备份一次):

trait DatabaseRepository {
  def readMagicNumber(): Future[Int]
  def updateMagicNumber(number: Int): Future[Boolean]
}
trait FileBackup {
  def readMagicNumberFromLatestBackup(): Future[Int]
}

由于这个魔数对业务至关重要,当数据库出问题时,我们希望从备份中读取。这时可以使用 fallbackTo 方法:

trait MagicNumberService {
  val repository: DatabaseRepository
  val backup: FileBackup

  val magicNumberF: Future[Int] =
    repository.readMagicNumber()
      .fallbackTo(backup.readMagicNumberFromLatestBackup())
}

它会在当前 Future 失败时,尝试使用备选的 Future。如果两个都失败,最终的 Future 会失败,并使用当前 Future 的异常

6.3. recover

如果想通过提供替代值来处理特定异常,可以使用 recover 方法:

val recoveredF: Future[Int] = Future(3 / 0).recover {
  case _: ArithmeticException => 0
}

它接收一个偏函数,将匹配的异常转换为成功的值。否则,保持原异常。

6.4. recoverWith

如果想用另一个 Future 来处理特定异常,应该使用 recoverWith

val recoveredWithF: Future[Int] = Future(3 / 0).recoverWith {
  case _: ArithmeticException => magicNumberF
}

7. 转换 Future

7.1. map

当我们有一个 Future 实例时,可以用 map 方法 在不阻塞主线程的情况下转换其成功结果

def increment(number: Int): Int = number + 1
val nextMagicNumberF: Future[Int] = magicNumberF.map(increment)

它会创建一个新的 Future,将 increment 方法应用到 magicNumberF 成功的结果上。如果原 Future 失败,则新 Future 也会失败,并携带相同的异常。

increment 方法的执行是在 ExecutionContext 提供的另一个线程中进行的。

7.2. flatMap

如果想使用一个返回 Future 的函数来转换 Future,应该使用 flatMap 方法:

val updatedMagicNumberF: Future[Boolean] =
  nextMagicNumberF.flatMap(repository.updateMagicNumber)

它与 map 类似,但会保持结果扁平,返回 Future[Boolean] 而不是 Future[Future[Boolean]]

有了 flatMapmap,我们可以写出更清晰、易读的代码。

7.3. transform

map() 不同,**transform(f: Try[T] => Try[S]) 可以同时映射成功和失败的情况**:

val value = Future.successful(42)
val transformed = value.transform {
  case Success(value) => Success(s"Successfully computed the $value")
  case Failure(cause) => Failure(new IllegalStateException(cause))
}

如上所示,transform() 接收一个函数,输入是 Future 的结果,输出是一个 Try 实例。在这个例子中,我们将 Future[Int] 转换为 Future[String]

该方法还有一个重载版本,分别接收两个函数,一个处理成功,一个处理失败:

val overloaded = value.transform(
  value => s"Successfully computed $value", 
  cause => new IllegalStateException(cause)
)

第一个函数处理成功结果,第二个处理异常。

7.4. transformWith

transform() 类似,transformWith(f: Try[T] => Future[S]) 接收一个函数,但它将 Try 直接转换为 Future

value.transformWith {
  case Success(value) => Future.successful(s"Successfully computed the $value")
  case Failure(cause) => Future.failed(new IllegalStateException(cause))
}

如上所示,输入函数的返回类型是 Future 而不是 Try

7.5. andThen

andThen() 函数对给定的 Future 应用副作用函数,并返回相同的 Future:

Future.successful(42).andThen {
  case Success(v) => println(s"The answer is $v")
}

如上所示,andThen() 在这里消费了成功结果。由于 andThen() 返回的是同一个 Future,我们可以链式调用多个 andThen()

val f = Future.successful(42).andThen {
  case Success(v) => println(s"The answer is $v")
} andThen {
  case Success(_) => // 发送 HTTP 请求表示成功
  case Failure(_) =>  // 发送 HTTP 请求表示失败
}
  
f.onComplete { v =>
  println(s"The original future has returned: ${v}")
}

如上所示,变量 f 在应用了两个副作用函数后,仍然包含原始的 Future。

8. 组合 Future

8.1. zip

如果想将两个独立的 Future 的结果组合成一个 Tuple2,可以使用 zip 方法:

val pairOfMagicNumbersF: Future[(Int, Int)] =
  repository.readMagicNumber()
    .zip(backup.readMagicNumberFromLatestBackup())

它会尝试将两个 Future 的成功结果组合成一个元组。如果其中任何一个失败,结果 Future 也会失败,且失败原因与左边第一个失败的 Future 相同

8.2. zipWith

如果想将两个 Future 的结果组合成非元组的其他结构,可以使用 zipWith 方法:

def areEqual(x: Int, y: Int): Boolean = x == y
val areMagicNumbersEqualF: Future[Boolean] =
  repository.readMagicNumber()
    .zipWith(backup.readMagicNumberFromLatestBackup())(areEqual)

它使用给定的函数组合两个 Future 的成功结果。

在上面的例子中,我们使用 areEqual 方法判断两个魔数是否相等。

8.3. traverse

假设我们有一个魔数列表,并希望使用 Publisher 发布每一个:

val magicNumbers: List[Int] = List(1, 2, 3, 4)
trait Publisher {
  def publishMagicNumber(number: Int): Future[Boolean]
}

在这种情况下,我们可以使用 Future.traverse 方法,它 并行地对多个元素进行映射

val published: Future[List[Boolean]] =
  Future.traverse(magicNumbers)(publisher.publishMagicNumber)

它会对每个魔数调用 publishMagicNumber 方法,并将它们组合成一个单一的 Future。每个调用都在 ExecutionContext 提供的不同线程中执行。

如果其中任何一个失败,结果 Future 也会失败。

9. 总结

在这篇文章中,我们探索了 Scala 的 Future API。

我们了解了如何使用 Future 启动异步计算,以及如何等待其结果。接着学习了多个用于转换和组合 Future 结果的有用方法,且不会阻塞主线程。

最后,我们展示了如何处理成功结果和错误,以及如何组合多个 Future 的结果。

一如既往,本文的完整源代码可在 GitHub 上找到。


原始标题:Initializing Empty Mutable Collections in Kotlin