1. 概述

在使用Apache Kafka集群的事件驱动系统中,我们通常需要获取活跃代理的列表。在这篇教程中,我们将探索几个用于获取运行集群中活跃代理列表的shell命令。

2. 设置

为了本文的目的,我们将使用以下 docker-compose.yml 文件来设置一个两节点Kafka集群

$ cat docker-compose.yml
---
version: '2'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181
  
  kafka-1:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper-1
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  kafka-2:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper-1
    ports:
      - 39092:39092
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

现在,让我们使用 docker-compose 命令启动Kafka集群:

$ docker-compose up -d

我们可以验证Zookeeper服务器正在端口 2181 上监听,而Kafka代理分别在端口 2909239092 上监听:

$ ports=(2181 29092 39092)
$ for port in $ports
do
nc -z localhost $port
done
Connection to localhost port 2181 [tcp/eforward] succeeded!
Connection to localhost port 29092 [tcp/*] succeeded!
Connection to localhost port 39092 [tcp/*] succeeded!

3. 使用Zookeeper API

在Kafka集群中,Zookeeper服务器存储与Kafka代理服务器相关的元数据。因此,我们将使用Zookeeper暴露的文件系统API获取代理详细信息。

3.1. zookeeper-shell 命令

大多数Kafka发行版都附带了 zookeeper-shellzookeeper-shell.sh 可执行文件。因此,这是与Zookeeper服务器交互的既定标准。

首先,让我们连接到运行在 localhost:2181 的Zookeeper服务器:

$ /usr/local/bin/zookeeper-shell localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!

一旦连接到Zookeeper服务器,我们就可以执行诸如 ls 等典型文件系统命令以获取服务器中存储的元数据信息。让我们找出当前存活的代理的ID:

ls /brokers/ids
[1, 2]

我们可以看到当前有两个活跃的代理,ID分别为1和2。使用 get 命令,我们可以为具有给定ID的特定代理检索更多详细信息:

get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1:9092","PLAINTEXT_HOST://localhost:29092"],"jmx_port":-1,"port":9092,"host":"kafka-1","version":5,"timestamp":"1625336133848"}
get /brokers/ids/2
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-2:9092","PLAINTEXT_HOST://localhost:39092"],"jmx_port":-1,"port":9092,"host":"kafka-2","version":5,"timestamp":"1625336133967"}

请注意,ID为 1 的代理正在端口 29092 上监听,而第二个ID为 2 的代理正在端口 39092 上监听。

最后,要退出Zookeeper控制台,我们可以使用 quit 命令:

quit

3.2. zkCli 命令

就像Kafka发行版附带了 zookeeper-shell 可执行文件一样,Zookeeper发行版也附带了 zkClizkCli.sh 可执行文件。

因此,zkCli 交互的方式完全类似于与 zookeeper-shell 交互,让我们继续确认我们能够获取具有ID为 1 的代理所需的详细信息:

$ zkCli -server localhost:2181 get /brokers/ids/1
Connecting to localhost:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1:9092","PLAINTEXT_HOST://localhost:29092"],"jmx_port":-1,"port":9092,"host":"kafka-1","version":5,"timestamp":"1625336133848"}

如预期的那样,我们可以通过 zookeeper-shell 获取的代理详细信息与通过 zkCli 获取的详细信息相匹配。

4. 使用代理版本API

有时,我们可能有一个不完整的活跃代理列表,并且希望获取集群中的所有可用代理。在这种情况下,我们可以使用Kafka发行版提供的 kafka-broker-api-versions 命令。

假设我们知道一个在 localhost:29092 运行的代理,让我们尝试找出所有参与Kafka集群的活跃代理:

$ kafka-broker-api-versions --bootstrap-server localhost:29092 | awk '/id/{print $1}'
localhost:39092
localhost:29092

值得注意的是,我们使用了 awk 命令过滤输出并仅显示代理地址。此外,结果正确地显示了集群中有两个活跃的代理。

尽管这种方法看起来比Zookeeper CLI方法更简单,但 kafka-broker-api-versions 可执行文件只是Kafka发行版的较新添加。

5. Shell 脚本

在实际场景中,手动为每个代理执行 zkClizookeeper-shell 命令会很耗时。因此,让我们编写一个 Shell脚本,该脚本将Zookeeper服务器地址作为输入,并返回集群中所有活跃代理的列表。

5.1. 辅助函数

让我们将所有辅助函数写入 functions.sh 脚本:

$ cat functions.sh
#!/bin/bash
ZOOKEEPER_SERVER="${1:-localhost:2181}"

# Helper Functions Below

首先,让我们编写 **get_broker_ids 函数来获取活动代理ID的集合,该函数将内部调用 zkCli 命令:

function get_broker_ids {
broker_ids_out=$(zkCli -server $ZOOKEEPER_SERVER <<EOF
ls /brokers/ids
quit
EOF
)
broker_ids_csv="$(echo "${broker_ids_out}" | grep '^\[.*\]$')"
echo "$broker_ids_csv" | sed 's/\[//;s/]//;s/,/ /'
}

接下来,让我们编写 get_broker_details 函数来使用 broker_id 获取详细的代理详细信息

function get_broker_details {
broker_id="$1"
echo "$(zkCli -server $ZOOKEEPER_SERVER <<EOF
get /brokers/ids/$broker_id
quit
EOF
)"
}

现在,我们有了详细的代理详细信息,让我们编写 parse_broker_endpoint 函数来获取代理的端点详细信息:

function parse_endpoint_detail {
broker_detail="$1"
json="$(echo "$broker_detail"  | grep '^{.*}$')"
json_endpoints="$(echo $json | jq .endpoints)"
echo "$(echo $json_endpoints |jq . |  grep HOST | tr -d " ")"
}

内部,我们使用了 jq 命令进行JSON解析

5.2. 主脚本

现在,让我们编写名为 get_all_active_brokers.sh 的主脚本,该脚本使用在 functions.sh 中定义的辅助函数:

$ cat get_all_active_brokers.sh
#!/bin/bash
. functions.sh "$1"

function get_all_active_brokers {
broker_ids=$(get_broker_ids)
for broker_id in $broker_ids
do
    broker_details="$(get_broker_details $broker_id)"
    broker_endpoint=$(parse_endpoint_detail "$broker_details")
    echo "broker_id="$broker_id,"endpoint="$broker_endpoint
done
}

get_all_active_brokers

我们可以注意到 **我们在 get_all_active_brokers 函数中迭代了所有 broker_ids,以聚合所有活跃代理的端点**。

最后,让我们执行 get_all_active_brokers.sh 脚本,以便查看我们的两节点Kafka集群中的活跃代理列表:

$ ./get_all_active_brokers.sh localhost:2181
broker_id=1,endpoint="PLAINTEXT_HOST://localhost:29092"
broker_id=2,endpoint="PLAINTEXT_HOST://localhost:39092"

我们可以看到结果是准确的。看起来我们成功了!

6. 结论

在这篇教程中,我们学习了 zookeeper-shellzkClikafka-broker-api-versions 等shell命令,用于获取Kafka集群中的活跃代理列表。此外,我们编写了一个 shell脚本来自动化在真实世界场景中查找代理详细信息的过程