1. Overview

In this tutorial, we’ll review an interesting pattern that is not a part of classical GoF patterns – the Pipeline pattern.

It’s powerful and can help resolve tricky problems and improve an application’s design. Also, Java has some built-in solutions to help implement this pattern; we’ll discuss them in the end.

Usually, the Pipeline pattern is compared to the Chain of Responsibility. Pipeline also has many things in common with the Decorator. In some aspects, it’s closer to the Decorator than to the Chain of Responsibility. Let’s review the similarities and differences between these patterns.

2.1. Chain of Responsibility

The Pipeline and the Chain of Responsibility are often compared because both patterns explicitly declare a step-by-step process. The first difference between the Pipeline and the Chain of Responsibility is that the latter usually has no return value from its handleRequest() method:

Concrete Handler

However, nothing stops us from returning values from the handleRequest() method. In this case, it would be defined as part of the Handler interface.

2.2. Decorator

The Decorator doesn’t raise a resemblance with the pipeline pattern straight away because it’s not explicit about its chain-like structure. However, with its delegation and recursive nesting, the behavior is quite similar to the Chain of Responsibility or the Pipeline:

decorator

In the classical (GoF) implementation, this pattern adds behavior and has no return values for the operations. However, this is a reasonable choice to alter the state of an object or process the data with different components. Often the state-altering solutions might be overly complex, as we can achieve the result with a more straightforward structure. At the same time, the Decorator offers the management of temporal dependencies and maintains the order of the execution.

3. Pipeline

The main idea behind the Pipeline pattern is to create a set of operations (pipeline) and pass data through it. Although the Chain of Responsibility and the Decorator can handle this task partially. The main power of the Pipeline is that it’s flexible about the type of its result.

The Chain of Responsibility and the Decorator return only the type defined in the Handler and the Component interfaces, respectively. The Pipeline, on the other hand, can work with the input and output of any type. The flexibility of this pattern is its main feature.

3.1. Immutable Pipeline

Let’s create a simple example for an immutable pipeline. We will start with the Pipe interface:

public interface Pipe<IN, OUT> {
    OUT process(IN input);
}

It’s quite a simple interface with only one method, and it takes the input and produces the output. The interface is parametrized, and we can provide any implementation inside it. Also, please note the examples in the article will diverge from the official naming convention for the type parameters. This is to distinguish method level and class level parameters better. Now let’s create a class that will hold the pipes in a pipeline:

public class Pipeline<IN, OUT> {

    private Collection<Pipe<?, ?>> pipes;

    private Pipeline(Pipe<IN, OUT> pipe) {
        pipes = Collections.singletonList(pipe);
    }

    private Pipeline(Collection<Pipe<?, ?>> pipes) {
        this.pipes = new ArrayList<>(pipes);
    }

    public static <IN, OUT> Pipeline<IN, OUT> of(Pipe<IN, OUT> pipe) {
        return new Pipeline<>(pipe);
    }

    public <NEW_OUT> Pipeline<IN, NEW_OUT> withNextPipe(Pipe<OUT, NEW_OUT> pipe) {
        final ArrayList<Pipe<?, ?>> newPipes = new ArrayList<>(pipes);
        newPipes.add(pipe);
        return new Pipeline<>(newPipes);
    }

    public OUT process(IN input) {
        Object output = input;
        for (final Pipe pipe : pipes) {
            output = pipe.process(output);
        }
        return (OUT) output;
    }
}

The constructors and the static factory are pretty straightforward, so let’s concentrate on the withNextPipe method:

public <NEW_OUT> Pipeline<IN, NEW_OUT> withNextPipe(Pipe<OUT, NEW_OUT> pipe) {
    final ArrayList<Pipe<?, ?>> newPipes = new ArrayList<>(pipes);
    newPipes.add(pipe);
    return new Pipeline<>(newPipes);
}

Because we need a level of type-safety and don’t allow pipes that would fail the pipeline, we need to store the information about current input and output types. This information is stored in the Pipeline object. However, while adding a new Pipe, we need to update this information, and we cannot do this on the same object. That’s why the decision was to make the Pipeline immutable and adding a new Pipe will produce a new separate Pipeline.

The process part of the Pipeline is quite simple:

public OUT process(IN input) {
    Object output = input;
    for (final Pipe pipe : pipes) {
        output = pipe.process(output);
    }
    return (OUT) output;
}

However, we need to use raw types in this case. We ensured the Pipes were passed correctly, so there should be no problems. Ultimately, we have to cast the result to the expected type.

3.2. Simple Pipes

We can simplify a bit the example above and get rid of the Pipeline class entirely:

public interface Pipe<IN, OUT> {
    OUT process(IN input);

    default <NEW_OUT> Pipe<IN, NEW_OUT> add(Pipe <OUT, NEW_OUT> pipe) {
        return input -> pipe.execute(execute(input));
    }
}

This implementation is closer to the patterns discussed previously (Decorator and Chain of Responsibility), as it has the recursive structure of delegating from one pipe to another. However, all the Pipes are hidden inside a method call in this implementation, so getting the entire pipeline is hard. At the same time, this solution is quite simple and flexible compared to the previous implementation with a Pipeline.

3.3. Functional Solution

We can iterate over the previous solution and improve it using vanilla Java. Let’s take a look at the Pipe interface once again:

public interface Pipe<IN, OUT> {
    OUT process(IN input);

    default <NEW_OUT> Pipe<IN, NEW_OUT> add(Pipe <OUT, NEW_OUT> pipe) {
        return input -> pipe.execute(execute(input));
    }
}

This is a functional interface with one default method. We can substitute it with an already existing Function interface:

public interface Function<T, R> {
    //...
    R apply(T t);
    //...
}

Also, the Function interface contains a couple of useful methods, one of them is andThen:

default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) {
    Objects.requireNonNull(after);
    return (T t) -> after.apply(apply(t));
}

We can use it instead of our previous add method. Additionally, the Function interface provides a way to add a function at the beginning of our pipeline:

default <V> Function<V, R> compose(Function<? super V, ? extends T> before) {
    Objects.requireNonNull(before);
    return (V v) -> apply(before.apply(v));
}

By using Function, we can create very flexible and easy-to-use pipelines:

@Test
void whenCombiningThreeFunctions_andInitializingPipeline_thenResultIsCorrect() {
    Function<Integer, Integer> square = s -> s * s;
    Function<Integer, Integer> half = s -> s / 2;
    Function<Integer, String> toString = Object::toString;
    Function<Integer, String> pipeline = square.andThen(half)
        .andThen(toString);
    String result = pipeline.apply(5);
    String expected = "12";
    assertEquals(expected, result);
}

The pipeline takes the parameters directly, making this approach quite clean. As a bonus, we can extend out pipelines with BiFunctions:

@Test
void whenCombiningFunctionAndBiFunctions_andInitializingPipeline_thenResultIsCorrect() {
    BiFunction<Integer, Integer, Integer> add = Integer::sum;
    BiFunction<Integer, Integer, Integer> mul = (a, b) -> a * b;
    Function<Integer, String> toString = Object::toString;
    BiFunction<Integer, Integer, String> pipeline = add.andThen(a -> mul.apply(a, 2))
        .andThen(toString);
    String result = pipeline.apply(1, 2);
    String expected = "6";
    assertEquals(expected, result);
}

Because the andThen method takes Function, we must use currying to turn mul BiFunction into a function. Although we provide the parameter inside the function and not when invoking the pipeline, this solution is still straightforward and clear. The same approach is used in Stream API, and the sequence of operations in a stream is referred to as a pipeline.

4. Conclusion

In this article, we discussed the Pipeline pattern as a potent tool, while not popular and not included in the classical (GoF) list of known patterns.

We can implement this pattern in various ways, but also Java provides an excellent option to leverage it through the Stream API. In most cases, the solutions provided by Java are sufficient enough. In the case of specific pipelines, it’s possible to implement them from scratch.

The main benefit of this pattern is that it allows streamlining logic and makes the code more maintainable while being concise and clear. The complete source code for this example is available over on GitHub.