1. 引言

异步消息通信是一种松耦合的分布式通信方式,近年来在实现事件驱动架构中越来越流行。Spring 框架提供了 Spring AMQP 项目,帮助我们快速构建基于 AMQP 的消息系统。

然而,在异步消息环境中处理错误并非易事,尤其是在分布式场景下,异常来源更复杂。本文将系统性地介绍几种主流的错误处理策略,帮你避开常见“坑位”。

2. 环境准备

本文使用 RabbitMQ 作为 AMQP 消息中间件,Spring AMQP 提供了 spring-rabbit 模块,集成非常方便。

我们通过 Docker 快速启动 RabbitMQ 服务:

docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management

RabbitMQ 管理界面默认运行在 http://localhost:15672,用户名密码均为 guest

详细的项目依赖和配置可参考我们的 Spring AMQP 入门文章

3. 常见失败场景

相比单体应用,消息系统由于其分布式特性,可能遇到更多类型的异常,主要包括:

  • 网络或 I/O 异常:连接中断、读写超时等底层通信问题
  • 协议或基础设施异常:消息中间件配置错误,如交换机类型不匹配
  • Broker 相关异常:客户端与 Broker 通信问题,如认证失败、队列达到上限
  • 应用或消息异常:业务逻辑校验失败、消息格式非法等

⚠️ 值得注意的是,Spring AMQP 已内置处理连接层和低级异常(如自动重试、重入队列),大多数底层异常会被封装为 AmqpException 或其子类。

本文重点讨论 应用级异常全局错误处理策略,这些才是业务开发中最常踩的坑。

4. 项目基础配置

先定义一个简单的队列和交换机:

public static final String QUEUE_MESSAGES = "baeldung-messages-queue";
public static final String EXCHANGE_MESSAGES = "baeldung-messages-exchange";

@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .build();
}
 
@Bean
DirectExchange messagesExchange() {
    return new DirectExchange(EXCHANGE_MESSAGES);
}
 
@Bean
Binding bindingMessages() {
    return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
}

生产者发送消息:

public void sendMessage() {
    rabbitTemplate
      .convertAndSend(SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES,
        SimpleDLQAmqpConfiguration.QUEUE_MESSAGES, "Some message id:" + messageNumber++);
}

消费者故意抛出异常:

@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)
public void receiveMessage(Message message) throws BusinessException {
    throw new BusinessException();
}

默认行为:失败消息会被立即重新入队,且排在队列头部,导致无限重试

运行应用后,日志会疯狂刷屏:

WARN 22260 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
Caused by: com.baeldung.springamqp.errorhandling.errorhandler.BusinessException: null

要打破这个死循环,有两种方式:

  • spring.rabbitmq.listener.simple.default-requeue-rejected=false:设置监听器不重新入队
  • ✅ 抛出 AmqpRejectAndDontRequeueException:明确拒绝消息,不再重试

接下来我们介绍更优雅的处理方式。

5. 死信队列(Dead Letter Queue)

死信队列(DLQ)用于存放无法被正常消费的消息,是构建高可用消息系统的关键组件。

它的核心价值在于:

  • ✅ 避免消息无限重试拖垮系统
  • ✅ 隔离问题消息,便于后续分析或人工干预
  • ✅ 支持失败消息的延迟重试或归档

核心概念有两个:

  • DLX(Dead Letter Exchange):一个普通交换机,可以是 directtopicfanout 类型
  • DLQ(Dead Letter Queue):绑定到 DLX 的队列,专门接收死信

⚠️ 关键点:生产者只关心交换机和路由键,不直接操作队列。消息的最终投递由交换机和路由规则决定。

5.1 基础配置

为队列添加死信参数:

@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .withArgument("x-dead-letter-exchange", "") // 使用默认交换机
      .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
      .build();
}
 
@Bean
Queue deadLetterQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}

说明:

  • x-dead-letter-exchange="":空字符串表示使用 AMQP 默认交换机(""
  • x-dead-letter-routing-key:指定死信消息的新路由键

5.2 失败消息的路由机制

当消息消费失败,会被发送到 DLX。但 DLX 是普通交换机,必须匹配路由规则才能投递到 DLQ

例如:

Exchange: (AMQP default)
Routing Key: baeldung-messages-queue.dlq

如果省略 x-dead-letter-routing-key,消息可能因路由不匹配而丢失或陷入重试循环。

失败消息的原始信息会记录在 x-death 消息头中:

x-death:
  count: 1
  exchange: baeldung-messages-exchange
  queue: baeldung-messages-queue 
  reason: rejected
  routing-keys: baeldung-messages-queue 
  time: 1571232954

这些信息可在 RabbitMQ 管理界面(http://localhost:15672)查看。

💡 提示:若使用 Spring Cloud Stream,可通过 republishToDlq=trueautoBindDlq=true 简化配置。

5.3 自定义死信交换机

有时我们不希望修改路由键。可通过自定义 fanout 类型的 DLX 实现:

public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
 
@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
      .build(); // 不设置 routing-key,保留原始值
}
 
@Bean
FanoutExchange deadLetterExchange() {
    return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
}
 
@Bean
Queue deadLetterQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}
 
@Bean
Binding deadLetterBinding() {
    return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}

此时失败消息的路由信息为:

Exchange: baeldung-messages-queue.dlx
Routing Key: baeldung-messages-queue  // 保持不变

5.4 死信队列消息处理

定义 DLQ 消费者:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessages(Message message) {
    log.info("Received failed message: {}", message.toString());
}

日志输出:

WARN 11752 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
INFO 11752 --- [ntContainer#1-1] c.b.s.e.consumer.SimpleDLQAmqpContainer  : 
  Received failed message:

接下来怎么处理?常见策略有:

策略1:直接丢弃

适用于无业务价值的消息。

策略2:立即重试

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRequeue(Message failedMessage) {
    log.info("Received failed message, requeueing: {}", failedMessage.toString());
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

⚠️ 问题:这与默认重试无本质区别,仍可能陷入循环。

策略3:有限重试 + 丢弃

利用消息头记录重试次数:

public static final String HEADER_X_RETRIES_COUNT = "x-retries-count";
public static final int MAX_RETRIES_COUNT = 3;

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRetryHeaders(Message failedMessage) {
    Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
      .getHeaders().get(HEADER_X_RETRIES_COUNT);
    if (retriesCnt == null) retriesCnt = 1;
    
    if (retriesCnt > MAX_RETRIES_COUNT) {
        log.info("Discarding message after {} retries", MAX_RETRIES_COUNT);
        return;
    }
    
    log.info("Retrying message for the {} time", retriesCnt);
    failedMessage.getMessageProperties()
      .getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
    
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

日志验证:

WARN ... Execution of Rabbit message listener failed.
INFO ... Retrying message for the 1 time
WARN ... Execution of Rabbit message listener failed.
INFO ... Retrying message for the 2 time
WARN ... Execution of Rabbit message listener failed.
INFO ... Discarding message after 3 retries

💡 可结合 x-message-ttl 设置消息存活时间,防止队列无限增长。

5.5 停车场队列(Parking Lot Queue)

当消息不能丢弃(如金融交易),或需要人工介入时,可引入 停车场队列

思路:将超过最大重试次数的消息转入专用队列,供人工或定时任务处理。

定义停车场队列:

public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot";
public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + ".exchange.parking-lot";
 
@Bean
FanoutExchange parkingLotExchange() {
    return new FanoutExchange(EXCHANGE_PARKING_LOT);
}
 
@Bean
Queue parkingLotQueue() {
    return QueueBuilder.durable(QUEUE_PARKING_LOT).build();
}
 
@Bean
Binding parkingLotBinding() {
    return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange());
}

更新 DLQ 消费者逻辑:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRetryWithParkingLot(Message failedMessage) {
    Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
      .getHeaders().get(HEADER_X_RETRIES_COUNT);
    if (retriesCnt == null) retriesCnt = 1;
    
    if (retriesCnt > MAX_RETRIES_COUNT) {
        log.info("Sending message to the parking lot queue");
        rabbitTemplate.send(EXCHANGE_PARKING_LOT, "", failedMessage); // fanout 不关心 routing-key
        return;
    }
    
    log.info("Retrying message for the {} time", retriesCnt);
    failedMessage.getMessageProperties()
      .getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
    
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

停车场队列消费者:

@RabbitListener(queues = QUEUE_PARKING_LOT)
public void processParkingLotQueue(Message failedMessage) {
    log.info("Received message in parking lot queue: {}", failedMessage.getMessageProperties().getHeaders());
    // 保存到数据库、发送告警邮件等
    sendAlertEmail("Failed message requires manual intervention: " + failedMessage.toString());
}

private void sendAlertEmail(String content) {
    // mock: 发送邮件通知
    log.warn("ALERT: {}", content);
}

测试输出:

WARN ... Execution of Rabbit message listener failed.
INFO ... Retrying message for the 1 time
WARN ... Execution of Rabbit message listener failed.
INFO ... Retrying message for the 2 time
WARN ... Execution of Rabbit message listener failed.
INFO ... Sending message to the parking lot queue
INFO ... Received message in parking lot queue

6. 自定义错误处理

除了队列机制,有时我们需要全局捕获异常,比如做统一日志记录或监控。

6.1 全局 ErrorHandler

Spring AMQP 默认使用 ConditionalRejectingErrorHandler,它会判断异常是否致命,决定是否拒绝消息(不再重试)。

我们可以自定义 ErrorHandler,仅对特定异常重试:

public class CustomErrorHandler implements ErrorHandler {
    @Override
    public void handleError(Throwable t) {
        // 只有 BusinessException 才重试,其他都拒绝
        if (!(t.getCause() instanceof BusinessException)) {
            throw new AmqpRejectAndDontRequeueException("Rejecting non-business exception", t);
        }
    }
}

⚠️ 注意:监听器抛出的异常会被包装为 ListenerExecutionFailedException,需通过 getCause() 获取原始异常。

6.2 FatalExceptionStrategy

更优雅的方式是自定义 FatalExceptionStrategy

public class CustomFatalExceptionStrategy 
      extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
    
    @Override
    public boolean isFatal(Throwable t) {
        // BusinessException 不是致命异常,允许重试
        return !(t.getCause() instanceof BusinessException);
    }
}

注入自定义策略:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
  ConnectionFactory connectionFactory,
  SimpleRabbitListenerContainerFactoryConfigurer configurer) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setErrorHandler(errorHandler());
    return factory;
}

@Bean
public ErrorHandler errorHandler() {
    return new ConditionalRejectingErrorHandler(customExceptionStrategy());
}

@Bean
FatalExceptionStrategy customExceptionStrategy() {
    return new CustomFatalExceptionStrategy();
}

这样,只有非 BusinessException 的异常才会被拒绝,避免无意义重试。

7. 总结

本文系统介绍了 Spring AMQP 中的错误处理方案:

  • 死信队列(DLQ):隔离失败消息,避免系统雪崩
  • 重试计数 + TTL:控制重试次数,防止无限循环
  • 停车场队列:处理需人工介入的关键消息
  • 自定义 ErrorHandler:全局异常控制,灵活决定重试策略

实际项目中,建议 组合使用多种策略,构建健壮的消息处理流水线。例如:先有限重试 → 进 DLQ → 超限进停车场 → 告警通知。

💡 完整代码示例已上传至 GitHub:https://github.com/eugenp/tutorials/tree/master/messaging-modules/spring-amqp


原始标题:Error Handling with Spring AMQP