1. 概述
简单来说,MBassador 是一个基于发布-订阅模式的高性能事件总线。
消息会被广播给一个或多个订阅者,而发布者无需知道订阅者数量或如何处理消息。
2. Maven 依赖
使用前需添加 mbassador 依赖:
<dependency>
<groupId>net.engio</groupId>
<artifactId>mbassador</artifactId>
<version>1.3.1</version>
</dependency>
3. 基础事件处理
3.1 简单示例
先看一个基础的消息发布示例:
private MBassador<Object> dispatcher = new MBassador<>();
private String messageString;
@Before
public void prepareTests() {
dispatcher.subscribe(this);
}
@Test
public void whenStringDispatched_thenHandleString() {
dispatcher.post("TestString").now();
assertNotNull(messageString);
assertEquals("TestString", messageString);
}
@Handler
public void handleString(String message) {
messageString = message;
}
关键点解析:
- 测试类顶部创建默认构造的 MBassador 实例
- @Before 方法中调用 subscribe() 注册当前类
- 调用 dispatcher.post(…).now() 触发消息分发
- 分发后 handleString() 方法被调用
✅ 核心概念:
- 任何对象都可成为订阅者,只需包含 @Handler 注解的方法
- 订阅者可包含任意数量的处理器
- 处理器方法只能有一个参数(消息本身),且不能抛出受检异常
- post() 方法接受任何 Object 类型作为消息
⚠️ 实际生产中发布者和消费者通常在不同类中,测试中为简化使用自身订阅。
3.2 死信消息
当消息没有匹配的处理器时会发生什么?添加新处理器后发送第三种消息类型:
private Object deadEvent;
@Test
public void whenLongDispatched_thenDeadEvent() {
dispatcher.post(42L).now();
assertNull(messageString);
assertNull(messageInteger);
assertNotNull(deadEvent);
assertTrue(deadEvent instanceof Long);
assertTrue(42L == (Long) deadEvent);
}
@Handler
public void handleDeadEvent(DeadMessage message) {
deadEvent = message.getMessage();
}
当消息没有处理器时,会被包装成 DeadMessage 对象。我们通过添加 DeadMessage 处理器捕获了它。
❌ 如果应用不需要跟踪死信消息,可以安全忽略它们。
4. 使用事件继承体系
仅发送 String 和 Integer 事件太局限,先定义几个消息类:
public class Message {}
public class AckMessage extends Message {}
public class RejectMessage extends Message {
int code;
// setters and getters
}
4.1 发送基类 Message
private MBassador<Message> dispatcher = new MBassador<>();
private Message message;
private AckMessage ackMessage;
private RejectMessage rejectMessage;
@Before
public void prepareTests() {
dispatcher.subscribe(this);
}
@Test
public void whenMessageDispatched_thenMessageHandled() {
dispatcher.post(new Message()).now();
assertNotNull(message);
assertNull(ackMessage);
assertNull(rejectMessage);
}
@Handler
public void handleMessage(Message message) {
this.message = message;
}
@Handler
public void handleRejectMessage(RejectMessage message) {
rejectMessage = message;
}
@Handler
public void handleAckMessage(AckMessage message) {
ackMessage = message;
}
使用泛型 MBassador
- 发送 Message 时,只有 handleMessage() 被调用
- 其他处理器不会触发
4.2 发送子类消息
发送 RejectMessage 时:
@Test
public void whenRejectDispatched_thenMessageAndRejectHandled() {
dispatcher.post(new RejectMessage()).now();
assertNotNull(message);
assertNotNull(rejectMessage);
assertNull(ackMessage);
}
✅ 两个处理器都被触发:
- *handleRejectMessage()*(精确匹配)
- *handleMessage()*(继承关系匹配)
验证 AckMessage 行为:
@Test
public void whenAckDispatched_thenMessageAndAckHandled() {
dispatcher.post(new AckMessage()).now();
assertNotNull(message);
assertNotNull(ackMessage);
assertNull(rejectMessage);
}
5. 消息过滤
按类型组织消息已是强大功能,但还可进一步过滤。
5.1 按类和子类过滤
发送子类消息时,基类处理器也会触发。可通过过滤器解决:
private Message baseMessage;
private Message subMessage;
@Test
public void whenMessageDispatched_thenMessageFiltered() {
dispatcher.post(new Message()).now();
assertNotNull(baseMessage);
assertNull(subMessage);
}
@Test
public void whenRejectDispatched_thenRejectFiltered() {
dispatcher.post(new RejectMessage()).now();
assertNotNull(subMessage);
assertNull(baseMessage);
}
@Handler(filters = { @Filter(Filters.RejectSubtypes.class) })
public void handleBaseMessage(Message message) {
this.baseMessage = message;
}
@Handler(filters = { @Filter(Filters.SubtypesOnly.class) })
public void handleSubMessage(Message message) {
this.subMessage = message;
}
@Handler 的 filters 参数接受实现 IMessageFilter 的类:
- Filters.RejectSubtypes:过滤所有子类
- Filters.SubtypesOnly:仅接受子类
5.2 IMessageFilter 接口
RejectSubTypes 和 SubtypesOnly 都实现了 IMessageFilter:
- RejectSubTypes:消息类必须精确匹配,拒绝子类
5.3 条件过滤
MBassador 支持使用 Java EL 表达式进行条件过滤,更简单灵活。
按字符串长度过滤:
private String testString;
@Test
public void whenLongStringDispatched_thenStringFiltered() {
dispatcher.post("foobar!").now();
assertNull(testString);
}
@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
this.testString = message;
}
"foobar!"(7字符)被过滤,发送更短的字符串:
@Test
public void whenShortStringDispatched_thenStringHandled() {
dispatcher.post("foobar").now();
assertNotNull(testString);
}
"foobar"(6字符)通过过滤。
过滤带字段的 RejectMessage:
private RejectMessage rejectMessage;
@Test
public void whenWrongRejectDispatched_thenRejectFiltered() {
RejectMessage testReject = new RejectMessage();
testReject.setCode(-1);
dispatcher.post(testReject).now();
assertNull(rejectMessage);
assertNotNull(subMessage);
assertEquals(-1, ((RejectMessage) subMessage).getCode());
}
@Handler(condition = "msg.getCode() != -1")
public void handleRejectMessage(RejectMessage rejectMessage) {
this.rejectMessage = rejectMessage;
}
5.4 捕获被过滤消息
类似 DeadEvents,可捕获被过滤的消息:
private String testString;
private FilteredMessage filteredMessage;
private DeadMessage deadMessage;
@Test
public void whenLongStringDispatched_thenStringFiltered() {
dispatcher.post("foobar!").now();
assertNull(testString);
assertNotNull(filteredMessage);
assertTrue(filteredMessage.getMessage() instanceof String);
assertNull(deadMessage);
}
@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
this.testString = message;
}
@Handler
public void handleFilterMessage(FilteredMessage message) {
this.filteredMessage = message;
}
@Handler
public void handleDeadMessage(DeadMessage deadMessage) {
this.deadMessage = deadMessage;
}
过滤事件与死信事件处理机制不同:
- 过滤消息进入 FilteredMessage 处理器
- 死信消息进入 DeadMessage 处理器
6. 异步消息分发与处理
目前所有示例使用同步分发,调用 post.now() 会阻塞直到所有消息处理完成。
6.1 异步分发
MBassador.post() 返回 SyncAsyncPostCommand,提供方法:
- *now()*:同步分发,阻塞直到完成
- *asynchronously()*:异步执行消息发布
使用异步分发示例(借助 Awaitility):
private MBassador<Message> dispatcher = new MBassador<>();
private String testString;
private AtomicBoolean ready = new AtomicBoolean(false);
@Test
public void whenAsyncDispatched_thenMessageReceived() {
dispatcher.post("foobar").asynchronously();
await().untilAtomic(ready, equalTo(true));
assertNotNull(testString);
}
@Handler
public void handleStringMessage(String message) {
this.testString = message;
ready.set(true);
}
⚠️ 注释掉 await() 可能导致测试失败,因为可能在分发线程完成前就检查 testString。
6.2 异步处理器调用
异步分发允许发布者提前返回,但处理器仍按顺序串行执行。若某处理器耗时较长,会阻塞后续处理器。
MBassador 支持处理器异步调用,每个处理器在独立线程执行:
private Integer testInteger;
private String invocationThreadName;
private AtomicBoolean ready = new AtomicBoolean(false);
@Test
public void whenHandlerAsync_thenHandled() {
dispatcher.post(42).now();
await().untilAtomic(ready, equalTo(true));
assertNotNull(testInteger);
assertFalse(Thread.currentThread().getName().equals(invocationThreadName));
}
@Handler(delivery = Invoke.Asynchronously)
public void handleIntegerMessage(Integer message) {
this.invocationThreadName = Thread.currentThread().getName();
this.testInteger = message;
ready.set(true);
}
通过 delivery = Invoke.Asynchronously 声明异步调用,测试中比较线程名验证。
7. 自定义 MBassador
目前使用默认配置的 MBassador,可通过注解修改行为。
7.1 异常处理
处理器不能抛出受检异常。可通过构造函数传入 IPublicationErrorHandler:
public class MBassadorConfigurationTest
implements IPublicationErrorHandler {
private MBassador dispatcher;
private String messageString;
private Throwable errorCause;
@Before
public void prepareTests() {
dispatcher = new MBassador<String>(this);
dispatcher.subscribe(this);
}
@Test
public void whenErrorOccurs_thenErrorHandler() {
dispatcher.post("Error").now();
assertNull(messageString);
assertNotNull(errorCause);
}
@Test
public void whenNoErrorOccurs_thenStringHandler() {
dispatcher.post("Error").now();
assertNull(errorCause);
assertNotNull(messageString);
}
@Handler
public void handleString(String message) {
if ("Error".equals(message)) {
throw new Error("BOOM");
}
messageString = message;
}
@Override
public void handleError(PublicationError error) {
errorCause = error.getCause().getCause();
}
}
当 handleString() 抛出错误时,错误信息被保存到 errorCause。
7.2 处理器优先级
处理器默认按注册顺序逆序调用,但不应依赖此行为。即使使用异步调用,仍可能需要明确调用顺序。
可通过优先级显式控制:
private LinkedList<Integer> list = new LinkedList<>();
@Test
public void whenRejectDispatched_thenPriorityHandled() {
dispatcher.post(new RejectMessage()).now();
// 应按优先级逆序弹出
assertTrue(1 == list.pop());
assertTrue(3 == list.pop());
assertTrue(5 == list.pop());
}
@Handler(priority = 5)
public void handleRejectMessage5(RejectMessage rejectMessage) {
list.push(5);
}
@Handler(priority = 3)
public void handleRejectMessage3(RejectMessage rejectMessage) {
list.push(3);
}
@Handler(priority = 2, rejectSubtypes = true)
public void handleMessage(Message rejectMessage)
logger.error("Reject handler #3");
list.push(3);
}
@Handler(priority = 0)
public void handleRejectMessage0(RejectMessage rejectMessage) {
list.push(1);
}
✅ 调用顺序:
- 高优先级 → 低优先级
- 默认优先级为 0,最后调用
- 测试中按优先级逆序弹出(5→3→1)
7.3 子类过滤的简单方式
上例中 handleMessage() 为何未触发?无需使用 RejectSubTypes.class 过滤子类:
rejectSubtypes 布尔标志提供相同过滤功能,且性能优于 IMessageFilter 实现。
⚠️ 但仅接受子类时仍需使用过滤器实现。
8. 总结
MBassador 是简洁高效的对象间消息传递库:
- 支持多种消息组织方式
- 支持同步/异步分发
- 提供灵活的过滤机制
完整示例见 GitHub 项目。