1. Overview

Debugging reactive streams is probably one of the main challenges we’ll have to face once we start using these data structures.

And having in mind that Reactive Streams have been gaining popularity over the last years, it’s a good idea to know how we can carry out this task efficiently.

Let’s start by setting up a project using a reactive stack to see why this is often troublesome.

2. Scenario with Bugs

We want to simulate a real-case scenario, where several asynchronous processes are running, and where we’ve introduced some defects in the code that will eventually trigger exceptions.

To understand the big picture, we’ll mention that our application will be consuming and processing streams of simple Foo objects which contain only an id, a formattedName, and a quantity field.

2.1. Analyzing the Log Output

Now, let’s examine a snippet and the output it generates when an unhandled error shows up:

public void processFoo(Flux<Foo> flux) {
    flux.map(FooNameHelper::concatFooName)
      .map(FooNameHelper::substringFooName)
      .map(FooReporter::reportResult)
      .subscribe();
}

public void processFooInAnotherScenario(Flux<Foo> flux) {
    flux.map(FooNameHelper::substringFooName)
      .map(FooQuantityHelper::divideFooQuantity)
      .subscribe();
}

After running our application for a few seconds, we’ll realize that it’s logging exceptions from time to time.

Having a close look at one of the errors, we’ll find something similar to this:

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
    at j.l.String.substring(String.java:1963)
    at com.baeldung.debugging.consumer.service.FooNameHelper
      .lambda$1(FooNameHelper.java:38)
    at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:100)
    at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at r.c.p.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:275)
    at r.c.p.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:849)
    at r.c.p.Operators$MonoSubscriber.complete(Operators.java:1476)
    at r.c.p.MonoDelayUntil$DelayUntilCoordinator.signal(MonoDelayUntil.java:211)
    at r.c.p.MonoDelayUntil$DelayUntilTrigger.onComplete(MonoDelayUntil.java:290)
    at r.c.p.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:118)
    at r.c.s.SchedulerTask.call(SchedulerTask.java:50)
    at r.c.s.SchedulerTask.call(SchedulerTask.java:27)
    at j.u.c.FutureTask.run(FutureTask.java:266)
    at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
      .access$201(ScheduledThreadPoolExecutor.java:180)
    at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
      .run(ScheduledThreadPoolExecutor.java:293)
    at j.u.c.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at j.u.c.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at j.l.Thread.run(Thread.java:748)

Based on the root cause, and noticing the FooNameHelper class mentioned in the stack trace, we can imagine that on some occasions, our Foo objects are being processed with a formattedName value that is shorter than expected.

Of course, this is just a simplified case, and the solution seems rather obvious.

But let’s imagine this was a real-case scenario where the exception itself doesn’t help us solve the issue without some context information.

Was the exception triggered as a part of the processFoo, or of the processFooInAnotherScenario method?

Did other previous steps affect the formattedName field before arriving at this stage?

The log entry wouldn’t help us figure out these questions.

To make things worse, sometimes the exception isn’t even thrown from within our functionality.

For example, imagine we rely on a reactive repository to persist our Foo objects. If an error rises at that point, we might not even have a clue on where to get started to debug our code.

We need tools to debug reactive streams efficiently.

3. Using a Debug Session

One option to figure out what’s going on with our application is to start a debugging session using our favorite IDE.

We’ll have to set up a couple of conditional breakpoints and analyze the flow of data when each step in the stream gets executed.

Indeed, this might be a cumbersome task, especially when we’ve got a lot of reactive processes running and sharing resources.

Additionally, there are many circumstances where we can’t start a debugging session for security reasons.

4. Logging Information With the doOnErrorMethod or Using the Subscribe Parameter

Sometimes, we can add useful context information, by providing a Consumer as a second parameter of the subscribe method:

public void processFoo(Flux<Foo> flux) {

    // ...

    flux.subscribe(foo -> {
        logger.debug("Finished processing Foo with Id {}", foo.getId());
    }, error -> {
        logger.error(
          "The following error happened on processFoo method!",
           error);
    });
}

Note: It’s worth mentioning that if we don’t need to carry out further processing on the subscribe method, we can chain the doOnError function on our publisher:

flux.doOnError(error -> {
    logger.error("The following error happened on processFoo method!", error);
}).subscribe();

Now we’ll have some guidance on where the error might be coming from, even though we still don’t have much information about the actual element that generated the exception.

5. Activating Reactor’s Global Debug Configuration

The Reactor library provides a Hooks class that lets us configure the behavior of Flux and Mono operators.

By just adding the following statement, our application will instrument the calls to the publishers’ methods, wrap the construction of the operator, and capture a stack trace:

Hooks.onOperatorDebug();

After the debug mode gets activated, our exception logs will include some helpful information:

16:06:35.334 [parallel-1] ERROR c.b.d.consumer.service.FooService
  - The following error happened on processFoo method!
java.lang.StringIndexOutOfBoundsException: String index out of range: 15
    at j.l.String.substring(String.java:1963)
    at c.d.b.c.s.FooNameHelper.lambda$1(FooNameHelper.java:38)
    ...
    at j.l.Thread.run(Thread.java:748)
    Suppressed: r.c.p.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :
    reactor.core.publisher.Flux.map(Flux.java:5653)
    c.d.b.c.s.FooNameHelper.substringFooName(FooNameHelper.java:32)
    c.d.b.c.s.FooService.processFoo(FooService.java:24)
    c.d.b.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
    o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    o.s.s.s.DelegatingErrorHandlingRunnable
      .run(DelegatingErrorHandlingRunnable.java:54)
    o.u.c.Executors$RunnableAdapter.call(Executors.java:511)
    o.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
    |_    Flux.map ⇢ c.d.b.c.s.FooNameHelper
            .substringFooName(FooNameHelper.java:32)
    |_    Flux.map ⇢ c.d.b.c.s.FooReporter.reportResult(FooReporter.java:15)

As we can see, the first section remains relatively the same, but the following sections provide information about:

  1. The assembly trace of the publisher — here we can confirm that the error was first generated in the processFoo method.
  2. The operators that observed the error after it was first triggered, with the user class where they were chained.

Note: In this example, mainly to see this clearly, we’re adding the operations on different classes.

We can toggle the debug mode on or off at any time, but it won’t affect Flux and Mono objects that have already been instantiated.

5.1. Executing Operators on Different Threads

One other aspect to keep in mind is that the assembly trace is generated properly even if there are different threads operating on the stream.

Let’s have a look at the following example:

public void processFoo(Flux<Foo> flux) {
    flux.publishOn(Schedulers.newSingle("foo-thread"))
       // ...
      .publishOn(Schedulers.newSingle("bar-thread"))
      .map(FooReporter::reportResult)
      .subscribeOn(Schedulers.newSingle("starter-thread"))
      .subscribe();
}

Now if we check the logs we’ll appreciate that in this case, the first section might change a little bit, but the last two remain fairly the same.

The first part is the thread stack trace, therefore it’ll show only the operations carried out by a particular thread.

As we’ve seen, that’s not the most important section when we’re debugging the application, so this change is acceptable.

6. Activating the Debug Output on a Single Process

Instrumenting and generating a stack trace in every single reactive process is costly.

Thus, we should implement the former approach only in critical cases.

Anyhow, Reactor provides a way to enable the debug mode on single crucial processes, which is less memory-consuming.

We’re referring to the checkpoint operator:

public void processFoo(Flux<Foo> flux) {
    
    // ...

    flux.checkpoint("Observed error on processFoo", true)
      .subscribe();
}

Note that in this manner, the assembly trace will be logged at the checkpoint stage:

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
    ...
Assembly trace from producer [reactor.core.publisher.FluxMap],
  described as [Observed error on processFoo] :
    r.c.p.Flux.checkpoint(Flux.java:3096)
    c.b.d.c.s.FooService.processFoo(FooService.java:26)
    c.b.d.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
    o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    o.s.s.s.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    j.u.c.Executors$RunnableAdapter.call(Executors.java:511)
    j.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
    |_    Flux.checkpoint ⇢ c.b.d.c.s.FooService.processFoo(FooService.java:26)

We should implement the checkpoint method towards the end of the reactive chain.

Otherwise, the operator won’t be able to observe errors occurring downstream.

Also, let’s note that the library offers an overloaded method. We can avoid:

  • specifying a description for the observed error if we use the no-args option
  • generating a filled stack trace (which is the most costly operation), by providing just the custom description

7. Logging a Sequence of Elements

Finally, Reactor publishers offer one more method that could potentially come in handy in some cases.

By calling the log method in our reactive chain, the application will log each element in the flow with the state that it has at that stage.

Let’s try it out in our example:

public void processFoo(Flux<Foo> flux) {
    flux.map(FooNameHelper::concatFooName)
      .map(FooNameHelper::substringFooName)
      .log();
      .map(FooReporter::reportResult)
      .doOnError(error -> {
        logger.error("The following error happened on processFoo method!", error);
      })
      .subscribe();
}

And check the logs:

INFO  reactor.Flux.OnAssembly.1 - onSubscribe(FluxMap.MapSubscriber)
INFO  reactor.Flux.OnAssembly.1 - request(unbounded)
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=0, formattedName=theFo, quantity=8))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=1, formattedName=theFo, quantity=3))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=2, formattedName=theFo, quantity=5))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=3, formattedName=theFo, quantity=6))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=4, formattedName=theFo, quantity=6))
INFO  reactor.Flux.OnAssembly.1 - cancel()
ERROR c.b.d.consumer.service.FooService 
  - The following error happened on processFoo method!
...

We can easily see the state of each Foo object at this stage, and how the framework cancels the flow when an exception happens.

Of course, this approach is also costly, and we’ll have to use it in moderation.

8. Conclusion

We can consume a lot of our time and effort troubleshooting problems if we don’t know the tools and mechanisms to debug our application properly.

This is especially true if we’re not used to handling reactive and asynchronous data structures, and we need extra help to figure out how things work.

As always, the full example is available over on the GitHub repo.