1. Introduction
In this tutorial, we’ll explore the concept of fanout and topic exchanges with Spring AMQP and RabbitMQ.
At a high level, fanout exchanges will broadcast the same message to all bound queues, while topic exchanges use a routing key for passing messages to a particular bound queue or queues.
Prior reading of Messaging With Spring AMQP is recommended for this tutorial.
2. Setting Up a Fanout Exchange
Let’s set up one fanout exchange with two queues bound to it. When we send a message to this exchange both queues will receive the message. Our fanout exchange ignores any routing key included with the message.
Spring AMQP allows us to aggregate all the declarations of queues, exchanges, and bindings in a Declarables object:
@Bean
public Declarables fanoutBindings() {
Queue fanoutQueue1 = new Queue("fanout.queue1", false);
Queue fanoutQueue2 = new Queue("fanout.queue2", false);
FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");
return new Declarables(
fanoutQueue1,
fanoutQueue2,
fanoutExchange,
BindingBuilder.bind(fanoutQueue1).to(fanoutExchange),
BindingBuilder.bind(fanoutQueue2).to(fanoutExchange));
}
3. Setting Up a Topic Exchange
Now, we’ll also set up a topic exchange with two queues, each with a different binding pattern:
@Bean
public Declarables topicBindings() {
Queue topicQueue1 = new Queue(topicQueue1Name, false);
Queue topicQueue2 = new Queue(topicQueue2Name, false);
TopicExchange topicExchange = new TopicExchange(topicExchangeName);
return new Declarables(
topicQueue1,
topicQueue2,
topicExchange,
BindingBuilder
.bind(topicQueue1)
.to(topicExchange).with("*.important.*"),
BindingBuilder
.bind(topicQueue2)
.to(topicExchange).with("#.error"));
}
A topic exchange allows us to bind queues to it with different key patterns. This is very flexible and allows us to bind multiple queues with the same pattern or even multiple patterns to the same queue.
When the message’s routing key matches the pattern, it will be placed in the queue. If a queue has multiple bindings which match the message’s routing key, only one copy of the message is placed on the queue.
Our binding patterns can use an asterisk (“*”) to match a word in a specific position or a pound sign (“#”) to match zero or more words.
So, our topicQueue1 will receive messages which have routing keys having a three-word pattern with the middle word being “important” – for example: “user.important.error” or “blog.important.notification”.
And, our topicQueue2 will receive messages which have routing keys ending in the word error; matching examples are “error”, “user.important.error” or “blog.post.save.error”.
4. Setting Up a Producer
We’ll use the convertAndSend method of the RabbitTemplate to send our sample messages:
String message = " payload is broadcast";
return args -> {
rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", "fanout" + message);
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN,
"topic important warn" + message);
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR,
"topic important error" + message);
};
The RabbitTemplate provides many overloaded convertAndSend() methods for different exchange types.
When we send a message to a fanout exchange, the routing key is ignored, and the message is passed to all bound queues.
When we send a message to the topic exchange, we need to pass a routing key. Based on this routing key the message will be delivered to specific queues.
5. Configuring Consumers
Finally, let’s set up four consumers – one for each queue – to pick up the messages produced:
@RabbitListener(queues = {FANOUT_QUEUE_1_NAME})
public void receiveMessageFromFanout1(String message) {
System.out.println("Received fanout 1 message: " + message);
}
@RabbitListener(queues = {FANOUT_QUEUE_2_NAME})
public void receiveMessageFromFanout2(String message) {
System.out.println("Received fanout 2 message: " + message);
}
@RabbitListener(queues = {TOPIC_QUEUE_1_NAME})
public void receiveMessageFromTopic1(String message) {
System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message);
}
@RabbitListener(queues = {TOPIC_QUEUE_2_NAME})
public void receiveMessageFromTopic2(String message) {
System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message);
}
We configure consumers using the @RabbitListener annotation. The only argument passed here is the queues’ name. Consumers are not aware here of exchanges or routing keys.
6. Running the Example
Our sample project is a Spring Boot application, and so it will initialize the application together with a connection to RabbitMQ and set up all queues, exchanges, and bindings.
By default, our application expects a RabbitMQ instance running on the localhost on port 5672. We can modify this and other defaults in application.yaml.
Our project exposes HTTP endpoint on the URI – /broadcast – that accepts POSTs with a message in the request body.
When we send a request to this URI with body “Test” we should see something similar to this in the output:
Received fanout 1 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important warn payload is broadcast
Received topic 2 (#.error) message: topic important error payload is broadcast
Received fanout 2 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important error payload is broadcast
The order in which we will see these messages is, of course, not guaranteed.
7. Conclusion
In this quick tutorial, we covered fanout and topic exchanges with Spring AMQP and RabbitMQ.
The complete source code and all code snippets for this tutorial are available on the GitHub repository.