1. Introduction
Apache Pulsar is a distributed open source Publication/Subscription based messaging system developed at Yahoo.
It was created to power Yahoo’s critical applications like Yahoo Mail, Yahoo Finance, Yahoo Sports etc. Then, in 2016, it was open sourced under the Apache Software Foundation.
2. Architecture
Pulsar is a multi-tenant, high-performance solution for server-to-server messaging. It’s composed of a set of brokers and bookies along with an inbuilt Apache ZooKeeper for configuration and management. The bookies are from Apache BookKeeper which provide storage for the messages until they are consumed.
In a cluster we’ll have:
- Multiple cluster brokers to handle the incoming message from producers and dispatch the message to consumers
- Apache BookKeeper to support message persistence
- Apache ZooKeeper to store the cluster configuration
To better understand this, let’s have a look at the architecture diagram from the documentation:
3. Key Features
Let’s start with a quick look at some of the key features:
- Inbuilt support for multiple clusters
- Support for Geo-replication of messages across multiple clusters
- Multiple subscription modes
- Scalable to millions of topics
- Uses Apache BookKeeper to guarantee message delivery.
- Low latency
Now, let’s discuss some of the key features in detail.
3.1. Messaging Model
The framework provides a flexible messaging model. In general messaging architectures have two messaging models i.e. queuing and publisher/subscriber. Publisher/Subscriber is a broadcast messaging system in which the message is sent to all consumers. On the other hand, queuing is a point to point communication.
Pulsar combines both concepts in one generalized API. The publisher publishes the messages to different topics. Then these messages are broadcasted to all subscriptions.
The consumers subscribe to get messages. The library allows consumers to choose the different ways to consume messages in the same subscription which includes exclusive, shared and failover. We’ll discuss these subscription types in detail in the later sections.
3.2. Deployment Modes
Pulsar has inbuilt support for deployment in different environments. This means we can use it on standard on-premise machines, or deploy it in a Kubernetes cluster, Google or AWS Cloud.
It can be executed as a single node for development and testing purposes. In this case, all the components (broker, BookKeeper, and ZooKeeper) run in a single process.
3.3. Geo-Replication
The library provides out of the box support for geo-replication of data. We can enable replication of messages between multiple clusters by configuring different geographical regions.
Message data is replicated in near real time. In case of network failure across clusters, the data is always safe and stored in the BookKeeper. The replication system continues to retry until the replication is successful.
The geo-replication feature also allows the organization to deploy Pulsar across different cloud providers and replicate the data. This helps them to avoid the use of proprietary cloud provider APIs.
3.4. Permanence
After Pulsar reads and acknowledges the data, it guarantees no data loss. Data durability is related to the number of disks configured to store the data.
Pulsar ensures durability by using bookies (Apache BookKeeper instance) running in storage nodes. Whenever a bookie receives a message, it saves a copy in memory and also writes the data to a WAL (Write Ahead Log). This log works in the same way as a database WAL. Bookies operate on database transaction principle and ensure that data is not lost even in case of machine failure.
Apart from the above, Pulsar can also withstand multiple node failures. The library replicates data to multiple bookies, then sends an acknowledgment message to the producer. This mechanism guarantees that zero data loss even in case of multiple hardware failures.
4. Single Node Setup
Now let’s see how to set up a single node cluster of Apache Pulsar.
Apache also provides a simple client API with bindings for Java, Python, and C++. We’ll later create a simple Java producer and subscription example.
4.1. Installation
Apache Pulsar is available as a binary distribution. Let’s start by downloading it:
wget https://archive.apache.org/dist/incubator/pulsar/pulsar-2.1.1-incubating/apache-pulsar-2.1.1-incubating-bin.tar.gz
When the download is complete, we can unarchive the zip file. The unarchived distribution will contain bin, conf, example, licenses and lib folder.
After that, we need to download the inbuilt connectors. These now ship as a separate package:
wget https://archive.apache.org/dist/incubator/pulsar/pulsar-2.1.1-incubating/apache-pulsar-io-connectors-2.1.1-incubating-bin.tar.gz
Let’s unarchive the connectors and copy the Connectors folder in the Pulsar folder.
4.2. Starting an Instance
To start a standalone instance we can execute:
bin/pulsar standalone
5. Java Client
Now we’ll create a Java project to produce and consume messages. We’ll also create examples for different subscription types.
5.1. Setting up the Project
We’ll start by adding the pulsar-client dependency to our project:
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.1.1-incubating</version>
</dependency>
5.2. Producer
Let’s continue by creating a Producer example. Here, we’ll create a topic and a producer.
First, we need to create a PulsarClient which will connect to a Pulsar service on a specific host and port, using its own protocol. Many producers and consumers can share a single client object.
Now, we’ll create a Producer with the specific topic name:
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "test-topic";
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
Producer<byte[]> producer = client.newProducer()
.topic(TOPIC_NAME)
.compressionType(CompressionType.LZ4)
.create();
The producer will send 5 messages:
IntStream.range(1, 5).forEach(i -> {
String content = String.format("hi-pulsar-%d", i);
Message<byte[]> msg = MessageBuilder.create()
.setContent(content.getBytes())
.build();
MessageId msgId = producer.send(msg);
});
5.3. Consumer
Next, we’ll create the consumer to get the messages created by the producer. The consumer also requires the same PulsarClient to connect with our server:
Consumer<byte[]> consumer = client.newConsumer()
.topic(TOPIC_NAME)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(SUBSCRIPTION_NAME)
.subscribe();
Here we’ve created the client with a Shared subscription type*.* This allows multiple consumers to attach to the same subscription and get messages.
5.4. Subscription Types for Consumer
In the above example of the consumer, we have created a subscription with shared type. We can also create exclusive and failover subscriptions.
The exclusive subscription allows only one consumer to be subscribed.
On the other hand, a failover subscription allows the user to define the fallback consumer, in case one consumer fails, as shown in this Apache diagram:
6. Conclusion
In this article, we’ve highlighted the features of the Pulsar messaging system such as the messaging model, geo-replication and strong durability guarantees.
We also learned how to set up a single node and how to use the Java client.
As always, the full implementation of this tutorial can be found over on Github.