1. 概述

在实现Kafka生产者或消费者(例如,使用Spring时)时,我们需要配置一个名为"bootstrap.servers"的属性[1]。

在这篇教程中,我们将了解这个设置的含义及其用途。

2. Kafka架构

Kafka的设计旨在实现可扩展性和高可用性。因此,它由一组服务器(称为broker)组成,这些服务器之间复制主题分区。每个分区有一个领导者broker和多个跟随者。

生产者将消息发送到分区领导者,然后将记录传播到每个副本。消费者通常也会连接到分区领导者,因为消费消息会改变消费者的偏移量(consumer offset)。

复制因子(replication factor)是副本的数量。推荐值为3,因为它能在性能和容错性之间找到合适的平衡,而且云提供商通常会在一个区域中提供三个数据中心(availability zones)用于部署。

以下是一个示例,展示了包含两个分区和复制因子为3的四个broker的集群:

当一个分区领导者发生故障时,Kafka会选择另一个broker作为新的分区领导者。这时,消费者和生产者(即客户端)也需要切换到新的领导者。如果broker 1发生故障,场景可能会变为:

3. 启动与发现

如前所见,整个集群是动态的,客户端需要知道当前的拓扑状态,以便连接到正确的分区领导者来发送和接收消息。这就是启动服务(bootstrapping)的作用所在。

"bootstrap.servers"配置是一个包含"hostname:port"对的列表,其中指定了一个或多个(甚至所有)broker的地址。客户端通过以下步骤使用这个列表:

  1. 从列表中选择第一个broker。
  2. 向该broker发送请求,获取包含有关主题、分区以及每个分区的领导者broker信息的集群元数据(每个broker都可以提供这种元数据)。
  3. 连接到所选主题分区的领导者broker。

当然,在列表中指定多个broker是有意义的,因为如果第一个broker不可用,客户端可以选择第二个来进行启动。

Kafka使用Kraft(早期的Zookeeper)来管理此类协调工作。

4. 示例

假设我们在开发环境中使用了一个简单的Docker镜像,包含了Kafka和Kraft(例如,bashj79/kafka-kraft)。我们可以使用以下命令安装这个Docker镜像:

docker run -p 9092:9092 -d bashj79/kafka-kraft

这将在容器内部及主机上运行一个通过端口9092访问的单个Kafka实例。

4.1. 使用Kafka命令行工具

连接到Kafka的一种方法是使用内置的Kafka命令行工具,该工具随Kafka安装提供。首先,我们创建一个名为samples的主题。在容器的Bash中,可以运行以下命令:

$ cd /opt/kafka/bin
$ sh kafka-topics.sh --bootstrap-server localhost:9092 --create --topic samples --partitions 1 --replication-factor 1

如果我们想开始消费这个主题,我们需要再次指定bootstrap服务器:

$ sh kafka-console-consumer.sh --bootstrap-server localhost:9092,another-host.com:29092 --topic samples

我们还可以探索元数据,就像虚拟文件系统一样。我们通过kafka-metadata-shell脚本连接到元数据:

$ sh kafka-metadata-shell.sh --snapshot /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000167.log

4.2. 使用Java

在Java应用程序中,我们可以使用Kafka客户端

static Consumer<Long, String> createConsumer() {
    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
        "localhost:9092,another-host.com:29092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, 
        "MySampleConsumer");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
        LongDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class.getName());
      // Create the consumer using props.
      final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
      // Subscribe to the topic.
      consumer.subscribe(Arrays.asList("samples"));
      return consumer;
  }

使用Spring Boot和Spring的Kafka集成,我们只需简单地在application.properties中进行配置

spring.kafka.bootstrap-servers=localhost:9092,another-host.com:29092

5. 总结

在这篇文章中,我们了解到Kafka是一个由多个broker组成的分布式系统,它们复制主题分区以确保高可用性、可扩展性和容错性。

客户端需要从一个broker获取元数据,以找到当前的分区领导者并连接到它。这个broker就是引导服务器,我们通常提供一个引导服务器列表,以防主服务器不可用时提供备选方案。

如往常一样,所有的代码实现可以在GitHub上找到。