1. Introduction

ksqlDB can be described as a real-time event-streaming database built on top of Apache Kafka and Kafka Streams. It combines powerful stream processing with a relational database model using SQL syntax.

In this tutorial, we’ll cover the fundamental concepts of ksqlDB and build a sample application to demonstrate a practical use case.

2. Overview

Since ksqlDB is an event streaming database, streams and tables are its core abstractions. Essentially, these are collections of data that can be transformed and processed in real-time.

Stream processing enables continuous computations over these unbounded streams of events. We can transform, filter, aggregate, and join the collections to derive new collections or materialized views using SQL. Furthermore, new events continuously update these collections and views to provide real-time data.

Finally, queries publish the results of the various stream processing operations. ksqlDB queries support both asynchronous real-time application flows and synchronous request/response flows, similar to a traditional database.

3. Setup

To see ksqlDB in action, we’ll build an event-driven Java application. This will aggregate and query an unbounded stream of readings from various sensor sources.

The main use case is to detect situations when the average value of the readings exceeds a specified threshold, within a specific time period. Furthermore, a key requirement is that the application must provide real-time information that could, for example, be used when building a dashboard or warning system.

We’ll be using the ksqlDB Java client to interact with the server in order to create tables, aggregate queries, and execute various queries.

3.1. Docker

As ksqlDB runs on top of Kafka, we’ll use Docker Compose to run the Kafka components, ksqlDB server, and the ksqlDB CLI client:

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    hostname: zookeeper
    ...

  broker:
    image: confluentinc/cp-kafka:6.2.0
    hostname: broker
    ...

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.19.0
    hostname: ksqldb-server
    depends_on:
      - broker
    ports:
      - "8088:8088"
    healthcheck:
      test: curl -f http://ksqldb-server:8088/ || exit 1
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.19.0
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

Additionally, we’ll also use this docker-compose.yml file in our Java application to spin up an environment for our integration tests using the Testcontainers framework.

First, let’s bring up the stack by running:

docker-compose up

Next, let’s connect to the interactive CLI, once all the services have started. This is useful for testing and interacting with the server:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

We’ll also tell ksqlDB to start all queries from the earliest point in each topic:

ksql> SET 'auto.offset.reset' = 'earliest';

3.2. Dependencies

In this project, we’ll primarily be using the Java client to interact with ksqlDB. More specifically, we’ll be using ksqlDB for Confluent Platform (CP), so we’ll need to add the CP Maven repository to our POM file:

<repository>
    <id>confluent</id>
    <name>confluent-repo</name>
    <url>http://packages.confluent.io/maven/</url>
</repository>

Now, let’s add the dependency for the client:

<dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-api-client</artifactId>
    <version>6.2.0</version>
</dependency>

4. Real-Time Data Aggregation

In this section, we’ll see how to create a materialized view that represents the real-time aggregation required by our application.

4.1. Creating the Stream

In Kafka, a topic stores the collection of events. Similarly, in ksqkDB, a stream represents the events, backed by a Kafka topic.

Let’s begin by creating our stream to store the incoming sensor data:

CREATE STREAM readings (sensor_id VARCHAR KEY, timestamp VARCHAR, reading INT)
  WITH (KAFKA_TOPIC = 'readings',
        VALUE_FORMAT = 'JSON',
        TIMESTAMP = 'timestamp',
        TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss',
        PARTITIONS = 1);

Here, ksqlDB creates the readings topic to store the stream data in JSON format. Since the events represent temporal data, it’s important that each reading contains a timestamp indicating the event time. The timestamp field stores this data in the specified format. This ensures that ksqlDB applies event-time semantics for time-related operations and out-of-order events.

Next, we’ll create an instance of the Client with ksqlDB server connection details and use this to execute our SQL statement:

ClientOptions options = ClientOptions.create()
  .setHost(KSQLDB_SERVER_HOST)
  .setPort(KSQLDB_SERVER_PORT);

Client client = Client.create(options);

Map<String, Object> properties = Collections.singletonMap(
  "auto.offset.reset", "earliest"
);

CompletableFuture<ExecuteStatementResult> result = 
  client.executeStatement(CREATE_READINGS_STREAM, properties);

As previously with the CLI, we set the value of the auto.offset.reset property to “earliest“. This ensures that in the absence of a Kafka offset, the query reads the relevant topic from the earliest offset.

The executeStatement method is part of the async API provided by the client. It immediately returns a CompletableFuture, before sending any requests to the server. The calling code may then decide to block and wait for completion (by invoking the get or join method) or to perform other non-blocking operations.

4.2. Creating the Materialized View

Now that we have the underlying event stream, we can derive a new alerts table from the readings stream. This persistent query (or materialized view) runs on the server indefinitely and processes events from the source stream or table.

In our case, it should raise an alert when the average reading, per sensor, exceeds a value of 25 over a 30-minute period:

CREATE TABLE alerts AS
  SELECT
    sensor_id,
    TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss', 'UTC') 
      AS start_period,
    TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss', 'UTC') 
      AS end_period,
    AVG(reading) AS average_reading
  FROM readings
  WINDOW TUMBLING (SIZE 30 MINUTES)
  GROUP BY id 
  HAVING AVG(reading) > 25
  EMIT CHANGES;

In this query, we’re aggregating new incoming events in a tumbling window of 30 minutes, per sensor. We’ve also used the TIMESTAMPTOSTRING function to convert the UNIX timestamp into something more readable.

Importantly, the materialized view only updates with data when the new event successfully integrates with the aggregation function.

As previously, let’s use the client to execute this statement asynchronously and create our materialized view:

CompletableFuture<ExecuteStatementResult> result = 
  client.executeStatement(CREATE_ALERTS_TABLE, properties)

Once created, such views update in an incremental manner. This is the key to efficient and highly performant queries for real-time updates.

4.3. Inserting Sample Data

Before we can run queries, let’s produce some sample events that represent various readings at 10-minute intervals.

Let’s provide key/value mappings for the stream columns using KsqlObject:

List<KsqlObject> rows = Arrays.asList(
  new KsqlObject().put("sensor_id", "sensor-1")
    .put("timestamp", "2021-08-01 09:00:00").put("reading", 22),
  new KsqlObject().put("sensor_id", "sensor-1")
    .put("timestamp", "2021-08-01 09:10:00").put("reading", 20),
  new KsqlObject().put("sensor_id", "sensor-2")
    .put("timestamp", "2021-08-01 10:00:00").put("reading", 26),
  
  // additional rows
);

CompletableFuture<Void> result = CompletableFuture.allOf(
  rows.stream()
    .map(row -> client.insertInto(READINGS_TABLE, row))
    .toArray(CompletableFuture[]::new)
);

Here, we combine all the individual insert operations into a single Future for convenience. This completes upon the successful completion of all underlying CompletableFuture instances.

5. Querying the Data

Queries allow the callers to bring the materialized view data into the application. These can be classified into two types.

5.1. Push Query

This type of query pushes a continuous stream of updates to the client. These queries are particularly suitable for asynchronous application flows as they enable the clients to react to new information in real-time.

However, unlike persistent queries, the server does not store the results of such queries in a Kafka topic. Therefore, we should keep these queries as simple as possible while moving all the heavy lifting into persistent queries.

Let’s create a simple push query to subscribe to the results from our alerts materialized view, created earlier:

SELECT * FROM alerts EMIT CHANGES;

Here, it’s important to note the EMIT clause, which emits all changes to the client. As the query contains no limit, it will continue to stream all results until terminated.

Next, we subscribe to the results of the query in order to receive streaming data:

public CompletableFuture<Void> subscribeOnAlerts(Subscriber<Row> subscriber) {
    return client.streamQuery(ALERTS_QUERY, PROPERTIES)
      .thenAccept(streamedQueryResult -> streamedQueryResult.subscribe(subscriber))
      .whenComplete((result, ex) -> {
          if (ex != null) {
              log.error("Alerts push query failed", ex);
          }
      });
}

Here, we’ve invoked the streamQuery method, which returns a StreamedQueryResult for obtaining streaming data. This extends the Publisher interface from Reactive Streams. Therefore, we’re able to asynchronously consume the results by using a reactive Subscriber. In fact, the subscriber is a simple Reactive Streams implementation that receives the ksqlDB rows as JSON and converts them to Alert POJO.

We can now test this using our Compose file and the DockerComposeContainer from Testcontainers:

@Testcontainers
class KsqlDBApplicationLiveTest {

    @Container
    public static DockerComposeContainer dockerComposeContainer =
      new DockerComposeContainer<>(KSQLDB_COMPOSE_FILE)
        .withServices("zookeeper", "broker", "ksqldb-server")
        .withExposedService("ksqldb-server", 8088,
          Wait.forHealthcheck().withStartupTimeout(Duration.ofMinutes(5)))
        .withLocalCompose(true);

    // setup and teardown

    @Test
    void givenSensorReadings_whenSubscribedToAlerts_thenAlertsAreConsumed() {
        createAlertsMaterializedView();
        
        // Reactive Streams Subscriber impl for receiving streaming data
        RowSubscriber<Alert> alertSubscriber = new RowSubscriber<>(Alert.class);

        ksqlDBApplication.subscribeOnAlerts(alertSubscriber);
        insertSampleData();

        await().atMost(Duration.ofMinutes(3)).untilAsserted(() ->
          assertThat(alertSubscriber.consumedItems)
            .containsOnly(
              expectedAlert("sensor-1", "2021-08-01 09:30:00", "2021-08-01 10:00:00", 28.0),
              expectedAlert("sensor-2", "2021-08-01 10:00:00", "2021-08-01 10:30:00", 26.0)
            )
        );
    }
}

Here, we’ve spun up a complete ksqlDB environment for the integration tests. The test inserts sample rows into the stream and ksqlDB performs the windowed aggregation. Finally, we assert that our subscriber consumes the latest alerts, as expected.

5.2. Pull Query

In contrast to push queries, pull queries retrieve data that does not update dynamically, much like a traditional RDBMS. Such queries return immediately with a finite result set. Hence, pull queries are well suited to synchronous request/response application flows.

As a simple example, let’s create a query to retrieve all the alerts triggered for a particular sensor id:

String pullQuery = "SELECT * FROM alerts WHERE sensor_id = 'sensor-2';";

List<Row> rows = client.executeQuery(pullQuery, PROPERTIES).get()

In contrast to the push query, this query returns all the available data from the materialized view at execution time. This is useful for querying the current state of the materialized view.

5.3. Miscellaneous Operations

The API docs for the client provide further information on other operations such as describing sources; listing streams, tables, topics; terminating queries, and more.

6. Conclusion

In this article, we covered the core concepts of streams, tables, and queries that support ksqlDB as an efficient event-streaming database.

Along the way, we built a simple, reactive application using concise and composable SQL constructs. We also saw how to use the Java client to create streams and tables and issue queries against materialized views and retrieve real-time data.

As always, the full source code is available over on GitHub.