1. 概述
当生产者向 Apache Kafka 发送一条消息时,该消息会被追加到日志文件中,并在配置的时间段内保留。
在本篇文章中,我们将重点介绍如何 为 Kafka 主题配置基于时间的消息保留策略。
2. 基于时间的保留机制
通过设置保留时间相关的属性,每条消息都会有一个 TTL(Time To Live)。一旦超过这个时间,消息就会被标记为可删除状态,从而释放磁盘空间。
同一个 Kafka 主题下的所有消息都遵循相同的保留策略。此外,✅我们可以在创建主题之前配置这些属性,也可以在主题创建后运行时动态修改。
接下来的内容中,我们会学习如何通过 Broker 级别的配置来设定新主题的默认保留时间,并通过主题级别的配置实现在运行时进行调整。
3. Broker 级别配置
Apache Kafka 支持在 Broker 层面统一设置保留策略,可以通过以下三个时间相关属性中的任意一个来进行配置:
log.retention.hours
log.retention.minutes
log.retention.ms
⚠️需要注意的是:Kafka 会以精度更高的配置为准。也就是说,**log.retention.ms
的优先级最高**。
3.1. 默认配置查看
首先,我们可以通过执行 grep
命令来查看 Kafka 配置文件中的默认保留时间设置:
$ grep -i 'log.retention.[hms].*\=' config/server.properties
log.retention.hours=168
可以看到,默认情况下消息保留时间为 7 天(168 小时)。
如果我们只想保留消息 10 分钟,可以修改 config/server.properties
文件中的如下配置项:
log.retention.minutes=10
3.2. 新建主题时的保留时间验证
Kafka 提供了多个 Shell 脚本来执行管理操作。为了方便演示,我们先创建一个辅助脚本 functions.sh
,用于封装一些常用的命令。
在 functions.sh
中添加两个函数,分别用于创建主题和查看主题配置:
function create_topic {
topic_name="$1"
bin/kafka-topics.sh --create --topic ${topic_name} --if-not-exists \
--partitions 1 --replication-factor 1 \
--zookeeper localhost:2181
}
function describe_topic_config {
topic_name="$1"
./bin/kafka-configs.sh --describe --all \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
接着创建两个独立脚本:create-topic.sh
和 get-topic-retention-time.sh
:
bash-5.1# cat create-topic.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
create_topic "${topic_name}"
exit $?
bash-5.1# cat get-topic-retention-time.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
describe_topic_config "${topic_name}" | awk 'BEGIN{IFS="=";IRS=" "} /^[ ]*retention.ms/{print $1}'
exit $?
📌注意:describe_topic_config
会输出主题的所有配置信息,所以我们使用 awk
对结果进行了过滤,只显示 retention.ms
字段。
最后,启动 Kafka 环境并验证新建主题的保留时间是否生效:
bash-5.1# ./create-topic.sh test-topic
Created topic test-topic.
bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=600000
可以看到,retention.ms
设置为 600000
(即 10 分钟),这正是从我们在 server.properties
中定义的 log.retention.minutes=10
推导出来的。
4. 主题级别配置
一旦 Kafka Broker 启动完成,**log.retention.{hours|minutes|ms}
这些 Broker 级别的配置就变成只读状态了**。
不过,我们可以对每个主题单独设置 retention.ms
来覆盖默认值。
在 functions.sh
中添加一个函数用于修改主题配置:
function alter_topic_config {
topic_name="$1"
config_name="$2"
config_value="$3"
./bin/kafka-configs.sh --alter \
--add-config ${config_name}=${config_value} \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
然后创建一个脚本 alter-topic-config.sh
来调用它:
#!/bin/sh
. ./functions.sh
alter_topic_retention_config $1 $2 $3
exit $?
现在我们将 test-topic
的保留时间改为 5 分钟(300000 毫秒)并验证:
bash-5.1# ./alter-topic-config.sh test-topic retention.ms 300000
Completed updating config for topic test-topic.
bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=300000
✅成功更新了主题级别的保留时间配置。
5. 验证消息过期机制
前面我们已经学会了如何配置消息保留时间,现在是时候验证一下消息是否真的会在保留时间到期后自动清除。
5.1. 生产者与消费者脚本准备
继续在 functions.sh
中添加生产和消费消息的函数:
function produce_message {
topic_name="$1"
message="$2"
echo "${message}" | ./bin/kafka-console-producer.sh \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
function consume_message {
topic_name="$1"
timeout="$2"
./bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning \
--topic ${topic_name} \
--max-messages 1 \
--timeout-ms $timeout
}
📌注意:消费者始终从头开始读取消息,因为我们希望它能读取 Kafka 中所有可用的消息。
接着创建两个独立脚本:
bash-5.1# cat producer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
message="$2"
produce_message ${topic_name} ${message}
exit $?
bash-5.1# cat consumer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
timeout="$2"
consume_message ${topic_name} $timeout
exit $?
5.2. 测试消息过期
准备好基础环境后,我们发送一条消息并立即消费两次:
bash-5.1# ./producer.sh "test-topic-2" "message1"
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
✅可以看到,消费者可以重复读取到同一条消息。
然后我们等待 5 分钟(即保留时间),再尝试消费:
bash-5.1# sleep 300 && ./consumer.sh test-topic 10000
[2021-02-06 21:55:00,896] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
❌正如预期,消费者没有读取到任何消息,因为消息已超过保留时间被清理掉了。
6. 注意事项与限制
Kafka Broker 内部还有一个配置项叫做 log.retention.check.interval.ms
,它决定了检查消息是否过期的频率。
⚠️为了确保保留策略能够及时生效,我们必须保证 log.retention.check.interval.ms
的值要小于任何主题的 retention.ms
值。
否则可能会出现消息明明已经过期,但由于检查频率太低而没有及时清理的问题。
7. 总结
在这篇文章中,我们深入探讨了 Apache Kafka 的基于时间的消息保留机制。我们通过编写简单的 Shell 脚本简化了日常管理任务,并搭建了一个简易的生产者/消费者模型来验证消息的自动清理行为。
✅掌握这些知识可以帮助你在实际项目中更好地控制 Kafka 消息的生命周期,避免磁盘空间被无用数据占用。