1. 概述

本教程将使用 NATS Java 客户端 连接到 NATS 服务器,实现消息的发布和接收。

NATS 提供三种核心消息交换模式:

  • 发布/订阅:将消息传递给主题的所有订阅者
  • 请求/回复:向主题的所有订阅者发送请求,并将响应路由回请求方(仅处理第一个响应)
  • 队列组:订阅者加入队列组后,消息仅传递给组内一个订阅者(可用于发布/订阅或请求/回复)

2. 环境准备

2.1 Maven 依赖

pom.xml 中添加 NATS 库:

<dependency>
    <groupId>io.nats</groupId>
    <artifactId>jnats</artifactId>
    <version>2.17.6</version>
</dependency>

ℹ️ 详细安装说明参考 Maven 指南,Gradle 用户参考 Gradle 指南

2.2 NATS 服务器

需要运行 NATS 服务器,安装指南见 NATS 文档。默认假设服务器运行在 localhost:4222

3. 连接 NATS 服务器

3.1 基础连接与自定义配置

通过 Nats.connect() 创建连接:

Connection natsConnection = Nats.connect(); // 默认连接 localhost:4222

自定义配置示例(包含连接监听器和错误监听器):

Options options = new Options.Builder()
  .server("nats://localhost:4222")
  .connectionListener((conn, event) -> log.info("连接事件: " + event))
  .errorListener(new CustomErrorListener())
  .build();

Connection natsConnection = Nats.connect(options);

⚠️ NATS 连接具有自动重连能力。连接监听器会报告断开/重连事件,错误监听器处理异常。

测试连接稳定性

Connection conn = createConnection("nats://localhost:4222");
Thread.sleep(60000); // 保持运行60秒

重启服务器时输出示例:

连接事件: nats: 连接已打开
异常: java.io.IOException: 读取通道关闭
连接事件: nats: 连接断开
异常: java.net.ConnectException: 连接被拒绝
连接事件: nats: 连接重连成功
连接事件: nats: 订阅关系已恢复

4. 消息交换

4.1 订阅消息基础

NATS 消息是字节数组容器,支持设置/获取主题和回复主题。

4.2 异步订阅

使用 Dispatcher 处理异步消息:

Dispatcher dispatcher = natsConnection.createDispatcher();
Subscription subscription = dispatcher.subscribe("subject", 
  msg -> log.info("异步订阅收到消息: " + msg));

// 队列组订阅
Subscription qSubscription = dispatcher.subscribe("subject", "queueGroup", 
  msg -> log.info("队列订阅收到消息: " + msg));

// 取消订阅
dispatcher.unsubscribe(subscription);

✅ 多个订阅可共享一个 Dispatcher(低负载场景),高负载建议独立 Dispatcher

4.3 同步订阅

通过轮询获取消息:

Subscription subscription = natsConnection.subscribe("mySubject");
Message message = subscription.nextMessage(1000); // 超时返回null

// 取消订阅
subscription.unsubscribe();

4.4 发布消息

"即发即弃"模式,不关心订阅者是否存在:

natsConnection.publish("mySubject", "消息内容".getBytes());

4.5 消息响应

两种响应模式:

  1. 多响应:使用 reply-to 字段
  2. 单响应:使用内置 request 方法

4.6 获取多个响应

发布方设置 reply-to 主题并订阅响应:

// 发布方
String replyTo = "responseSubject";
natsConnection.publish("requestSubject", replyTo, "请回复".getBytes());
Subscription responseSub = natsConnection.subscribe(replyTo);

// 接收多个响应
Message m1 = responseSub.nextMessage(1000);
Message m2 = responseSub.nextMessage(1000);

响应方订阅请求主题:

natsConnection.subscribe("requestSubject", msg -> {
  natsConnection.publish(msg.getReplyTo(), "回复内容".getBytes());
});

4.7 获取单个响应

使用 request 方法(自动处理订阅和超时):

Message response = natsConnection.request(
  "requestSubject", 
  "请回复".getBytes(), 
  Duration.ofMillis(1000)
);

✅ 仅返回第一个响应,其他响应自动丢弃

4.8 通配符订阅

支持两种通配符:

  • * 匹配单个主题段
  • > 匹配剩余所有主题段

示例测试:

// 订阅 segment.*
Subscription starSub = natsConnection.subscribe("segment.*");
natsConnection.publish("segment.another", "消息1".getBytes());
Message msg1 = starSub.nextMessage(200); // ✅ 匹配成功

natsConnection.publish("segment.one.two", "消息2".getBytes());
Message msg2 = starSub.nextMessage(200); // ❌ 不匹配

// 订阅 segment.>
Subscription gtSub = natsConnection.subscribe("segment.>");
natsConnection.publish("segment.one.two", "消息3".getBytes());
Message msg3 = gtSub.nextMessage(200); // ✅ 匹配成功

5. 消息队列

5.1 发布到队列

发布方式与普通消息相同:

natsConnection.publish("mySubject", "队列消息".getBytes());

5.2 订阅队列

指定队列组名称:

// 同步订阅
Subscription queueSub = natsConnection.subscribe("mySubject", "myQueue");

// 异步订阅
Dispatcher dispatcher = connection.createDispatcher();
Subscription asyncSub = dispatcher.subscribe("mySubject", "myQueue", 
  msg -> log.info("队列消息: " + msg));

✅ 队列组特性:

  • 消息仅传递给组内一个订阅者
  • 无订阅者时消息直接丢弃
  • 可与普通订阅共存

订阅行为对比

// 队列订阅(负载均衡)
Subscription queue1 = natsConnection.subscribe("mySubject", "myQueue");
Subscription queue2 = natsConnection.subscribe("mySubject", "myQueue");

// 普通订阅(广播)
Subscription sub1 = natsConnection.subscribe("mySubject");
Subscription sub2 = natsConnection.subscribe("mySubject");

6. 总结

我们实现了:

  • ✅ NATS 服务器连接(含自动重连)
  • ✅ 发布/订阅消息
  • ✅ 请求/回复模式
  • ✅ 通配符订阅
  • ✅ 队列组负载均衡

NATS 的轻量级设计和高效通信机制,使其成为分布式系统中消息传递的理想选择。


原始标题:Publish and Receive Messages with Nats Java Client | Baeldung