本系列文章索引《响应式Spring的道法术器》
前情提要 Reactor 3快速上手 | 响应式流规范
本文测试源码
在响应式编程中,调试是块难啃的骨头,这也是从命令式编程到响应式编程的切换过程中,学习曲线最陡峭的地方。
在命令式编程中,方法的调用关系摆在面上,我们通常可以通过stack trace追踪的问题出现的位置。但是在异步的响应式编程中,一方面有诸多的调用是在水面以下的,作为响应式开发库的使用者是不需要了解的;另一方面,基于事件的异步响应机制导致stack trace并非很容易在代码中按图索骥的。
比如下边的例子:
@Test
public void testBug() {
getMonoWithException()
.subscribe();
}
single()
方法只能接收一个元素,多了的话就会导致异常。上边的代码会报出如下的异常stack trace:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item
Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.tryOnNext(FluxFilterFuseable.java:129)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.tryOnNext(FluxMapFuseable.java:284)
at reactor.core.publisher.FluxRange$RangeSubscriptionConditional.fastPath(FluxRange.java:273)
at reactor.core.publisher.FluxRange$RangeSubscriptionConditional.request(FluxRange.java:251)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:316)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:170)
at reactor.core.publisher.MonoSingle$SingleSubscriber.request(MonoSingle.java:94)
at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:87)
at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:114)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:79)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:236)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:65)
at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:60)
at reactor.core.publisher.FluxFilterFuseable.subscribe(FluxFilterFuseable.java:51)
at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
at reactor.core.publisher.Mono.subscribe(Mono.java:3077)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:3185)
at reactor.core.publisher.Mono.subscribe(Mono.java:2962)
at com.getset.Test_2_7.testBug(Test_2_7.java:19)
...
比较明显的信息大概就是那句“Source emitted more than one item”。下边的内容基本都是在Reactor库内部的调用,而且上边的stack trace的问题是出自.subscribe()
那一行的。
如果对响应式流内部的Publisher、Subscriber和Subscription的机制比较熟悉,大概可以根据subscribe()
或request()
的顺序大概猜测出来getMonoWithException()
方法内大约经过了.map.filter.range
的操作链,但是除此之外,确实获取不到太多信息。
另一方面,命令式编程的方式比较容易使用IDE的调试工具进行单步或断点调试,而在异步编程方式下,通常也不太好使。
以上这些都是在异步的响应式编程中可能会遇到的窘境。解铃还须系铃人,对于响应式编程的调试还需要响应式编程库本身提供调试工具。
Reactor提供了开启调试模式的方法。
Hooks.onOperatorDebug();
这个方法能够开启调试模式,从而在抛出异常时打印出一些有用的信息。把这一行加上:
@Test
public void testBug() {
Hooks.onOperatorDebug();
getMonoWithException()
.subscribe();
}
这时候,除了上边的那一套stack trace之外,增加了以下内容:
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
reactor.core.publisher.Flux.single(Flux.java:6473)
com.getset.Test_2_7.getMonoWithException(Test_2_7.java:13)
com.getset.Test_2_7.testBug(Test_2_7.java:19)
Error has been observed by the following operator(s):
|_ Flux.single(Test_2_7.java:13)
这里就可以明确找出问题根源了。
Hooks.onOperatorDebug()
的实现原理在于在组装期包装各个操作符的构造方法,加入一些监测功能,所以这个 hook 应该在早于声明的时候被激活,最保险的方式就是在你程序的最开始就激活它。以map
操作符为例:
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper) {
if (this instanceof Fuseable) {
return onAssembly(new FluxMapFuseable<>(this, mapper));
}
return onAssembly(new FluxMap<>(this, mapper));
}
可以看到,每次在返回新的Flux对象的时候,都会调用onAssembly
方法,这里就是Reactor可以在组装期插手“搞事情”的地方。
Hooks.onOperatorDebug()
是一种全局性的Hook,会影响到应用中所有的操作符,所以其带来的性能成本也是比较大的。如果我们大概知道可能的问题在哪,而对整个应用开启调试模式,也容易被茫茫多的调试信息淹没。这时候,我们需要一种更加精准且廉价的定位方式。
如果你知道问题出在哪个链上,但是由于这个链的上游或下游来自其他的调用,就可以针对这个链使用checkpoint()进行问题定位。
checkpoint()
操作符就像一个Hook,不过它的作用范围仅限于这个链上。
@Test
public void checkBugWithCheckPoint() {
getMonoWithException()
.checkpoint()
.subscribe();
}
通过增加checkpoint()
操作符,仍然可以打印出调试信息:
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
reactor.core.publisher.Mono.checkpoint(Mono.java:1367)
reactor.core.publisher.Mono.checkpoint(Mono.java:1317)
com.getset.Test_2_7.checkBugWithCheckPoint(Test_2_7.java:25)
Error has been observed by the following operator(s):
|_ Mono.checkpoint(Test_2_7.java:25)
checkpoint()
方法还有变体checkpoint(String description)
,你可以传入一个独特的字符串以方便在 assembly traceback 中进行识别。 这样会省略掉stack trace,不过你可以依赖这个字符串来定位到出问题的组装点。checkpoint(String) 比 checkpoint 有更低的执行成本。如下:
@Test
public void checkBugWithCheckPoint2() {
getMonoWithException()
.checkpoint("checkBugWithCheckPoint2")
.subscribe();
}
加入用于标识的字符串(方法名),输出如下:
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly site of producer [reactor.core.publisher.MonoSingle] is identified by light checkpoint [I_HATE_BUGS]."description" : "checkBugWithCheckPoint2"
可以看到这里确实省略了调试的assembly traceback,但是我们通过上边的信息也可以定位到是single
的问题。
上边的例子比较简单,当有许多的调试信息打印出来的时候,这个标识字符串能够方便我们在许多的控制台输出中定位到问题。
如果既希望有调试信息assembly traceback,也希望用上标识字符串,还可以checkpoint(description, true)
来实现,第二个参数true
标识要打印assembly traceback。
最后一个方便调试的工具就是我们前边多次用到的log()
操作符了,它能够记录其上游的Flux或 Mono的事件(包括onNext
、onError
、onComplete
, 以及onSubscribe
、cancel
、和request
)。
log
操作符可以通过SLF4J使用类似Log4J和Logback这样的公共的日志工具来记录日志,如果SLF4J不存在的话,则直接将日志输出到控制台。
控制台使用 System.err 记录WARN
和ERROR
级别的日志,使用 System.out 记录其他级别的日志。
(17)Reactor的调试——响应式Spring的道法术器
原文地址:http://blog.51cto.com/liukang/2096220