1. 概述
在本文中,我们将介绍几种清理 Apache Kafka Topic 中数据的策略。这些策略适用于不同的场景,比如需要立即清除错误消息、测试环境重置数据,或是为了节省存储空间。
2. 数据清理场景
在学习具体的清理策略之前,我们先来看一个常见的清理场景。
2.1. 场景说明
Kafka 中的消息默认会在配置的 retention time 后自动过期。但在某些情况下,我们可能希望立即删除消息。
举个例子:应用代码中引入了一个 bug,导致向 Kafka Topic 中发送了大量无效消息。在修复上线之前,这些消息已经堆积在 Topic 中,等待被消费。这种情况下,快速批量删除消息是最直接有效的处理方式。
这类问题在开发或测试环境中尤为常见,因此快速清空 Topic 成为一种合理操作。
2.2. 模拟场景
我们先创建一个名为 purge-scenario
的 Topic:
$ bin/kafka-topics.sh \
--create --topic purge-scenario --if-not-exists \
--partitions 2 --replication-factor 1 \
--zookeeper localhost:2181
然后使用 shuf
命令生成随机数据,并通过 kafka-console-producer.sh
发送到 Topic 中:
$ /usr/bin/shuf -i 1-100000 -n 50000000 \
| tee -a /tmp/kafka-random-data \
| bin/kafka-console-producer.sh \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario
⚠️ 注意:我们使用了 tee
命令将生成的数据保存下来,方便后续使用。
验证消费者是否能正常消费消息:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--max-messages 3
76696
49425
1744
Processed a total of 3 messages
✅ 成功消费,说明数据已经写入。
3. 消息过期策略
Kafka Topic 中的消息默认保留 7 天(604800000 ms)。如果我们要清理这些消息,可以通过临时修改 retention.ms
配置项,让消息快速过期:
$ bin/kafka-configs.sh --alter \
--add-config retention.ms=10000 \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario \
&& sleep 10
检查消息是否已过期:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--max-messages 1 --timeout-ms 1000
[2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
✅ 消息已过期无法消费。
恢复默认的保留时间:
$ bin/kafka-configs.sh --alter \
--add-config retention.ms=604800000 \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario
这种方式会清理 Topic 中所有分区的数据,适合全量清理。
4. 分区级选择性删除
有时候我们只想删除某个 Topic 中特定分区的数据。可以使用 kafka-delete-records.sh
工具来实现。
首先,创建一个 delete-config.json
文件,指定要删除的分区及偏移量:
{
"partitions": [
{
"topic": "purge-scenario",
"partition": 1,
"offset": -1
}
],
"version": 1
}
⚠️ offset: -1
表示删除该分区中所有消息。
执行删除命令:
$ bin/kafka-delete-records.sh \
--bootstrap-server localhost:9092 \
--offset-json-file delete-config.json
验证 partition=0
是否仍有数据:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario --partition=0 \
--max-messages 1 --timeout-ms 1000
44017
Processed a total of 1 messages
✅ 仍有数据。
验证 partition=1
是否已清空:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--partition=1 \
--max-messages 1 --timeout-ms 1000
[2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
✅ 分区 1 已成功清空。
5. 删除并重建 Topic
如果 Kafka 配置中启用了 delete.topic.enable=true
,我们可以直接删除 Topic,再重新创建,达到清理目的。
启动 Kafka 时启用 Topic 删除功能:
$ bin/kafka-server-start.sh config/server.properties \
--override delete.topic.enable=true
删除 Topic:
$ bin/kafka-topics.sh \
--delete --topic purge-scenario \
--zookeeper localhost:2181
Topic purge-scenario is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
确认 Topic 已删除:
$ bin/kafka-topics.sh --zookeeper localhost:2181 --list
如果列表中已无 purge-scenario
,说明删除成功,之后可以重新创建。
⚠️ 注意:此方法适用于允许重建 Topic 的场景,生产环境慎用。
6. 总结
本文通过模拟场景,介绍了清理 Kafka Topic 的几种常见方式:
- ✅ 通过设置
retention.ms
实现消息自动过期 - ✅ 使用
kafka-delete-records.sh
实现分区级选择性删除 - ✅ 直接删除并重建 Topic(需配置支持)
根据实际需求选择合适的方法,既高效又安全。