1. 概述
在实现Kafka生产者或消费者(例如,使用Spring时)时,我们需要配置一个名为"bootstrap.servers"
的属性[1]。
在这篇教程中,我们将了解这个设置的含义及其用途。
2. Kafka架构
Kafka的设计旨在实现可扩展性和高可用性。因此,它由一组服务器(称为broker)组成,这些服务器之间复制主题分区。每个分区有一个领导者broker和多个跟随者。
生产者将消息发送到分区领导者,然后将记录传播到每个副本。消费者通常也会连接到分区领导者,因为消费消息会改变消费者的偏移量(consumer offset)。
复制因子(replication factor)是副本的数量。推荐值为3,因为它能在性能和容错性之间找到合适的平衡,而且云提供商通常会在一个区域中提供三个数据中心(availability zones)用于部署。
以下是一个示例,展示了包含两个分区和复制因子为3的四个broker的集群:
当一个分区领导者发生故障时,Kafka会选择另一个broker作为新的分区领导者。这时,消费者和生产者(即客户端)也需要切换到新的领导者。如果broker 1发生故障,场景可能会变为:
3. 启动与发现
如前所见,整个集群是动态的,客户端需要知道当前的拓扑状态,以便连接到正确的分区领导者来发送和接收消息。这就是启动服务(bootstrapping)的作用所在。
"bootstrap.servers"
配置是一个包含"hostname:port"对的列表,其中指定了一个或多个(甚至所有)broker的地址。客户端通过以下步骤使用这个列表:
- 从列表中选择第一个broker。
- 向该broker发送请求,获取包含有关主题、分区以及每个分区的领导者broker信息的集群元数据(每个broker都可以提供这种元数据)。
- 连接到所选主题分区的领导者broker。
当然,在列表中指定多个broker是有意义的,因为如果第一个broker不可用,客户端可以选择第二个来进行启动。
Kafka使用Kraft(早期的Zookeeper)来管理此类协调工作。
4. 示例
假设我们在开发环境中使用了一个简单的Docker镜像,包含了Kafka和Kraft(例如,bashj79/kafka-kraft)。我们可以使用以下命令安装这个Docker镜像:
docker run -p 9092:9092 -d bashj79/kafka-kraft
这将在容器内部及主机上运行一个通过端口9092访问的单个Kafka实例。
4.1. 使用Kafka命令行工具
连接到Kafka的一种方法是使用内置的Kafka命令行工具,该工具随Kafka安装提供。首先,我们创建一个名为samples
的主题。在容器的Bash中,可以运行以下命令:
$ cd /opt/kafka/bin
$ sh kafka-topics.sh --bootstrap-server localhost:9092 --create --topic samples --partitions 1 --replication-factor 1
如果我们想开始消费这个主题,我们需要再次指定bootstrap服务器:
$ sh kafka-console-consumer.sh --bootstrap-server localhost:9092,another-host.com:29092 --topic samples
我们还可以探索元数据,就像虚拟文件系统一样。我们通过kafka-metadata-shell
脚本连接到元数据:
$ sh kafka-metadata-shell.sh --snapshot /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000167.log
4.2. 使用Java
在Java应用程序中,我们可以使用Kafka客户端:
static Consumer<Long, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092,another-host.com:29092");
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"MySampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// Create the consumer using props.
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Arrays.asList("samples"));
return consumer;
}
使用Spring Boot和Spring的Kafka集成,我们只需简单地在application.properties
中进行配置:
spring.kafka.bootstrap-servers=localhost:9092,another-host.com:29092
5. 总结
在这篇文章中,我们了解到Kafka是一个由多个broker组成的分布式系统,它们复制主题分区以确保高可用性、可扩展性和容错性。
客户端需要从一个broker获取元数据,以找到当前的分区领导者并连接到它。这个broker就是引导服务器,我们通常提供一个引导服务器列表,以防主服务器不可用时提供备选方案。
如往常一样,所有的代码实现可以在GitHub上找到。