1. 概述
本教程将使用 NATS Java 客户端 连接到 NATS 服务器,实现消息的发布和接收。
NATS 提供三种核心消息交换模式:
- 发布/订阅:将消息传递给主题的所有订阅者
- 请求/回复:向主题的所有订阅者发送请求,并将响应路由回请求方(仅处理第一个响应)
- 队列组:订阅者加入队列组后,消息仅传递给组内一个订阅者(可用于发布/订阅或请求/回复)
2. 环境准备
2.1 Maven 依赖
在 pom.xml
中添加 NATS 库:
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.17.6</version>
</dependency>
2.2 NATS 服务器
需要运行 NATS 服务器,安装指南见 NATS 文档。默认假设服务器运行在 localhost:4222
。
3. 连接 NATS 服务器
3.1 基础连接与自定义配置
通过 Nats.connect()
创建连接:
Connection natsConnection = Nats.connect(); // 默认连接 localhost:4222
自定义配置示例(包含连接监听器和错误监听器):
Options options = new Options.Builder()
.server("nats://localhost:4222")
.connectionListener((conn, event) -> log.info("连接事件: " + event))
.errorListener(new CustomErrorListener())
.build();
Connection natsConnection = Nats.connect(options);
⚠️ NATS 连接具有自动重连能力。连接监听器会报告断开/重连事件,错误监听器处理异常。
测试连接稳定性:
Connection conn = createConnection("nats://localhost:4222");
Thread.sleep(60000); // 保持运行60秒
重启服务器时输出示例:
连接事件: nats: 连接已打开
异常: java.io.IOException: 读取通道关闭
连接事件: nats: 连接断开
异常: java.net.ConnectException: 连接被拒绝
连接事件: nats: 连接重连成功
连接事件: nats: 订阅关系已恢复
4. 消息交换
4.1 订阅消息基础
NATS 消息是字节数组容器,支持设置/获取主题和回复主题。
4.2 异步订阅
使用 Dispatcher
处理异步消息:
Dispatcher dispatcher = natsConnection.createDispatcher();
Subscription subscription = dispatcher.subscribe("subject",
msg -> log.info("异步订阅收到消息: " + msg));
// 队列组订阅
Subscription qSubscription = dispatcher.subscribe("subject", "queueGroup",
msg -> log.info("队列订阅收到消息: " + msg));
// 取消订阅
dispatcher.unsubscribe(subscription);
✅ 多个订阅可共享一个
Dispatcher
(低负载场景),高负载建议独立Dispatcher
4.3 同步订阅
通过轮询获取消息:
Subscription subscription = natsConnection.subscribe("mySubject");
Message message = subscription.nextMessage(1000); // 超时返回null
// 取消订阅
subscription.unsubscribe();
4.4 发布消息
"即发即弃"模式,不关心订阅者是否存在:
natsConnection.publish("mySubject", "消息内容".getBytes());
4.5 消息响应
两种响应模式:
- 多响应:使用
reply-to
字段 - 单响应:使用内置
request
方法
4.6 获取多个响应
发布方设置 reply-to
主题并订阅响应:
// 发布方
String replyTo = "responseSubject";
natsConnection.publish("requestSubject", replyTo, "请回复".getBytes());
Subscription responseSub = natsConnection.subscribe(replyTo);
// 接收多个响应
Message m1 = responseSub.nextMessage(1000);
Message m2 = responseSub.nextMessage(1000);
响应方订阅请求主题:
natsConnection.subscribe("requestSubject", msg -> {
natsConnection.publish(msg.getReplyTo(), "回复内容".getBytes());
});
4.7 获取单个响应
使用 request
方法(自动处理订阅和超时):
Message response = natsConnection.request(
"requestSubject",
"请回复".getBytes(),
Duration.ofMillis(1000)
);
✅ 仅返回第一个响应,其他响应自动丢弃
4.8 通配符订阅
支持两种通配符:
*
匹配单个主题段>
匹配剩余所有主题段
示例测试:
// 订阅 segment.*
Subscription starSub = natsConnection.subscribe("segment.*");
natsConnection.publish("segment.another", "消息1".getBytes());
Message msg1 = starSub.nextMessage(200); // ✅ 匹配成功
natsConnection.publish("segment.one.two", "消息2".getBytes());
Message msg2 = starSub.nextMessage(200); // ❌ 不匹配
// 订阅 segment.>
Subscription gtSub = natsConnection.subscribe("segment.>");
natsConnection.publish("segment.one.two", "消息3".getBytes());
Message msg3 = gtSub.nextMessage(200); // ✅ 匹配成功
5. 消息队列
5.1 发布到队列
发布方式与普通消息相同:
natsConnection.publish("mySubject", "队列消息".getBytes());
5.2 订阅队列
指定队列组名称:
// 同步订阅
Subscription queueSub = natsConnection.subscribe("mySubject", "myQueue");
// 异步订阅
Dispatcher dispatcher = connection.createDispatcher();
Subscription asyncSub = dispatcher.subscribe("mySubject", "myQueue",
msg -> log.info("队列消息: " + msg));
✅ 队列组特性:
- 消息仅传递给组内一个订阅者
- 无订阅者时消息直接丢弃
- 可与普通订阅共存
订阅行为对比:
// 队列订阅(负载均衡)
Subscription queue1 = natsConnection.subscribe("mySubject", "myQueue");
Subscription queue2 = natsConnection.subscribe("mySubject", "myQueue");
// 普通订阅(广播)
Subscription sub1 = natsConnection.subscribe("mySubject");
Subscription sub2 = natsConnection.subscribe("mySubject");
6. 总结
我们实现了:
- ✅ NATS 服务器连接(含自动重连)
- ✅ 发布/订阅消息
- ✅ 请求/回复模式
- ✅ 通配符订阅
- ✅ 队列组负载均衡
NATS 的轻量级设计和高效通信机制,使其成为分布式系统中消息传递的理想选择。