1. 概述
本文将带你使用 Spring Boot 构建一个响应式应用,并集成 RabbitMQ 消息中间件——AMQP 协议最流行的实现之一。
我们会覆盖两种典型场景:点对点通信(Point-to-Point)和 发布-订阅模式(Publish-Subscribe),并通过一个分布式的结构来突出两者的设计差异。
⚠️ 本文假设你已具备 AMQP、RabbitMQ 和 Spring Boot 的基础知识,例如交换机(Exchange)、队列(Queue)、主题(Topic)等核心概念。如果需要补课,可参考以下文章:
2. RabbitMQ 服务搭建
虽然你可以本地安装 RabbitMQ,但生产环境通常会用更完善的部署方案,比如高可用、监控、安全等特性加持的集群。
为了在开发机上模拟这种环境,我们直接上 Docker,简单粗暴搞定。
启动 RabbitMQ 容器
$ docker run -d --name rabbitmq -p 5672:5672 rabbitmq:3
这个命令会启动一个独立的 RabbitMQ 实例。注意:我们没有挂载持久化卷,所以重启后未消费的消息会丢失。服务暴露在主机的 5672
端口。
查看日志确认启动成功
$ docker logs rabbitmq
输出中应包含类似以下内容,表示服务已就绪:
2018-06-09 13:42:33.491 [info] <0.226.0>
Starting RabbitMQ 3.7.5 on Erlang 20.3.5
Copyright (C) 2007-2018 Pivotal Software, Inc.
Licensed under the MPL. See http://www.rabbitmq.com/
使用 rabbitmqctl 管理服务
RabbitMQ 镜像自带 rabbitmqctl
工具,可用于执行管理操作:
$ docker exec rabbitmq rabbitmqctl status
输出会显示节点状态、运行应用等信息,确认服务正常运行。
常用管理命令一览
✅ 查看交换机
docker exec rabbitmq rabbitmqctl list_exchanges
✅ 查看队列(含未消费消息数)
docker exec rabbitmq rabbitmqctl list_queues
✅ 查看绑定关系(Exchange ↔ Queue + Routing Key)
docker exec rabbitmq rabbitmqctl list_bindings
这些命令在排查消息路由问题时非常有用,建议集合。
3. Spring AMQP 项目配置
RabbitMQ 跑起来了,接下来创建 Spring Boot 项目。目标是:通过 REST 接口收发消息,底层走 AMQP 协议。
核心依赖
在 pom.xml
中加入:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>3.1.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>3.1.5</version>
</dependency>
spring-boot-starter-amqp
:AMQP 支持,封装了 RabbitMQ 客户端spring-boot-starter-webflux
:响应式 Web 框架,支持Flux
/Mono
📌 最新版可在 Maven Central 搜索
spring-boot-starter-amqp
和spring-boot-starter-webflux
获取。
4. 场景一:点对点通信(Point-to-Point)
使用 Direct Exchange,它会把消息精确路由到一个队列。多个消费者可监听同一队列,但每条消息只会被一个消费者消费(竞争消费)。
4.1 交换机与队列初始化
我们通过 @PostConstruct
方法在应用启动时声明交换机、队列并绑定。
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private DestinationsConfig destinationsConfig;
@PostConstruct
public void setupQueueDestinations() {
destinationsConfig.getQueues()
.forEach((key, destination) -> {
Exchange ex = ExchangeBuilder.directExchange(destination.getExchange())
.durable(true)
.build();
amqpAdmin.declareExchange(ex);
Queue q = QueueBuilder.durable(destination.getRoutingKey()).build();
amqpAdmin.declareQueue(q);
Binding b = BindingBuilder.bind(q)
.to(ex)
.with(destination.getRoutingKey())
.noargs();
amqpAdmin.declareBinding(b);
});
}
📌 说明:
DestinationsConfig
是一个@ConfigurationProperties
类,从application.yml
加载配置- 每个 destination 包含 exchange 名和 routing key
- 队列名直接用 routing key,简单直接
4.2 生产者接口
发送消息走 /queue/{name}
,HTTP POST。
@PostMapping(value = "/queue/{name}")
public Mono<ResponseEntity<?>> sendMessageToQueue(
@PathVariable String name,
@RequestBody String payload) {
DestinationInfo d = destinationsConfig.getQueues().get(name);
if (d == null) {
return Mono.just(ResponseEntity.notFound().build());
}
return Mono.fromCallable(() -> {
amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload);
return ResponseEntity.accepted().build();
});
}
✅ 逻辑清晰:
- 校验 destination 是否存在
- 用
amqpTemplate
发送字符串消息 - 返回
202 Accepted
表示已接收
⚠️ 注意:这里用了 Mono.fromCallable()
包装阻塞调用,避免阻塞 Reactor 线程。
4.3 MessageListenerContainer 工厂
Spring AMQP 用 MessageListenerContainer
来异步消费消息。我们封装一个工厂,解耦创建逻辑。
@Component
public class MessageListenerContainerFactory {
@Autowired
private ConnectionFactory connectionFactory;
public MessageListenerContainer createMessageListenerContainer(String queueName) {
SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer(connectionFactory);
mlc.addQueueNames(queueName);
return mlc;
}
}
简单工厂模式,每次返回新的 SimpleMessageListenerContainer
实例。
4.4 消费者接口
消费者通过 /queue/{name}
接收消息,返回 Flux<String>
,支持 Server-Sent Events(SSE)。
@GetMapping(
value = "/queue/{name}",
produces = MediaType.TEXT_EVENT_STREAM_VALUE
)
public Flux<?> receiveMessagesFromQueue(@PathVariable String name) {
DestinationInfo d = destinationsConfig.getQueues().get(name);
if (d == null) {
return Flux.just(ResponseEntity.notFound().build());
}
MessageListenerContainer mlc = messageListenerContainerFactory
.createMessageListenerContainer(d.getRoutingKey());
Flux<String> f = Flux.<String>create(emitter -> {
mlc.setupMessageListener((MessageListener) m -> {
String payload = new String(m.getBody());
emitter.next(payload);
});
emitter.onRequest(v -> mlc.start());
emitter.onDispose(() -> mlc.stop());
});
return Flux.interval(Duration.ofSeconds(5))
.map(v -> "No news is good news")
.mergeWith(f);
}
📌 关键点:
Flux.create()
桥接 AMQP 的监听器模型到响应式流onRequest
启动容器,onDispose
停止容器,资源随流生命周期管理- ✅ 心跳消息:
Flux.interval()
每 5 秒发一个 dummy 消息,防止客户端长时间无消息时连接被误判为断开
⚠️ 踩坑提醒:没有心跳的话,客户端断开可能无法及时感知,导致资源泄漏。
4.5 测试验证
application.yml 配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
destinations:
queues:
NYSE:
exchange: nyse
routing-key: NYSE
发送消息
curl -v -d "Test message" http://localhost:8080/queue/NYSE
返回 202 Accepted
表示消息已接收。
查看队列状态
docker exec rabbitmq rabbitmqctl list_queues
输出:
NYSE 1
说明消息已入队。
消费消息
curl -v http://localhost:8080/queue/NYSE
输出:
data:Test message
data:No news is good news
...
消费后再次 list_queues
,消息数变为 0,验证成功。
5. 场景二:发布-订阅(Publish-Subscribe)
一个消息发给多个消费者。RabbitMQ 支持 Topic Exchange 和 Fanout Exchange。
- Topic:支持通配符路由(如
weather.#
) - Fanout:广播到所有绑定队列
本文以 Topic 为例。
5.1 交换机初始化
只声明交换机,队列由每个订阅者动态创建。
@PostConstruct
public void setupTopicDestinations() {
destinationsConfig.getTopics().forEach((key, destination) -> {
Exchange ex = ExchangeBuilder
.topicExchange(destination.getExchange())
.durable(true)
.build();
amqpAdmin.declareExchange(ex);
});
}
5.2 发布者接口
@PostMapping(value = "/topic/{name}")
public Mono<ResponseEntity<?>> sendMessageToTopic(
@PathVariable String name,
@RequestBody String payload) {
DestinationInfo d = destinationsConfig.getTopics().get(name);
if (d == null) {
return Mono.just(ResponseEntity.notFound().build());
}
return Mono.fromCallable(() -> {
amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload);
return ResponseEntity.accepted().build();
});
}
和点对点几乎一样,只是取配置的地方不同。
5.3 订阅者接口
每个订阅者创建自己的专属队列,并绑定到交换机。
@GetMapping(
value = "/topic/{name}",
produces = MediaType.TEXT_EVENT_STREAM_VALUE
)
public Flux<?> receiveMessagesFromTopic(@PathVariable String name) {
DestinationInfo d = destinationsConfig.getTopics().get(name);
if (d == null) {
return Flux.just(ResponseEntity.notFound().build());
}
Queue topicQueue = createTopicQueue(d);
String qname = topicQueue.getName();
MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(qname);
Flux<String> f = Flux.<String>create(emitter -> {
mlc.setupMessageListener((MessageListener) m -> {
String payload = new String(m.getBody());
emitter.next(payload);
});
emitter.onRequest(v -> mlc.start());
emitter.onDispose(() -> {
amqpAdmin.deleteQueue(qname);
mlc.stop();
});
});
return Flux.interval(Duration.ofSeconds(5))
.map(v -> "No news is good news")
.mergeWith(f);
}
关键差异:
createTopicQueue()
动态创建 非持久化、排他性队列onDispose
中删除队列,避免资源堆积
5.4 动态队列创建
private Queue createTopicQueue(DestinationInfo destination) {
Exchange ex = ExchangeBuilder
.topicExchange(destination.getExchange())
.durable(true)
.build();
amqpAdmin.declareExchange(ex);
Queue q = QueueBuilder.nonDurable().build();
amqpAdmin.declareQueue(q);
Binding b = BindingBuilder.bind(q)
.to(ex)
.with(destination.getRoutingKey())
.noargs();
amqpAdmin.declareBinding(b);
return q;
}
⚠️ 注意:虽然这里再次声明 Exchange,但 RabbitMQ 会去重,不会重复创建。
5.5 测试发布-订阅
配置 application.yml
destinations:
topics:
weather:
exchange: alerts
routing-key: WEATHER
启动多个订阅者
curl -v http://localhost:8080/topic/weather
开两个终端,分别运行。
发布消息
curl -v -H "Content-Type: application/json" -d "Hurricane approaching!" http://localhost:8080/topic/weather
两个订阅者终端会同时收到消息:
data:Hurricane approaching!
查看绑定
docker exec rabbitmq rabbitmqctl list_bindings
你会看到 alerts
交换机绑定了两个自动生成的队列(如 spring.gen-xxx
),每个订阅者一个。
断开订阅者(Ctrl+C),队列自动删除,绑定消失,干净利落。
6. 总结
本文实现了基于 Spring WebFlux 和 Spring AMQP 的响应式消息网关,支持:
✅ 点对点通信(Direct Exchange)
✅ 发布-订阅模式(Topic Exchange)
✅ 动态队列创建与销毁
✅ 心跳保活防止连接假死
代码简洁,扩展性强,后续可轻松集成安全、监控等企业级功能。
完整代码已开源:GitHub 仓库