1. Introduction

Apache Curator is a Java client for Apache Zookeeper, the popular coordination service for distributed applications.

In this tutorial, we’ll introduce some of the most relevant features provided by Curator:

  • Connection Management – managing connections and retry policies
  • Async – enhancing existing client by adding async capabilities and the use of Java 8 lambdas
  • Configuration Management – having a centralized configuration for the system
  • Strongly-Typed Models – working with typed models
  • Recipes – implementing leader election, distributed locks or counters

2. Prerequisites

To start with, it’s recommended to take a quick look at the Apache Zookeeper and its features.

For this tutorial, we assume that there’s already a standalone Zookeeper instance running on 127.0.0.1:2181; here are instructions on how to install and run it, if you’re just getting started.

First, we’ll need to add the curator-x-async dependency to our pom.xml:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-async</artifactId>
    <version>4.0.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>

The latest version of Apache Curator 4.X.X has a hard dependency with Zookeeper 3.5.X which is still in beta right now.

And so, in this article, we’re going to use the currently latest stable Zookeeper 3.4.11 instead.

So we need to exclude the Zookeeper dependency and add the dependency for our Zookeeper version to our pom.xml:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.11</version>
</dependency>

For more information about compatibility, please refer to this link.

3. Connection Management

The basic use case of Apache Curator is connecting to a running Apache Zookeeper instance.

The tool provides a factory to build connections to Zookeeper using retry policies:

int sleepMsBetweenRetries = 100;
int maxRetries = 3;
RetryPolicy retryPolicy = new RetryNTimes(
  maxRetries, sleepMsBetweenRetries);

CuratorFramework client = CuratorFrameworkFactory
  .newClient("127.0.0.1:2181", retryPolicy);
client.start();
 
assertThat(client.checkExists().forPath("/")).isNotNull();

In this quick example, we’ll retry 3 times and will wait 100 ms between retries in case of connectivity issues.

Once connected to Zookeeper using the CuratorFramework client, we can now browse paths, get/set data and essentially interact with the server.

4. Async

The Curator Async module wraps the above CuratorFramework client to provide non-blocking capabilities using the CompletionStage Java 8 API.

Let’s see how the previous example looks like using the Async wrapper:

int sleepMsBetweenRetries = 100;
int maxRetries = 3;
RetryPolicy retryPolicy 
  = new RetryNTimes(maxRetries, sleepMsBetweenRetries);

CuratorFramework client = CuratorFrameworkFactory
  .newClient("127.0.0.1:2181", retryPolicy);

client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);

AtomicBoolean exists = new AtomicBoolean(false);

async.checkExists()
  .forPath("/")
  .thenAcceptAsync(s -> exists.set(s != null));

await().until(() -> assertThat(exists.get()).isTrue());

Now, the checkExists() operation works in asynchronous mode, not blocking the main thread. We can also chain actions one after each other using the thenAcceptAsync() method instead, which uses the CompletionStage API.

5. Configuration Management

In a distributed environment, one of the most common challenges is to manage shared configuration among many applications. We can use Zookeeper as a data store where to keep our configuration.

Let’s see an example using Apache Curator to get and set data:

CuratorFramework client = newClient();
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
String key = getKey();
String expected = "my_value";

client.create().forPath(key);

async.setData()
  .forPath(key, expected.getBytes());

AtomicBoolean isEquals = new AtomicBoolean();
async.getData()
  .forPath(key)
  .thenAccept(data -> isEquals.set(new String(data).equals(expected)));

await().until(() -> assertThat(isEquals.get()).isTrue());

In this example, we create the node path, set the data in Zookeeper, and then we recover it checking that the value is the same. The key field could be a node path like /config/dev/my_key.

5.1. Watchers

Another interesting feature in Zookeeper is the ability to watch keys or nodes. It allows us to listen to changes in the configuration and update our applications without needing to redeploy.

Let’s see how the above example looks like when using watchers:

CuratorFramework client = newClient()
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
String key = getKey();
String expected = "my_value";

async.create().forPath(key);

List<String> changes = new ArrayList<>();

async.watched()
  .getData()
  .forPath(key)
  .event()
  .thenAccept(watchedEvent -> {
    try {
        changes.add(new String(client.getData()
          .forPath(watchedEvent.getPath())));
    } catch (Exception e) {
        // fail ...
    }});

// Set data value for our key
async.setData()
  .forPath(key, expected.getBytes());

await()
  .until(() -> assertThat(changes.size()).isEqualTo(1));

We configure the watcher, set the data, and then confirm the watched event was triggered. We can watch one node or a set of nodes at once.

6. Strongly Typed Models

Zookeeper primarily works with byte arrays, so we need to serialize and deserialize our data. This allows us some flexibility to work with any serializable instance, but it can be hard to maintain.

To help here, Curator adds the concept of typed models which delegates the serialization/deserialization and allows us to work with our types directly. Let’s see how that works.

First, we need a serializer framework. Curator recommends using the Jackson implementation, so let’s add the Jackson dependency to our pom.xml:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.13.0</version>
</dependency>

Now, let’s try to persist our custom class HostConfig:

public class HostConfig {
    private String hostname;
    private int port;

    // getters and setters
}

We need to provide the model specification mapping from the HostConfig class to a path, and use the modeled framework wrapper provided by Apache Curator:

ModelSpec<HostConfig> mySpec = ModelSpec.builder(
  ZPath.parseWithIds("/config/dev"), 
  JacksonModelSerializer.build(HostConfig.class))
  .build();

CuratorFramework client = newClient();
client.start();

AsyncCuratorFramework async 
  = AsyncCuratorFramework.wrap(client);
ModeledFramework<HostConfig> modeledClient 
  = ModeledFramework.wrap(async, mySpec);

modeledClient.set(new HostConfig("host-name", 8080));

modeledClient.read()
  .whenComplete((value, e) -> {
     if (e != null) {
          fail("Cannot read host config", e);
     } else {
          assertThat(value).isNotNull();
          assertThat(value.getHostname()).isEqualTo("host-name");
          assertThat(value.getPort()).isEqualTo(8080);
     }
   });

The whenComplete() method when reading the path /config/dev will return the HostConfig instance in Zookeeper.

7. Recipes

Zookeeper provides this guideline to implement high-level solutions or recipes such as leader election, distributed locks or shared counters.

Apache Curator provides an implementation for most of these recipes. To see the full list, visit the Curator Recipes documentation.

All of these recipes are available in a separate module:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.1</version>
</dependency>

Let’s jump right in and start understanding these with some simple examples.

7.1. Leader Election

In a distributed environment, we may need one master or leader node to coordinate a complex job.

This is how the usage of the Leader Election recipe in Curator looks like:

CuratorFramework client = newClient();
client.start();
LeaderSelector leaderSelector = new LeaderSelector(client, 
  "/mutex/select/leader/for/job/A", 
  new LeaderSelectorListener() {
      @Override
      public void stateChanged(
        CuratorFramework client, 
        ConnectionState newState) {
      }

      @Override
      public void takeLeadership(
        CuratorFramework client) throws Exception {
      }
  });

// join the members group
leaderSelector.start();

// wait until the job A is done among all members
leaderSelector.close();

When we start the leader selector, our node joins a members group within the path /mutex/select/leader/for/job/A. Once our node becomes the leader, the takeLeadership method will be invoked, and we as leaders can resume the job.

7.2. Shared Locks

The Shared Lock recipe is about having a fully distributed lock:

CuratorFramework client = newClient();
client.start();
InterProcessSemaphoreMutex sharedLock = new InterProcessSemaphoreMutex(
  client, "/mutex/process/A");

sharedLock.acquire();

// do process A

sharedLock.release();

When we acquire the lock, Zookeeper ensures that there’s no other application acquiring the same lock at the same time.

7.3. Counters

The Counters recipe coordinates a shared Integer among all the clients:

CuratorFramework client = newClient();
client.start();

SharedCount counter = new SharedCount(client, "/counters/A", 0);
counter.start();

counter.setCount(counter.getCount() + 1);

assertThat(counter.getCount()).isEqualTo(1);

In this example, Zookeeper stores the Integer value in the path /counters/A and initializes the value to 0 if the path has not been created yet.

8. Conclusion

In this article, we’ve seen how to use Apache Curator to connect to Apache Zookeeper and take benefit of its main features.

We’ve also introduced a few of the main recipes in Curator.

As usual, sources can be found over on GitHub.