1. 概述
一旦开始使用响应式流(Reactive Streams),调试这类数据结构将成为我们面临的主要挑战之一。
考虑到响应式编程近年来的流行趋势,掌握如何高效地进行调试显得尤为重要。
我们先从一个基于响应式技术栈的项目入手,看看为什么调试响应式流常常会变得复杂。
2. 含有缺陷的场景示例
我们模拟一个真实场景:多个异步流程在运行,代码中人为引入了一些缺陷,最终导致异常发生。
为了便于理解,我们的应用会消费并处理一些简单的 Foo
对象流,每个对象包含 id
、formattedName
和 quantity
字段。
2.1. 分析日志输出
来看下面这段代码及其运行后产生的异常日志:
public void processFoo(Flux<Foo> flux) {
flux.map(FooNameHelper::concatFooName)
.map(FooNameHelper::substringFooName)
.map(FooReporter::reportResult)
.subscribe();
}
public void processFooInAnotherScenario(Flux<Foo> flux) {
flux.map(FooNameHelper::substringFooName)
.map(FooQuantityHelper::divideFooQuantity)
.subscribe();
}
运行应用几秒钟后,我们会发现日志中时不时出现异常:
Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
at j.l.String.substring(String.java:1963)
at com.baeldung.debugging.consumer.service.FooNameHelper
.lambda$1(FooNameHelper.java:38)
at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:100)
at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at r.c.p.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:275)
at r.c.p.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:849)
at r.c.p.Operators$MonoSubscriber.complete(Operators.java:1476)
at r.c.p.MonoDelayUntil$DelayUntilCoordinator.signal(MonoDelayUntil.java:211)
at r.c.p.MonoDelayUntil$DelayUntilTrigger.onComplete(MonoDelayUntil.java:290)
at r.c.p.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:118)
at r.c.s.SchedulerTask.call(SchedulerTask.java:50)
at r.c.s.SchedulerTask.call(SchedulerTask.java:27)
at j.u.c.FutureTask.run(FutureTask.java:266)
at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
.access$201(ScheduledThreadPoolExecutor.java:180)
at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
.run(ScheduledThreadPoolExecutor.java:293)
at j.u.c.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at j.u.c.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at j.l.Thread.run(Thread.java:748)
从根异常和堆栈信息中可以看到 FooNameHelper
类,可以推测某些 Foo
对象的 formattedName
字段长度小于预期。
✅ 虽然这是个简化场景,解决方案也显而易见,但在真实项目中,异常本身往往不足以定位问题。
⚠️ 问题是:这个异常是在 processFoo
还是 processFooInAnotherScenario
方法中触发的?有没有其他步骤影响了 formattedName
的值?
这些信息在日志中很难直接看出来。
更糟糕的是,有时异常甚至不是我们代码引发的。例如,如果依赖响应式 Repository 持久化 Foo
对象时出错,我们甚至不知道从哪开始调试。
因此,我们需要高效的调试工具来处理响应式流。
3. 使用 IDE 调试器进行调试
一种方法是使用 IDE 启动调试会话。
我们可以通过设置条件断点,逐步分析每个操作符执行时的数据流。
⚠️ 但这在多个异步流程并行运行时会变得非常繁琐。
而且,出于安全原因,我们有时无法直接调试。
4. 利用 doOnError
或 subscribe
参数记录日志
有时候,我们可以通过 subscribe
方法的第二个参数(Consumer
)来添加上下文信息:
public void processFoo(Flux<Foo> flux) {
// ...
flux.subscribe(foo -> {
logger.debug("Finished processing Foo with Id {}", foo.getId());
}, error -> {
logger.error(
"The following error happened on processFoo method!",
error);
});
}
✅ 如果不需要在 subscribe
中处理数据,也可以使用 doOnError
:
flux.doOnError(error -> {
logger.error("The following error happened on processFoo method!", error);
}).subscribe();
这样至少能知道错误来自哪个方法,但仍无法知道具体是哪个数据元素触发了异常。
5. 激活 Reactor 的全局调试配置
Reactor 提供了 Hooks
类,用于配置 Flux
和 Mono
的行为。
只需添加以下语句,Reactor 就会为每个操作符调用生成堆栈追踪:
Hooks.onOperatorDebug();
激活后,异常日志中会包含更有用的信息:
16:06:35.334 [parallel-1] ERROR c.b.d.consumer.service.FooService
- The following error happened on processFoo method!
java.lang.StringIndexOutOfBoundsException: String index out of range: 15
at j.l.String.substring(String.java:1963)
at c.d.b.c.s.FooNameHelper.lambda$1(FooNameHelper.java:38)
...
at j.l.Thread.run(Thread.java:748)
Suppressed: r.c.p.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :
reactor.core.publisher.Flux.map(Flux.java:5653)
c.d.b.c.s.FooNameHelper.substringFooName(FooNameHelper.java:32)
c.d.b.c.s.FooService.processFoo(FooService.java:24)
c.d.b.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
o.s.s.s.DelegatingErrorHandlingRunnable
.run(DelegatingErrorHandlingRunnable.java:54)
o.u.c.Executors$RunnableAdapter.call(Executors.java:511)
o.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
|_ Flux.map ⇢ c.d.b.c.s.FooNameHelper
.substringFooName(FooNameHelper.java:32)
|_ Flux.map ⇢ c.d.b.c.s.FooReporter.reportResult(FooReporter.java:15)
📌 从中我们可以看到:
- Publisher 的组装路径(Assembly trace):确认错误首次发生在
processFoo
方法中。 - 观察到错误的操作符列表:包含每个操作符的位置和类。
5.1. 多线程场景下的调试
即使流在多个线程中执行,组装路径依然能正确生成。
例如:
public void processFoo(Flux<Foo> flux) {
flux.publishOn(Schedulers.newSingle("foo-thread"))
// ...
.publishOn(Schedulers.newSingle("bar-thread"))
.map(FooReporter::reportResult)
.subscribeOn(Schedulers.newSingle("starter-thread"))
.subscribe();
}
📌 虽然线程堆栈可能略有变化,但关键的组装路径和操作符追踪依然有效。
6. 单个流程的调试输出
为每个响应式流程都开启调试模式会带来性能开销。
因此,Reactor 提供了 checkpoint
操作符,用于为关键流程开启调试,更轻量:
public void processFoo(Flux<Foo> flux) {
// ...
flux.checkpoint("Observed error on processFoo", true)
.subscribe();
}
📌 日志输出中会包含:
Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
...
Assembly trace from producer [reactor.core.publisher.FluxMap],
described as [Observed error on processFoo] :
r.c.p.Flux.checkpoint(Flux.java:3096)
c.b.d.c.s.FooService.processFoo(FooService.java:26)
c.b.d.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
o.s.s.s.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
j.u.c.Executors$RunnableAdapter.call(Executors.java:511)
j.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
|_ Flux.checkpoint ⇢ c.b.d.c.s.FooService.processFoo(FooService.java:26)
✅ 建议将 checkpoint
放在链的末尾,以确保能捕获下游异常。
📌 checkpoint
提供多个重载版本:
- 无参数版本:不提供描述,不生成完整堆栈(节省性能)
- 一个参数版本:提供描述,不生成完整堆栈
- 两个参数版本:提供描述并生成完整堆栈(最详细,也最耗性能)
7. 记录元素序列
Reactor 还提供了一个 log()
方法,用于记录每个元素在流中的状态:
public void processFoo(Flux<Foo> flux) {
flux.map(FooNameHelper::concatFooName)
.map(FooNameHelper::substringFooName)
.log()
.map(FooReporter::reportResult)
.doOnError(error -> {
logger.error("The following error happened on processFoo method!", error);
})
.subscribe();
}
📌 输出示例:
INFO reactor.Flux.OnAssembly.1 - onSubscribe(FluxMap.MapSubscriber)
INFO reactor.Flux.OnAssembly.1 - request(unbounded)
INFO reactor.Flux.OnAssembly.1 - onNext(Foo(id=0, formattedName=theFo, quantity=8))
INFO reactor.Flux.OnAssembly.1 - onNext(Foo(id=1, formattedName=theFo, quantity=3))
INFO reactor.Flux.OnAssembly.1 - onNext(Foo(id=2, formattedName=theFo, quantity=5))
INFO reactor.Flux.OnAssembly.1 - onNext(Foo(id=3, formattedName=theFo, quantity=6))
INFO reactor.Flux.OnAssembly.1 - onNext(Foo(id=4, formattedName=theFo, quantity=6))
INFO reactor.Flux.OnAssembly.1 - cancel()
ERROR c.b.d.consumer.service.FooService
- The following error happened on processFoo method!
...
✅ 通过 log()
可以清晰看到每个元素的状态变化,以及异常发生时的中断点。
⚠️ 但要注意,log()
也会带来性能开销,应谨慎使用。
8. 总结
如果不掌握正确的调试工具和机制,我们在排查响应式流问题时会浪费大量时间和精力。
特别是对于不熟悉响应式和异步数据结构的开发者来说,这些技巧尤为重要。
📌 本文完整示例代码见 GitHub 仓库。