1. 概述

上一篇 Kafka Connect 入门文章中,我们已经介绍了 Kafka Connect 的基本概念、连接器类型、核心功能以及 REST API 的使用。

本篇文章将带你构建一个更具“真实感”的 Kafka Connect 示例项目:

✅ 使用 MQTT 作为数据源
✅ 将收集到的数据写入 MongoDB

这其实是一个典型的物联网(IoT)场景:设备通过 MQTT 发送数据,Kafka Connect 负责桥接并落地到 MongoDB 中进行存储。

2. 使用 Docker 快速搭建环境

我们使用 Docker Compose 来搭建整个基础设施。包括以下组件:

  • ✅ MQTT Broker(Eclipse Mosquitto)作为数据源
  • ✅ Zookeeper 和 Kafka Broker 作为消息中间件
  • ✅ Kafka Connect 用于连接源与目标系统
  • ✅ MongoDB 作为数据落盘目标
  • ✅ Mongo Express(Web UI)方便查看数据是否正确写入

2.1. 安装所需连接器

我们这个例子需要两个连接器:

  • MQTT Source Connector
  • MongoDB Sink Connector

这两个连接器 默认不包含在 Kafka 或 Confluent Platform 中

你需要从 Confluent Hub 下载这两个插件:

下载后解压 .jar 文件放到本地目录中,例如:/tmp/custom/jars。这个目录会在后续的 Docker Compose 配置中挂载到 Kafka Connect 容器中。

⚠️ 注意:必须在启动 Docker Compose 前完成此步骤,因为 Kafka Connect 会在启动时加载插件。

2.2. Docker Compose 配置文件

下面是完整的 docker-compose.yml 文件,包含 6 个服务:

version: '3.3'

services:
  mosquitto:
    image: eclipse-mosquitto:1.5.5
    hostname: mosquitto
    container_name: mosquitto
    expose:
      - "1883"
    ports:
      - "1883:1883"
  zookeeper:
    image: zookeeper:3.4.9
    restart: unless-stopped
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zookeeper:2888:3888
    volumes:
      - ./zookeeper/data:/data
      - ./zookeeper/datalog:/datalog
  kafka:
    image: confluentinc/cp-kafka:5.1.0
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./kafka/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
  kafka-connect:
    image: confluentinc/cp-kafka-connect:5.1.0
    hostname: kafka-connect
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
      CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - /tmp/custom/jars:/etc/kafka-connect/jars
    depends_on:
      - zookeeper
      - kafka
      - mosquitto
  mongo-db:
    image: mongo:4.0.5
    hostname: mongo-db
    container_name: mongo-db
    expose:
      - "27017"
    ports:
      - "27017:27017"
    command: --bind_ip_all --smallfiles
    volumes:
      - ./mongo-db:/data
  mongoclient:
    image: mongoclient/mongoclient:2.2.0
    container_name: mongoclient
    hostname: mongoclient
    depends_on:
      - mongo-db
    ports:
      - 3000:3000
    environment:
      MONGO_URL: "mongodb://mongo-db:27017"
      PORT: 3000
    expose:
      - "3000"

各服务说明:

  • mosquitto: MQTT Broker
  • zookeeper + kafka: Kafka 集群(单节点)
  • kafka-connect: Kafka Connect 实例(分布式模式)
  • mongo-db: MongoDB 实例
  • mongoclient: Web UI 工具,用于查看 MongoDB 数据

启动方式:

docker-compose up

3. 配置 Kafka Connect 连接器

现在 Kafka Connect 已经运行起来了,我们可以通过 REST API 来配置 Source 和 Sink 连接器。

3.1. 配置 Source 连接器(MQTT)

使用如下命令配置 MQTT Source:

curl -d @<path-to-config-file>/connect-mqtt-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

配置文件 connect-mqtt-source.json 内容如下:

{
    "name": "mqtt-source",
    "config": {
        "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
        "tasks.max": 1,
        "mqtt.server.uri": "tcp://mosquitto:1883",
        "mqtt.topics": "baeldung",
        "kafka.topic": "connect-custom",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "confluent.topic.bootstrap.servers": "kafka:9092",
        "confluent.topic.replication.factor": 1
    }
}

关键配置说明:

配置项 说明
mqtt.server.uri MQTT Broker 地址
mqtt.topics 监听的 MQTT 主题
kafka.topic 接收到的数据将发送到 Kafka 的哪个主题
value.converter 设置为 ByteArrayConverter,因为 MQTT 默认是 Base64 编码
confluent.topic.* 新版连接器必须配置的内部 Topic 参数

3.2. 测试 Source 连接器

发送一条测试消息到 MQTT:

docker run \
-it --rm --name mqtt-publisher --network 04_custom_default \
efrecon/mqtt-client \
pub -h mosquitto -t "baeldung" -m "{\"id\":1234,\"message\":\"This is a test\"}"

然后监听 Kafka 主题 connect-custom

docker run \
--rm \
confluentinc/cp-kafka:5.1.0 \
kafka-console-consumer --network 04_custom_default --bootstrap-server kafka:9092 --topic connect-custom --from-beginning

如果一切正常,你应该看到刚才发送的消息。

3.3. 配置 Sink 连接器(MongoDB)

继续配置 MongoDB Sink:

curl -d @<path-to-config file>/connect-mongodb-sink.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

配置文件 connect-mongodb-sink.json 内容如下:

{
    "name": "mongodb-sink",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": 1,
        "topics": "connect-custom",
        "mongodb.connection.uri": "mongodb://mongo-db/test?retryWrites=true",
        "mongodb.collection": "MyCollection",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false
    }
}

关键配置说明:

配置项 说明
mongodb.connection.uri MongoDB 连接串
mongodb.collection 存储数据的集合名
value.converter 使用 JsonConverter 保证 JSON 格式
schemas.enable 设置为 false 以避免 schema 信息干扰

3.4. 测试 Sink 连接器

由于 Kafka 主题 connect-custom 中已有数据,MongoDB Sink 连接器应该已经自动消费并写入数据库。

你可以访问 Web UI:http://localhost:3000/,选择左侧的 MyCollection,点击 Execute,就能看到数据。

3.5. 端到端测试

现在可以发送任意 JSON 数据,比如下面这个:

{
    "firstName": "John",
    "lastName": "Smith",
    "age": 25,
    "address": {
        "streetAddress": "21 2nd Street",
        "city": "New York",
        "state": "NY",
        "postalCode": "10021"
    },
    "phoneNumber": [{
        "type": "home",
        "number": "212 555-1234"
    }, {
        "type": "fax",
        "number": "646 555-4567"
    }],
    "gender": {
        "type": "male"
    }
}

MongoDB 支持 schema-free 的 JSON 文档,加上我们关闭了 schema,所以任何结构都能直接通过 Kafka Connect 流转并落库。

3.6. 清理环境

测试完成后,删除两个连接器:

curl -X DELETE http://localhost:8083/connectors/mqtt-source
curl -X DELETE http://localhost:8083/connectors/mongodb-sink

然后使用 Ctrl + C 关闭 Docker Compose。

4. 总结

本篇文章展示了如何使用 Kafka Connect 构建一个从 MQTT 到 MongoDB 的完整数据链路:

✅ MQTT Source Connector 收集数据
✅ Kafka Connect 负责桥接
✅ MongoDB Sink Connector 落地数据

整个流程无需编写任何代码,只需配置好连接器即可实现数据同步。非常适合用于 IoT、日志采集、实时分析等场景。

配置文件已上传至 GitHub:https://github.com/eugenp/tutorials/tree/master/apache-kafka


原始标题:Kafka Connect Example with MQTT and MongoDB

» 下一篇: Java周报,264