1. Overview

In this article, we'll take a look at the Mantis platform developed by Netflix.

We'll explore the main Mantis concepts by creating, running, and investigating a stream processing job.

2. What Is Mantis?

Mantis is a platform for building stream-processing applications (jobs). It provides an easy way to manage the deployment and life-cycle of jobs. Moreover, it facilitates resource allocation, discovery, and communication between these jobs.

Therefore, developers can focus on actual business logic, all the while having the support of a robust and scalable platform to run their high volume, low latency, non-blocking applications.

A Mantis job consists of three distinct parts:

  • the source, responsible for retrieving the data from an external source
  • one or more stages, responsible for processing the incoming event streams
  • and a sink that collects the processed data

Let's now explore each of them.

3. Setup and Dependencies

Let's start by adding the mantis-runtime and jackson-databind dependencies:

<dependency>
    <groupId>io.mantisrx</groupId>
    <artifactId>mantis-runtime</artifactId>
</dependency>

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

Now, for setting up our job's data source, let's implement the Mantis Source interface:

public class RandomLogSource implements Source<String> {

    @Override
    public Observable<Observable<String>> call(Context context, Index index) {
        return Observable.just(
          Observable
            .interval(250, TimeUnit.MILLISECONDS)
            .map(this::createRandomLogEvent));
    }

    private String createRandomLogEvent(Long tick) {
        // generate a random log entry string
        ...
    }

}

As we can see, it simply generates random log entries multiple times per second.

4. Our First Job

Let's now create a Mantis job that simply collects log events from our RandomLogSource. Later on, we'll add group and aggregation transformations for a more complex and interesting result.

To begin with, let's create a LogEvent entity:

public class LogEvent implements JsonType {
    private Long index;
    private String level;
    private String message;

    // ...
}

Then, let's add our TransformLogStage.

It's a simple stage that implements the ScalarComputation interface and splits a log entry to build a LogEvent. Also, it filters out any wrong formatted strings:

public class TransformLogStage implements ScalarComputation<String, LogEvent> {

    @Override
    public Observable<LogEvent> call(Context context, Observable<String> logEntry) {
        return logEntry
          .map(log -> log.split("#"))
          .filter(parts -> parts.length == 3)
          .map(LogEvent::new);
    }

}

4.1. Running the Job

At this point, we have enough building blocks for putting together our Mantis job:

public class LogCollectingJob extends MantisJobProvider<LogEvent> {

    @Override
    public Job<LogEvent> getJobInstance() {
        return MantisJob
          .source(new RandomLogSource())
          .stage(new TransformLogStage(), new ScalarToScalar.Config<>())
          .sink(Sinks.eagerSubscribe(Sinks.sse(LogEvent::toJsonString)))
          .metadata(new Metadata.Builder().build())
          .create();
    }

}

Let's take a closer look at our job.

As we can see, it extends MantisJobProvider. At first, it fetches data from our RandomLogSource and applies the TransformLogStage to the fetched data. Finally, it sends the processed data to the built-in sink that eagerly subscribes and delivers data over SSE.

Now, let's configure our job to execute locally on startup:

@SpringBootApplication
public class MantisApplication implements CommandLineRunner {

    // ...
 
    @Override
    public void run(String... args) {
        LocalJobExecutorNetworked.execute(new LogCollectingJob().getJobInstance());
    }
}

Let's run the application. We'll see a log message like:

...
Serving modern HTTP SSE server sink on port: 86XX

Let's now connect to the sink using curl:

$ curl localhost:86XX
data: {"index":86,"level":"WARN","message":"login attempt"}
data: {"index":87,"level":"ERROR","message":"user created"}
data: {"index":88,"level":"INFO","message":"user created"}
data: {"index":89,"level":"INFO","message":"login attempt"}
data: {"index":90,"level":"INFO","message":"user created"}
data: {"index":91,"level":"ERROR","message":"user created"}
data: {"index":92,"level":"WARN","message":"login attempt"}
data: {"index":93,"level":"INFO","message":"user created"}
...

4.2. Configuring the Sink

So far, we've used the built-in sink for collecting our processed data. Let's see if we can add more flexibility to our scenario by providing a custom sink.

What if, for example, we'd like to filter logs by message?

Let's create a LogSink that implements the Sink interface:

public class LogSink implements Sink<LogEvent> {
    @Override
    public void call(Context context, PortRequest portRequest, Observable<LogEvent> logEventObservable) {
        SelfDocumentingSink<LogEvent> sink = new ServerSentEventsSink.Builder<LogEvent>()
          .withEncoder(LogEvent::toJsonString)
          .withPredicate(filterByLogMessage())
          .build();
        logEventObservable.subscribe();
        sink.call(context, portRequest, logEventObservable);
    }
    private Predicate<LogEvent> filterByLogMessage() {
        return new Predicate<>("filter by message",
          parameters -> {
            if (parameters != null && parameters.containsKey("filter")) {
                return logEvent -> logEvent.getMessage().contains(parameters.get("filter").get(0));
            }
            return logEvent -> true;
        });
    }
}

In this sink implementation, we configured a predicate that uses the filter parameter to only retrieve logs that contain the text set in the filter parameter:

$ curl localhost:8874?filter=login
data: {"index":93,"level":"ERROR","message":"login attempt"}
data: {"index":95,"level":"INFO","message":"login attempt"}
data: {"index":97,"level":"ERROR","message":"login attempt"}
...

Note Mantis also offers a powerful querying language, MQL, that can be used for querying, transforming, and analyzing stream data in a SQL fashion.

5. Stage Chaining

Let's now suppose we're interested in knowing how many ERROR, WARN, or INFO log entries we have in a given time interval. For this, we'll add two more stages to our job and chain them together.

5.1. Grouping

Firstly, let's create a GroupLogStage.

This stage is a ToGroupComputation implementation that receives a LogEvent stream data from the existing TransformLogStage. After that, it groups entries by logging level and sends them to the next stage:

public class GroupLogStage implements ToGroupComputation<LogEvent, String, LogEvent> {

    @Override
    public Observable<MantisGroup<String, LogEvent>> call(Context context, Observable<LogEvent> logEvent) {
        return logEvent.map(log -> new MantisGroup<>(log.getLevel(), log));
    }

    public static ScalarToGroup.Config<LogEvent, String, LogEvent> config(){
        return new ScalarToGroup.Config<LogEvent, String, LogEvent>()
          .description("Group event data by level")
          .codec(JacksonCodecs.pojo(LogEvent.class))
          .concurrentInput();
    }
    
}

We've also created a custom stage config by providing a description, the codec to use for serializing the output, and allowed this stage's call method to run concurrently by using concurrentInput().

One thing to note is that this stage is horizontally scalable. Meaning we can run as many instances of this stage as needed. Also worth mentioning,  when deployed in a Mantis cluster, this stage sends data to the next stage so that all events belonging to a particular group will land on the same worker of the next stage.

5.2. Aggregating

Before we move on and create the next stage, let's first add a LogAggregate entity:

public class LogAggregate implements JsonType {

    private final Integer count;
    private final String level;

}

Now, let's create the last stage in the chain.

This stage implements GroupToScalarComputation and transforms a stream of log groups to a scalar LogAggregate. It does this by counting how many times each type of log appears in the stream. In addition, it also has a LogAggregationDuration parameter, which can be used to control the size of the aggregation window:

public class CountLogStage implements GroupToScalarComputation<String, LogEvent, LogAggregate> {

    private int duration;

    @Override
    public void init(Context context) {
        duration = (int)context.getParameters().get("LogAggregationDuration", 1000);
    }

    @Override
    public Observable<LogAggregate> call(Context context, Observable<MantisGroup<String, LogEvent>> mantisGroup) {
        return mantisGroup
          .window(duration, TimeUnit.MILLISECONDS)
          .flatMap(o -> o.groupBy(MantisGroup::getKeyValue)
            .flatMap(group -> group.reduce(0, (count, value) ->  count = count + 1)
              .map((count) -> new LogAggregate(count, group.getKey()))
            ));
    }

    public static GroupToScalar.Config<String, LogEvent, LogAggregate> config(){
        return new GroupToScalar.Config<String, LogEvent, LogAggregate>()
          .description("sum events for a log level")
          .codec(JacksonCodecs.pojo(LogAggregate.class))
          .withParameters(getParameters());
    }

    public static List<ParameterDefinition<?>> getParameters() {
        List<ParameterDefinition<?>> params = new ArrayList<>();

        params.add(new IntParameter()
          .name("LogAggregationDuration")
          .description("window size for aggregation in milliseconds")
          .validator(Validators.range(100, 10000))
          .defaultValue(5000)
          .build());

        return params;
    }
    
}

5.3. Configure and Run the Job

The only thing left to do now is to configure our job:

public class LogAggregationJob extends MantisJobProvider<LogAggregate> {

    @Override
    public Job<LogAggregate> getJobInstance() {

        return MantisJob
          .source(new RandomLogSource())
          .stage(new TransformLogStage(), TransformLogStage.stageConfig())
          .stage(new GroupLogStage(), GroupLogStage.config())
          .stage(new CountLogStage(), CountLogStage.config())
          .sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString)))
          .metadata(new Metadata.Builder().build())
          .create();
    }
}

As soon as we run the application and execute our new job, we can see the log counts being retrieved every few seconds:

$ curl localhost:8133
data: {"count":3,"level":"ERROR"}
data: {"count":13,"level":"INFO"}
data: {"count":4,"level":"WARN"}

data: {"count":8,"level":"ERROR"}
data: {"count":5,"level":"INFO"}
data: {"count":7,"level":"WARN"}
...

6. Conclusion

To sum up, in this article, we've seen what Netflix Mantis is and what it can be used for. Furthermore, we looked at the main concepts, used them to build jobs, and explored custom configurations for different scenarios.

As always, the complete code is available over on GitHub.