1. 概述
在使用Apache Kafka集群的事件驱动系统中,我们通常需要获取活跃代理的列表。在这篇教程中,我们将探索几个用于获取运行集群中活跃代理列表的shell命令。
2. 设置
为了本文的目的,我们将使用以下 docker-compose.yml 文件来设置一个两节点Kafka集群:
$ cat docker-compose.yml
---
version: '2'
services:
zookeeper-1:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
kafka-1:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper-1
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-2:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper-1
ports:
- 39092:39092
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
现在,让我们使用 docker-compose 命令启动Kafka集群:
$ docker-compose up -d
我们可以验证Zookeeper服务器正在端口 2181 上监听,而Kafka代理分别在端口 29092 和 39092 上监听:
$ ports=(2181 29092 39092)
$ for port in $ports
do
nc -z localhost $port
done
Connection to localhost port 2181 [tcp/eforward] succeeded!
Connection to localhost port 29092 [tcp/*] succeeded!
Connection to localhost port 39092 [tcp/*] succeeded!
3. 使用Zookeeper API
在Kafka集群中,Zookeeper服务器存储与Kafka代理服务器相关的元数据。因此,我们将使用Zookeeper暴露的文件系统API获取代理详细信息。
3.1. zookeeper-shell 命令
大多数Kafka发行版都附带了 zookeeper-shell 或 zookeeper-shell.sh 可执行文件。因此,这是与Zookeeper服务器交互的既定标准。
首先,让我们连接到运行在 localhost:2181 的Zookeeper服务器:
$ /usr/local/bin/zookeeper-shell localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!
一旦连接到Zookeeper服务器,我们就可以执行诸如 ls 等典型文件系统命令以获取服务器中存储的元数据信息。让我们找出当前存活的代理的ID:
ls /brokers/ids
[1, 2]
我们可以看到当前有两个活跃的代理,ID分别为1和2。使用 get 命令,我们可以为具有给定ID的特定代理检索更多详细信息:
get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1:9092","PLAINTEXT_HOST://localhost:29092"],"jmx_port":-1,"port":9092,"host":"kafka-1","version":5,"timestamp":"1625336133848"}
get /brokers/ids/2
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-2:9092","PLAINTEXT_HOST://localhost:39092"],"jmx_port":-1,"port":9092,"host":"kafka-2","version":5,"timestamp":"1625336133967"}
请注意,ID为 1 的代理正在端口 29092 上监听,而第二个ID为 2 的代理正在端口 39092 上监听。
最后,要退出Zookeeper控制台,我们可以使用 quit 命令:
quit
3.2. zkCli 命令
就像Kafka发行版附带了 zookeeper-shell 可执行文件一样,Zookeeper发行版也附带了 zkCli 或 zkCli.sh 可执行文件。
因此,与 zkCli 交互的方式完全类似于与 zookeeper-shell 交互,让我们继续确认我们能够获取具有ID为 1 的代理所需的详细信息:
$ zkCli -server localhost:2181 get /brokers/ids/1
Connecting to localhost:2181
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1:9092","PLAINTEXT_HOST://localhost:29092"],"jmx_port":-1,"port":9092,"host":"kafka-1","version":5,"timestamp":"1625336133848"}
如预期的那样,我们可以通过 zookeeper-shell 获取的代理详细信息与通过 zkCli 获取的详细信息相匹配。
4. 使用代理版本API
有时,我们可能有一个不完整的活跃代理列表,并且希望获取集群中的所有可用代理。在这种情况下,我们可以使用Kafka发行版提供的 kafka-broker-api-versions 命令。
假设我们知道一个在 localhost:29092 运行的代理,让我们尝试找出所有参与Kafka集群的活跃代理:
$ kafka-broker-api-versions --bootstrap-server localhost:29092 | awk '/id/{print $1}'
localhost:39092
localhost:29092
值得注意的是,我们使用了 awk 命令过滤输出并仅显示代理地址。此外,结果正确地显示了集群中有两个活跃的代理。
尽管这种方法看起来比Zookeeper CLI方法更简单,但 kafka-broker-api-versions 可执行文件只是Kafka发行版的较新添加。
5. Shell 脚本
在实际场景中,手动为每个代理执行 zkCli 或 zookeeper-shell 命令会很耗时。因此,让我们编写一个 Shell脚本,该脚本将Zookeeper服务器地址作为输入,并返回集群中所有活跃代理的列表。
5.1. 辅助函数
让我们将所有辅助函数写入 functions.sh 脚本:
$ cat functions.sh
#!/bin/bash
ZOOKEEPER_SERVER="${1:-localhost:2181}"
# Helper Functions Below
首先,让我们编写 **get_broker_ids 函数来获取活动代理ID的集合,该函数将内部调用 zkCli 命令:
function get_broker_ids {
broker_ids_out=$(zkCli -server $ZOOKEEPER_SERVER <<EOF
ls /brokers/ids
quit
EOF
)
broker_ids_csv="$(echo "${broker_ids_out}" | grep '^\[.*\]$')"
echo "$broker_ids_csv" | sed 's/\[//;s/]//;s/,/ /'
}
接下来,让我们编写 get_broker_details 函数来使用 broker_id 获取详细的代理详细信息:
function get_broker_details {
broker_id="$1"
echo "$(zkCli -server $ZOOKEEPER_SERVER <<EOF
get /brokers/ids/$broker_id
quit
EOF
)"
}
现在,我们有了详细的代理详细信息,让我们编写 parse_broker_endpoint 函数来获取代理的端点详细信息:
function parse_endpoint_detail {
broker_detail="$1"
json="$(echo "$broker_detail" | grep '^{.*}$')"
json_endpoints="$(echo $json | jq .endpoints)"
echo "$(echo $json_endpoints |jq . | grep HOST | tr -d " ")"
}
内部,我们使用了 jq 命令进行JSON解析。
5.2. 主脚本
现在,让我们编写名为 get_all_active_brokers.sh 的主脚本,该脚本使用在 functions.sh 中定义的辅助函数:
$ cat get_all_active_brokers.sh
#!/bin/bash
. functions.sh "$1"
function get_all_active_brokers {
broker_ids=$(get_broker_ids)
for broker_id in $broker_ids
do
broker_details="$(get_broker_details $broker_id)"
broker_endpoint=$(parse_endpoint_detail "$broker_details")
echo "broker_id="$broker_id,"endpoint="$broker_endpoint
done
}
get_all_active_brokers
我们可以注意到 **我们在 get_all_active_brokers 函数中迭代了所有 broker_ids,以聚合所有活跃代理的端点**。
最后,让我们执行 get_all_active_brokers.sh 脚本,以便查看我们的两节点Kafka集群中的活跃代理列表:
$ ./get_all_active_brokers.sh localhost:2181
broker_id=1,endpoint="PLAINTEXT_HOST://localhost:29092"
broker_id=2,endpoint="PLAINTEXT_HOST://localhost:39092"
我们可以看到结果是准确的。看起来我们成功了!
6. 结论
在这篇教程中,我们学习了 zookeeper-shell、zkCli 和 kafka-broker-api-versions 等shell命令,用于获取Kafka集群中的活跃代理列表。此外,我们编写了一个 shell脚本来自动化在真实世界场景中查找代理详细信息的过程。