1. 概述
在上一篇 Kafka Connect 入门文章中,我们已经介绍了 Kafka Connect 的基本概念、连接器类型、核心功能以及 REST API 的使用。
本篇文章将带你构建一个更具“真实感”的 Kafka Connect 示例项目:
✅ 使用 MQTT 作为数据源
✅ 将收集到的数据写入 MongoDB
这其实是一个典型的物联网(IoT)场景:设备通过 MQTT 发送数据,Kafka Connect 负责桥接并落地到 MongoDB 中进行存储。
2. 使用 Docker 快速搭建环境
我们使用 Docker Compose 来搭建整个基础设施。包括以下组件:
- ✅ MQTT Broker(Eclipse Mosquitto)作为数据源
- ✅ Zookeeper 和 Kafka Broker 作为消息中间件
- ✅ Kafka Connect 用于连接源与目标系统
- ✅ MongoDB 作为数据落盘目标
- ✅ Mongo Express(Web UI)方便查看数据是否正确写入
2.1. 安装所需连接器
我们这个例子需要两个连接器:
- MQTT Source Connector
- MongoDB Sink Connector
这两个连接器 默认不包含在 Kafka 或 Confluent Platform 中。
你需要从 Confluent Hub 下载这两个插件:
下载后解压 .jar
文件放到本地目录中,例如:/tmp/custom/jars
。这个目录会在后续的 Docker Compose 配置中挂载到 Kafka Connect 容器中。
⚠️ 注意:必须在启动 Docker Compose 前完成此步骤,因为 Kafka Connect 会在启动时加载插件。
2.2. Docker Compose 配置文件
下面是完整的 docker-compose.yml
文件,包含 6 个服务:
version: '3.3'
services:
mosquitto:
image: eclipse-mosquitto:1.5.5
hostname: mosquitto
container_name: mosquitto
expose:
- "1883"
ports:
- "1883:1883"
zookeeper:
image: zookeeper:3.4.9
restart: unless-stopped
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
volumes:
- ./zookeeper/data:/data
- ./zookeeper/datalog:/datalog
kafka:
image: confluentinc/cp-kafka:5.1.0
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./kafka/data:/var/lib/kafka/data
depends_on:
- zookeeper
kafka-connect:
image: confluentinc/cp-kafka-connect:5.1.0
hostname: kafka-connect
container_name: kafka-connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 1
volumes:
- /tmp/custom/jars:/etc/kafka-connect/jars
depends_on:
- zookeeper
- kafka
- mosquitto
mongo-db:
image: mongo:4.0.5
hostname: mongo-db
container_name: mongo-db
expose:
- "27017"
ports:
- "27017:27017"
command: --bind_ip_all --smallfiles
volumes:
- ./mongo-db:/data
mongoclient:
image: mongoclient/mongoclient:2.2.0
container_name: mongoclient
hostname: mongoclient
depends_on:
- mongo-db
ports:
- 3000:3000
environment:
MONGO_URL: "mongodb://mongo-db:27017"
PORT: 3000
expose:
- "3000"
各服务说明:
mosquitto
: MQTT Brokerzookeeper
+kafka
: Kafka 集群(单节点)kafka-connect
: Kafka Connect 实例(分布式模式)mongo-db
: MongoDB 实例mongoclient
: Web UI 工具,用于查看 MongoDB 数据
启动方式:
docker-compose up
3. 配置 Kafka Connect 连接器
现在 Kafka Connect 已经运行起来了,我们可以通过 REST API 来配置 Source 和 Sink 连接器。
3.1. 配置 Source 连接器(MQTT)
使用如下命令配置 MQTT Source:
curl -d @<path-to-config-file>/connect-mqtt-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
配置文件 connect-mqtt-source.json
内容如下:
{
"name": "mqtt-source",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": 1,
"mqtt.server.uri": "tcp://mosquitto:1883",
"mqtt.topics": "baeldung",
"kafka.topic": "connect-custom",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"confluent.topic.bootstrap.servers": "kafka:9092",
"confluent.topic.replication.factor": 1
}
}
关键配置说明:
配置项 | 说明 |
---|---|
mqtt.server.uri |
MQTT Broker 地址 |
mqtt.topics |
监听的 MQTT 主题 |
kafka.topic |
接收到的数据将发送到 Kafka 的哪个主题 |
value.converter |
设置为 ByteArrayConverter ,因为 MQTT 默认是 Base64 编码 |
confluent.topic.* |
新版连接器必须配置的内部 Topic 参数 |
3.2. 测试 Source 连接器
发送一条测试消息到 MQTT:
docker run \
-it --rm --name mqtt-publisher --network 04_custom_default \
efrecon/mqtt-client \
pub -h mosquitto -t "baeldung" -m "{\"id\":1234,\"message\":\"This is a test\"}"
然后监听 Kafka 主题 connect-custom
:
docker run \
--rm \
confluentinc/cp-kafka:5.1.0 \
kafka-console-consumer --network 04_custom_default --bootstrap-server kafka:9092 --topic connect-custom --from-beginning
如果一切正常,你应该看到刚才发送的消息。
3.3. 配置 Sink 连接器(MongoDB)
继续配置 MongoDB Sink:
curl -d @<path-to-config file>/connect-mongodb-sink.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
配置文件 connect-mongodb-sink.json
内容如下:
{
"name": "mongodb-sink",
"config": {
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"tasks.max": 1,
"topics": "connect-custom",
"mongodb.connection.uri": "mongodb://mongo-db/test?retryWrites=true",
"mongodb.collection": "MyCollection",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}
关键配置说明:
配置项 | 说明 |
---|---|
mongodb.connection.uri |
MongoDB 连接串 |
mongodb.collection |
存储数据的集合名 |
value.converter |
使用 JsonConverter 保证 JSON 格式 |
schemas.enable |
设置为 false 以避免 schema 信息干扰 |
3.4. 测试 Sink 连接器
由于 Kafka 主题 connect-custom
中已有数据,MongoDB Sink 连接器应该已经自动消费并写入数据库。
你可以访问 Web UI:http://localhost:3000/,选择左侧的 MyCollection
,点击 Execute,就能看到数据。
3.5. 端到端测试
现在可以发送任意 JSON 数据,比如下面这个:
{
"firstName": "John",
"lastName": "Smith",
"age": 25,
"address": {
"streetAddress": "21 2nd Street",
"city": "New York",
"state": "NY",
"postalCode": "10021"
},
"phoneNumber": [{
"type": "home",
"number": "212 555-1234"
}, {
"type": "fax",
"number": "646 555-4567"
}],
"gender": {
"type": "male"
}
}
MongoDB 支持 schema-free 的 JSON 文档,加上我们关闭了 schema,所以任何结构都能直接通过 Kafka Connect 流转并落库。
3.6. 清理环境
测试完成后,删除两个连接器:
curl -X DELETE http://localhost:8083/connectors/mqtt-source
curl -X DELETE http://localhost:8083/connectors/mongodb-sink
然后使用 Ctrl + C
关闭 Docker Compose。
4. 总结
本篇文章展示了如何使用 Kafka Connect 构建一个从 MQTT 到 MongoDB 的完整数据链路:
✅ MQTT Source Connector 收集数据
✅ Kafka Connect 负责桥接
✅ MongoDB Sink Connector 落地数据
整个流程无需编写任何代码,只需配置好连接器即可实现数据同步。非常适合用于 IoT、日志采集、实时分析等场景。
配置文件已上传至 GitHub:https://github.com/eugenp/tutorials/tree/master/apache-kafka