1. Overview

In this tutorial, we’ll explore Kafka using Scala. First, we’ll produce messages on a topic. Next, our consumer application will read those messages. Finally, we’ll use Avro to store our message schema in a schema registry.

2. Setup

Let’s set up our Kafka cluster and dependencies.

2.1. Cluster

Our Kafka cluster will consist of one broker and a schema registry. Client applications will connect to the broker on port 9094:

kafka:
  image: docker.io/bitnami/kafka:3.4.0-debian-11-r23
  networks:
    - kafka_net
  ports:
    - "9094:9094"
  volumes:
    - "kafka_data:/bitnami"
  environment:
    - BITNAMI_DEBUG=yes
    - ALLOW_PLAINTEXT_LISTENER=yes
    - KAFKA_KRAFT_CLUSTER_ID=9YoavaRpTCOitT3Dm2OQFQ
    - KAFKA_CFG_LISTENERS=CLIENT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
    - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9094
    - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
    - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT

kafka-ui lets us visualize our cluster state in the browser on port 8088:

kafka-ui:
  image: provectuslabs/kafka-ui:v0.7.0
  networks:
    - kafka_net
  ports:
    - "8088:8080"
  depends_on:
    kafka:
      condition: service_started
    kafka-schema-registry:
      condition: service_started
  environment:
    KAFKA_CLUSTERS_0_NAME: baeldung
    KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://kafka-schema-registry:8081
    DYNAMIC_CONFIG_ENABLED: 'true'

Later on, we’ll use the schema registry to register our application schema using Avro Serde:

kafka-schema-registry:
  image: bitnami/schema-registry:6.0
  networks:
    - kafka_net
  ports:
    - "8081:8081"
  depends_on:
    kafka:
      condition: service_started
  environment:
    SCHEMA_REGISTRY_DEBUG: true
    SCHEMA_REGISTRY_KAFKA_BROKERS: PLAINTEXT://kafka:9092

We’ll run docker compose up in the directory containing the docker-compose.yml file to bring the cluster up.

Alternatively, the cluster can be created using Kafka binaries. In this case, we should make sure to map the correct ports to the ones mentioned above.

2.2. Dependencies

We’ll need four main libraries in our application. First, we’ll install Kafka client dependency:

libraryDependencies += "org.apache.kafka" % "kafka-clients" % "7.5.1-ce"

Thanks to it, we can produce and consume messages. Additionally, we’ll include jackson-data-bind:

libraryDependencies += "com.fasterxml.jackson.core" % "jackson-databind" % "2.15.1"

It will be used to serialize Scala case classes to JSON strings and back. Finally, let’s add avro4s and kafka-avro-serializer:

libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "4.1.1"
libraryDependencies += "io.confluent" % "kafka-avro-serializer" % "7.5.1" 

Those two dependencies help us serialize and deserialize our Kafka messages using the Avro format.

3. Producer

Our application will produce messages from an Article case class :

case class Article(
  id: String,
  title: String,
  content: String,
  created: LocalDate,
  author: Author
)

Let’s configure our producer before sending messages.

3.1. Configuration

First, we’ll set up producer properties in kafka-intro.conf:

bootstrap-servers = "localhost:9094"
topic="scala-articles"
producer {
    client.id = baeldung-scala-kafka-producer
    bootstrap.servers = ${bootstrap-servers}
}

Messages will reside in a topic named scala-articles. It’s important to ensure the topic exists before running the producer. Although the client.id property isn’t required, it’s useful to distinguish different producer applications.

To load our configuration, we’ll use the ProducerConfig companion class :

case class ProducerConfig(producer: Config, topic: String)

object ProducerConfig extends ClientConfig {
  def getConfig(resource: String): (util.Map[String, AnyRef], String) = {
    val source =
      ConfigSource.resources(resource).loadOrThrow[ProducerConfig]
    val config = source.producer.asJavaMap
    val topic = source.topic
    (config, topic)
  }
}

Here, loadOrThrow from PureConfig will fail to load the configuration if a key isn’t found.

3.2. Producing Messages

Next, we produce messages to our scala-articles topic:

object ArticleJsonStringProducer
  extends App
  with ProducerUtils[Article]
  with JsonStringSerializer[Article] {

  private val (config, topic) = ProducerConfig.getConfig("kafka-intro.conf")

  private val producer =
    new KafkaProducer(config, keySerializer, valueSerializer)

  private val articles = Generator.articles

  for (article <- articles) {
    produce(producer, topic, article.id, article.toJsonString)
  }

  producer.close()
}

We send a stringified Article class as value and its id as key to Kafka.

Let’s understand what’s going on here in detail. To start, the keySerializer and valueSerializer implicits are both using Kafka StringSerializer. They are defined in JsonStringSerializer:

trait JsonStringSerializer[T] {

  implicit val keySerializer: StringSerializer = new StringSerializer()
  implicit val valueSerializer: StringSerializer = new StringSerializer()

  implicit val jsonMapper: JsonMapper = JsonMapper
    .builder()
    .addModule(DefaultScalaModule)
    .addModule(new JavaTimeModule())
    .build()

  implicit class ValueOps(value: T) {
    def toJsonString()(implicit jsonMapper: JsonMapper): String = {
      jsonMapper.writeValueAsString(value)
    }
  }
}

It’s important to note that the article is serialized to String using Jackson ObjectMapper.

Continuing, the produce() function sends each Article to Kafka:

trait ProducerUtils[T]  {

  def produce[K, V](
    producer: KafkaProducer[K, V],
    topic: String,
    key: K,
    value: V
  ): Unit = {
    val record = new ProducerRecord(topic, key, value)
    producer.send(record, implicitly[Callback])
  }

}

Finally, when we run the producer application, our messages are successfully sent:

[kafka-producer-network-thread | baeldung-scala-kafka-producer] INFO  com.baeldung.scala.kafka.intro.producer.ArticleJsonStringProducer$ - Successfully produced a new record to Kafka: topic: scala-articles, partition: 1, offset: 6, key size: 36, value size: 36
[kafka-producer-network-thread | baeldung-scala-kafka-producer] INFO  com.baeldung.scala.kafka.intro.producer.ArticleJsonStringProducer$ - Successfully produced a new record to Kafka: topic: scala-articles, partition: 0, offset: 7, key size: 36, value size: 36
[kafka-producer-network-thread | baeldung-scala-kafka-producer] INFO  com.baeldung.scala.kafka.intro.producer.ArticleJsonStringProducer$ - Successfully produced a new record to Kafka: topic: scala-articles, partition: 0, offset: 8, key size: 36, value size: 36

4. Consumer

Now let’s consume published messages. Like the producer, we’ll define some configurations beforehand.

4.1. Configuration

We’ll add a consumer section in the kafka-intro.conf file defined above:

consumer {
    group.id = baeldung-scala-kafka-consumer
    bootstrap.servers = ${bootstrap-servers}
}

The ConsumerConfig configuration file is similar to the ProducerConfig.

4.2. Consuming Messages

After the configurations are done, let’s define our consumer application:

object ArticleJsonStringConsumer
  extends App
  with ConsumerUtils[Article]
  with JsonStringDeSerializer[Article] {

  private val (config, topic) = ConsumerConfig.getConfig("kafka-intro.conf")

  private val consumer =
    new KafkaConsumer(config, keyDeSerializer, valueDeSerializer)

  consumer.subscribe(asJavaCollection(List(topic)))

  while (true) {
    val messages = pool(consumer, FiniteDuration(1, SECONDS))
    for ((_, value) <- messages) {
      val article = fromJsonString(value)
      logger.info(
        s"New article received. Title: ${article.title}.  Author: ${article.author.name} "
      )
    }
    consumer.commitAsync()
  }

}

KeyDeserializer and valueDerializer use Kafka StringDeserializer. Since the record value is a JSON string, we deserialize it using fromJsonString():

trait JsonStringDeSerializer[T] {

  def fromJsonString(
    str: String
  )(implicit jsonMapper: JsonMapper, classTag: ClassTag[T]): T = {
    jsonMapper.readValue(str, classTag.runtimeClass).asInstanceOf[T]
  }
}

It’s important to note that we conserve the generic type information at runtime with Scala ClassTag. Besides, the consumer polls the cluster every second for new messages:

trait ConsumerUtils[T] extends Logging {
  def pool[K, V](
    consumer: KafkaConsumer[K, V],
    timeout: FiniteDuration = FiniteDuration(5, TimeUnit.SECONDS)
  ): Iterable[(K, V)] = {
    val records: ConsumerRecords[K, V] =
      consumer.poll(new ScalaDurationOps(timeout).toJava)
    val messages = records.asScala.map(record => {
      logger.debug(
        s"received record from topic ${record.topic}. Key:  ${record.key} value: ${record.value.toString}"
      )
      (record.key, record.value)
    })
    messages
  }
}

Type conversions between Java and Scala are handled by CollectionConverters implicits. Finally, the running consumer duly reads messages from the cluster:

[main] INFO  com.baeldung.scala.kafka.intro.consumer.ArticleJsonStringConsumer$ - New article received. Title: Introduction to Kafka.  Author: Foo Bar 
[main] INFO  com.baeldung.scala.kafka.intro.consumer.ArticleJsonStringConsumer$ - New article received. Title: Introduction to Scala Spire.  Author: Jane Doe 
[main] INFO  com.baeldung.scala.kafka.intro.consumer.ArticleJsonStringConsumer$ - New article received. Title: Introduction to Scala Programming.  Author: John Doe 

5. Avro Serde

Up to here, we’ve used Kafka native StringSerializer and StringDeserializer. Then we managed to send and read our article value thanks to Jackson ObjectMapper.

That approach has limitations in a real-world scenario. It’d be hard to evolve our data and keep track of its changes. Fortunately, schema registries help us overcome those limits.

They allow us to test our messages against a schema depending on compatibility. To store our schema for the Article message, we’ll serialize that class using Avro Serde.

Let’s rewire our configurations and applications to meet these new needs.

5.1. Configuration

Let’s define a new configuration file. It’ll use a new topic named scala-articles-avro:

bootstrap-servers = "localhost:9094"
schema-registry-url = "http://localhost:8081"
topic="scala-articles-avro"

producer {
    client.id = baeldung-scala-kafka-producer
    bootstrap.servers = ${bootstrap-servers}
    transactional.id = "baeldung-scala-kafka-producer"
}

serde {
    schema.registry.url = ${schema-registry-url}
}

consumer {
    group.id = baeldung-scala-kafka-consumer
    bootstrap.servers = ${bootstrap-servers}
}

The main addition here is the serde block. It holds the schema registry URL defined in our initial setup. When Avro messages are sent to Kafka, they will be checked against the schema in the registry.

Next, we add a new SerdeConfig:

object SerdeConfig extends ClientConfig {
  def getConfig(resource: String): util.Map[String, AnyRef] = {
    val source =
      ConfigSource.resources(resource).loadOrThrow[SerdeConfig]
    val serde = source.serde.asJavaMap
    serde
  }
}

Now that we have all configurations ready, let’s update our producer and consumer applications.

5.2. Producer

The ArticleAvroProducer uses AvroSerializer to serialize articles. The key remains a String:

object ArticleAvroProducer
  extends App
  with ProducerUtils[Article]
  with AvroSerializer {

  private val (config, topic) =
    ProducerConfig.getConfig("kafka-intro-avro.conf")
  private val serde = SerdeConfig.getConfig("kafka-intro-avro.conf")

  val keySerializer: StringSerializer = new StringSerializer()

  implicit lazy val Valueformat: RecordFormat[Article] = RecordFormat[Article]
  val valueSerializer: Serializer[Article] = AvroSerializer[Article]
  valueSerializer.configure(serde, false)

  private val producer =
    new KafkaProducer(config, keySerializer, valueSerializer)
  private val articles = Generator.articles

  Try {
    producer.initTransactions()
    producer.beginTransaction()
    for (article <- articles) {
      produce(producer, topic, article.id, article)
    }
    producer.commitTransaction()
    logger.info("Successfully completed Kafka transaction.")
  }.recover { case error =>
    logger.error(error)
    logger.error(
      "Something went wrong during Kafka transaction processing. Aborting"
    )
    producer.abortTransaction();
  }
  producer.close()

}

This time, we send transactional messages. It’s mandatory to define the transactional.id property in the producer config for it to work properly.

Interestingly, AvroSerializer uses KafkaAvroSerializer under the hood:

trait AvroSerializer {
  def AvroSerializer[T](implicit format: RecordFormat[T]): Serializer[T] =
    new Serializer[T] {
      val ser = new KafkaAvroSerializer()

      override def configure(
        configs: java.util.Map[String, _],
        isKey: Boolean
      ): Unit =
        ser.configure(configs, isKey)

      override def serialize(topic: String, data: T): Array[Byte] = Option(data)
        .map(data => ser.serialize(topic, format.to(data)))
        .getOrElse(Array.emptyByteArray)

      override def close(): Unit = ser.close()
    }
}

KafkaAvroSerializer defines the connection with the schema registry. In return, the registry stores the schema of the Article record:

{
    "type":"record",
    "name":"Article",
    "namespace":"com.baeldung.scala.kafka.intro.common",
    "fields":[
        {
            "name":"id",
            "type":"string"
        },
        {
            "name":"title",
            "type":"string"
        },
        {
            "name":"content",
            "type":"string"
        },
        {
            "name":"created",
            "type":{
                "type":"int",
                "logicalType":"date"
            }
        },
        {
            "name":"author",
            "type":{
                "type":"record",
                "name":"Author",
                "fields":[
                    {
                        "name":"id",
                        "type":"int"
                    },
                    {
                        "name":"name",
                        "type":"string"
                    }
                ]
            }
        }
    ]
}

Thanks to avro4s, we didn’t have to write the boilerplate to generate the schema. If we want to opt out of the schema registry but still use Avro, we can use Avro4s GenericSerde.

5.3. Consumer

Likewise, the consumer keeps the key deserialization unchanged. As for the value, it’s deserialized with AvroDeserializer:

trait AvroDeSerializer {
  def deserializer[T](implicit
    format: RecordFormat[T]
  ): Deserializer[T] = new Deserializer[T] {
    val deser = new KafkaAvroDeserializer()

    override def configure(
      configs: java.util.Map[String, _],
      isKey: Boolean
    ): Unit =
      deser.configure(configs, isKey)

    override def deserialize(topic: String, data: Array[Byte]): T = Option(data)
      .filter(_.nonEmpty)
      .map { data =>
        format
          .from(deser.deserialize(topic, data).asInstanceOf[IndexedRecord])
      }
      .getOrElse(null.asInstanceOf[T])

    override def close(): Unit = deser.close()
  }
}

With that deserializer, let’s define our KafkaConsumer:

object ArticleAvroConsumer
  extends App
  with ConsumerUtils[Article]
  with AvroDeSerializer {

  private val (config, topic) =
    ConsumerConfig.getConfig("kafka-intro-avro.conf")
  private val serde = SerdeConfig.getConfig("kafka-intro-avro.conf")

  val keyDeSerializer: StringDeserializer = new StringDeserializer()

  implicit lazy val Valueformat: RecordFormat[Article] = RecordFormat[Article]
  val valueDeSerializer: Deserializer[Article] = deserializer[Article]
  valueDeSerializer.configure(serde, false)

  private val consumer =
    new KafkaConsumer(config, keyDeSerializer, valueDeSerializer)

  consumer.subscribe(asJavaCollection(List(topic)))

  consumer.seekToBeginning(Nil.asJava)

  Try {
    while (true) {
      val messages = pool(consumer, FiniteDuration(1, MILLISECONDS))

      for ((_, article) <- messages) {
        logger.info(
          s"New article received. Title: ${article.title} .  Author: ${article.author.name}, Date: ${article.created}  "
        )
      }
    }
  }.recover { case error =>
    logger.error(error)
    logger.error(
      "Something went wrong when seeking messages from the beginning. Unsubscribing"
    )
    consumer.unsubscribe();
  }
  consumer.close()
}

In this new consumer, we’re polling messages from the beginning at a rate of one second. *Passing an empty collection to consumer.seekToBeginning(), it’ll seek the first offset for all currently assigned partitions.*

6. Conclusion

In this article, we’ve had an overview of Kafka using Scala. First, we set up our cluster and dependencies. Next, we produced and consumed messages using Kafka native serializers. Finally, we used Avro Serde to store our Article schema in a schema registry.

As always, the source code for the examples is available over on GitHub.