1. 概述
在本教程中,我们将介绍如何在使用 Play Framework 编写的路由处理器中调度一个异步任务。
首先,我们会说明哪些场景适合使用异步任务;接着,介绍两种在 Play Framework 中调度任务的方法。
2. 为什么需要异步任务?
在 Play Framework 应用中,使用异步任务至少有以下三个原因:
✅ 启动一个涉及多个应用的长时间运行任务
✅ 触发应用内部状态的更新
✅ 记录用户行为而不增加响应时间
有时候,我们希望在接收到 REST API 请求后,启动一个耗时任务。例如,某个任务需要从数据库获取数据、生成 PDF 报告并发送给用户。对于调用我们 REST API 的客户端来说,等待邮件发送完成显然不合理。更好的做法是立即返回 HTTP 202 Accepted 状态码,然后在后台处理任务。
我们可能还需要清除缓存、重新加载数据并预热缓存以提升响应速度。这种情况下,也不应该让客户端等待缓存刷新完成,因为 HTTP 连接很可能会超时。
此外,我们可能需要将用户行为事件发送到消息队列。如果这些事件主要用于监控,并不是业务流程的核心部分,可以选择在后台任务中发送。
3. 将函数作为异步任务调度
Play Framework 3.0 使用 Apache Pekko 作为底层的 Actor 系统。而 Play 2.x 则使用 Akka。
在控制器代码中,我们可以访问 Actor 系统及其调度器。当我们拿到 Scheduler 后,就可以将任意函数作为后台任务运行。
首先,我们将 ActorSystem 添加为控制器的依赖项,并将 ExecutionContext 作为隐式依赖:
class AsyncTaskController @Inject()(val controllerComponents: ControllerComponents, val actorSystem: ActorSystem)(implicit ec: ExecutionContext) extends BaseController {
def runAsync(): Action[AnyContent] = Action {
...
}
}
为了访问该接口,我们需要在 routes 文件中添加配置:
GET /async controllers.AsyncTaskController.runAsync()
现在,我们可以 获取 ActorSystem 的调度器,并用它调度一个匿名函数在 30 秒后运行:
import scala.concurrent.duration._
import scala.language.postfixOps
def runAsync(): Action[AnyContent] = Action {
Console.println(s"In route handler: ${DateTime.now()}")
actorSystem.scheduler.scheduleOnce(30 seconds) {
Console.println(s"30 seconds later: ${DateTime.now()}")
}
...
}
注意,30 seconds
是一个有效的 FiniteDuration 对象,因为我们导入了 scala.concurrent.duration._
的隐式转换,并通过 import scala.language.postfixOps
启用了后缀表达式。
为了验证该匿名函数确实是在后台延迟运行的,我们可以启动 Play 应用并访问 http://localhost:9000/async。在日志中,你将看到如下输出:
In route handler: 2021-02-10T18:09:22.639+01:00
30 seconds later: 2021-02-10T18:09:52.794+01:00
4. 使用 Actor
使用 ActorSystem 有两个主要优势:
✅ Actor 可以是有状态的,同时我们无需担心并发访问问题,因为 Actor 一次只处理一个消息
✅ Actor 可以返回响应
要在 Play Framework 中使用 Actor,我们需要定义一个 Guice 模块,将 Actor 添加到 Actor 系统中。此外,我们还需要向控制器注入一个 Actor 引用。我们先从 注入 Actor 引用并向其发送消息 开始:
class AsyncTaskController @Inject()(..., @Named("async-job-actor") actor: ActorRef)(implicit ec: ExecutionContext) extends BaseController {
def runAsync(): Action[AnyContent] = Action {
actor ! "THIS IS THE MESSAGE SENT TO THE ACTOR"
...
}
}
@Named("async-job-actor")
注解告诉 Play Framework 应该将哪个 Actor 注入到控制器中。在 runAsync
函数中,我们调用 !
方法将对象发送给 Actor。我们可以传递任何可序列化的对象作为消息。
为了让这段代码运行起来,我们还需要做几件事。首先,定义一个接收消息的 Actor:
class AsyncTaskInActor extends Actor {
override def receive: Receive = {
case msg: String =>
Console.println(s"Message ${msg} received at ${DateTime.now()}")
}
}
接着,在 Guice 依赖注入模块中添加绑定:
class ActorsModule extends AbstractModule with PekkoGuiceSupport {
override def configure(): Unit = {
bindActor(classOf[AsyncTaskInActor], "async-job-actor")
}
}
最后,打开 application.conf 文件,将新模块添加到 Play Framework 使用的模块列表中:
play.modules.enabled += "actors.ActorsModule"
4.1. 延迟发送消息给 Actor
如果我们想延迟发送消息给 Actor,可以把前面两种方法结合起来,使用 scheduleOnce 函数,它可以接受一个 Actor 引用作为参数:
actorSystem.scheduler.scheduleOnce(
30 seconds,
actor,
"A TEST MESSAGE!!"
)
4.2. 调度周期性任务
有时我们需要执行周期性任务,这时可以使用 scheduleAtFixedRate 函数,它接受初始延迟、后续运行间隔、Actor 引用和消息作为参数:
val cancellable = actorSystem.scheduler.scheduleAtFixedRate(
10 seconds,
5 minutes,
actor,
"recurring task message"
)
该函数返回一个 Cancellable 实例,我们可以调用 cancel()
方法来停止周期性任务:
cancellable.cancel()
5. 总结
本教程演示了如何在 Play Framework 中通过直接使用 Scheduler 来异步运行匿名函数,以及如何访问 ActorSystem 来执行异步任务。我们还展示了如何定义 Guice 模块,将自定义 Actor 添加到 ActorSystem 中,并注入 Actor 引用。
完整示例代码可以在 GitHub 上找到。