1. 概述
软件组件解耦是软件设计的重要组成部分。实现这一目标的一种方法是使用消息系统,它提供组件(服务)之间异步通信的方式。本文将介绍其中一种系统:RabbitMQ。
RabbitMQ 是一个实现了高级消息队列协议 (AMQP)[^1] 的消息代理。它为主要编程语言提供了客户端库。
除了用于解耦软件组件外,RabbitMQ 还可以用于:
- 执行后台操作
- 执行异步操作
2. 消息模型
首先,让我们从高层次上快速了解一下消息传递的工作原理。
简单来说,与消息系统交互的应用有两种类型:生产者和消费者。生产者负责将消息(发布)发送到代理,而消费者则从代理接收消息。通常,这些程序(软件组件)运行在不同的机器上,RabbitMQ 在它们之间充当通信中间件。
在这篇文章中,我们将讨论两个服务通过RabbitMQ进行通信的简单示例。其中一个服务将向RabbitMQ发布消息,另一个服务则负责消费消息。
3. 配置
首先,让我们按照官方指南这里运行RabbitMQ。
我们将自然地使用Java客户端与RabbitMQ服务器交互;这个客户端的Maven依赖项如下:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.0</version>
</dependency>
运行官方指南中的RabbitMQ代理后,我们需要使用Java客户端连接到它:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
我们使用ConnectionFactory
来设置与服务器的连接,它处理协议(AMQP)和身份验证。在这里,我们在本地主机上连接,可以通过setHost
函数修改主机名。
如果RabbitMQ服务器没有使用默认端口,我们可以使用setPort
来设置端口;RabbitMQ的默认端口是15672:
factory.setPort(15678);
我们可以设置用户名和密码:
factory.setUsername("user1");
factory.setPassword("MyPassword");
接下来,我们将使用这个连接来发布和消费消息。
4. 生产者
假设一个简单的场景:一个网站允许用户添加新产品。每当有新产品添加时,我们需要向客户发送一封电子邮件。
首先,我们定义一个队列:
channel.queueDeclare("products_queue", false, false, false, null);
每次用户添加新产品时,我们将向队列发布一条消息:
String message = "product details";
channel.basicPublish("", "products_queue", null, message.getBytes());
最后,关闭通道和连接:
channel.close();
connection.close();
这条消息将被另一个负责向客户发送电子邮件的服务消费。
5. 消费者
现在来看看我们在消费者端能实现什么;我们将声明相同的队列:
channel.queueDeclare("products_queue", false, false, false, null);
这是如何定义处理队列消息的异步消费者:
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
// process the message
}
};
channel.basicConsume("products_queue", true, consumer);
6. 总结
这篇简短的文章涵盖了RabbitMQ的基本概念,并讨论了一个使用它的简单示例。
完整教程的实现可以在GitHub项目中找到。