1. Overview

In this tutorial, we’ll take a close look at testing reactive streams with StepVerifier and TestPublisher.

We’ll base our investigation on a Spring Reactor application containing a chain of reactor operations.

2. Maven Dependencies

Spring Reactor comes with several classes for testing reactive streams.

We can get these by adding the reactor-test dependency:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
    <version>3.6.0</version>
</dependency>

3. StepVerifier

In general, reactor-test has two main uses:

  • creating a step-by-step test with StepVerifier
  • producing predefined data with TestPublisher to test downstream operators

The most common case in testing reactive streams is when we have a publisher (a Flux or Mono) defined in our code. We want to know how it behaves when someone subscribes. 

With the StepVerifier API, we can define our expectations of published elements in terms of what elements we expect and what happens when our stream completes.

First of all, let’s create a publisher with some operators.

We’ll use a Flux.just(T elements). This method will create a Flux that emits given elements and then completes.

Since advanced operators are beyond the scope of this article, we’ll just create a simple publisher that outputs only four-letter names mapped to uppercase:

Flux<String> source = Flux.just("John", "Monica", "Mark", "Cloe", "Frank", "Casper", "Olivia", "Emily", "Cate")
  .filter(name -> name.length() == 4)
  .map(String::toUpperCase);

3.1. Step-By-Step Scenario

Now, let’s test our source with StepVerifier in order to test what will happen when someone subscribes:

StepVerifier
  .create(source)
  .expectNext("JOHN")
  .expectNextMatches(name -> name.startsWith("MA"))
  .expectNext("CLOE", "CATE")
  .expectComplete()
  .verify();

First, we create a StepVerifier builder with the create method.

Next, we wrap our Flux source, which is under test. The first signal is verified with expectNext(T element), but really, we can pass any number of elements to expectNext.

We can also use expectNextMatches and provide a Predicate for a more custom match.

For our last expectation, we expect that our stream completes.

And finally, we use verify() to trigger our test.

3.2. Exceptions in StepVerifier

Now, let’s concatenate our Flux publisher with Mono.

We’ll have this Mono terminate immediately with an error when subscribed to:

Flux<String> error = source.concatWith(
  Mono.error(new IllegalArgumentException("Our message"))
);

Now, after four all elements, we expect our stream to terminate with an exception:

StepVerifier
  .create(error)
  .expectNextCount(4)
  .expectErrorMatches(throwable -> throwable instanceof IllegalArgumentException &&
    throwable.getMessage().equals("Our message")
  ).verify();

We can use only one method to verify exceptions. The OnError signal notifies the subscriber that the publisher is closed with an error state. Therefore, we can’t add more expectations afterward.

If it’s not necessary to check the type and message of the exception at once, then we can use one of the dedicated methods:

  • expectError() – expect any kind of error
  • expectError(Class<? extends Throwable> clazz*) –* expect an error of a specific type
  • expectErrorMessage(String errorMessage) – expect an error having a specific message
  • expectErrorMatches(Predicate predicate) – expect an error that matches a given predicate
  • expectErrorSatisfies(Consumer assertionConsumer) – consume a Throwable in order to do a custom assertion

3.3. Testing Time-Based Publishers

Sometimes our publishers are time-based.

For example, suppose that in our real-life application, we have a one-day delay between events. Now, obviously, we don’t want our tests to run for an entire day to verify expected behavior with such a delay.

StepVerifier.withVirtualTime builder is designed to avoid long-running tests.

We create a builder by calling withVirtualTime. Note that this method doesn’t take Flux as input. Instead, it takes a Supplier, which lazily creates an instance of the tested Flux after having the scheduler set up.

To demonstrate how we can test for an expected delay between events, let’s create a Flux with an interval of one second that runs for two seconds. If the timer runs correctly, we should only get two elements:

StepVerifier
  .withVirtualTime(() -> Flux.interval(Duration.ofSeconds(1)).take(2))
  .expectSubscription()
  .expectNoEvent(Duration.ofSeconds(1))
  .expectNext(0L)
  .thenAwait(Duration.ofSeconds(1))
  .expectNext(1L)
  .verifyComplete();

Note that we should avoid instantiating the Flux earlier in the code and then having the Supplier returning this variable. Instead, we should always instantiate Flux inside the lambda.

There are two major expectation methods that deal with time:

  • thenAwait(Duration duration) – pauses the evaluation of the steps; new events may occur during this time
  • expectNoEvent(Duration duration) – fails when any event appears during the duration; the sequence will pass with a given duration

Please notice that the first signal is the subscription event, so every expectNoEvent(Duration duration) should be preceded with expectSubscription().

3.4. Post-Execution Assertions with StepVerifier

So, as we’ve seen, it’s straightforward to describe our expectations step-by-step.

However, sometimes we need to verify additional state after our whole scenario played out successfully.

Let’s create a custom publisher. It will emit a few elements, then complete, pause, and emit one more element, which we’ll drop:

Flux<Integer> source = Flux.<Integer>create(emitter -> {
    emitter.next(1);
    emitter.next(2);
    emitter.next(3);
    emitter.complete();
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    emitter.next(4);
}).filter(number -> number % 2 == 0);

We expect that it will emit a 2, but drop a 4, since we called emitter.complete first.

So, let’s verify this behavior by using verifyThenAssertThat. This method returns StepVerifier.Assertions on which we can add our assertions:

@Test
public void droppedElements() {
    StepVerifier.create(source)
      .expectNext(2)
      .expectComplete()
      .verifyThenAssertThat()
      .hasDropped(4)
      .tookLessThan(Duration.ofMillis(1050));
}

4. Producing Data with TestPublisher

Sometimes, we might need some special data in order to trigger the chosen signals.

For instance, we may have a very particular situation that we want to test.

Alternatively, we may choose to implement our own operator and want to test how it behaves.

For both cases, we can use TestPublisher, which allows us to programmatically trigger miscellaneous signals:

  • next(T value) or next(T value, T rest) – send one or more signals to subscribers
  • emit(T value) – same as next(T) but invokes complete() afterwards
  • complete() – terminates a source with the complete signal
  • error(Throwable tr) – terminates a source with an error
  • flux() – convenient method to wrap a TestPublisher into Flux
  • mono() – same us flux() but wraps to a Mono

4.1. Creating a TestPublisher

Let’s create a simple TestPublisher that emits a few signals and then terminates with an exception:

TestPublisher
  .<String>create()
  .next("First", "Second", "Third")
  .error(new RuntimeException("Message"));

4.2. TestPublisher in Action

As we mentioned earlier, we may sometimes want to trigger a finely chosen signal that closely matches to a particular situation.

Now, it’s especially important in this case that we have complete mastery over the source of the data. To achieve this, we can again rely on TestPublisher.

First, let’s create a class that uses Flux as the constructor parameter to perform the operation getUpperCase():

class UppercaseConverter {
    private final Flux<String> source;

    UppercaseConverter(Flux<String> source) {
        this.source = source;
    }

    Flux<String> getUpperCase() {
        return source
          .map(String::toUpperCase);
    }   
}

Suppose that UppercaseConverter is our class with complex logic and operators, and we need to supply very particular data from the source publisher.

We can easily achieve this with TestPublisher:

final TestPublisher<String> testPublisher = TestPublisher.create();

UppercaseConverter uppercaseConverter = new UppercaseConverter(testPublisher.flux());

StepVerifier.create(uppercaseConverter.getUpperCase())
  .then(() -> testPublisher.emit("aA", "bb", "ccc"))
  .expectNext("AA", "BB", "CCC")
  .verifyComplete();

In this example, we create a test Flux publisher in the UppercaseConverter constructor parameter. Then, our TestPublisher emits three elements and completes.

4.3. Misbehaving TestPublisher

On the other hand, we can create a misbehaving TestPublisher with the createNonCompliant factory method. We need to pass in the constructor one enum value from TestPublisher.Violation. These values specify which parts of specifications our publisher may overlook.

Let’s take a look at a TestPublisher that won’t throw a NullPointerException for the null element:

TestPublisher
  .createNoncompliant(TestPublisher.Violation.ALLOW_NULL)
  .emit("1", "2", null, "3");

In addition to ALLOW_NULL, we can also use TestPublisher.Violation to:

  • REQUEST_OVERFLOW – allows calling next() without throwing an IllegalStateException when there’s an insufficient number of requests
  • CLEANUP_ON_TERMINATE – allows sending any termination signal several times in a row
  • DEFER_CANCELLATION – allows us to ignore cancellation signals and continue with emitting elements

5. Conclusion

In this article, we discussed various ways of testing reactive streams from the Spring Reactor project.

First, we saw how to use StepVerifier to test publishers. Then, we saw how to use TestPublisher. Similarly, we saw how to operate with a misbehaving TestPublisher.

As usual, the implementation of all our examples can be found in the Github project.