1. 引言
在Apache Kafka中传输消息时,客户端和服务器会协商使用一种通用的语法格式。Kafka提供了默认的转换器(如String
和Long
),同时也支持针对特定场景的自定义序列化器。在这篇教程中,我们将学习如何实现它们。
2. Apache Kafka中的序列化
序列化是将对象转换为字节的过程,反序列化则是将字节流转换回对象。简而言之,它将内容转化为可读和可解释的信息。
正如我们提到的,Kafka为一些基本类型提供了默认的序列化器,并允许我们实现自定义序列化器:
上图展示了通过网络向Kafka主题发送消息的过程。在这个过程中,自定义序列化器会在生产者将消息发送到主题之前将对象转换为字节。同样,它也展示了解序列化器如何将字节流转换回对象,以便消费者能够正确处理它。
2.1. 自定义序列化器
Apache Kafka为几种基本类型提供了预构建的序列化器和反序列化器:
-
StringSerializer
-
ShortSerializer
-
IntegerSerializer
-
LongSerializer
-
DoubleSerializer
-
BytesSerializer
但它还提供了实现自定义(反)序列化器的能力。为了序列化我们自己的对象,我们将实现Serializer
接口。同样,为了创建自定义的反序列化器,我们将实现Deserializer
接口。
这两个接口都有以下方法可以重写:
-
configure
: 用于实现配置细节 -
serialize/deserialize
: 这些方法包含了我们自定义序列化和反序列化的实际实现。 -
close
: 使用此方法关闭Kafka会话
3. 在Apache Kafka中实现自定义序列化器
Apache Kafka提供了定制序列化器的功能。不仅可以为消息值实现特定的转换器,还可以为键实现。
3.1. 依赖项
为了实现示例,我们只需在pom.xml
中添加Apache Kafka消费者API的依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
3.2. 自定义序列化器
首先,我们将使用Lombok指定通过Kafka发送的自定义对象:
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MessageDto {
private String message;
private String version;
}
接下来,我们将为生产者实现Kafka提供的Serializer
接口,以便发送消息:
public class CustomSerializer implements Serializer<MessageDto> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, MessageDto data) {
try {
if (data == null){
System.out.println("Null received at serializing");
return null;
}
System.out.println("Serializing...");
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error when serializing MessageDto to byte[]");
}
}
@Override
public void close() {
}
}
我们将重写接口的serialize
方法。因此,在我们的实现中,我们将使用Jackson ObjectMapper来转换自定义对象,然后返回字节流,以便正确发送消息到网络。
3.3. 自定义反序列化器
同样,我们将为消费者实现Deserializer
接口:
@Slf4j
public class CustomDeserializer implements Deserializer<MessageDto> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public MessageDto deserialize(String topic, byte[] data) {
try {
if (data == null){
System.out.println("Null received at deserializing");
return null;
}
System.out.println("Deserializing...");
return objectMapper.readValue(new String(data, "UTF-8"), MessageDto.class);
} catch (Exception e) {
throw new SerializationException("Error when deserializing byte[] to MessageDto");
}
}
@Override
public void close() {
}
}
如同前一节,我们将重写接口的deserialize
方法。因此,我们将使用相同的Jackson ObjectMapper将字节流转换回自定义对象。
3.4. 接收一个示例消息
现在,让我们看一个工作示例,演示如何使用自定义序列化器和反序列化器发送和接收一个示例消息。
首先,我们将创建并配置Kafka生产者:
private static KafkaProducer<String, MessageDto> createKafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomSerializer");
return new KafkaProducer(props);
}
我们将配置值序列化器属性为我们自定义类,并使用默认的StringSerializer
作为键序列化器。
其次,我们将创建Kafka消费者:
private static KafkaConsumer<String, MessageDto> createKafkaConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomDeserializer");
return new KafkaConsumer<>(props);
}
除了使用我们自定义类的键和值反序列化器外,必须包含组ID。除此之外,我们需要设置自动偏移重置为earliest
,以确保在消费者开始之前,生产者已经发送了所有消息。
创建完生产者和消费者客户端后,是时候发送一个示例消息了:
MessageDto msgProd = MessageDto.builder().message("test").version("1.0").build();
KafkaProducer<String, MessageDto> producer = createKafkaProducer();
producer.send(new ProducerRecord<String, MessageDto>(TOPIC, "1", msgProd));
System.out.println("Message sent " + msgProd);
producer.close();
然后,我们可以通过订阅主题来接收消息:
AtomicReference<MessageDto> msgCons = new AtomicReference<>();
KafkaConsumer<String, MessageDto> consumer = createKafkaConsumer();
consumer.subscribe(Arrays.asList(TOPIC));
ConsumerRecords<String, MessageDto> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
msgCons.set(record.value());
System.out.println("Message received " + record.value());
});
consumer.close();
控制台输出的结果如下:
Serializing...
Message sent MessageDto(message=test, version=1.0)
Deserializing...
Message received MessageDto(message=test, version=1.0)
4. 总结
在这篇教程中,我们展示了生产者如何使用Kafka中的序列化器通过网络发送消息。同样,我们也展示了消费者如何使用反序列化器解析接收到的消息。
此外,我们了解了可用的默认序列化器,并且最重要的是,学会了如何实现自定义序列化器和反序列化器。
如往常一样,代码可以在GitHub上找到。