1. 概述

Apache RocketMQ 是阿里巴巴开源的一个分布式消息队列和流数据平台,本教程我们将使用Spring Boot和RocketMQ,创建一个消息生产者和消费者。

2. Maven依赖

对于Maven项目,我们需要添加Apache RocketMQ Spring Boot Starter依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>

生产消息

在我们的示例中,我们将创建一个基础的消息生产者,当用户向购物车添加或移除商品时,会发送事件。

首先,我们在application.properties中设置服务器位置和组名:

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=cart-producer-group

注意,如果有多个名称服务器,我们可以像host:port;host:port一样列出它们。

现在,为了简化,我们创建一个CommandLineRunner应用程序,并在应用启动时生成一些事件:

@SpringBootApplication
public class CartEventProducer implements CommandLineRunner {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public static void main(String[] args) {
        SpringApplication.run(CartEventProducer.class, args);
    }

    public void run(String... args) throws Exception {
        rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1));
        rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2));
        rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1));
    }
}

CartItemEvent只包含两个属性:商品ID和数量:

class CartItemEvent {
    private String itemId;
    private int quantity;

    // constructor, getters and setters
}

在上述示例中,我们使用convertAndSend()方法,这是AbstractMessageSendingTemplate抽象类定义的一个通用方法,用于发送我们的购物车事件。它接受两个参数:目的地,对我们来说是主题名称和消息负载。

消息消费者

接收RocketMQ消息就像创建一个带有@RocketMQMessageListener注解的Spring组件并实现RocketMQListener接口那样简单:

@SpringBootApplication
public class CartEventConsumer {

    public static void main(String[] args) {
        SpringApplication.run(CartEventConsumer.class, args);
    }

    @Service
    @RocketMQMessageListener(
      topic = "cart-item-add-topic",
      consumerGroup = "cart-consumer_cart-item-add-topic"
    )
    public class CardItemAddConsumer implements RocketMQListener<CartItemEvent> {
        public void onMessage(CartItemEvent addItemEvent) {
            log.info("Adding item: {}", addItemEvent);
            // additional logic
        }
    }

    @Service
    @RocketMQMessageListener(
      topic = "cart-item-removed-topic",
      consumerGroup = "cart-consumer_cart-item-removed-topic"
    )
    public class CardItemRemoveConsumer implements RocketMQListener<CartItemEvent> {
        public void onMessage(CartItemEvent removeItemEvent) {
            log.info("Removing item: {}", removeItemEvent);
            // additional logic
        }
    }
}

我们需要为监听的每个消息主题创建一个单独的组件。在这些监听器中,我们通过@RocketMQMessageListener注解定义主题名称和消费者组名称。

同步与异步传输

在之前的例子中,我们使用了convertAndSend方法发送消息。不过,我们还有其他选择。

例如,我们可以调用syncSend,它与convertAndSend不同,因为它返回SendResult对象。

它可以用来验证消息是否成功发送,或者获取其ID:

public void run(String... args) throws Exception { 
    SendResult addBikeResult = rocketMQTemplate.syncSend("cart-item-add-topic", 
      new CartItemEvent("bike", 1)); 
    SendResult addComputerResult = rocketMQTemplate.syncSend("cart-item-add-topic", 
      new CartItemEvent("computer", 2)); 
    SendResult removeBikeResult = rocketMQTemplate.syncSend("cart-item-removed-topic", 
      new CartItemEvent("bike", 1)); 
}

convertAndSend类似,只有在发送过程完成时才会返回这个方法。

在需要高可靠性的场景,如重要通知或短信通知,我们应该使用同步传输。

另一方面,我们可能希望异步发送消息并在发送完成后得到通知。

这可以通过asyncSend来实现,它接受一个SendCallback参数并立即返回:

rocketMQTemplate.asyncSend("cart-item-add-topic", new CartItemEvent("bike", 1), new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        log.error("Successfully sent cart item");
    }

    @Override
    public void onException(Throwable throwable) {
        log.error("Exception during cart item sending", throwable);
    }
});

在需要高吞吐量的情况下,我们会使用异步传输。

最后,对于有极高吞吐量要求的情况,我们可以使用sendOneWay代替asyncSendsendOneWayasyncSend的不同之处在于它不保证消息会被发送。

单向传输也可以用于普通可靠性情况,如收集日志。

在事务中发送消息

RocketMQ提供了在事务中发送消息的能力。我们可以通过使用sendInTransaction()方法来实现:

MessageBuilder.withPayload(new CartItemEvent("bike", 1)).build();
rocketMQTemplate.sendMessageInTransaction("test-transaction", "topic-name", msg, null);

此外,我们还需要实现RocketMQLocalTransactionListener接口:

@RocketMQTransactionListener(txProducerGroup="test-transaction")
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
      @Override
      public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
          // ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN
          return RocketMQLocalTransactionState.UNKNOWN;
      }

      @Override
      public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
          // ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN
          return RocketMQLocalTransactionState.COMMIT;
      }
}

sendMessageInTransaction()中,第一个参数是事务名称,必须与@RocketMQTransactionListener的成员字段txProducerGroup相同。

消息生产者配置

我们还可以配置消息生产者的各个方面:

  • rocketmq.producer.send-message-timeout: 发送消息超时时间(毫秒),默认值为3000
  • rocketmq.producer.compress-message-body-threshold: 当消息大小超过此阈值时,RocketMQ将压缩消息,默认值为1024字节。
  • rocketmq.producer.max-message-size: 最大消息大小(字节),默认值为4096字节。
  • rocketmq.producer.retry-times-when-send-async-failed: 异步模式下内部发送失败的最大重试次数,默认值为2。
  • rocketmq.producer.retry-next-server: 表示在发送失败时内部是否尝试另一个代理,默认值为false
  • rocketmq.producer.retry-times-when-send-failed: 异步模式下内部发送失败的最大重试次数,默认值为2。

总结

在这篇文章中,我们学习了如何使用Apache RocketMQ和Spring Boot发送和消费消息。如往常一样,所有源代码可在GitHub上找到。