1. 概述

在本文中,我们将介绍几种清理 Apache Kafka Topic 中数据的策略。这些策略适用于不同的场景,比如需要立即清除错误消息、测试环境重置数据,或是为了节省存储空间。

2. 数据清理场景

在学习具体的清理策略之前,我们先来看一个常见的清理场景。

2.1. 场景说明

Kafka 中的消息默认会在配置的 retention time 后自动过期。但在某些情况下,我们可能希望立即删除消息

举个例子:应用代码中引入了一个 bug,导致向 Kafka Topic 中发送了大量无效消息。在修复上线之前,这些消息已经堆积在 Topic 中,等待被消费。这种情况下,快速批量删除消息是最直接有效的处理方式。

这类问题在开发或测试环境中尤为常见,因此快速清空 Topic 成为一种合理操作。

2.2. 模拟场景

我们先创建一个名为 purge-scenario 的 Topic:

$ bin/kafka-topics.sh \
  --create --topic purge-scenario --if-not-exists \
  --partitions 2 --replication-factor 1 \
  --zookeeper localhost:2181

然后使用 shuf 命令生成随机数据,并通过 kafka-console-producer.sh 发送到 Topic 中:

$ /usr/bin/shuf -i 1-100000 -n 50000000 \
  | tee -a /tmp/kafka-random-data \
  | bin/kafka-console-producer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario

⚠️ 注意:我们使用了 tee 命令将生成的数据保存下来,方便后续使用。

验证消费者是否能正常消费消息:

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --max-messages 3
76696
49425
1744
Processed a total of 3 messages

✅ 成功消费,说明数据已经写入。

3. 消息过期策略

Kafka Topic 中的消息默认保留 7 天(604800000 ms)。如果我们要清理这些消息,可以通过临时修改 retention.ms 配置项,让消息快速过期:

$ bin/kafka-configs.sh --alter \
  --add-config retention.ms=10000 \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario \
  && sleep 10

检查消息是否已过期:

$ bin/kafka-console-consumer.sh  \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --max-messages 1 --timeout-ms 1000
[2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

✅ 消息已过期无法消费。

恢复默认的保留时间:

$ bin/kafka-configs.sh --alter \
  --add-config retention.ms=604800000 \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario

这种方式会清理 Topic 中所有分区的数据,适合全量清理。

4. 分区级选择性删除

有时候我们只想删除某个 Topic 中特定分区的数据。可以使用 kafka-delete-records.sh 工具来实现。

首先,创建一个 delete-config.json 文件,指定要删除的分区及偏移量:

{
  "partitions": [
    {
      "topic": "purge-scenario",
      "partition": 1,
      "offset": -1
    }
  ],
  "version": 1
}

⚠️ offset: -1 表示删除该分区中所有消息。

执行删除命令:

$ bin/kafka-delete-records.sh \
  --bootstrap-server localhost:9092 \
  --offset-json-file delete-config.json

验证 partition=0 是否仍有数据:

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario --partition=0 \
  --max-messages 1 --timeout-ms 1000
44017
Processed a total of 1 messages

✅ 仍有数据。

验证 partition=1 是否已清空:

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --partition=1 \
  --max-messages 1 --timeout-ms 1000
[2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

✅ 分区 1 已成功清空。

5. 删除并重建 Topic

如果 Kafka 配置中启用了 delete.topic.enable=true,我们可以直接删除 Topic,再重新创建,达到清理目的。

启动 Kafka 时启用 Topic 删除功能:

$ bin/kafka-server-start.sh config/server.properties \
  --override delete.topic.enable=true

删除 Topic:

$ bin/kafka-topics.sh \
  --delete --topic purge-scenario \
  --zookeeper localhost:2181
Topic purge-scenario is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

确认 Topic 已删除:

$ bin/kafka-topics.sh --zookeeper localhost:2181 --list

如果列表中已无 purge-scenario,说明删除成功,之后可以重新创建。

⚠️ 注意:此方法适用于允许重建 Topic 的场景,生产环境慎用

6. 总结

本文通过模拟场景,介绍了清理 Kafka Topic 的几种常见方式:

  • ✅ 通过设置 retention.ms 实现消息自动过期
  • ✅ 使用 kafka-delete-records.sh 实现分区级选择性删除
  • ✅ 直接删除并重建 Topic(需配置支持)

根据实际需求选择合适的方法,既高效又安全。


原始标题:Guide to Purging an Apache Kafka Topic