1. Overview

Apache Flink is a stream processing framework that can be used easily with Java. Apache Kafka is a distributed stream processing system supporting high fault-tolerance.

In this tutorial, we-re going to have a look at how to build a data pipeline using those two technologies.

2. Installation

To install and configure Apache Kafka, please refer to the official guide. After installing, we can use the following commands to create the new topics called flink_input and flink_output:

 bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic flink_output

 bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic flink_input

For the sake of this tutorial, we’ll use default configuration and default ports for Apache Kafka.

Apache Flink allows a real-time stream processing technology. The framework allows using multiple third-party systems as stream sources or sinks.

In Flink – there are various connectors available :

  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)

To add Flink to our project, we need to include the following Maven dependencies :

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.16.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.16.1</version>
</dependency>

Adding those dependencies will allow us to consume and produce to and from Kafka topics. You can find the current version of Flink on Maven Central.

4. Kafka String Consumer

To consume data from Kafka with Flink we need to provide a topic and a Kafka address. We should also provide a group id which will be used to hold offsets so we won’t always read the whole data from the beginning.

Let’s create a static method that will make the creation of FlinkKafkaConsumer easier:

public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
  String topic, String kafkaAddress, String kafkaGroup ) {
 
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", kafkaAddress);
    props.setProperty("group.id",kafkaGroup);
    FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
      topic, new SimpleStringSchema(), props);

    return consumer;
}

This method takes a topic, kafkaAddress, and kafkaGroup and creates the FlinkKafkaConsumer that will consume data from given topic as a String since we have used SimpleStringSchema to decode data.

The number 011 in the name of class refers to the Kafka version.

5. Kafka String Producer

To produce data to Kafka, we need to provide Kafka address and topic that we want to use. Again, we can create a static method that will help us to create producers for different topics:

public static FlinkKafkaProducer011<String> createStringProducer(
  String topic, String kafkaAddress){

    return new FlinkKafkaProducer011<>(kafkaAddress,
      topic, new SimpleStringSchema());
}

This method takes only topic and kafkaAddress as arguments since there’s no need to provide group id when we are producing to Kafka topic.

6. String Stream Processing

When we have a fully working consumer and producer, we can try to process data from Kafka and then save our results back to Kafka. The full list of functions that can be used for stream processing can be found here.

In this example, we’re going to capitalize words in each Kafka entry and then write it back to Kafka.

For this purpose we need to create a custom MapFunction:

public class WordsCapitalizer implements MapFunction<String, String> {
    @Override
    public String map(String s) {
        return s.toUpperCase();
    }
}

After creating the function, we can use it in stream processing:

public static void capitalize() {
    String inputTopic = "flink_input";
    String outputTopic = "flink_output";
    String consumerGroup = "baeldung";
    String address = "localhost:9092";
    StreamExecutionEnvironment environment = StreamExecutionEnvironment
      .getExecutionEnvironment();
    FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(
      inputTopic, address, consumerGroup);
    DataStream<String> stringInputStream = environment
      .addSource(flinkKafkaConsumer);

    FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(
      outputTopic, address);

    stringInputStream
      .map(new WordsCapitalizer())
      .addSink(flinkKafkaProducer);
}

The application will read data from the flink_input topic, perform operations on the stream and then save the results to the flink_output topic in Kafka.

We’ve seen how to deal with Strings using Flink and Kafka. But often it’s required to perform operations on custom objects. We’ll see how to do this in the next chapters.

7. Custom Object Deserialization

The following class represents a simple message with information about sender and recipient:

@JsonSerialize
public class InputMessage {
    String sender;
    String recipient;
    LocalDateTime sentAt;
    String message;
}

Previously, we were using SimpleStringSchema to deserialize messages from Kafka, but now we want to deserialize data directly to custom objects.

To do this, we need a custom DeserializationSchema:

public class InputMessageDeserializationSchema implements
  DeserializationSchema<InputMessage> {

    static ObjectMapper objectMapper = new ObjectMapper()
      .registerModule(new JavaTimeModule());

    @Override
    public InputMessage deserialize(byte[] bytes) throws IOException {
        return objectMapper.readValue(bytes, InputMessage.class);
    }

    @Override
    public boolean isEndOfStream(InputMessage inputMessage) {
        return false;
    }

    @Override
    public TypeInformation&lt;InputMessage&gt; getProducedType() {
        return TypeInformation.of(InputMessage.class);
    }
}

We are assuming here that the messages are held as JSON in Kafka.

Since we have a field of type LocalDateTime, we need to specify the JavaTimeModule, which takes care of mapping LocalDateTime objects to JSON.

Flink schemas can’t have fields that aren’t serializable because all operators (like schemas or functions) are serialized at the start of the job.

There are similar issues in Apache Spark. One of the known fixes for this issue is initializing fields as static, as we did with ObjectMapper above. It isn’t the prettiest solution, but it’s relatively simple and does the job.

The method isEndOfStream can be used for the special case when stream should be processed only until some specific data is received. But it isn’t needed in our case.

8. Custom Object Serialization

Now, let’s assume that we want our system to have a possibility of creating a backup of messages. We want the process to be automatic, and each backup should be composed of messages sent during one whole day.

Also, a backup message should have a unique id assigned.

For this purpose, we can create the following class:

public class Backup {
    @JsonProperty("inputMessages")
    List<InputMessage> inputMessages;
    @JsonProperty("backupTimestamp")
    LocalDateTime backupTimestamp;
    @JsonProperty("uuid")
    UUID uuid;

    public Backup(List<InputMessage> inputMessages, 
      LocalDateTime backupTimestamp) {
        this.inputMessages = inputMessages;
        this.backupTimestamp = backupTimestamp;
        this.uuid = UUID.randomUUID();
    }
}

Please mind that the UUID generation mechanism isn’t perfect, as it allows duplicates. However, this is enough for the scope of this example.

We want to save our Backup object as JSON to Kafka, so we need to create our SerializationSchema:

public class BackupSerializationSchema
  implements SerializationSchema<Backup> {

    ObjectMapper objectMapper;
    Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class);

    @Override
    public byte[] serialize(Backup backupMessage) {
        if(objectMapper == null) {
            objectMapper = new ObjectMapper()
              .registerModule(new JavaTimeModule());
        }
        try {
            return objectMapper.writeValueAsString(backupMessage).getBytes();
        } catch (com.fasterxml.jackson.core.JsonProcessingException e) {
            logger.error("Failed to parse JSON", e);
        }
        return new byte[0];
    }
}

9. Timestamping Messages

Since we want to create a backup for all messages of each day, messages need a timestamp.

Flink provides the three different time characteristics EventTime, ProcessingTime, and IngestionTime.

In our case, we need to use the time at which the message has been sent, so we’ll use EventTime.

To use EventTime we need a TimestampAssigner which will extract timestamps from our input data:

public class InputMessageTimestampAssigner 
  implements AssignerWithPunctuatedWatermarks<InputMessage> {
 
    @Override
    public long extractTimestamp(InputMessage element, 
      long previousElementTimestamp) {
        ZoneId zoneId = ZoneId.systemDefault();
        return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
    }

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(InputMessage lastElement, 
      long extractedTimestamp) {
        return new Watermark(extractedTimestamp - 1500);
    }
}

We need to transform our LocalDateTime to EpochSecond as this is the format expected by Flink. After assigning timestamps, all time-based operations will use time from sentAt field to operate.

Since Flink expects timestamps to be in milliseconds and toEpochSecond()  returns time in seconds we needed to multiply it by 1000, so Flink will create windows correctly.

Flink defines the concept of a Watermark. Watermarks are useful in case of data that don’t arrive in the order they were sent. A watermark defines the maximum lateness that is allowed for elements to be processed.

Elements that have timestamps lower than the watermark won’t be processed at all.

10. Creating Time Windows

To assure that our backup gathers only messages sent during one day, we can use the timeWindowAll method on the stream, which will split messages into windows.

However, we’ll still need to aggregate messages from each window and return them as Backup.

To do this, we’ll need a custom AggregateFunction:

public class BackupAggregator 
  implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
 
    @Override
    public List<InputMessage> createAccumulator() {
        return new ArrayList<>();
    }

    @Override
    public List<InputMessage> add(
      InputMessage inputMessage,
      List<InputMessage> inputMessages) {
        inputMessages.add(inputMessage);
        return inputMessages;
    }

    @Override
    public Backup getResult(List<InputMessage> inputMessages) {
        return new Backup(inputMessages, LocalDateTime.now());
    }

    @Override
    public List<InputMessage> merge(List<InputMessage> inputMessages,
      List<InputMessage> acc1) {
        inputMessages.addAll(acc1);
        return inputMessages;
    }
}

11. Aggregating Backups

After assigning proper timestamps and implementing our AggregateFunction, we can finally take our Kafka input and process it:

public static void createBackup () throws Exception {
    String inputTopic = "flink_input";
    String outputTopic = "flink_output";
    String consumerGroup = "baeldung";
    String kafkaAddress = "192.168.99.100:9092";
    StreamExecutionEnvironment environment
      = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer
      = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
    flinkKafkaConsumer.setStartFromEarliest();

    flinkKafkaConsumer.assignTimestampsAndWatermarks(
      new InputMessageTimestampAssigner());
    FlinkKafkaProducer011<Backup> flinkKafkaProducer
      = createBackupProducer(outputTopic, kafkaAddress);

    DataStream<InputMessage> inputMessagesStream
      = environment.addSource(flinkKafkaConsumer);

    inputMessagesStream
      .timeWindowAll(Time.hours(24))
      .aggregate(new BackupAggregator())
      .addSink(flinkKafkaProducer);

    environment.execute();
}

12. Conclusion

In this article, we’ve presented how to create a simple data pipeline with Apache Flink and Apache Kafka.

As always, the code can be found over on Github.