1. Overview

In Kafka, consumers read messages from partitions. While reading messages, there are some concerns to consider, like determining which messages to read from the partitions or, preventing duplicate message reading or message loss in case of failure. The solution to these concerns is the use of offsets.

In this tutorial, we’ll learn about offsets in Kafka. We’ll see how to commit offsets to manage message consumption and discuss its methods and drawbacks.

2. What Is Offset?

We know that Kafka stores messages in topics, and each topic can have multiple partitions. Each consumer reads messages from one partition of a topic. Here, Kafka, with the help of offsets, keeps track of the messages that consumers read. Offsets are integers starting from zero that increment by one as the message gets stored.

Let’s say one consumer has read five messages from a partition. Then, based on configuration, Kafka marks the offset till 4 as committed(zero-based sequence). The consumer consumes messages with offsets 5 onwards the next time it attempts to read messages.

Without offsets, there is no way to avoid duplicate processing or data loss. That’s why it’s so crucial.

We can make an analogy with database storage. In a database, we commit after executing SQL statements to persist the changes. In the same way, after reading from the partition, we commit offsets to mark the position of the processed message.

3. Ways to Commit Offsets

There are four ways to commit offsets. We’ll look at each in detail and discuss their use cases, advantages, and disadvantages.

Let’s start by adding the Kafka Client API dependency in the pom.xml:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.1</version>
</dependency>

3.1. Auto Commit

This is the simplest way to commit offsets. Kafka, by default, uses auto-commit – at every five seconds it commits the largest offset returned by the poll() method. poll() returns a set of messages with a timeout of 10 seconds, as we can see in the code:

KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<Long, String> message : messages) {
  // processed message
}

The problem with auto-commit is that there is a very high chance of data loss in case of application failure. When poll() returns the messages, Kafka may commit the largest offset before processing messages.

Let’s say poll() returns 100 messages, and the consumer processes 60 messages when the auto-commit happens. Then, due to some failure, the consumer crashes. When a new consumer goes live to read messages, it commences reading from offset 101, resulting in the loss of messages between 61 and 100.

Thus, we need other ways where this drawback isn’t present. The answer is manual commit.

3.2. Manual Sync Commit

In manual commits, whether sync or async,  it’s necessary to disable auto-commit by setting the default property (enabled.auto.commit property) to false:

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

After disabling the manual commit, let’s now understand the use of commitSync():

KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
  //process the messages
consumer.commitSync();

This method prevents data loss by committing the offset only after processing the messages. However, it doesn’t prevent duplicate reading when a consumer crashes before committing the offset. Besides this, it also impacts application performance.

The commitSync() blocks the code until it completes. Also, in case of an error, it keeps on retrying. This decreases the throughput of the application, which we don’t want. So, Kafka provides another solution, async commit, that deals with these drawbacks.

3.3. Manual Async Commit

Kafka provides commitAsync() to commit offsets asynchronously. It overcomes the performance overhead of manual sync commits by committing offsets in different threads. Let’s implement an async commit to understand this:

KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props); 
consumer.subscribe(KafkaConfigProperties.getTopic()); 
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
  //process the messages
consumer.commitAsync();

The problem with the async commit is that it doesn’t retry in case of failure. It relies on the next call of commitAsync(), which will commit the latest offset.

Suppose 300 is the largest offset we want to commit, but our commitAsync() fails due to some issue. It could be possible that before it retries, another call of commitAsync() commits the largest offset of 400 as it is asynchronous. When failed commitAsync() retries and if it commits offsets 300 successfully, it will overwrite the previous commit of 400, resulting in duplicate reading. That is why commitAsync() doesn’t retry.

3.4. Commit Specific Offset

Sometimes, we need to take more control over offsets. Let’s say we’re processing the messages in small batches and want to commit the offsets as soon as messages are processed. We can use the overloaded method of commitSync() and commitAsync() that takes a map argument to commit the specific offset:

KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int messageProcessed = 0;
  while (true) {
    ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
    for (ConsumerRecord<Long, String> message : messages) {
        // processed one message
      messageProcessed++;
      currentOffsets.put(
          new TopicPartition(message.topic(), message.partition()),
          new OffsetAndMetadata(message.offset() + 1));
      if (messageProcessed%50==0){
        consumer.commitSync(currentOffsets);
      }
    }
  }

In this code, we manage a currentOffsets map, which takes TopicPartition as key and OffsetAndMetadata as value. We insert the TopicPartition and OffsetAndMetadata of processed messages during message processing into the currentOffsets map. When the number of processed messages reaches fifty, we call commitSync() with the currentOffsets map to mark these messages as committed.

The behavior of this way is the same as sync and async commit. The only difference is that here we’re deciding the offsets to be committed not Kafka.

4. Conclusion

In this article, we learned about the offset and its importance in Kafka. Further, we explored the four ways to commit the offsets, both manual and automatic. Lastly, we analyzed their respective pros and cons. We can conclude that there is no definitive best way to commit in Kafka; rather, it depends on the specific use cases.

All the code examples used in this article are available over on GitHub.