1. Overview

Apache Flink is a Big Data processing framework that allows programmers to process a vast amount of data in a very efficient and scalable manner.

In this article, we’ll introduce some of the core API concepts and standard data transformations available in the Apache Flink Java API. The fluent style of this API makes it easy to work with Flink’s central construct – the distributed collection.

First, we will take a look at Flink’s DataSet API transformations and use them to implement a word count program. Then we will take a brief look at Flink’s DataStream API, which allows you to process streams of events in a real-time fashion.

2. Maven Dependency

To get started we’ll need to add Maven dependencies to flink-java and flink-test-utils libraries:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.16.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>1.16.1</version>
    <scope>test<scope>
</dependency>

3. Core API Concepts

When working with Flink, we need to know a couple of things related to its API:

  • Every Flink program performs transformations on distributed collections of data. A variety of functions for transforming data are provided, including filtering, mapping, joining, grouping, and aggregating
  • A sink operation in Flink triggers the execution of a stream to produce the desired result of the program, such as saving the result to the file system or printing it to the standard output
  • Flink transformations are lazy, meaning that they are not executed until a sink operation is invoked
  • The Apache Flink API supports two modes of operations — batch and real-time. If you are dealing with a limited data source that can be processed in batch mode, you will use the DataSet API. Should you want to process unbounded streams of data in real time, you would need to use the DataStream API

4. DataSet API Transformations

The entry point to the Flink program is an instance of the ExecutionEnvironment class — this defines the context in which a program is executed.

Let’s create an ExecutionEnvironment to start our processing:

ExecutionEnvironment env
  = ExecutionEnvironment.getExecutionEnvironment();

Note that when you launch the application on the local machine, it will perform processing on the local JVM. Should you want to start processing on a cluster of machines, you would need to install Apache Flink on those machines and configure the ExecutionEnvironment accordingly.

4.1. Creating a DataSet

To start performing data transformations, we need to supply our program with the data.

Let’s create an instance of the DataSet class using our ExecutionEnvironement:

DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);

You can create a DataSet from multiple sources, such as Apache Kafka, a CSV, a file or virtually any other data source.

4.2. Filter and Reduce

Once you create an instance of the DataSet class, you can apply transformations to it.

Let’s say that you want to filter numbers that are above a certain threshold and next sum them all*.* You can use the filter() and reduce() transformations to achieve this:

int threshold = 30;
List<Integer> collect = amounts
  .filter(a -> a > threshold)
  .reduce((integer, t1) -> integer + t1)
  .collect();

assertThat(collect.get(0)).isEqualTo(90);

Note that the collect() method is a sink operation that triggers the actual data transformations.

4.3. Map

Let’s say that you have a DataSet of Person objects:

private static class Person {
    private int age;
    private String name;

    // standard constructors/getters/setters
}

Next, let’s create a DataSet of these objects:

DataSet<Person> personDataSource = env.fromCollection(
  Arrays.asList(
    new Person(23, "Tom"),
    new Person(75, "Michael")));

Suppose that you want to extract only the age field from every object of the collection. You can use the map() transformation to get only a specific field of the Person class:

List<Integer> ages = personDataSource
  .map(p -> p.age)
  .collect();

assertThat(ages).hasSize(2);
assertThat(ages).contains(23, 75);

4.4. Join

When you have two datasets, you may want to join them on some id field. For this, you can use the join() transformation.

Let’s create collections of transactions and addresses of a user:

Tuple3<Integer, String, String> address
  = new Tuple3<>(1, "5th Avenue", "London");
DataSet<Tuple3<Integer, String, String>> addresses
  = env.fromElements(address);

Tuple2<Integer, String> firstTransaction 
  = new Tuple2<>(1, "Transaction_1");
DataSet<Tuple2<Integer, String>> transactions 
  = env.fromElements(firstTransaction, new Tuple2<>(12, "Transaction_2"));

The first field in both tuples is of an Integer type, and this is an id field on which we want to join both data sets.

To perform the actual joining logic, we need to implement a KeySelector interface for address and transaction:

private static class IdKeySelectorTransaction 
  implements KeySelector<Tuple2<Integer, String>, Integer> {
    @Override
    public Integer getKey(Tuple2<Integer, String> value) {
        return value.f0;
    }
}

private static class IdKeySelectorAddress 
  implements KeySelector<Tuple3<Integer, String, String>, Integer> {
    @Override
    public Integer getKey(Tuple3<Integer, String, String> value) {
        return value.f0;
    }
}

Each selector is only returning the field on which the join should be performed.

Unfortunately, it’s not possible to use lambda expressions here because Flink needs generic type info.

Next, let’s implement merging logic using those selectors:

List<Tuple2<Tuple2<Integer, String>, Tuple3<Integer, String, String>>>
  joined = transactions.join(addresses)
  .where(new IdKeySelectorTransaction())
  .equalTo(new IdKeySelectorAddress())
  .collect();

assertThat(joined).hasSize(1);
assertThat(joined).contains(new Tuple2<>(firstTransaction, address));

4.5. Sort

Let’s say that you have the following collection of Tuple2:

Tuple2<Integer, String> secondPerson = new Tuple2<>(4, "Tom");
Tuple2<Integer, String> thirdPerson = new Tuple2<>(5, "Scott");
Tuple2<Integer, String> fourthPerson = new Tuple2<>(200, "Michael");
Tuple2<Integer, String> firstPerson = new Tuple2<>(1, "Jack");
DataSet<Tuple2<Integer, String>> transactions = env.fromElements(
  fourthPerson, secondPerson, thirdPerson, firstPerson);

If you want to sort this collection by the first field of the tuple, you can use the sortPartitions() transformation:

List<Tuple2<Integer, String>> sorted = transactions
  .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING)
  .collect();

assertThat(sorted)
  .containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);

5. Word Count

The word count problem is one that is commonly used to showcase the capabilities of Big Data processing frameworks. The basic solution involves counting word occurrences in a text input. Let’s use Flink to implement a solution to this problem.

As the first step in our solution, we create a LineSplitter class that splits our input into tokens (words), collecting for each token a Tuple2 of key-value pairs. In each of these tuples, the key is a word found in the text, and the value is the integer one (1).

This class implements the FlatMapFunction interface that takes String as an input and produces a Tuple2<String, Integer>:

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        Stream.of(value.toLowerCase().split("\\W+"))
          .filter(t -> t.length() > 0)
          .forEach(token -> out.collect(new Tuple2<>(token, 1)));
    }
}

We call the collect() method on the Collector class to push data forward in the processing pipeline.

Our next and final step is to group the tuples by their first elements (words) and then perform a sum aggregate on the second element to produce a count of the word occurrences:

public static DataSet<Tuple2<String, Integer>> startWordCount(
  ExecutionEnvironment env, List<String> lines) throws Exception {
    DataSet<String> text = env.fromCollection(lines);

    return text.flatMap(new LineSplitter())
      .groupBy(0)
      .aggregate(Aggregations.SUM, 1);
}

*We are using three types of Flink transformations: flatMap(), groupBy(), and aggregate().*

Let’s write a test to assert that the word count implementation is working as expected:

List<String> lines = Arrays.asList(
  "This is a first sentence",
  "This is a second sentence with a one word");

DataSet<Tuple2<String, Integer>> result = WordCount.startWordCount(env, lines);

List<Tuple2<String, Integer>> collect = result.collect();
 
assertThat(collect).containsExactlyInAnyOrder(
  new Tuple2<>("a", 3), new Tuple2<>("sentence", 2), new Tuple2<>("word", 1),
  new Tuple2<>("is", 2), new Tuple2<>("this", 2), new Tuple2<>("second", 1),
  new Tuple2<>("first", 1), new Tuple2<>("with", 1), new Tuple2<>("one", 1));

6. DataStream API

6.1. Creating a DataStream

Apache Flink also supports the processing of streams of events through its DataStream API. If we want to start consuming events, we first need to use the StreamExecutionEnvironment class:

StreamExecutionEnvironment executionEnvironment
 = StreamExecutionEnvironment.getExecutionEnvironment();

Next, we can create a stream of events using the executionEnvironment from a variety of sources. It could be some message bus like Apache Kafka, but in this example, we will simply create a source from a couple of string elements:

DataStream<String> dataStream = executionEnvironment.fromElements(
  "This is a first sentence", 
  "This is a second sentence with a one word");

We can apply transformations to every element of the DataStream like in the normal DataSet class:

SingleOutputStreamOperator<String> upperCase = text.map(String::toUpperCase);

To trigger the execution, we need to invoke a sink operation such as print() that will just print the result of transformations to the standard output, followed with the execute() method on the StreamExecutionEnvironment class:

upperCase.print();
env.execute();

It will produce the following output:

1> THIS IS A FIRST SENTENCE
2> THIS IS A SECOND SENTENCE WITH A ONE WORD

6.2. Windowing of Events

When processing a stream of events in real-time, you may sometimes need to group events together and apply some computation on a window of those events.

Suppose we have a stream of events, where each event is a pair consisting of the event number and the timestamp when the event was sent to our system, and that we can tolerate events that are out-of-order but only if they are no more than twenty seconds late.

For this example, let’s first create a stream simulating two events that are several minutes apart and define a timestamp extractor that specifies our lateness threshold:

SingleOutputStreamOperator<Tuple2<Integer, Long>> windowed
  = env.fromElements(
  new Tuple2<>(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),
  new Tuple2<>(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond()))
  .assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor
      <Tuple2<Integer, Long>>(Time.seconds(20)) {
 
        @Override
        public long extractTimestamp(Tuple2<Integer, Long> element) {
          return element.f1 * 1000;
        }
    });

Next, let’s define a window operation to group our events into five-second windows and apply a transformation on those events:

SingleOutputStreamOperator<Tuple2<Integer, Long>> reduced = windowed
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
  .maxBy(0, true);
reduced.print();

It will get the last element of every five-second window, so it prints out:

1> (15,1491221519)

Note that we do not see the second event because it arrived later than the specified lateness threshold.

7. Conclusion

In this article, we introduced the Apache Flink framework and looked at some of the transformations supplied with its API.

We implemented a word count program using Flink’s fluent and functional DataSet API. Then we looked at the DataStream API and implemented a simple real-time transformation on a stream of events.

The implementation of all these examples and code snippets can be found on GitHub – this is a Maven project, so it should be easy to import and run as it is.