1. 引言

在 Actor 模型中,邮箱(Mailbox)是其核心组件之一。通过邮箱机制,Actor 可以将消息的接收与处理解耦。

那么,我们来看看最知名的 Actor 系统实现 Akka Typed 是如何实现 邮箱 这一概念的。

2. Akka 依赖配置

我们将使用 SBT 来配置项目依赖。要使用 Akka Typed 库,需要引入 akka-actor-typed 模块:

libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.8.0"

为了测试 Akka Typed 的 Actor,还需要引入测试工具包:

libraryDependencies += "com.typesafe.akka" %% "akka-actor-testkit-typed" % "2.8.0" % Test

3. 记录用户浏览行为

简单来说,一个 Actor 是一个响应消息进行动作的对象。而在 Akka Typed 中,这些通信就是 Actor 的 Behavior 所定义的消息。

举个例子,假设我们刚开了一家电商网站卖鞋子,为了更好地了解用户行为,我们需要收集他们在网站上的操作信息,比如鼠标移动、点击以及键盘输入:

sealed trait UserEvent {
  val usedId: String
}
case class MouseClick(override val usedId: String, x: Int, y: Int) extends Event
case class TextSniffing(override val usedId: String, text: String) extends Event
case class MouseMove(override val usedId: String, x: Int, y: Int) extends Event

然后我们定义一个 Actor 来收集这些事件:

val eventCollector: Behavior[UserEvent] = Behaviors.receive { (ctx, msg) =>
  msg match {
    case MouseClick(user, x, y) => 
      ctx.log.info(s"The user $user just clicked point ($x, $y)")
    case MouseMove(user, x, y) => 
      ctx.log.info(s"The user $user just move the mouse to point ($x, $y)")
    case TextSniffing(user, text) =>
      ctx.log.info(s"The user $user just entered the text '$text'")
  }
  Behaviors.same
}

4. 系统扩容(或崩溃)的风险

如前所述,Actor eventCollector 是从自己的邮箱中读取消息进行处理的。而邮箱本质上就是一个存放消息的数据结构。ActorSystem 负责把消息投递到这个邮箱里。

有一天,我们的广告登上了 Vogue America,第二天访问量暴涨十倍。此时我们的 eventCollector Actor 会发生什么?它能应对这突如其来的负载吗?

很遗憾,答案是否定的。由于默认的邮箱类型是 SingleConsumerOnlyUnboundedMailbox,也就是说它是无界队列,不会拒绝任何消息。同时,Akka 并没有内置背压机制。如果消息生产速度远超消费速度,系统很快就会因内存耗尽抛出 OutOfMemoryError

⚠️ 更糟糕的是,这种邮箱不允许被多个 Actor 共享,只能是单生产者多消费者模型。

所以问题来了:怎么避免让 eventCollector 搞垮整个业务?下面我们就来看看如何为 Actor 配置合适的邮箱。

5. 多种邮箱可选

幸运的是,Akka Typed 提供了多种类型的邮箱供我们选择。我们可以主动指定邮箱类型,而不是被动接受默认值

比如,我们可以使用 MailboxSelector 工厂方法来指定邮箱:

ctx.spawn(eventCollector, id, MailboxSelector.bounded(1000))

上面的例子中,我们给 Actor 设置了一个容量为 1000 的有界邮箱。

此外,也可以通过配置文件的方式读取邮箱设置:

val props = MailboxSelector.fromConfig("mailboxes.event-collector-mailbox")
ctx.spawn(eventCollector, s"{$id}_1", props)

对应的配置如下:

mailboxes {
  event-collector-mailbox {
    mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
  }
}

强烈建议不要硬编码邮箱配置,因为未来需求可能变化,灵活配置才是王道。

5.1. 有界 vs 无界邮箱

顾名思义,无界邮箱可以无限增长,如果消息生产者远快于消费者,它会吃光所有内存。因此,这类邮箱只适用于轻量级、低并发的场景。

相反,有界邮箱只保留固定数量的消息。一旦邮箱满了,新消息会被丢弃,从而避免 OOM。我们可以通过如下方式设置:

mailboxes { 
  event-collector-mailbox { 
    mailbox-type = "akka.dispatch.BoundedMailbox"
    mailbox-capacity = 100
  } 
}

在这个场景下,丢弃部分用户行为数据是可以接受的,毕竟比起系统崩溃,这点损失微不足道。

那么问题又来了:被丢弃的消息去哪了?真的就直接扔掉了吗?

好消息是,Akka 提供了“死信”机制来追踪这些消息,后面我们会详细讲。

5.2. 阻塞 vs 非阻塞邮箱

虽然听起来有些反直觉,但 Akka 确实提供了阻塞型邮箱。在这种情况下,发送方会被阻塞,直到消息成功进入邮箱为止。

当然,永远阻塞发送方是不可取的。因此,Akka 提供了带超时机制的有界邮箱,配置项为 mailbox-push-timeout-time

mailboxes { 
  event-collector-mailbox { 
    mailbox-type = "akka.dispatch.BoundedMailbox" 
    mailbox-capacity = 100 
    mailbox-push-timeout-time = 1s
  }
}

发送方最多等待 1 秒钟,若仍无法投递,则消息会被转到死信通道,并释放发送方继续执行。如果将该超时设为 0,则退化为非阻塞模式。

✅ 因此,使用带超时的有界阻塞邮箱,正是 Akka 实现背压的一种方式

6. 死信机制

当消息无法写入目标 Actor 的邮箱时,Akka 会将其重定向到一个特殊的虚拟 Actor:/deadLetters

死信消息的传递保证与其他消息一致,但由于它们已经“走投无路”,主要用于调试和监控

我们可以通过以下方式获取默认的死信监听器引用:

val defaultDeadLettersActor: ActorRef[DeadLetter] = system.deadLetters[DeadLetter]

同时,也可以让自定义 Actor 接收死信消息。Akka 使用 Event Stream 机制来分发死信:

val deadLettersListener: Behavior[DeadLetter] = Behaviors.receive { (ctx, msg) =>
  msg match {
    case DeadLetter(message, sender, recipient) =>
      ctx.log.debug(s"Dead letter received: ($message, $sender, $recipient)")
      Behaviors.same
  }
}

然后订阅事件流:

val deadLettersActor: ActorRef[DeadLetter] = 
  system.systemActorOf(deadLettersListener, "deadLettersListener")
system.eventStream.tell(EventStream.Subscribe[DeadLetter](deadLettersActor))

⚠️ 注意:死信不会在网络间传播,所以订阅者只能收到本地系统的死信。

7. 小结

本文介绍了 Akka Typed 中 Actor 的邮箱机制。我们通过实际案例讲解了无界邮箱的潜在风险,以及何时应选用有界邮箱。同时也探讨了阻塞与非阻塞邮箱的区别,以及它们与背压的关系。

除此之外,还有很多进阶话题值得深入研究,例如:

一如既往,完整代码可在 GitHub 上找到。


原始标题:Typed Mailboxes in Scala