1. 概述

当生产者向 Apache Kafka 发送一条消息时,该消息会被追加到日志文件中,并在配置的时间段内保留。

在本篇文章中,我们将重点介绍如何 为 Kafka 主题配置基于时间的消息保留策略

2. 基于时间的保留机制

通过设置保留时间相关的属性,每条消息都会有一个 TTL(Time To Live)。一旦超过这个时间,消息就会被标记为可删除状态,从而释放磁盘空间。

同一个 Kafka 主题下的所有消息都遵循相同的保留策略。此外,✅我们可以在创建主题之前配置这些属性,也可以在主题创建后运行时动态修改

接下来的内容中,我们会学习如何通过 Broker 级别的配置来设定新主题的默认保留时间,并通过主题级别的配置实现在运行时进行调整。

3. Broker 级别配置

Apache Kafka 支持在 Broker 层面统一设置保留策略,可以通过以下三个时间相关属性中的任意一个来进行配置:

  • log.retention.hours
  • log.retention.minutes
  • log.retention.ms

⚠️需要注意的是:Kafka 会以精度更高的配置为准。也就是说,**log.retention.ms 的优先级最高**。

3.1. 默认配置查看

首先,我们可以通过执行 grep 命令来查看 Kafka 配置文件中的默认保留时间设置:

$ grep -i 'log.retention.[hms].*\=' config/server.properties
log.retention.hours=168

可以看到,默认情况下消息保留时间为 7 天(168 小时)

如果我们只想保留消息 10 分钟,可以修改 config/server.properties 文件中的如下配置项:

log.retention.minutes=10

3.2. 新建主题时的保留时间验证

Kafka 提供了多个 Shell 脚本来执行管理操作。为了方便演示,我们先创建一个辅助脚本 functions.sh,用于封装一些常用的命令。

functions.sh 中添加两个函数,分别用于创建主题和查看主题配置:

function create_topic {
    topic_name="$1"
    bin/kafka-topics.sh --create --topic ${topic_name} --if-not-exists \
      --partitions 1 --replication-factor 1 \
      --zookeeper localhost:2181
}

function describe_topic_config {
    topic_name="$1"
    ./bin/kafka-configs.sh --describe --all \
      --bootstrap-server=0.0.0.0:9092 \
      --topic ${topic_name}
}

接着创建两个独立脚本:create-topic.shget-topic-retention-time.sh

bash-5.1# cat create-topic.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
create_topic "${topic_name}"
exit $?
bash-5.1# cat get-topic-retention-time.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
describe_topic_config "${topic_name}" | awk 'BEGIN{IFS="=";IRS=" "} /^[ ]*retention.ms/{print $1}'
exit $?

📌注意:describe_topic_config 会输出主题的所有配置信息,所以我们使用 awk 对结果进行了过滤,只显示 retention.ms 字段。

最后,启动 Kafka 环境并验证新建主题的保留时间是否生效:

bash-5.1# ./create-topic.sh test-topic
Created topic test-topic.
bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=600000

可以看到,retention.ms 设置为 600000(即 10 分钟),这正是从我们在 server.properties 中定义的 log.retention.minutes=10 推导出来的。

4. 主题级别配置

一旦 Kafka Broker 启动完成,**log.retention.{hours|minutes|ms} 这些 Broker 级别的配置就变成只读状态了**。

不过,我们可以对每个主题单独设置 retention.ms 来覆盖默认值。

functions.sh 中添加一个函数用于修改主题配置:

function alter_topic_config {
    topic_name="$1"
    config_name="$2"
    config_value="$3"
    ./bin/kafka-configs.sh --alter \
      --add-config ${config_name}=${config_value} \
      --bootstrap-server=0.0.0.0:9092 \
      --topic ${topic_name}
}

然后创建一个脚本 alter-topic-config.sh 来调用它:

#!/bin/sh
. ./functions.sh

alter_topic_retention_config $1 $2 $3
exit $?

现在我们将 test-topic 的保留时间改为 5 分钟(300000 毫秒)并验证:

bash-5.1# ./alter-topic-config.sh test-topic retention.ms 300000
Completed updating config for topic test-topic.

bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=300000

✅成功更新了主题级别的保留时间配置。

5. 验证消息过期机制

前面我们已经学会了如何配置消息保留时间,现在是时候验证一下消息是否真的会在保留时间到期后自动清除。

5.1. 生产者与消费者脚本准备

继续在 functions.sh 中添加生产和消费消息的函数:

function produce_message {
    topic_name="$1"
    message="$2"
    echo "${message}" | ./bin/kafka-console-producer.sh \
    --bootstrap-server=0.0.0.0:9092 \
    --topic ${topic_name}
}

function consume_message {
    topic_name="$1"
    timeout="$2"
    ./bin/kafka-console-consumer.sh \
    --bootstrap-server=0.0.0.0:9092 \
    --from-beginning \
    --topic ${topic_name} \
    --max-messages 1 \
    --timeout-ms $timeout
}

📌注意:消费者始终从头开始读取消息,因为我们希望它能读取 Kafka 中所有可用的消息。

接着创建两个独立脚本:

bash-5.1# cat producer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
message="$2"

produce_message ${topic_name} ${message}
exit $?
bash-5.1# cat consumer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
timeout="$2"

consume_message ${topic_name} $timeout
exit $?

5.2. 测试消息过期

准备好基础环境后,我们发送一条消息并立即消费两次:

bash-5.1# ./producer.sh "test-topic-2" "message1"
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages

✅可以看到,消费者可以重复读取到同一条消息。

然后我们等待 5 分钟(即保留时间),再尝试消费:

bash-5.1# sleep 300 && ./consumer.sh test-topic 10000
[2021-02-06 21:55:00,896] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

❌正如预期,消费者没有读取到任何消息,因为消息已超过保留时间被清理掉了。

6. 注意事项与限制

Kafka Broker 内部还有一个配置项叫做 log.retention.check.interval.ms,它决定了检查消息是否过期的频率。

⚠️为了确保保留策略能够及时生效,我们必须保证 log.retention.check.interval.ms 的值要小于任何主题的 retention.ms 值。

否则可能会出现消息明明已经过期,但由于检查频率太低而没有及时清理的问题。

7. 总结

在这篇文章中,我们深入探讨了 Apache Kafka 的基于时间的消息保留机制。我们通过编写简单的 Shell 脚本简化了日常管理任务,并搭建了一个简易的生产者/消费者模型来验证消息的自动清理行为。

✅掌握这些知识可以帮助你在实际项目中更好地控制 Kafka 消息的生命周期,避免磁盘空间被无用数据占用。


原始标题:Configuring Message Retention Period in Apache Kafka