1. 概述

简单来说,MBassador一个基于发布-订阅模式的高性能事件总线

消息会被广播给一个或多个订阅者,而发布者无需知道订阅者数量或如何处理消息。

2. Maven 依赖

使用前需添加 mbassador 依赖:

<dependency>
    <groupId>net.engio</groupId>
    <artifactId>mbassador</artifactId>
    <version>1.3.1</version>
</dependency>

3. 基础事件处理

3.1 简单示例

先看一个基础的消息发布示例:

private MBassador<Object> dispatcher = new MBassador<>();
private String messageString;

@Before
public void prepareTests() {
    dispatcher.subscribe(this);
}

@Test
public void whenStringDispatched_thenHandleString() {
    dispatcher.post("TestString").now();
 
    assertNotNull(messageString);
    assertEquals("TestString", messageString);
}

@Handler
public void handleString(String message) {
    messageString = message;
}

关键点解析:

  • 测试类顶部创建默认构造的 MBassador 实例
  • @Before 方法中调用 subscribe() 注册当前类
  • 调用 dispatcher.post(…).now() 触发消息分发
  • 分发后 handleString() 方法被调用

✅ 核心概念:

  1. 任何对象都可成为订阅者,只需包含 @Handler 注解的方法
  2. 订阅者可包含任意数量的处理器
  3. 处理器方法只能有一个参数(消息本身),且不能抛出受检异常
  4. post() 方法接受任何 Object 类型作为消息

⚠️ 实际生产中发布者和消费者通常在不同类中,测试中为简化使用自身订阅。

3.2 死信消息

当消息没有匹配的处理器时会发生什么?添加新处理器后发送第三种消息类型:

private Object deadEvent; 

@Test
public void whenLongDispatched_thenDeadEvent() {
    dispatcher.post(42L).now();
 
    assertNull(messageString);
    assertNull(messageInteger);
    assertNotNull(deadEvent);
    assertTrue(deadEvent instanceof Long);
    assertTrue(42L == (Long) deadEvent);
} 

@Handler
public void handleDeadEvent(DeadMessage message) {
    deadEvent = message.getMessage();
}

当消息没有处理器时,会被包装成 DeadMessage 对象。我们通过添加 DeadMessage 处理器捕获了它。

❌ 如果应用不需要跟踪死信消息,可以安全忽略它们。

4. 使用事件继承体系

仅发送 StringInteger 事件太局限,先定义几个消息类:

public class Message {}

public class AckMessage extends Message {}

public class RejectMessage extends Message {
    int code;

    // setters and getters
}

4.1 发送基类 Message

private MBassador<Message> dispatcher = new MBassador<>();

private Message message;
private AckMessage ackMessage;
private RejectMessage rejectMessage;

@Before
public void prepareTests() {
    dispatcher.subscribe(this);
}

@Test
public void whenMessageDispatched_thenMessageHandled() {
    dispatcher.post(new Message()).now();
    assertNotNull(message);
    assertNull(ackMessage);
    assertNull(rejectMessage);
}

@Handler
public void handleMessage(Message message) {
    this.message = message;
}

@Handler
public void handleRejectMessage(RejectMessage message) {
   rejectMessage = message;
}

@Handler
public void handleAckMessage(AckMessage message) {
    ackMessage = message;
}

使用泛型 MBassador 增强类型安全性:

  • 发送 Message 时,只有 handleMessage() 被调用
  • 其他处理器不会触发

4.2 发送子类消息

发送 RejectMessage 时:

@Test
public void whenRejectDispatched_thenMessageAndRejectHandled() {
    dispatcher.post(new RejectMessage()).now();
 
    assertNotNull(message);
    assertNotNull(rejectMessage);
    assertNull(ackMessage);
}

✅ 两个处理器都被触发:

  1. *handleRejectMessage()*(精确匹配)
  2. *handleMessage()*(继承关系匹配)

验证 AckMessage 行为:

@Test
public void whenAckDispatched_thenMessageAndAckHandled() {
    dispatcher.post(new AckMessage()).now();
 
    assertNotNull(message);
    assertNotNull(ackMessage);
    assertNull(rejectMessage);
}

5. 消息过滤

按类型组织消息已是强大功能,但还可进一步过滤。

5.1 按类和子类过滤

发送子类消息时,基类处理器也会触发。可通过过滤器解决:

private Message baseMessage;
private Message subMessage;

@Test
public void whenMessageDispatched_thenMessageFiltered() {
    dispatcher.post(new Message()).now();
 
    assertNotNull(baseMessage);
    assertNull(subMessage);
}

@Test
public void whenRejectDispatched_thenRejectFiltered() {
    dispatcher.post(new RejectMessage()).now();
 
    assertNotNull(subMessage);
    assertNull(baseMessage);
}

@Handler(filters = { @Filter(Filters.RejectSubtypes.class) })
public void handleBaseMessage(Message message) {
    this.baseMessage = message;
}

@Handler(filters = { @Filter(Filters.SubtypesOnly.class) })
public void handleSubMessage(Message message) {
    this.subMessage = message;
}

@Handler 的 filters 参数接受实现 IMessageFilter 的类

  • Filters.RejectSubtypes:过滤所有子类
  • Filters.SubtypesOnly:仅接受子类

5.2 IMessageFilter 接口

RejectSubTypesSubtypesOnly 都实现了 IMessageFilter

  • RejectSubTypes:消息类必须精确匹配,拒绝子类

5.3 条件过滤

MBassador 支持使用 Java EL 表达式进行条件过滤,更简单灵活。

按字符串长度过滤:

private String testString;

@Test
public void whenLongStringDispatched_thenStringFiltered() {
    dispatcher.post("foobar!").now();
 
    assertNull(testString);
}

@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
    this.testString = message;
}

"foobar!"(7字符)被过滤,发送更短的字符串:

    
@Test
public void whenShortStringDispatched_thenStringHandled() {
    dispatcher.post("foobar").now();
 
    assertNotNull(testString);
}

"foobar"(6字符)通过过滤。

过滤带字段的 RejectMessage

private RejectMessage rejectMessage;

@Test
public void whenWrongRejectDispatched_thenRejectFiltered() {

    RejectMessage testReject = new RejectMessage();
    testReject.setCode(-1);

    dispatcher.post(testReject).now();
 
    assertNull(rejectMessage);
    assertNotNull(subMessage);
    assertEquals(-1, ((RejectMessage) subMessage).getCode());
}

@Handler(condition = "msg.getCode() != -1")
public void handleRejectMessage(RejectMessage rejectMessage) {
    this.rejectMessage = rejectMessage;
}

5.4 捕获被过滤消息

类似 DeadEvents,可捕获被过滤的消息:

private String testString;
private FilteredMessage filteredMessage;
private DeadMessage deadMessage;

@Test
public void whenLongStringDispatched_thenStringFiltered() {
    dispatcher.post("foobar!").now();
 
    assertNull(testString);
    assertNotNull(filteredMessage);
    assertTrue(filteredMessage.getMessage() instanceof String);
    assertNull(deadMessage);
}

@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
    this.testString = message;
}

@Handler
public void handleFilterMessage(FilteredMessage message) {
    this.filteredMessage = message;
}

@Handler
public void handleDeadMessage(DeadMessage deadMessage) {
    this.deadMessage = deadMessage;
}

过滤事件与死信事件处理机制不同

  • 过滤消息进入 FilteredMessage 处理器
  • 死信消息进入 DeadMessage 处理器

6. 异步消息分发与处理

目前所有示例使用同步分发,调用 post.now() 会阻塞直到所有消息处理完成。

6.1 异步分发

MBassador.post() 返回 SyncAsyncPostCommand,提供方法:

  • *now()*:同步分发,阻塞直到完成
  • *asynchronously()*:异步执行消息发布

使用异步分发示例(借助 Awaitility):

private MBassador<Message> dispatcher = new MBassador<>();
private String testString;
private AtomicBoolean ready = new AtomicBoolean(false);

@Test
public void whenAsyncDispatched_thenMessageReceived() {
    dispatcher.post("foobar").asynchronously();
 
    await().untilAtomic(ready, equalTo(true));
    assertNotNull(testString);
}

@Handler
public void handleStringMessage(String message) {
    this.testString = message;
    ready.set(true);
}

⚠️ 注释掉 await() 可能导致测试失败,因为可能在分发线程完成前就检查 testString

6.2 异步处理器调用

异步分发允许发布者提前返回,但处理器仍按顺序串行执行。若某处理器耗时较长,会阻塞后续处理器。

MBassador 支持处理器异步调用,每个处理器在独立线程执行:

private Integer testInteger;
private String invocationThreadName;
private AtomicBoolean ready = new AtomicBoolean(false);

@Test
public void whenHandlerAsync_thenHandled() {
    dispatcher.post(42).now();
 
    await().untilAtomic(ready, equalTo(true));
    assertNotNull(testInteger);
    assertFalse(Thread.currentThread().getName().equals(invocationThreadName));
}

@Handler(delivery = Invoke.Asynchronously)
public void handleIntegerMessage(Integer message) {
 
    this.invocationThreadName = Thread.currentThread().getName();
    this.testInteger = message;
    ready.set(true);
}

通过 delivery = Invoke.Asynchronously 声明异步调用,测试中比较线程名验证。

7. 自定义 MBassador

目前使用默认配置的 MBassador,可通过注解修改行为。

7.1 异常处理

处理器不能抛出受检异常。可通过构造函数传入 IPublicationErrorHandler

public class MBassadorConfigurationTest
  implements IPublicationErrorHandler {

    private MBassador dispatcher;
    private String messageString;
    private Throwable errorCause;

    @Before
    public void prepareTests() {
        dispatcher = new MBassador<String>(this);
        dispatcher.subscribe(this);
    }

    @Test
    public void whenErrorOccurs_thenErrorHandler() {
        dispatcher.post("Error").now();
 
        assertNull(messageString);
        assertNotNull(errorCause);
    }

    @Test
    public void whenNoErrorOccurs_thenStringHandler() {
        dispatcher.post("Error").now();
 
        assertNull(errorCause);
        assertNotNull(messageString);
    }

    @Handler
    public void handleString(String message) {
        if ("Error".equals(message)) {
            throw new Error("BOOM");
        }
        messageString = message;
    }

    @Override
    public void handleError(PublicationError error) {
        errorCause = error.getCause().getCause();
    }
}

handleString() 抛出错误时,错误信息被保存到 errorCause

7.2 处理器优先级

处理器默认按注册顺序逆序调用,但不应依赖此行为。即使使用异步调用,仍可能需要明确调用顺序。

可通过优先级显式控制:

private LinkedList<Integer> list = new LinkedList<>();

@Test
public void whenRejectDispatched_thenPriorityHandled() {
    dispatcher.post(new RejectMessage()).now();

    // 应按优先级逆序弹出
    assertTrue(1 == list.pop());
    assertTrue(3 == list.pop());
    assertTrue(5 == list.pop());
}

@Handler(priority = 5)
public void handleRejectMessage5(RejectMessage rejectMessage) {
    list.push(5);
}

@Handler(priority = 3)
public void handleRejectMessage3(RejectMessage rejectMessage) {
    list.push(3);
}

@Handler(priority = 2, rejectSubtypes = true)
public void handleMessage(Message rejectMessage) 
    logger.error("Reject handler #3");
    list.push(3);
}

@Handler(priority = 0)
public void handleRejectMessage0(RejectMessage rejectMessage) {
    list.push(1);
}

✅ 调用顺序:

  1. 高优先级 → 低优先级
  2. 默认优先级为 0,最后调用
  3. 测试中按优先级逆序弹出(5→3→1)

7.3 子类过滤的简单方式

上例中 handleMessage() 为何未触发?无需使用 RejectSubTypes.class 过滤子类:

rejectSubtypes 布尔标志提供相同过滤功能,且性能优于 IMessageFilter 实现。

⚠️ 但仅接受子类时仍需使用过滤器实现。

8. 总结

MBassador 是简洁高效的对象间消息传递库:

  • 支持多种消息组织方式
  • 支持同步/异步分发
  • 提供灵活的过滤机制

完整示例见 GitHub 项目


原始标题:Introduction to MBassador | Baeldung