1. Overview
In this tutorial, we’ll use the Java Client for NATS to connect to a NATS Server so that we can publish and receive messages.
NATS offers three primary modes of message exchange:
- Publish/Subscribe semantics deliver messages to all subscribers of a subject.
- Request/Reply messaging sends requests to all subscribers of a subject and routes responses back to the requestor. The first response to return to the requestor is processed. We can easily build a Request/Reply like system where all responses are handled. This feature may be added in future versions.
- Subscribers can also join message queue groups when they subscribe to a subject. Messages sent to the associated subject are only delivered to one subscriber in the queue group. This semantic can be used in either Publish/Subscribe or Request/Reply.
2. Setup
This section discusses the basic setup required to run the example. There are regular updates, so check the client GitHub page for the latest version.
2.1. Maven Dependency
First, we need to add the NATS library to our pom.xml file:
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.17.6</version>
</dependency>
There are more detailed instructions in the Maven section of the Java Client read-me. Gradle users can find instructions in the Gradle section of the read-me.
2.2. NATS Server
Second, we’ll need a NATS Server for exchanging messages. There are instructions for all major platforms on the NATS Docs site.
We’ll assume there’s a server running on localhost:4222.
3. Connect to a NATS Server
To exchange messages, we first need to connect to the server. Once the client is connected, messages can be exchanged by publishing and subscribing to subjects.
3.1. Connecting With or Without Custom Options
The Nats.connect() method in the static Nats class creates a Connection.
To create a Connection, we can provide information such as the host and port of the server. There are also other configurable connection options such as username and password, how to handle a secure connection, and many other options that influence how the connection behaves. If we don’t provide an Options object to the Nats.connect() method, it will use all defaults and try to connect to a server on localhost, port 4222:
Connection natsConnection = Nats.connect();
To demonstrate creating a connection with custom options, let’s write an example that builds a custom Options object with:
- a custom ConnectionListener that receives connection events and prints them out
- a custom ErrorListener that receives error events and prints them out
This function creates an Options object for a specific URI and passes it to Nats.connect. If the URI is null or empty, the Options will assume the default server location:
Options options = new Options.Builder()
.server(uri)
.connectionListener((connection, event) -> log.info("Connection Event: " + event))
.errorListener(new CustomErrorListener())
.build();
The next thing to do would be to connect using that Options instance:
Connection natsConnection = Nats.connect(options);
NATS connections are durable. The API will automatically attempt to reconnect a lost connection.
As part of the connection options, we’ve installed a ConnectionListener implementation lambda to notify us when a connection event like disconnect or reconnect occurs. We’ve also installed an ErrorListener. The ErrorListener interface contains multiple methods, so it cannot be created with a lambda. The example code has an implementation called CustomErrorListener.
Let’s run a quick test. First, we create a new NatsClient, which will initialize a connection. Then, we add a sleep for 60 seconds to keep the process running:
Connection conn = createConnection("nats://localhost:4222");
Thread.sleep(60000);
When we run this, the connection listener will report the connection opened:
Connection Event: nats: connection opened
Then, let’s stop and start our NATS server:
Exception Occurred: java.io.IOException: Read channel closed.
Connection Event: nats: connection disconnected
Exception Occurred: java.net.ConnectException: Connection refused: no further information
Connection Event: nats: connection disconnected
Exception Occurred: java.net.ConnectException: Connection refused: no further information
Connection Event: nats: connection disconnected
Connection Event: nats: connection reconnected
Connection Event: nats: subscriptions re-established
We can see the ConnectionListener and the CustomErrorListener receive and handle events such as exceptions, disconnection, and re-connection.
4. Exchange Messages
Now that we have a Connection, we can work on message processing. Exchanging messages requires publishing and subscribing on a subject.
When we publish a message to a subject, programs that have subscribed to the subject will receive a copy of the message. The subscribers must be ready before the message is published.
A NATS Message is a container for an array of bytes. In addition to the expected setData(byte[]) and byte[] getData() methods, there are methods for setting and getting the message destination subject and reply-to subjects.
4.1. Subscribe to Messages
We subscribe to subjects, which are Strings.
NATS supports both synchronous and asynchronous subscriptions.
4.2. Asynchronous Subscriptions
Asynchronous subscriptions require a Dispatcher. The Dispatcher contains the thread that gives incoming messages to the asynchronous handler.
Since a Connection can have multiple subscriptions, we can decide whether to have a Dispatcher for each subscription or share a Dispatcher among multiple subscriptions. Each Dispatcher has a run loop executing on its own thread. The run loop calls the message handler’s onMessage(Message) method and waits for the handler to return.
If we have multiple subscriptions with a high load or our message handler takes a long time to process each message, it’s probably better to have a Dispatcher for each subscription. If there are occasional messages or very short processing time, it’s fine to have them share a Dispatcher. This is typically tuned for each application based on the use case.
In this example, let’s create a Dispatcher and use it for multiple subscriptions. Creating the Dispatcher is identical regardless of whether it’s used for multiple subscriptions or not:
Dispatcher dispatcher = natsConnection.createDispatcher();
Subscription subscription = dispatcher.subscribe(subject, msg -> log.info("Subscription received message " + msg));
Subscription qSubscription = dispatcher.subscribe(subject, queueGroup, msg -> log.info("Queue subscription received message " + msg));
When we’re done with a subscription, it’s best to unsubscribe. If we don’t unsubscribe, the server will continue to send messages for the subject until the connection is closed. To unsubscribe from a subscription that was made with a Dispatcher, we call unsubscribe(…):
dispatcher.unsubscribe(subscription);
4.3. Synchronous Subscriptions
Synchronous subscriptions require that each subscriber poll for messages using the nextMessage method in its own thread. Here, we create a synchronous subscription. Notice a dispatcher is not required:
Subscription subscription = natsConnection.subscribe("mySubject");
Message message = subscription.nextMessage(1000);
The nextMessage(…) method call waits for the specified number of milliseconds or until it receives a message. If no message is available within the timeout, nextMessage returns null.
We’ll use synchronous subscriptions for our tests to keep the test cases simple. When we are done with a synchronous subscription, we need to call unsubscribe directly from the subscription:
4.4. Publishing Messages
Publishing a Message is a “fire and forget” operation — we don’t know if anyone was subscribing to the Message. There are several different APIs we can use for publishing a message. The simplest method requires only a subject String and the message bytes:
natsConnection.publish("mySubject", "Hi there!".getBytes());
There are also overloads for a few other combinations of publish such as passing in a Message instead of bytes.
4.5. Message Responses
There are two ways to get responses to a published Message. We can do a regular publish with a reply-to, or we can use the built-in request functionality. If we want to receive more than one response to any given subject, then we need to use the publish pattern. If we only need the first response to any given subject, then we can use the request pattern.
4.6. Publish to Get Many Responses
The reply-to field in a Message is provided for the publisher to supply a subject that responders will subscribe to. We can supply the reply-to subject ourselves when we publish a Message. We as the publisher can then subscribe to that reply-to subject and wait for as many responses as we want:
The reply-side subscribers would be subscribing to the publish subject. Notice we don’t have to hard-code the reply-to subject because it comes with the message:
The publish-side subscription can get all replies. The order of replies will be in the same order that the server receives them:
Message m1 = publishSideSubscription.nextMessage(1000);
Message m2 = publishSideSubscription.nextMessage(1000);
4.7. Use Request (/Reply) to Get Only One Response
When we use the request method, we cannot provide the reply-to subject — the client does it for us. It waits for the very first response, meaning even if multiple subscribers reply, only the first response received is the one returned. All other responses get ignored.
Message response = natsConnection.request("requestSubject", "Please respond!".getBytes(), Duration.ofMillis(1000));
The reply-side subscriptions are identical, but the requestor (publisher) does not need to set up a subscription, it just needs to wait for the response. There are also overloads to the request method that use a CompletableFuture
4.8. Wildcard Subscriptions
NATS server supports subject wildcards.
Wildcards operate on subject tokens separated by the ’.’ character. The asterisk character ‘*’ matches an individual token. The greater-than symbol ‘>’ is a wildcard match for the remainder of a subject, which may be more than one token.
For example:
- foo.* matches foo.bar, but not foo.bar.requests
- foo.> matches foo.bar, foo.bar.requests, foo.bar.baeldung, and so on
Let’s try a few tests:
Subscription starSubscription = natsConnection.subscribe("segment.*");
natsConnection.publish("segment.another", convertStringToBytes("hello segment star"));
Message message = segmentStarSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello segment star sub", convertMessageDataBytesToString(message));
natsConnection.publish("segment.one.two", "hello there");
message = segmentStarSubscription.nextMessage(200);
assertNull("Got message!", message);
Subscription segmentGreaterSubscription = natsConnection.subscribe("segment.>");
client.publishMessage("segment.one.two", "hello segment greater sub");
message = greaterSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello segment greater sub", new String(message.getData()));
5. Message Queues
Subscribers may specify queue groups at subscription time. When a message is published to the group, NATS will deliver it to one and only one subscriber.
Queue groups do not persist messages. If no listeners are available, the message is discarded.
5.1. Publishing to Queues
Publishing messages to queue groups is no different than normal publishing — it simply requires publishing to the associated subject:
natsConnection.publish("mySubject", convertStringToBytes("data"));
5.2. Subscribing to Queues
Subscribers specify a queue group name as a String:
Subscription subscription = natsConnection.subscribe("mySubject", "myQueue");
There’s also an asynchronous version, of course:
List<Message> messages = new ArrayList<>();
Dispatcher dispatcher = connection.createDispatcher();
Subscription subscription = dispatcher.subscribe("mySubject", "myQueue", messages::add);
The subscription creates the queue on the NATS server. The NATS server will route the message to the queue and select a message receiver:
Subscription queue1 = natsConnection.subscribe("mySubject", "myQueue");
Subscription queue2 = natsConnection.subscribe("mySubject", "myQueue");
Only one of the subscriptions, queue1 or queue2, will receive any specific message. If we subscribe to the same subject without a queue, as in:
Subscription subscription1 = natsConnection.subscribe("mySubject");
Subscription subscription2 = natsConnection.subscribe("mySubject");
Then both subscription1 and subscription2 will receive all messages. If all four subscriptions queue1, queue2, subscription1, and subscription2 are subscribed at the same time, queue1 and queue2 will still work as a queue, and subscription1 and subscription2 will still get all messages.
6. Conclusion
In this brief introduction, we connected to a NATS server and sent both pub/sub messages and load-balanced queue messages. We looked at NATS support for wildcard subscriptions. We also used request/reply messaging.
Code samples, as always, can be found over on GitHub.