1. 概述

本文将带你使用 Spring Boot 构建一个响应式应用,并集成 RabbitMQ 消息中间件——AMQP 协议最流行的实现之一。

我们会覆盖两种典型场景:点对点通信(Point-to-Point)和 发布-订阅模式(Publish-Subscribe),并通过一个分布式的结构来突出两者的设计差异。

⚠️ 本文假设你已具备 AMQP、RabbitMQ 和 Spring Boot 的基础知识,例如交换机(Exchange)、队列(Queue)、主题(Topic)等核心概念。如果需要补课,可参考以下文章:


2. RabbitMQ 服务搭建

虽然你可以本地安装 RabbitMQ,但生产环境通常会用更完善的部署方案,比如高可用、监控、安全等特性加持的集群。

为了在开发机上模拟这种环境,我们直接上 Docker,简单粗暴搞定。

启动 RabbitMQ 容器

$ docker run -d --name rabbitmq -p 5672:5672 rabbitmq:3

这个命令会启动一个独立的 RabbitMQ 实例。注意:我们没有挂载持久化卷,所以重启后未消费的消息会丢失。服务暴露在主机的 5672 端口。

查看日志确认启动成功

$ docker logs rabbitmq

输出中应包含类似以下内容,表示服务已就绪:

2018-06-09 13:42:33.491 [info] <0.226.0>
 Starting RabbitMQ 3.7.5 on Erlang 20.3.5
 Copyright (C) 2007-2018 Pivotal Software, Inc.
 Licensed under the MPL.  See http://www.rabbitmq.com/

使用 rabbitmqctl 管理服务

RabbitMQ 镜像自带 rabbitmqctl 工具,可用于执行管理操作:

$ docker exec rabbitmq rabbitmqctl status

输出会显示节点状态、运行应用等信息,确认服务正常运行。

常用管理命令一览

查看交换机

docker exec rabbitmq rabbitmqctl list_exchanges

查看队列(含未消费消息数)

docker exec rabbitmq rabbitmqctl list_queues

查看绑定关系(Exchange ↔ Queue + Routing Key)

docker exec rabbitmq rabbitmqctl list_bindings

这些命令在排查消息路由问题时非常有用,建议集合。


3. Spring AMQP 项目配置

RabbitMQ 跑起来了,接下来创建 Spring Boot 项目。目标是:通过 REST 接口收发消息,底层走 AMQP 协议。

核心依赖

pom.xml 中加入:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>3.1.5</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>3.1.5</version> 
</dependency>
  • spring-boot-starter-amqp:AMQP 支持,封装了 RabbitMQ 客户端
  • spring-boot-starter-webflux:响应式 Web 框架,支持 Flux / Mono

📌 最新版可在 Maven Central 搜索 spring-boot-starter-amqpspring-boot-starter-webflux 获取。


4. 场景一:点对点通信(Point-to-Point)

使用 Direct Exchange,它会把消息精确路由到一个队列。多个消费者可监听同一队列,但每条消息只会被一个消费者消费(竞争消费)。

4.1 交换机与队列初始化

我们通过 @PostConstruct 方法在应用启动时声明交换机、队列并绑定。

@Autowired
private AmqpAdmin amqpAdmin;
    
@Autowired
private DestinationsConfig destinationsConfig;

@PostConstruct
public void setupQueueDestinations() {
    destinationsConfig.getQueues()
        .forEach((key, destination) -> {
            Exchange ex = ExchangeBuilder.directExchange(destination.getExchange())
                .durable(true)
                .build();
            amqpAdmin.declareExchange(ex);
            
            Queue q = QueueBuilder.durable(destination.getRoutingKey()).build();
            amqpAdmin.declareQueue(q);
            
            Binding b = BindingBuilder.bind(q)
                .to(ex)
                .with(destination.getRoutingKey())
                .noargs();
            amqpAdmin.declareBinding(b);
        });
}

📌 说明:

  • DestinationsConfig 是一个 @ConfigurationProperties 类,从 application.yml 加载配置
  • 每个 destination 包含 exchange 名和 routing key
  • 队列名直接用 routing key,简单直接

4.2 生产者接口

发送消息走 /queue/{name},HTTP POST。

@PostMapping(value = "/queue/{name}")
public Mono<ResponseEntity<?>> sendMessageToQueue(
    @PathVariable String name, 
    @RequestBody String payload) {

    DestinationInfo d = destinationsConfig.getQueues().get(name);
    if (d == null) {
        return Mono.just(ResponseEntity.notFound().build());
    }
    
    return Mono.fromCallable(() -> {
        amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload);  
        return ResponseEntity.accepted().build();
    });
}

✅ 逻辑清晰:

  1. 校验 destination 是否存在
  2. amqpTemplate 发送字符串消息
  3. 返回 202 Accepted 表示已接收

⚠️ 注意:这里用了 Mono.fromCallable() 包装阻塞调用,避免阻塞 Reactor 线程。


4.3 MessageListenerContainer 工厂

Spring AMQP 用 MessageListenerContainer 来异步消费消息。我们封装一个工厂,解耦创建逻辑。

@Component
public class MessageListenerContainerFactory {

    @Autowired
    private ConnectionFactory connectionFactory;

    public MessageListenerContainer createMessageListenerContainer(String queueName) {
        SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer(connectionFactory);
        mlc.addQueueNames(queueName);
        return mlc;
    }
}

简单工厂模式,每次返回新的 SimpleMessageListenerContainer 实例。


4.4 消费者接口

消费者通过 /queue/{name} 接收消息,返回 Flux<String>,支持 Server-Sent Events(SSE)。

@GetMapping(
    value = "/queue/{name}",
    produces = MediaType.TEXT_EVENT_STREAM_VALUE
)
public Flux<?> receiveMessagesFromQueue(@PathVariable String name) {

    DestinationInfo d = destinationsConfig.getQueues().get(name);
    if (d == null) {
        return Flux.just(ResponseEntity.notFound().build());
    }

    MessageListenerContainer mlc = messageListenerContainerFactory
        .createMessageListenerContainer(d.getRoutingKey());

    Flux<String> f = Flux.<String>create(emitter -> {
        mlc.setupMessageListener((MessageListener) m -> {
            String payload = new String(m.getBody());
            emitter.next(payload);
        });
        emitter.onRequest(v -> mlc.start());
        emitter.onDispose(() -> mlc.stop());
    });

    return Flux.interval(Duration.ofSeconds(5))
        .map(v -> "No news is good news")
        .mergeWith(f);
}

📌 关键点:

  • Flux.create() 桥接 AMQP 的监听器模型到响应式流
  • onRequest 启动容器,onDispose 停止容器,资源随流生命周期管理
  • 心跳消息Flux.interval() 每 5 秒发一个 dummy 消息,防止客户端长时间无消息时连接被误判为断开

⚠️ 踩坑提醒:没有心跳的话,客户端断开可能无法及时感知,导致资源泄漏。


4.5 测试验证

application.yml 配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

destinations:
  queues:
    NYSE:
      exchange: nyse
      routing-key: NYSE

发送消息

curl -v -d "Test message" http://localhost:8080/queue/NYSE

返回 202 Accepted 表示消息已接收。

查看队列状态

docker exec rabbitmq rabbitmqctl list_queues

输出:

NYSE    1

说明消息已入队。

消费消息

curl -v http://localhost:8080/queue/NYSE

输出:

data:Test message
data:No news is good news
...

消费后再次 list_queues,消息数变为 0,验证成功。


5. 场景二:发布-订阅(Publish-Subscribe)

一个消息发给多个消费者。RabbitMQ 支持 Topic ExchangeFanout Exchange

  • Topic:支持通配符路由(如 weather.#
  • Fanout:广播到所有绑定队列

本文以 Topic 为例。

5.1 交换机初始化

只声明交换机,队列由每个订阅者动态创建。

@PostConstruct
public void setupTopicDestinations() {
    destinationsConfig.getTopics().forEach((key, destination) -> {
        Exchange ex = ExchangeBuilder
            .topicExchange(destination.getExchange())
            .durable(true)
            .build();
        amqpAdmin.declareExchange(ex);
    });
}

5.2 发布者接口

@PostMapping(value = "/topic/{name}")
public Mono<ResponseEntity<?>> sendMessageToTopic(
    @PathVariable String name, 
    @RequestBody String payload) {

    DestinationInfo d = destinationsConfig.getTopics().get(name);
    if (d == null) {
        return Mono.just(ResponseEntity.notFound().build());
    }      
    
    return Mono.fromCallable(() -> {
        amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload);   
        return ResponseEntity.accepted().build();
    });
}

和点对点几乎一样,只是取配置的地方不同。


5.3 订阅者接口

每个订阅者创建自己的专属队列,并绑定到交换机。

@GetMapping(
    value = "/topic/{name}",
    produces = MediaType.TEXT_EVENT_STREAM_VALUE
)
public Flux<?> receiveMessagesFromTopic(@PathVariable String name) {
    DestinationInfo d = destinationsConfig.getTopics().get(name);
    if (d == null) {
        return Flux.just(ResponseEntity.notFound().build());
    }

    Queue topicQueue = createTopicQueue(d);
    String qname = topicQueue.getName();
    MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(qname);

    Flux<String> f = Flux.<String>create(emitter -> {
        mlc.setupMessageListener((MessageListener) m -> {
            String payload = new String(m.getBody());
            emitter.next(payload);
        });
        emitter.onRequest(v -> mlc.start());
        emitter.onDispose(() -> {
            amqpAdmin.deleteQueue(qname);
            mlc.stop();
        });            
    });
    
    return Flux.interval(Duration.ofSeconds(5))
        .map(v -> "No news is good news")
        .mergeWith(f);
}

关键差异:

  • createTopicQueue() 动态创建 非持久化、排他性队列
  • onDispose 中删除队列,避免资源堆积

5.4 动态队列创建

private Queue createTopicQueue(DestinationInfo destination) {
    Exchange ex = ExchangeBuilder
        .topicExchange(destination.getExchange())
        .durable(true)
        .build();
    amqpAdmin.declareExchange(ex);

    Queue q = QueueBuilder.nonDurable().build();     
    amqpAdmin.declareQueue(q);

    Binding b = BindingBuilder.bind(q)
        .to(ex)
        .with(destination.getRoutingKey())
        .noargs();        
    amqpAdmin.declareBinding(b);
    return q;
}

⚠️ 注意:虽然这里再次声明 Exchange,但 RabbitMQ 会去重,不会重复创建。


5.5 测试发布-订阅

配置 application.yml

destinations:
  topics:
    weather:
      exchange: alerts
      routing-key: WEATHER

启动多个订阅者

curl -v http://localhost:8080/topic/weather

开两个终端,分别运行。

发布消息

curl -v -H "Content-Type: application/json" -d "Hurricane approaching!" http://localhost:8080/topic/weather

两个订阅者终端会同时收到消息:

data:Hurricane approaching!

查看绑定

docker exec rabbitmq rabbitmqctl list_bindings

你会看到 alerts 交换机绑定了两个自动生成的队列(如 spring.gen-xxx),每个订阅者一个。

断开订阅者(Ctrl+C),队列自动删除,绑定消失,干净利落。


6. 总结

本文实现了基于 Spring WebFlux 和 Spring AMQP 的响应式消息网关,支持:

✅ 点对点通信(Direct Exchange)
✅ 发布-订阅模式(Topic Exchange)
✅ 动态队列创建与销毁
✅ 心跳保活防止连接假死

代码简洁,扩展性强,后续可轻松集成安全、监控等企业级功能。

完整代码已开源:GitHub 仓库


原始标题:Spring AMQP in Reactive Applications