1. Overview

In a previous article, we had a quick introduction to Kafka Connect, including the different types of connectors, basic features of Connect, as well as the REST API.

In this tutorial, we’ll use Kafka connectors to build a more “real world” example.

We’ll use a connector to collect data via MQTT, and we’ll write the gathered data to MongoDB.

2. Setup Using Docker

We’ll use Docker Compose to set up the infrastructure. That includes an MQTT broker as the source, Zookeeper, one Kafka broker as well Kafka Connect as middleware, and finally a MongoDB instance including a GUI tool as the sink.

2.1. Connector Installation

The connectors required for our example, an MQTT source as well as a MongoDB sink connector, are not included in plain Kafka or the Confluent Platform.

As we discussed in the previous article, we can download the connectors (MQTT as well as MongoDB) from the Confluent hub. After that, we have to unpack the jars into a folder, which we’ll mount into the Kafka Connect container in the following section.

Let’s use the folder /tmp/custom/jars for that. We have to move the jars there before starting the compose stack in the following section, as Kafka Connect loads connectors online during startup.

2.2. Docker Compose File

We describe our setup as a simple Docker compose file, which consists of six containers:

version: '3.3'

services:
  mosquitto:
    image: eclipse-mosquitto:1.5.5
    hostname: mosquitto
    container_name: mosquitto
    expose:
      - "1883"
    ports:
      - "1883:1883"
  zookeeper:
    image: zookeeper:3.4.9
    restart: unless-stopped
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zookeeper:2888:3888
    volumes:
      - ./zookeeper/data:/data
      - ./zookeeper/datalog:/datalog
  kafka:
    image: confluentinc/cp-kafka:5.1.0
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./kafka/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
  kafka-connect:
    image: confluentinc/cp-kafka-connect:5.1.0
    hostname: kafka-connect
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
      CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - /tmp/custom/jars:/etc/kafka-connect/jars
    depends_on:
      - zookeeper
      - kafka
      - mosquitto
  mongo-db:
    image: mongo:4.0.5
    hostname: mongo-db
    container_name: mongo-db
    expose:
      - "27017"
    ports:
      - "27017:27017"
    command: --bind_ip_all --smallfiles
    volumes:
      - ./mongo-db:/data
  mongoclient:
    image: mongoclient/mongoclient:2.2.0
    container_name: mongoclient
    hostname: mongoclient
    depends_on:
      - mongo-db
    ports:
      - 3000:3000
    environment:
      MONGO_URL: "mongodb://mongo-db:27017"
      PORT: 3000
    expose:
      - "3000"

The mosquitto container provides a simple MQTT broker based on Eclipse Mosquitto.

The containers zookeeper and kafka define a single-node Kafka cluster.

kafka-connect defines our Connect application in distributed mode.

And finally, mongo-db defines our sink database, as well as the web-based mongoclient, which helps us to verify whether the sent data arrived correctly in the database.

We can start the stack using the following command:

docker-compose up

3. Connector Configuration

As Kafka Connect is now up and running,  we can now configure the connectors.

3.1. Configure Source Connector

Let’s configure the source connector using the REST API:

curl -d @<path-to-config-file>/connect-mqtt-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Our connect-mqtt-source.json file looks like this:

{
    "name": "mqtt-source",
    "config": {
        "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
        "tasks.max": 1,
        "mqtt.server.uri": "tcp://mosquitto:1883",
        "mqtt.topics": "baeldung",
        "kafka.topic": "connect-custom",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "confluent.topic.bootstrap.servers": "kafka:9092",
        "confluent.topic.replication.factor": 1
    }
}

There are a few properties, which we haven’t used before:

  • mqtt.server.uri is the endpoint our connector will connect to
  • mqtt.topics is the MQTT topic our connector will subscribe to
  • kafka.topic defines the Kafka topic the connector will send the received data to
  • value.converter defines a converter which will be applied to the received payload. We need the ByteArrayConverter, as the MQTT Connector uses Base64 by default, while we want to use plain text
  • confluent.topic.bootstrap.servers is required by the newest version of the connector
  • The same applies to confluent.topic.replication.factor: it defines the replication factor for a Confluent-internal topic – as we have only one node in our cluster, we have to set that value to 1

3.2. Test Source Connector

Let’s run a quick test by publishing a short message to the MQTT broker:

docker run \
-it --rm --name mqtt-publisher --network 04_custom_default \
efrecon/mqtt-client \
pub -h mosquitto  -t "baeldung" -m "{\"id\":1234,\"message\":\"This is a test\"}"

And if we listen to the topic, connect-custom:

docker run \
--rm \
confluentinc/cp-kafka:5.1.0 \
kafka-console-consumer --network 04_custom_default --bootstrap-server kafka:9092 --topic connect-custom --from-beginning

then we should see our test message.

3.3. Setup Sink Connector

Next, we need our sink connector. Let’s again use the REST API:

curl -d @<path-to-config file>/connect-mongodb-sink.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Our connect-mongodb-sink.json file looks like this:

{
    "name": "mongodb-sink",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": 1,
        "topics": "connect-custom",
        "mongodb.connection.uri": "mongodb://mongo-db/test?retryWrites=true",
        "mongodb.collection": "MyCollection",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false
    }
}

We have the following MongoDB-specific properties here:

  • mongodb.connection.uri contains the connection string for our MongoDB instance
  • mongodb.collection defines the collection
  • Since the MongoDB connector is expecting JSON, we have to set JsonConverter for key.converter and value.converter
  • And we also need schemaless JSON for MongoDB, so we have to set key.converter.schemas.enable and value.converter.schemas.enable to false

3.4. Test Sink Connector

Since our topic connect-custom already contains messages from the MQTT connector test, the MongoDB connector should have fetched them directly after creation.

Hence, we should find them immediately in our MongoDB. We can use the web interface for that, by opening the URL http://localhost:3000/. After login, we can select our MyCollection on the left, hit Execute, and our test message should be displayed.

3.5. End-to-end Test

Now, we can send any JSON struct using the MQTT client:

{
    "firstName": "John",
    "lastName": "Smith",
    "age": 25,
    "address": {
        "streetAddress": "21 2nd Street",
        "city": "New York",
        "state": "NY",
        "postalCode": "10021"
    },
    "phoneNumber": [{
        "type": "home",
        "number": "212 555-1234"
    }, {
        "type": "fax",
        "number": "646 555-4567"
    }],
    "gender": {
        "type": "male"
    }
}

MongoDB supports schema-free JSON documents, and as we disabled schemas for our converter, any struct is immediately passed through our connector chain and stored in the database.

Again, we can use the web interface at http://localhost:3000/.

3.6. Clean Up

Once we’re done, we can clean up our experiment and remove the two connectors:

curl -X DELETE http://localhost:8083/connectors/mqtt-source
curl -X DELETE http://localhost:8083/connectors/mongodb-sink

After that, we can shut down the Compose stack with Ctrl + C.

4. Conclusion

In this tutorial, we built an example using Kafka Connect, to collect data via MQTT, and to write the gathered data to MongoDB.

As always, the config files can be found over on GitHub.