1. 概述

一旦开始使用响应式流(Reactive Streams),调试这类数据结构将成为我们面临的主要挑战之一。

考虑到响应式编程近年来的流行趋势,掌握如何高效地进行调试显得尤为重要。

我们先从一个基于响应式技术栈的项目入手,看看为什么调试响应式流常常会变得复杂。

2. 含有缺陷的场景示例

我们模拟一个真实场景:多个异步流程在运行,代码中人为引入了一些缺陷,最终导致异常发生。

为了便于理解,我们的应用会消费并处理一些简单的 Foo 对象流,每个对象包含 idformattedNamequantity 字段。

2.1. 分析日志输出

来看下面这段代码及其运行后产生的异常日志:

public void processFoo(Flux<Foo> flux) {
    flux.map(FooNameHelper::concatFooName)
      .map(FooNameHelper::substringFooName)
      .map(FooReporter::reportResult)
      .subscribe();
}

public void processFooInAnotherScenario(Flux<Foo> flux) {
    flux.map(FooNameHelper::substringFooName)
      .map(FooQuantityHelper::divideFooQuantity)
      .subscribe();
}

运行应用几秒钟后,我们会发现日志中时不时出现异常:

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
    at j.l.String.substring(String.java:1963)
    at com.baeldung.debugging.consumer.service.FooNameHelper
      .lambda$1(FooNameHelper.java:38)
    at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:100)
    at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at r.c.p.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:275)
    at r.c.p.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:849)
    at r.c.p.Operators$MonoSubscriber.complete(Operators.java:1476)
    at r.c.p.MonoDelayUntil$DelayUntilCoordinator.signal(MonoDelayUntil.java:211)
    at r.c.p.MonoDelayUntil$DelayUntilTrigger.onComplete(MonoDelayUntil.java:290)
    at r.c.p.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:118)
    at r.c.s.SchedulerTask.call(SchedulerTask.java:50)
    at r.c.s.SchedulerTask.call(SchedulerTask.java:27)
    at j.u.c.FutureTask.run(FutureTask.java:266)
    at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
      .access$201(ScheduledThreadPoolExecutor.java:180)
    at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
      .run(ScheduledThreadPoolExecutor.java:293)
    at j.u.c.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at j.u.c.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at j.l.Thread.run(Thread.java:748)

从根异常和堆栈信息中可以看到 FooNameHelper 类,可以推测某些 Foo 对象的 formattedName 字段长度小于预期。

✅ 虽然这是个简化场景,解决方案也显而易见,但在真实项目中,异常本身往往不足以定位问题。

⚠️ 问题是:这个异常是在 processFoo 还是 processFooInAnotherScenario 方法中触发的?有没有其他步骤影响了 formattedName 的值?

这些信息在日志中很难直接看出来。

更糟糕的是,有时异常甚至不是我们代码引发的。例如,如果依赖响应式 Repository 持久化 Foo 对象时出错,我们甚至不知道从哪开始调试。

因此,我们需要高效的调试工具来处理响应式流。

3. 使用 IDE 调试器进行调试

一种方法是使用 IDE 启动调试会话。

我们可以通过设置条件断点,逐步分析每个操作符执行时的数据流。

⚠️ 但这在多个异步流程并行运行时会变得非常繁琐。

而且,出于安全原因,我们有时无法直接调试。

4. 利用 doOnErrorsubscribe 参数记录日志

有时候,我们可以通过 subscribe 方法的第二个参数(Consumer)来添加上下文信息

public void processFoo(Flux<Foo> flux) {

    // ...

    flux.subscribe(foo -> {
        logger.debug("Finished processing Foo with Id {}", foo.getId());
    }, error -> {
        logger.error(
          "The following error happened on processFoo method!",
           error);
    });
}

✅ 如果不需要在 subscribe 中处理数据,也可以使用 doOnError

flux.doOnError(error -> {
    logger.error("The following error happened on processFoo method!", error);
}).subscribe();

这样至少能知道错误来自哪个方法,但仍无法知道具体是哪个数据元素触发了异常。

5. 激活 Reactor 的全局调试配置

Reactor 提供了 Hooks 类,用于配置 FluxMono 的行为。

只需添加以下语句,Reactor 就会为每个操作符调用生成堆栈追踪

Hooks.onOperatorDebug();

激活后,异常日志中会包含更有用的信息:

16:06:35.334 [parallel-1] ERROR c.b.d.consumer.service.FooService
  - The following error happened on processFoo method!
java.lang.StringIndexOutOfBoundsException: String index out of range: 15
    at j.l.String.substring(String.java:1963)
    at c.d.b.c.s.FooNameHelper.lambda$1(FooNameHelper.java:38)
    ...
    at j.l.Thread.run(Thread.java:748)
    Suppressed: r.c.p.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :
    reactor.core.publisher.Flux.map(Flux.java:5653)
    c.d.b.c.s.FooNameHelper.substringFooName(FooNameHelper.java:32)
    c.d.b.c.s.FooService.processFoo(FooService.java:24)
    c.d.b.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
    o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    o.s.s.s.DelegatingErrorHandlingRunnable
      .run(DelegatingErrorHandlingRunnable.java:54)
    o.u.c.Executors$RunnableAdapter.call(Executors.java:511)
    o.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
    |_    Flux.map ⇢ c.d.b.c.s.FooNameHelper
            .substringFooName(FooNameHelper.java:32)
    |_    Flux.map ⇢ c.d.b.c.s.FooReporter.reportResult(FooReporter.java:15)

📌 从中我们可以看到:

  1. Publisher 的组装路径(Assembly trace):确认错误首次发生在 processFoo 方法中。
  2. 观察到错误的操作符列表:包含每个操作符的位置和类。

5.1. 多线程场景下的调试

即使流在多个线程中执行,组装路径依然能正确生成。

例如:

public void processFoo(Flux<Foo> flux) {
    flux.publishOn(Schedulers.newSingle("foo-thread"))
       // ...
      .publishOn(Schedulers.newSingle("bar-thread"))
      .map(FooReporter::reportResult)
      .subscribeOn(Schedulers.newSingle("starter-thread"))
      .subscribe();
}

📌 虽然线程堆栈可能略有变化,但关键的组装路径和操作符追踪依然有效。

6. 单个流程的调试输出

为每个响应式流程都开启调试模式会带来性能开销。

因此,Reactor 提供了 checkpoint 操作符,用于为关键流程开启调试,更轻量

public void processFoo(Flux<Foo> flux) {
    
    // ...

    flux.checkpoint("Observed error on processFoo", true)
      .subscribe();
}

📌 日志输出中会包含:

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
    ...
Assembly trace from producer [reactor.core.publisher.FluxMap],
  described as [Observed error on processFoo] :
    r.c.p.Flux.checkpoint(Flux.java:3096)
    c.b.d.c.s.FooService.processFoo(FooService.java:26)
    c.b.d.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
    o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    o.s.s.s.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    j.u.c.Executors$RunnableAdapter.call(Executors.java:511)
    j.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
    |_    Flux.checkpoint ⇢ c.b.d.c.s.FooService.processFoo(FooService.java:26)

✅ 建议将 checkpoint 放在链的末尾,以确保能捕获下游异常。

📌 checkpoint 提供多个重载版本:

  • 无参数版本:不提供描述,不生成完整堆栈(节省性能)
  • 一个参数版本:提供描述,不生成完整堆栈
  • 两个参数版本:提供描述并生成完整堆栈(最详细,也最耗性能)

7. 记录元素序列

Reactor 还提供了一个 log() 方法,用于记录每个元素在流中的状态

public void processFoo(Flux<Foo> flux) {
    flux.map(FooNameHelper::concatFooName)
      .map(FooNameHelper::substringFooName)
      .log()
      .map(FooReporter::reportResult)
      .doOnError(error -> {
        logger.error("The following error happened on processFoo method!", error);
      })
      .subscribe();
}

📌 输出示例:

INFO  reactor.Flux.OnAssembly.1 - onSubscribe(FluxMap.MapSubscriber)
INFO  reactor.Flux.OnAssembly.1 - request(unbounded)
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=0, formattedName=theFo, quantity=8))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=1, formattedName=theFo, quantity=3))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=2, formattedName=theFo, quantity=5))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=3, formattedName=theFo, quantity=6))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=4, formattedName=theFo, quantity=6))
INFO  reactor.Flux.OnAssembly.1 - cancel()
ERROR c.b.d.consumer.service.FooService 
  - The following error happened on processFoo method!
...

✅ 通过 log() 可以清晰看到每个元素的状态变化,以及异常发生时的中断点。

⚠️ 但要注意,log() 也会带来性能开销,应谨慎使用。

8. 总结

如果不掌握正确的调试工具和机制,我们在排查响应式流问题时会浪费大量时间和精力。

特别是对于不熟悉响应式和异步数据结构的开发者来说,这些技巧尤为重要。

📌 本文完整示例代码见 GitHub 仓库


原始标题:Debugging Reactive Streams in Spring 5