1. Overview

In this article, we’ll explore a few strategies to purge data from an Apache Kafka topic.

2. Clean-Up Scenario

Before we learn the strategies to clean up the data, let’s acquaint ourselves with a simple scenario that demands a purging activity.

2.1. Scenario

Messages in Apache Kafka automatically expire after a configured retention time. Nonetheless, in a few cases, we might want the message deletion to happen immediately.

Let’s imagine that a defect has been introduced in the application code that is producing messages in a Kafka topic. By the time a bug-fix is integrated, we already have many corrupt messages in the Kafka topic that are ready for consumption.

Such issues are most common in a development environment, and we want quick results. So, bulk deletion of messages is a rational thing to do.

2.2. Simulation

To simulate the scenario, let’s start by creating a purge-scenario topic from the Kafka installation directory:

$ bin/kafka-topics.sh \
  --create --topic purge-scenario --if-not-exists \
  --partitions 2 --replication-factor 1 \
  --zookeeper localhost:2181

Next, let’s use the shuf command to generate random data and feed it to the kafka-console-producer.sh script:

$ /usr/bin/shuf -i 1-100000 -n 50000000 \
  | tee -a /tmp/kafka-random-data \
  | bin/kafka-console-producer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario

We must note that we’ve used the tee command to save the simulation data for later use.

Finally, let’s verify that a consumer can consume messages from the topic:

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --max-messages 3
76696
49425
1744
Processed a total of 3 messages

3. Message Expiry

The messages produced in the purge-scenario topic will have a default retention period of seven days. To purge messages, we can temporarily reset the retention.ms topic-level property to ten seconds and wait for messages to expire:

$ bin/kafka-configs.sh --alter \
  --add-config retention.ms=10000 \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario \
  && sleep 10

Next, let’s verify that the messages have expired from the topic:

$ bin/kafka-console-consumer.sh  \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --max-messages 1 --timeout-ms 1000
[2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

Finally, we can restore the original retention period of seven days for the topic:

$ bin/kafka-configs.sh --alter \
  --add-config retention.ms=604800000 \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario

With this approach, Kafka will purge messages across all the partitions for the purge-scenario topic.

4. Selective Record Deletion

At times, we might want to delete records selectively within one or more partitions from a specific topic. We can satisfy such requirements by using the kafka-delete-records.sh script.

First, we need to specify the partition-level offset in the delete-config.json configuration file.

Let’s purge all messages from the partition=1 by using offset=-1:

{
  "partitions": [
    {
      "topic": "purge-scenario",
      "partition": 1,
      "offset": -1
    }
  ],
  "version": 1
}

Next, let’s proceed with record deletion:

$ bin/kafka-delete-records.sh \
  --bootstrap-server localhost:9092 \
  --offset-json-file delete-config.json

We can verify that we’re still able to read from partition=0:

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario --partition=0 \
  --max-messages 1 --timeout-ms 1000
  44017
  Processed a total of 1 messages

However, when we read from partition=1, there will be no records to process:

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --partition=1 \
  --max-messages 1 --timeout-ms 1000
[2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

5. Delete and Recreate the Topic

Another workaround to purge all messages of a Kafka topic is to delete and recreate it. However, this is only possible if we set the delete.topic.enable property to true while starting the Kafka server:

$ bin/kafka-server-start.sh config/server.properties \
  --override delete.topic.enable=true

To delete the topic, we can use the kafka-topics.sh script:

$ bin/kafka-topics.sh \
  --delete --topic purge-scenario \
  --zookeeper localhost:2181
Topic purge-scenario is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

Let’s verify it by listing the topic:

$ bin/kafka-topics.sh --zookeeper localhost:2181 --list

After confirming that the topic is no longer listed, we can now go ahead and recreate it.

6. Conclusion

In this tutorial, we simulated a scenario where we’d need to purge an Apache Kafka topic. Moreover, we explored multiple strategies to purge it completely or selectively across partitions.