1. 概述

在本教程中,我们将了解消息队列和发布者/订阅者的使用。这些是分布式系统中用于两个或多个服务相互通信的常见模式。

在本教程中,所有示例都将使用 RabbitMQ 消息代理进行展示,因此首先按照 RabbitMQ 的教程在本地启动并运行。要更深入地了解 RabbitMQ,请查看我们的其他教程

注意:RabbitMQ 有许多替代方案可用于本教程中的相同示例,例如KafkaGoogle Cloud Pub-SubAmazon SQS等。

2.什么是消息队列?

让我们首先看看消息队列。 消息队列由一个发布服务和多个通过队列进行通信的消费者服务组成。 这种 通信通常是发布者向消费者发出命令的一种方式 。发布服务通常会将消息放入队列或交换器中,并且单个消费者服务将使用该消息并基于该消息执行操作。

考虑以下交换:

1-1

由此,我们可以看到发布者服务正在将消息“m n+1”放入队列中。另外,我们还可以看到队列中已经存在多条消息等待被消费。在右侧,我们有 2 个消费服务“A”和“B”,它们正在侦听队列中的消息。

现在让我们考虑一段时间后的相同交换:

2-1

首先,我们可以看到Publisher的消息已经被推送到了队列的尾部。接下来,要考虑的重要部分是图像的右侧。我们可以看到消费者“A”已读取消息“m 1”,因此队列中的其他服务“B”不再可以使用该消息。

2.1.在哪里使用消息队列

消息队列经常用在我们想要从服务中委派工作的地方。 这样做时,我们希望确保该工作仅执行一次。

使用消息队列在微服务架构中以及开发基于云或无服务器应用程序时很流行,因为它允许我们根据负载水平扩展应用程序。

例如,如果队列中有许多消息等待处理,我们可以启动多个消费者服务,它们监听同一个消息队列并处理消息的涌入。一旦处理完消息,就可以在流量最小时关闭服务,以节省运行成本。

2.2.使用 RabbitMQ 的示例

为了清楚起见,让我们看一个例子。我们的示例将采用披萨餐厅的形式。想象一下,人们可以通过应用程序订购披萨,披萨店的厨师会在顾客进来时接货。在这个例子中,客户是我们的发布者,厨师是我们的消费者。

首先,让我们定义我们的队列:

private static final String MESSAGE_QUEUE = "pizza-message-queue";

@Bean
public Queue queue() {
    return new Queue(MESSAGE_QUEUE);
}

使用 Spring AMQP,我们创建了一个名为“pizza-message-queue”的队列。接下来,让我们定义将消息发布到新定义的队列的发布者:

public class Publisher {

    private RabbitTemplate rabbitTemplate;
    private String queue;

    public Publisher(RabbitTemplate rabbitTemplate, String queue) {
        this.rabbitTemplate = rabbitTemplate;
        this.queue = queue;
    }

    @PostConstruct
    public void postMessages() {
        rabbitTemplate.convertAndSend(queue, "1 Pepperoni");
        rabbitTemplate.convertAndSend(queue, "3 Margarita");
        rabbitTemplate.convertAndSend(queue, "1 Ham and Pineapple (yuck)");
    }
}

Spring AMQP 将为我们创建一个 RabbitTemplate bean,它连接到我们的 RabbitMQ 交换以减少配置开销。我们的发布者通过向我们的队列发送 3 条消息来利用这一点。

现在我们的披萨订单已经收到,我们需要一个单独的消费者应用程序。这将在示例中充当我们的厨师并读取消息:

public class Consumer {
    public void receiveOrder(String message) {
        System.out.printf("Order received: %s%n", message);
    }
}

现在让我们为队列创建一个 MessageListenerAdapter ,它将使用反射调用 Consumer 的接收订单方法:

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(MESSAGE_QUEUE);
    container.setMessageListener(listenerAdapter);
    return container;
}

@Bean
public MessageListenerAdapter listenerAdapter(Consumer consumer) {
    return new MessageListenerAdapter(consumer, "receiveOrder");
}

从队列中读取的消息现在将被路由到 Consumer 类的 receiveOrder 方法。为了运行这个应用程序,我们可以创建任意数量的消费者应用程序来履行传入的订单。例如,如果将 400 个披萨订单放入队列,那么我们可能需要 1 个以上的消费者“厨师”,否则订单会很慢。在这种情况下,我们可能会启动 10 个消费者实例来及时履行订单。

3. 什么是发布-订阅?

现在我们已经介绍了消息队列,让我们看看发布-订阅。 相反,对于消息队列,在发布-订阅架构中,我们希望所有消费(订阅)应用程序 至少获得发布者发布到交换器的消息的 1 个 副本。

考虑以下交换:

3-1

在左侧,我们有一个发布者向主题发送消息“m n+1”。该主题将向其订阅者广播此消息。这些订阅绑定到队列。每个队列都有一个监听订阅者服务等待消息。

现在让我们考虑一段时间后的相同交换:

4

两个订阅服务都在消耗“m 1”,因为两者都收到了该消息的副本。此外,该主题正在向其所有订阅者分发新消息“m n+1”。

当我们需要保证每个订阅者都能获得消息的副本时,应该使用 Pub sub。

3.1.使用 RabbitMQ 的示例

想象一下我们有一个服装网站。该网站能够向用户发送推送通知以通知他们优惠。我们的系统可以通过电子邮件或短信警报发送通知。在这种情况下,网站是我们的发布者,文本和电子邮件提醒服务是我们的订阅者。

首先,让我们定义主题交换并为其绑定 2 个队列:

private static final String PUB_SUB_TOPIC = "notification-topic";
private static final String PUB_SUB_EMAIL_QUEUE = "email-queue";
private static final String PUB_SUB_TEXT_QUEUE = "text-queue";

@Bean
public Queue emailQueue() {
    return new Queue(PUB_SUB_EMAIL_QUEUE);
}

@Bean
public Queue textQueue() {
    return new Queue(PUB_SUB_TEXT_QUEUE);
}

@Bean
public TopicExchange exchange() {
    return new TopicExchange(PUB_SUB_TOPIC);
}

@Bean
public Binding emailBinding(Queue emailQueue, TopicExchange exchange) {
    return BindingBuilder.bind(emailQueue).to(exchange).with("notification");
}

@Bean
public Binding textBinding(Queue textQueue, TopicExchange exchange) {
    return BindingBuilder.bind(textQueue).to(exchange).with("notification");
}

我们现在使用路由键“通知”绑定了 2 个队列,这意味着使用此路由键在主题上发布的任何消息都将发送到两个队列。更新我们之前创建的 Publisher 类,我们可以向我们的交换发送一些消息:

rabbitTemplate.convertAndSend(topic, "notification", "New Deal on T-Shirts: 95% off!");
rabbitTemplate.convertAndSend(topic, "notification", "2 for 1 on all Jeans!");

4. 比较

现在我们已经接触了这两个领域,让我们简要比较一下这两种类型的交换。

如前所述, 消息队列和发布-订阅架构模式都是分解应用程序以使其更具水平可扩展性的好方法。

使用发布-订阅或消息队列的另一个好处是通信比传统的同步通信模式更持久。 例如,如果应用程序 A 通过异步 HTTP 调用与应用程序 B 进行通信,那么如果其中一个应用程序出现故障,数据就会丢失,并且必须重试请求。

使用消息队列,如果消费者应用程序实例出现故障,则另一个消费者将能够处理该消息。使用发布-订阅,如果订阅者宕机,那么一旦它恢复了丢失的消息,就可以在其订阅队列中使用。

最后,背景是关键。选择是否使用发布-订阅或消息队列架构归结为准确定义您希望消费服务的行为方式。 要记住的最重要的因素是问“每个消费者都收到每条消息是否重要?

5. 结论

在本教程中,我们了解了发布-订阅和消息队列以及它们各自的一些特征。本教程中提到的所有代码都可以在 GitHub 上找到。