1. Overview
In this article, we’ll explore a few strategies to purge data from an Apache Kafka topic.
2. Clean-Up Scenario
Before we learn the strategies to clean up the data, let’s acquaint ourselves with a simple scenario that demands a purging activity.
2.1. Scenario
Messages in Apache Kafka automatically expire after a configured retention time. Nonetheless, in a few cases, we might want the message deletion to happen immediately.
Let’s imagine that a defect has been introduced in the application code that is producing messages in a Kafka topic. By the time a bug-fix is integrated, we already have many corrupt messages in the Kafka topic that are ready for consumption.
Such issues are most common in a development environment, and we want quick results. So, bulk deletion of messages is a rational thing to do.
2.2. Simulation
To simulate the scenario, let’s start by creating a purge-scenario topic from the Kafka installation directory:
$ bin/kafka-topics.sh \
--create --topic purge-scenario --if-not-exists \
--partitions 2 --replication-factor 1 \
--zookeeper localhost:2181
Next, let’s use the shuf command to generate random data and feed it to the kafka-console-producer.sh script:
$ /usr/bin/shuf -i 1-100000 -n 50000000 \
| tee -a /tmp/kafka-random-data \
| bin/kafka-console-producer.sh \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario
We must note that we’ve used the tee command to save the simulation data for later use.
Finally, let’s verify that a consumer can consume messages from the topic:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--max-messages 3
76696
49425
1744
Processed a total of 3 messages
3. Message Expiry
The messages produced in the purge-scenario topic will have a default retention period of seven days. To purge messages, we can temporarily reset the retention.ms topic-level property to ten seconds and wait for messages to expire:
$ bin/kafka-configs.sh --alter \
--add-config retention.ms=10000 \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario \
&& sleep 10
Next, let’s verify that the messages have expired from the topic:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--max-messages 1 --timeout-ms 1000
[2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
Finally, we can restore the original retention period of seven days for the topic:
$ bin/kafka-configs.sh --alter \
--add-config retention.ms=604800000 \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario
With this approach, Kafka will purge messages across all the partitions for the purge-scenario topic.
4. Selective Record Deletion
At times, we might want to delete records selectively within one or more partitions from a specific topic. We can satisfy such requirements by using the kafka-delete-records.sh script.
First, we need to specify the partition-level offset in the delete-config.json configuration file.
Let’s purge all messages from the partition=1 by using offset=-1:
{
"partitions": [
{
"topic": "purge-scenario",
"partition": 1,
"offset": -1
}
],
"version": 1
}
Next, let’s proceed with record deletion:
$ bin/kafka-delete-records.sh \
--bootstrap-server localhost:9092 \
--offset-json-file delete-config.json
We can verify that we’re still able to read from partition=0:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario --partition=0 \
--max-messages 1 --timeout-ms 1000
44017
Processed a total of 1 messages
However, when we read from partition=1, there will be no records to process:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--partition=1 \
--max-messages 1 --timeout-ms 1000
[2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
5. Delete and Recreate the Topic
Another workaround to purge all messages of a Kafka topic is to delete and recreate it. However, this is only possible if we set the delete.topic.enable property to true while starting the Kafka server:
$ bin/kafka-server-start.sh config/server.properties \
--override delete.topic.enable=true
To delete the topic, we can use the kafka-topics.sh script:
$ bin/kafka-topics.sh \
--delete --topic purge-scenario \
--zookeeper localhost:2181
Topic purge-scenario is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Let’s verify it by listing the topic:
$ bin/kafka-topics.sh --zookeeper localhost:2181 --list
After confirming that the topic is no longer listed, we can now go ahead and recreate it.
6. Conclusion
In this tutorial, we simulated a scenario where we’d need to purge an Apache Kafka topic. Moreover, we explored multiple strategies to purge it completely or selectively across partitions.