1. Overview

Apache Kafka, or simply Kafka,  is an open-source distributed event streaming platform developed by the Apache Software Foundation. Developers use it widely as a message broker to transmit messages from a producer to one or more consumers. It’s scalable, reliable, and can handle large amounts of data.

In this tutorial, we’ll explore how to integrate a Kafka instance in our ZIO applications using the zio-kafka library.

2. Serializing and Deserializing Data

In order to be able to send data to our Kafka instance, so they can be consumed by consumers, we need to first serialize them. When receiving them, we need to deserialize them.

z****io-kafka allows us to do this easily for simple datatypes by using the Serde trait and its companion object. It defines built-in serializers and deserializers for datatypes such as Long, Int, Float, Double, String, Boolean, and UUID. For more complex and custom types, we need to define our own custom serializer and deserializer.

2.1. Defining a Custom Serde

Let’s create a custom class that we’ll use as our message class produced and consumed by our application:

case class CustomMessage(id: Long, message: String, sender: String)

It’s a simple case class consisting of an id, a message, and the name of the sender. Next, we are going to implement our custom serde object. It has to be an instance of the Serde trait. We can use any format we want, but in our example, we’re going to use the JSON format.

First, we need to define a JSON encoder and decoder for our message inside the companion object. To do that, we need to add the following dependencies in our build.sbt file:

libraryDependencies += "dev.zio" %% "zio" % 2.0.19,
libraryDependencies += "dev.zio" %% "zio-streams" % 2.0.19,
libraryDependencies += "dev.zio" %% "zio-json" % "0.6.2"

Then, we can create our companion object:

object CustomMessage {
  implicit val encoder: JsonEncoder[CustomMessage] =
    DeriveJsonEncoder.gen[CustomMessage]

  implicit val decoder: JsonDecoder[CustomMessage] =
    DeriveJsonDecoder.gen[CustomMessage]
}

Finally, we can create our custom serde object:

object CustomMessageSerde {
  val key: Serde[Any, String] =
    Serde.string

  val value: Serde[Any, CustomMessage] =
    Serde.string.inmapM[Any, CustomMessage](s =>
      ZIO.fromEither(s.fromJson[CustomMessage])
        .mapError(e => new RuntimeException(e))
    )(r => ZIO.succeed(r.toJson))
}

We’re using the inmapM() method defined inside the Serde trait to decode a string to our custom class and we’re also handling any errors by throwing a RuntimeException. To do that, we use the built-in deserializer for String. To serialize our class, we simply use the toJson() method.

Now that we have our message class, we can define our producer and consumer pair.

3. Creating a Producer and a Consumer Pair

The zio-kafka library offers 2 ways of implementing producers and consumers: via ZIO Workflows or ZIO Stream Workflows. We can choose whichever we want, according to our needs. To use the zio-kafka library, we need to add the following line in our build.sbt file:

libraryDependencies += "dev.zio" %% "zio-kafka" % "2.7.0"

3.1. Create a Pair With ZIO Workflow

We can now define our producer. The library allows us to implement it very simply:

object KafkaProducer {
  def produce(
    topic: String,
    key: String,
    value: CustomMessage
  ): RIO[Any with Producer, RecordMetadata] =
    Producer.produce[Any, String, CustomMessage](
      topic = topic,
      key = key,
      value = value,
      keySerializer = CustomMessageSerde.key,
      valueSerializer = CustomMessageSerde.value
    )
}

We’ve defined a helper method that will accept a topic, a key, and a value, and it’ll serialize them using our custom serializer to forward them to our Kafka instance.

Our producer can also accept a ZLayer that’ll hold its configuration:

def producerLayer(
  bootstrapServers: List[String]
): ZLayer[Any, Throwable, Producer] =
  ZLayer.scoped(
    Producer.make(
      ProducerSettings().withBootstrapServers(bootstrapServers)
    )
  )

Here, the only thing we need to pass is our servers’ addresses.

Moving on, we can now define our consumer. The simplest way is by using the Consumer.consumeWith() method that’ll create a ZIO workflow that’ll run forever:

object KafkaConsumer {
  def consume(
    bootstrapServers: List[String],
    groupId: String,
    topic: String
  ): RIO[Any, Unit] =
    Consumer.consumeWith(
      settings = ConsumerSettings(bootstrapServers)
        .withGroupId(groupId),
      subscription = Subscription.topics(topic),
      keyDeserializer = CustomMessageSerde.key,
      valueDeserializer = CustomMessageSerde.value
    )(record =>
      Console
        .printLine(
          s"Consumed message with key: ${record.key()} and value: ${record.value()}"
        )
        .orDie
    )
}

Our consumer is pretty simple. It accepts the addresses of our servers, a group id to distinguish it from other consumer groups, and a topic. We also use our custom deserializer to be able to deserialize our messages in our custom class.

All that is left is to wrap together our pair to a ZIO app:

object KafkaWorkflowApp extends ZIOAppDefault {

  private val BOOTSTRAP_SERVERS = List("localhost:9092")
  private val KAFKA_TOPIC = "baeldung"

  override def run =
    for {
      consumer <- KafkaConsumer
        .consume(BOOTSTRAP_SERVERS, "baeldung-consumer-group", KAFKA_TOPIC)
        .fork
      _ <- KafkaProducer
        .produce(KAFKA_TOPIC, "key", CustomMessage(1, "Hello", "Baeldung"))
        .provide(KafkaProducer.producerLayer(BOOTSTRAP_SERVERS))
        .repeatN(10)
      _ <- consumer.join
    } yield ()

}

Our app is very simple. We create our consumer and make it work on a new fiber so it’s independent from the other ones. Then, we create our producer and we let it produce 10 messages. We can see the result in our console.

Our app assumes that there is a Kafka instance running on the same machine at port 9092, the default Kafka port.

3.2. Create a Pair With ZIO Stream Workflow

After defining our simple pair, we can move on and utilize the Stream Workflow to create another producer-consumer pair. The library provides a very simple way of doing it. In this example, we’ll utilize the ZLayers. The producer layer is the same as before:

object KafkaStreamProducer {

  def producerLayer(
    bootstrapServers: List[String]
  ): ZLayer[Any, Throwable, Producer] =
    ZLayer.scoped(
      Producer.make(
        ProducerSettings(bootstrapServers)
      )
    )
}

And now we can implement our consumer layer:

object KafkaStreamConsumer {

  def consumerLayer(
    bootstrapServers: List[String],
    groupId: String
  ): ZLayer[Any, Throwable, Consumer] =
    ZLayer.scoped(
      Consumer.make(
        ConsumerSettings(bootstrapServers)
          .withGroupId(groupId)
      )
    )

}

Both these methods will create a producer or consumer that will be available in the environment. But we still haven’t utilized any stream capabilities. This will happen in our main app, where we’ll call the proper methods:

object KafkaStreamWorkflowApp extends ZIOAppDefault {

  private val BOOTSTRAP_SERVERS = List("localhost:9092")
  private val KAFKA_TOPIC = "baeldung"

  val producer: ZLayer[Any, Throwable, Producer] = KafkaStreamProducer.producerLayer(BOOTSTRAP_SERVERS)
  val consumer: ZLayer[Any, Throwable, Consumer] = KafkaStreamConsumer.consumerLayer(
    BOOTSTRAP_SERVERS,
    "baeldung-consumer-group"
  )

  override def run = {
    val prod: ZStream[Producer, Throwable, Nothing] =
      ZStream
        .repeat("key")
        .schedule(Schedule.spaced(1.second))
        .map(key =>
          new ProducerRecord(
            KAFKA_TOPIC,
            key,
            CustomMessage(1, "Hello", "Baeldung")
          )
        )
        .via(
          Producer.produceAll(CustomMessageSerde.key, CustomMessageSerde.value)
        )
        .drain

    val cons: ZStream[Consumer, Throwable, Nothing] =
      Consumer
        .plainStream(
          Subscription.topics(KAFKA_TOPIC),
          CustomMessageSerde.key,
          CustomMessageSerde.value
        )
        .tap(record =>
          Console
            .printLine(
              s"Consumed message with key: ${record.key} and value: ${record.value}"
            )
            .orDie
        )
        .map(_.offset)
        .aggregateAsync(Consumer.offsetBatches)
        .mapZIO(_.commit)
        .drain

    (prod merge cons).runDrain.provide(producer, consumer)
  }

}

First, we create our producer and consumer layers, assuming again that there is a Kafka instance available on the same machine at port 9092. Then, we create our actual pairs.

Our producer produces messages indefinitely, with 1 second difference from each other. The method produceAll() accepts a stream of ProducerRecord and sends them to our Kafka instance, returning a RecordMetadata stream.

The plainStream() method in our consumer will consume these events and return again a stream of CommitableRecord. After that, we use that stream to print the message. This class represents a Kafka record that can be committed, meaning that we’ll inform Kafka that this record was consumed.

To improve performance, we use the aggregateAsync() method to group these records in batches and commit them all together.

Finally, we wrap everything together with the runDrain() method and by wiring our ZLayers.

4. Conclusion

In this article, we learned how to use the zio-kafka library to produce and consume messages using Apache Kafka. We demonstrated how to implement a simple producer-consumer pair and one that utilizes streams.

As always, the full source code for the examples is available over on GitHub.