1. 概述
有些操作,比如数据库查询或调用另一个 HTTP 服务,可能需要较长时间才能完成。如果在主线程上运行这些操作,会阻塞程序继续执行,从而降低性能。
本教程将重点介绍 Future,它是 Scala 中用于在后台运行操作并解决上述问题的一种方式。
2. Future 是什么?
✅ Future 表示一个异步计算的结果,这个结果可能已经可用,也可能尚未准备好。
当我们创建一个新的 Future 时,Scala 会启动一个新线程来执行其代码。一旦执行完成,计算的结果(值或异常)将被赋给这个 Future。
3. 创建 Future
3.1. ExecutionContext
在创建任何 Future 之前,我们需要提供一个隐式的 ExecutionContext。它指定了 Future 的代码将在哪个线程池上、以何种方式执行。我们可以从 Executor 或 ExecutorService 创建它:
val forkJoinPool: ExecutorService = new ForkJoinPool(4)
implicit val forkJoinExecutionContext: ExecutionContext =
ExecutionContext.fromExecutorService(forkJoinPool)
val singleThread: Executor = Executors.newSingleThreadExecutor()
implicit val singleThreadExecutionContext: ExecutionContext =
ExecutionContext.fromExecutor(singleThread)
还有一种内置的 global ExecutionContext,它使用 ForkJoinPool,其并行级别设置为可用处理器数量:
implicit val globalExecutionContext: ExecutionContext = ExecutionContext.global
在接下来的示例中,我们将使用 ExecutionContext.global
,只需一行导入即可使其生效:
import scala.concurrent.ExecutionContext.Implicits.global
3.2. 启动 Future
有了 ExecutionContext 后,我们就可以创建一个 Future 来在后台运行耗时操作了。我们用 Thread.sleep
来模拟耗时操作:
def generateMagicNumber(): Int = {
Thread.sleep(3000L)
23
}
val generatedMagicNumberF: Future[Int] = Future {
generateMagicNumber()
}
当我们调用 Future.apply
并传入 generateMagicNumber()
时,Future 运行时会在另一个线程中执行它。看起来像是把方法的返回值传给了 Future,但其实 Future.apply
接收的是一个 by-name 参数,也就是说,它会把执行过程推迟到由隐式 ExecutionContext 提供的线程中进行。
3.3. 已完成的 Future
如果已经有了计算结果,就没必要再启动异步计算来获取 Future。我们可以直接创建一个已完成的 Future:
def multiply(multiplier: Int): Future[Int] =
if (multiplier == 0) {
Future.successful(0)
} else {
Future(multiplier * generateMagicNumber())
}
或者创建一个已完成并带有失败结果的 Future:
def divide(divider: Int): Future[Int] =
if (divider == 0) {
Future.failed(new IllegalArgumentException("Don't divide by zero"))
} else {
Future(generateMagicNumber() / divider)
}
为了简化这种条件处理,我们可以使用 Future.fromTry() 方法:
def tryDivide(divider: Int): Future[Int] = Future.fromTry(Try {
generateMagicNumber() / divider
})
当传给 Try {}
的表达式抛出异常时,Future.fromTry()
等价于 Future.failed()
。否则,它类似于 Future.successful()
。
4. 阻塞等待 Future
我们已经创建了 Future,现在需要一种方式来等待它的结果:
val maxWaitTime: FiniteDuration = Duration(5, TimeUnit.SECONDS)
val magicNumber: Int = Await.result(generatedMagicNumberF, maxWaitTime)
Await.result
会阻塞主线程,并等待给定的 Future 在指定时间内完成。如果超时或 Future 失败,它会抛出异常。
在这个例子中,我们最多等待 5 秒钟获取 generatedMagicNumberF
的结果。
如果想无限期等待,可以使用 Duration.Inf
。
⚠️ 由于 Await.result
会阻塞主线程,因此只应在确实需要等待时使用。 如果只是想转换 Future 的结果或与其他 Future 组合,应该使用非阻塞方式,我们将在后面介绍。
5. 回调处理
5.1. onComplete
与其阻塞主线程等待 Future 结果,不如通过 onComplete
方法注册回调:
def printResult[A](result: Try[A]): Unit = result match {
case Failure(exception) => println("Failed with: " + exception.getMessage)
case Success(number) => println("Succeed with: " + number)
}
magicNumberF.onComplete(printResult)
在这个例子中,当 magicNumberF
完成后(无论成功或失败),Future 会执行 printResult
方法。
5.2. foreach
如果只想在 Future 成功完成时执行回调,可以使用 foreach
方法:
def printSucceedResult[A](result: A): Unit = println("Succeed with: " + result)
magicNumberF.foreach(printSucceedResult)
与 onComplete
不同,如果 magicNumberF
失败,则不会执行回调函数。
它的语义与 Try 或 Option 的 foreach
方法相同。
6. 错误处理
6.1. failed
如果我们希望把失败的 Future 转换为成功的 Future(携带异常信息),可以使用 failed
方法:
val failedF: Future[Int] = Future.failed(new IllegalArgumentException("Boom!"))
val failureF: Future[Throwable] = failedF.failed
它尝试将失败的 Future 转换为一个成功的 Future,其结果是 Throwable
。如果原 Future 成功完成,则结果 Future 会失败,并抛出 NoSuchElementException。
6.2. fallbackTo
假设我们有一个 DatabaseRepository
可以从数据库读取魔数,还有一个 FileBackup
可以从备份文件中读取魔数(每天备份一次):
trait DatabaseRepository {
def readMagicNumber(): Future[Int]
def updateMagicNumber(number: Int): Future[Boolean]
}
trait FileBackup {
def readMagicNumberFromLatestBackup(): Future[Int]
}
由于这个魔数对业务至关重要,当数据库出问题时,我们希望从备份中读取。这时可以使用 fallbackTo
方法:
trait MagicNumberService {
val repository: DatabaseRepository
val backup: FileBackup
val magicNumberF: Future[Int] =
repository.readMagicNumber()
.fallbackTo(backup.readMagicNumberFromLatestBackup())
}
它会在当前 Future 失败时,尝试使用备选的 Future。如果两个都失败,最终的 Future 会失败,并使用当前 Future 的异常。
6.3. recover
如果想通过提供替代值来处理特定异常,可以使用 recover
方法:
val recoveredF: Future[Int] = Future(3 / 0).recover {
case _: ArithmeticException => 0
}
它接收一个偏函数,将匹配的异常转换为成功的值。否则,保持原异常。
6.4. recoverWith
如果想用另一个 Future 来处理特定异常,应该使用 recoverWith
:
val recoveredWithF: Future[Int] = Future(3 / 0).recoverWith {
case _: ArithmeticException => magicNumberF
}
7. 转换 Future
7.1. map
当我们有一个 Future 实例时,可以用 map
方法 在不阻塞主线程的情况下转换其成功结果:
def increment(number: Int): Int = number + 1
val nextMagicNumberF: Future[Int] = magicNumberF.map(increment)
它会创建一个新的 Future,将 increment
方法应用到 magicNumberF
成功的结果上。如果原 Future 失败,则新 Future 也会失败,并携带相同的异常。
increment
方法的执行是在 ExecutionContext 提供的另一个线程中进行的。
7.2. flatMap
如果想使用一个返回 Future 的函数来转换 Future,应该使用 flatMap
方法:
val updatedMagicNumberF: Future[Boolean] =
nextMagicNumberF.flatMap(repository.updateMagicNumber)
它与 map
类似,但会保持结果扁平,返回 Future[Boolean]
而不是 Future[Future[Boolean]]
。
有了 flatMap
和 map
,我们可以写出更清晰、易读的代码。
7.3. transform
与 map()
不同,**transform(f: Try[T] => Try[S])
可以同时映射成功和失败的情况**:
val value = Future.successful(42)
val transformed = value.transform {
case Success(value) => Success(s"Successfully computed the $value")
case Failure(cause) => Failure(new IllegalStateException(cause))
}
如上所示,transform()
接收一个函数,输入是 Future 的结果,输出是一个 Try
实例。在这个例子中,我们将 Future[Int]
转换为 Future[String]
。
该方法还有一个重载版本,分别接收两个函数,一个处理成功,一个处理失败:
val overloaded = value.transform(
value => s"Successfully computed $value",
cause => new IllegalStateException(cause)
)
第一个函数处理成功结果,第二个处理异常。
7.4. transformWith
与 transform()
类似,transformWith(f: Try[T] => Future[S])
接收一个函数,但它将 Try
直接转换为 Future
:
value.transformWith {
case Success(value) => Future.successful(s"Successfully computed the $value")
case Failure(cause) => Future.failed(new IllegalStateException(cause))
}
如上所示,输入函数的返回类型是 Future
而不是 Try
。
7.5. andThen
andThen()
函数对给定的 Future 应用副作用函数,并返回相同的 Future:
Future.successful(42).andThen {
case Success(v) => println(s"The answer is $v")
}
如上所示,andThen()
在这里消费了成功结果。由于 andThen()
返回的是同一个 Future,我们可以链式调用多个 andThen()
:
val f = Future.successful(42).andThen {
case Success(v) => println(s"The answer is $v")
} andThen {
case Success(_) => // 发送 HTTP 请求表示成功
case Failure(_) => // 发送 HTTP 请求表示失败
}
f.onComplete { v =>
println(s"The original future has returned: ${v}")
}
如上所示,变量 f
在应用了两个副作用函数后,仍然包含原始的 Future。
8. 组合 Future
8.1. zip
如果想将两个独立的 Future 的结果组合成一个 Tuple2,可以使用 zip
方法:
val pairOfMagicNumbersF: Future[(Int, Int)] =
repository.readMagicNumber()
.zip(backup.readMagicNumberFromLatestBackup())
它会尝试将两个 Future 的成功结果组合成一个元组。如果其中任何一个失败,结果 Future 也会失败,且失败原因与左边第一个失败的 Future 相同。
8.2. zipWith
如果想将两个 Future 的结果组合成非元组的其他结构,可以使用 zipWith
方法:
def areEqual(x: Int, y: Int): Boolean = x == y
val areMagicNumbersEqualF: Future[Boolean] =
repository.readMagicNumber()
.zipWith(backup.readMagicNumberFromLatestBackup())(areEqual)
它使用给定的函数组合两个 Future 的成功结果。
在上面的例子中,我们使用 areEqual
方法判断两个魔数是否相等。
8.3. traverse
假设我们有一个魔数列表,并希望使用 Publisher
发布每一个:
val magicNumbers: List[Int] = List(1, 2, 3, 4)
trait Publisher {
def publishMagicNumber(number: Int): Future[Boolean]
}
在这种情况下,我们可以使用 Future.traverse
方法,它 并行地对多个元素进行映射:
val published: Future[List[Boolean]] =
Future.traverse(magicNumbers)(publisher.publishMagicNumber)
它会对每个魔数调用 publishMagicNumber
方法,并将它们组合成一个单一的 Future。每个调用都在 ExecutionContext 提供的不同线程中执行。
如果其中任何一个失败,结果 Future 也会失败。
9. 总结
在这篇文章中,我们探索了 Scala 的 Future API。
我们了解了如何使用 Future 启动异步计算,以及如何等待其结果。接着学习了多个用于转换和组合 Future 结果的有用方法,且不会阻塞主线程。
最后,我们展示了如何处理成功结果和错误,以及如何组合多个 Future 的结果。
一如既往,本文的完整源代码可在 GitHub 上找到。