1. Overview
In this tutorial, we’ll explore Kafka using Scala. First, we’ll produce messages on a topic. Next, our consumer application will read those messages. Finally, we’ll use Avro to store our message schema in a schema registry.
2. Setup
Let’s set up our Kafka cluster and dependencies.
2.1. Cluster
Our Kafka cluster will consist of one broker and a schema registry. Client applications will connect to the broker on port 9094:
kafka:
image: docker.io/bitnami/kafka:3.4.0-debian-11-r23
networks:
- kafka_net
ports:
- "9094:9094"
volumes:
- "kafka_data:/bitnami"
environment:
- BITNAMI_DEBUG=yes
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_KRAFT_CLUSTER_ID=9YoavaRpTCOitT3Dm2OQFQ
- KAFKA_CFG_LISTENERS=CLIENT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
kafka-ui lets us visualize our cluster state in the browser on port 8088:
kafka-ui:
image: provectuslabs/kafka-ui:v0.7.0
networks:
- kafka_net
ports:
- "8088:8080"
depends_on:
kafka:
condition: service_started
kafka-schema-registry:
condition: service_started
environment:
KAFKA_CLUSTERS_0_NAME: baeldung
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://kafka-schema-registry:8081
DYNAMIC_CONFIG_ENABLED: 'true'
Later on, we’ll use the schema registry to register our application schema using Avro Serde:
kafka-schema-registry:
image: bitnami/schema-registry:6.0
networks:
- kafka_net
ports:
- "8081:8081"
depends_on:
kafka:
condition: service_started
environment:
SCHEMA_REGISTRY_DEBUG: true
SCHEMA_REGISTRY_KAFKA_BROKERS: PLAINTEXT://kafka:9092
We’ll run docker compose up in the directory containing the docker-compose.yml file to bring the cluster up.
Alternatively, the cluster can be created using Kafka binaries. In this case, we should make sure to map the correct ports to the ones mentioned above.
2.2. Dependencies
We’ll need four main libraries in our application. First, we’ll install Kafka client dependency:
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "7.5.1-ce"
Thanks to it, we can produce and consume messages. Additionally, we’ll include jackson-data-bind:
libraryDependencies += "com.fasterxml.jackson.core" % "jackson-databind" % "2.15.1"
It will be used to serialize Scala case classes to JSON strings and back. Finally, let’s add avro4s and kafka-avro-serializer:
libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "4.1.1"
libraryDependencies += "io.confluent" % "kafka-avro-serializer" % "7.5.1"
Those two dependencies help us serialize and deserialize our Kafka messages using the Avro format.
3. Producer
Our application will produce messages from an Article case class :
case class Article(
id: String,
title: String,
content: String,
created: LocalDate,
author: Author
)
Let’s configure our producer before sending messages.
3.1. Configuration
First, we’ll set up producer properties in kafka-intro.conf:
bootstrap-servers = "localhost:9094"
topic="scala-articles"
producer {
client.id = baeldung-scala-kafka-producer
bootstrap.servers = ${bootstrap-servers}
}
Messages will reside in a topic named scala-articles. It’s important to ensure the topic exists before running the producer. Although the client.id property isn’t required, it’s useful to distinguish different producer applications.
To load our configuration, we’ll use the ProducerConfig companion class :
case class ProducerConfig(producer: Config, topic: String)
object ProducerConfig extends ClientConfig {
def getConfig(resource: String): (util.Map[String, AnyRef], String) = {
val source =
ConfigSource.resources(resource).loadOrThrow[ProducerConfig]
val config = source.producer.asJavaMap
val topic = source.topic
(config, topic)
}
}
Here, loadOrThrow from PureConfig will fail to load the configuration if a key isn’t found.
3.2. Producing Messages
Next, we produce messages to our scala-articles topic:
object ArticleJsonStringProducer
extends App
with ProducerUtils[Article]
with JsonStringSerializer[Article] {
private val (config, topic) = ProducerConfig.getConfig("kafka-intro.conf")
private val producer =
new KafkaProducer(config, keySerializer, valueSerializer)
private val articles = Generator.articles
for (article <- articles) {
produce(producer, topic, article.id, article.toJsonString)
}
producer.close()
}
We send a stringified Article class as value and its id as key to Kafka.
Let’s understand what’s going on here in detail. To start, the keySerializer and valueSerializer implicits are both using Kafka StringSerializer. They are defined in JsonStringSerializer:
trait JsonStringSerializer[T] {
implicit val keySerializer: StringSerializer = new StringSerializer()
implicit val valueSerializer: StringSerializer = new StringSerializer()
implicit val jsonMapper: JsonMapper = JsonMapper
.builder()
.addModule(DefaultScalaModule)
.addModule(new JavaTimeModule())
.build()
implicit class ValueOps(value: T) {
def toJsonString()(implicit jsonMapper: JsonMapper): String = {
jsonMapper.writeValueAsString(value)
}
}
}
It’s important to note that the article is serialized to String using Jackson ObjectMapper.
Continuing, the produce() function sends each Article to Kafka:
trait ProducerUtils[T] {
def produce[K, V](
producer: KafkaProducer[K, V],
topic: String,
key: K,
value: V
): Unit = {
val record = new ProducerRecord(topic, key, value)
producer.send(record, implicitly[Callback])
}
}
Finally, when we run the producer application, our messages are successfully sent:
[kafka-producer-network-thread | baeldung-scala-kafka-producer] INFO com.baeldung.scala.kafka.intro.producer.ArticleJsonStringProducer$ - Successfully produced a new record to Kafka: topic: scala-articles, partition: 1, offset: 6, key size: 36, value size: 36
[kafka-producer-network-thread | baeldung-scala-kafka-producer] INFO com.baeldung.scala.kafka.intro.producer.ArticleJsonStringProducer$ - Successfully produced a new record to Kafka: topic: scala-articles, partition: 0, offset: 7, key size: 36, value size: 36
[kafka-producer-network-thread | baeldung-scala-kafka-producer] INFO com.baeldung.scala.kafka.intro.producer.ArticleJsonStringProducer$ - Successfully produced a new record to Kafka: topic: scala-articles, partition: 0, offset: 8, key size: 36, value size: 36
4. Consumer
Now let’s consume published messages. Like the producer, we’ll define some configurations beforehand.
4.1. Configuration
We’ll add a consumer section in the kafka-intro.conf file defined above:
consumer {
group.id = baeldung-scala-kafka-consumer
bootstrap.servers = ${bootstrap-servers}
}
The ConsumerConfig configuration file is similar to the ProducerConfig.
4.2. Consuming Messages
After the configurations are done, let’s define our consumer application:
object ArticleJsonStringConsumer
extends App
with ConsumerUtils[Article]
with JsonStringDeSerializer[Article] {
private val (config, topic) = ConsumerConfig.getConfig("kafka-intro.conf")
private val consumer =
new KafkaConsumer(config, keyDeSerializer, valueDeSerializer)
consumer.subscribe(asJavaCollection(List(topic)))
while (true) {
val messages = pool(consumer, FiniteDuration(1, SECONDS))
for ((_, value) <- messages) {
val article = fromJsonString(value)
logger.info(
s"New article received. Title: ${article.title}. Author: ${article.author.name} "
)
}
consumer.commitAsync()
}
}
KeyDeserializer and valueDerializer use Kafka StringDeserializer. Since the record value is a JSON string, we deserialize it using fromJsonString():
trait JsonStringDeSerializer[T] {
def fromJsonString(
str: String
)(implicit jsonMapper: JsonMapper, classTag: ClassTag[T]): T = {
jsonMapper.readValue(str, classTag.runtimeClass).asInstanceOf[T]
}
}
It’s important to note that we conserve the generic type information at runtime with Scala ClassTag. Besides, the consumer polls the cluster every second for new messages:
trait ConsumerUtils[T] extends Logging {
def pool[K, V](
consumer: KafkaConsumer[K, V],
timeout: FiniteDuration = FiniteDuration(5, TimeUnit.SECONDS)
): Iterable[(K, V)] = {
val records: ConsumerRecords[K, V] =
consumer.poll(new ScalaDurationOps(timeout).toJava)
val messages = records.asScala.map(record => {
logger.debug(
s"received record from topic ${record.topic}. Key: ${record.key} value: ${record.value.toString}"
)
(record.key, record.value)
})
messages
}
}
Type conversions between Java and Scala are handled by CollectionConverters implicits. Finally, the running consumer duly reads messages from the cluster:
[main] INFO com.baeldung.scala.kafka.intro.consumer.ArticleJsonStringConsumer$ - New article received. Title: Introduction to Kafka. Author: Foo Bar
[main] INFO com.baeldung.scala.kafka.intro.consumer.ArticleJsonStringConsumer$ - New article received. Title: Introduction to Scala Spire. Author: Jane Doe
[main] INFO com.baeldung.scala.kafka.intro.consumer.ArticleJsonStringConsumer$ - New article received. Title: Introduction to Scala Programming. Author: John Doe
5. Avro Serde
Up to here, we’ve used Kafka native StringSerializer and StringDeserializer. Then we managed to send and read our article value thanks to Jackson ObjectMapper.
That approach has limitations in a real-world scenario. It’d be hard to evolve our data and keep track of its changes. Fortunately, schema registries help us overcome those limits.
They allow us to test our messages against a schema depending on compatibility. To store our schema for the Article message, we’ll serialize that class using Avro Serde.
Let’s rewire our configurations and applications to meet these new needs.
5.1. Configuration
Let’s define a new configuration file. It’ll use a new topic named scala-articles-avro:
bootstrap-servers = "localhost:9094"
schema-registry-url = "http://localhost:8081"
topic="scala-articles-avro"
producer {
client.id = baeldung-scala-kafka-producer
bootstrap.servers = ${bootstrap-servers}
transactional.id = "baeldung-scala-kafka-producer"
}
serde {
schema.registry.url = ${schema-registry-url}
}
consumer {
group.id = baeldung-scala-kafka-consumer
bootstrap.servers = ${bootstrap-servers}
}
The main addition here is the serde block. It holds the schema registry URL defined in our initial setup. When Avro messages are sent to Kafka, they will be checked against the schema in the registry.
Next, we add a new SerdeConfig:
object SerdeConfig extends ClientConfig {
def getConfig(resource: String): util.Map[String, AnyRef] = {
val source =
ConfigSource.resources(resource).loadOrThrow[SerdeConfig]
val serde = source.serde.asJavaMap
serde
}
}
Now that we have all configurations ready, let’s update our producer and consumer applications.
5.2. Producer
The ArticleAvroProducer uses AvroSerializer to serialize articles. The key remains a String:
object ArticleAvroProducer
extends App
with ProducerUtils[Article]
with AvroSerializer {
private val (config, topic) =
ProducerConfig.getConfig("kafka-intro-avro.conf")
private val serde = SerdeConfig.getConfig("kafka-intro-avro.conf")
val keySerializer: StringSerializer = new StringSerializer()
implicit lazy val Valueformat: RecordFormat[Article] = RecordFormat[Article]
val valueSerializer: Serializer[Article] = AvroSerializer[Article]
valueSerializer.configure(serde, false)
private val producer =
new KafkaProducer(config, keySerializer, valueSerializer)
private val articles = Generator.articles
Try {
producer.initTransactions()
producer.beginTransaction()
for (article <- articles) {
produce(producer, topic, article.id, article)
}
producer.commitTransaction()
logger.info("Successfully completed Kafka transaction.")
}.recover { case error =>
logger.error(error)
logger.error(
"Something went wrong during Kafka transaction processing. Aborting"
)
producer.abortTransaction();
}
producer.close()
}
This time, we send transactional messages. It’s mandatory to define the transactional.id property in the producer config for it to work properly.
Interestingly, AvroSerializer uses KafkaAvroSerializer under the hood:
trait AvroSerializer {
def AvroSerializer[T](implicit format: RecordFormat[T]): Serializer[T] =
new Serializer[T] {
val ser = new KafkaAvroSerializer()
override def configure(
configs: java.util.Map[String, _],
isKey: Boolean
): Unit =
ser.configure(configs, isKey)
override def serialize(topic: String, data: T): Array[Byte] = Option(data)
.map(data => ser.serialize(topic, format.to(data)))
.getOrElse(Array.emptyByteArray)
override def close(): Unit = ser.close()
}
}
KafkaAvroSerializer defines the connection with the schema registry. In return, the registry stores the schema of the Article record:
{
"type":"record",
"name":"Article",
"namespace":"com.baeldung.scala.kafka.intro.common",
"fields":[
{
"name":"id",
"type":"string"
},
{
"name":"title",
"type":"string"
},
{
"name":"content",
"type":"string"
},
{
"name":"created",
"type":{
"type":"int",
"logicalType":"date"
}
},
{
"name":"author",
"type":{
"type":"record",
"name":"Author",
"fields":[
{
"name":"id",
"type":"int"
},
{
"name":"name",
"type":"string"
}
]
}
}
]
}
Thanks to avro4s, we didn’t have to write the boilerplate to generate the schema. If we want to opt out of the schema registry but still use Avro, we can use Avro4s GenericSerde.
5.3. Consumer
Likewise, the consumer keeps the key deserialization unchanged. As for the value, it’s deserialized with AvroDeserializer:
trait AvroDeSerializer {
def deserializer[T](implicit
format: RecordFormat[T]
): Deserializer[T] = new Deserializer[T] {
val deser = new KafkaAvroDeserializer()
override def configure(
configs: java.util.Map[String, _],
isKey: Boolean
): Unit =
deser.configure(configs, isKey)
override def deserialize(topic: String, data: Array[Byte]): T = Option(data)
.filter(_.nonEmpty)
.map { data =>
format
.from(deser.deserialize(topic, data).asInstanceOf[IndexedRecord])
}
.getOrElse(null.asInstanceOf[T])
override def close(): Unit = deser.close()
}
}
With that deserializer, let’s define our KafkaConsumer:
object ArticleAvroConsumer
extends App
with ConsumerUtils[Article]
with AvroDeSerializer {
private val (config, topic) =
ConsumerConfig.getConfig("kafka-intro-avro.conf")
private val serde = SerdeConfig.getConfig("kafka-intro-avro.conf")
val keyDeSerializer: StringDeserializer = new StringDeserializer()
implicit lazy val Valueformat: RecordFormat[Article] = RecordFormat[Article]
val valueDeSerializer: Deserializer[Article] = deserializer[Article]
valueDeSerializer.configure(serde, false)
private val consumer =
new KafkaConsumer(config, keyDeSerializer, valueDeSerializer)
consumer.subscribe(asJavaCollection(List(topic)))
consumer.seekToBeginning(Nil.asJava)
Try {
while (true) {
val messages = pool(consumer, FiniteDuration(1, MILLISECONDS))
for ((_, article) <- messages) {
logger.info(
s"New article received. Title: ${article.title} . Author: ${article.author.name}, Date: ${article.created} "
)
}
}
}.recover { case error =>
logger.error(error)
logger.error(
"Something went wrong when seeking messages from the beginning. Unsubscribing"
)
consumer.unsubscribe();
}
consumer.close()
}
In this new consumer, we’re polling messages from the beginning at a rate of one second. *Passing an empty collection to consumer.seekToBeginning(), it’ll seek the first offset for all currently assigned partitions.*
6. Conclusion
In this article, we’ve had an overview of Kafka using Scala. First, we set up our cluster and dependencies. Next, we produced and consumed messages using Kafka native serializers. Finally, we used Avro Serde to store our Article schema in a schema registry.
As always, the source code for the examples is available over on GitHub.