Debugging Reactor
Switching from an imperative and synchronous programming paradigm to a reactive and asynchronous one can sometimes be daunting. One of the steepest steps in the learning curve is how to analyze and debug when something goes wrong.
In the imperative world, debugging is usually pretty straightforward. You can read the stacktrace and see where the problem originated. Was it entirely a failure of your code? Did the failure occur in some library code? If so, what part of your code called the library, potentially passing in improper parameters that ultimately caused the failure?
1. The Typical Reactor Stack Trace
When you shift to asynchronous code, things can get much more complicated.
Consider the following stack trace:
java.lang.IndexOutOfBoundsException: Source emitted more than one item
    at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:445)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:379)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
    at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
    at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:332)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
    at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
    at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
    at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
    at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3096)
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:3204)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3090)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3057)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3029)
    at reactor.guide.GuideTests.debuggingCommonStacktrace(GuideTests.java:995)There is a lot going on there. We get an IndexOutOfBoundsException, which tells us that
a source emitted more than one item.
We can probably quickly come to assume that this source is a Flux or a Mono, as confirmed by
the next line, which mentions MonoSingle. So it appears to be some sort of complaint
from a single operator.
Referring to the Javadoc for the Mono#single operator, we see that single has a contract:
The source must emit exactly one element. It appears we had a source that emitted more
than one and thus violated that contract.
Can we dig deeper and identify that source? The following rows are not very helpful. They
take us through the internals of what seems to be a reactive chain, through
multiple calls to subscribe and request.
By skimming over these rows, we can at least start to form a picture of the kind of chain
that went wrong: It seems to involve a MonoSingle, a FluxFlatMap, and a FluxRange
(each gets several rows in the trace, but overall these three classes are involved). So a
range().flatMap().single() chain maybe?
But what if we use that pattern a lot in our application? This still does not tell us
much, and simply searching for single is not going to find the problem. Then the last
line refers to some of our code. Finally, we are getting close.
Hold on, though. When we go to the source file, all we see is that a
pre-existing Flux is subscribed to, as follows:
toDebug
    .subscribeOn(Schedulers.immediate())
    .subscribe(System.out::println, Throwable::printStackTrace);All of this happened at subscription time, but the Flux itself was not
declared there. Worse, when we go to where the variable is declared, we see the following:
public Mono<String> toDebug; //please overlook the public class attributeThe variable is not instantiated where it is declared. We must assume a worst-case scenario where we find out that there could be a few different code paths that set it in the application. We remain unsure of which one caused the issue.
| This is kind of the Reactor equivalent of a runtime error, as opposed to a compilation error. | 
What we want to find out more easily is where the operator was added into the chain -
that is,  where the Flux was declared. We usually refer to that as the “assembly” of
the Flux.
2. Activating Debug Mode - aka tracebacks
| this section describes the easiest but also the slowest way to enable
the debugging capabilities due to the fact that it captures the stacktrace on every operator.
See The checkpoint()Alternative for a more fine grained way of debugging,
and Production-ready Global Debugging for a more advanced and performant global option. | 
Even though the stacktrace was still able to convey some information for someone with a bit of experience, we can see that it is not ideal by itself in more advanced cases.
Fortunately, Reactor comes with assembly-time instrumentation that is designed for debugging.
This is done by activating a global debug mode via the Hooks.onOperatorDebug() method at application start (or at
least before the incriminated Flux or Mono can be instantiated), as follows:
Hooks.onOperatorDebug();This starts instrumenting the calls to Reactor operator methods (where they are assembled into the chain) by wrapping the construction of the operator and capturing a stack trace there. Since this is done when the operator chain is declared, the hook should be activated before that, so the safest way is to activate it right at the start of your application.
Later on, if an exception occurs, the failing operator is able to refer to that capture and to rework the stack trace, appending additional information.
| We call this captured assembly information (and additional information added to the exceptions by Reactor in general) a traceback. | 
In the next section, we see how the stack trace differs and how to interpret that new information.
3. Reading a Stack Trace in Debug Mode
When we reuse our initial example but activate the operatorStacktrace debug feature,
several things happen:
- 
The stack trace, which points to subscription site and is thus less interesting, is cut after the first frame and set aside. 
- 
A special suppressed exception is added to the original exception (or amended if already there). 
- 
A message is constructed for that special exception with several sections. 
- 
First section will trace back to the assembly site of the operator that fails. 
- 
Second section will attempt to display the chain(s) that are built from this operator and have seen the error propagate 
- 
Last section is the original stack trace 
The full stack trace, once printed, is as follows:
java.lang.IndexOutOfBoundsException: Source emitted more than one item
    at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127) (1)
    Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: (2)
Assembly trace from producer [reactor.core.publisher.MonoSingle] : (3)
    reactor.core.publisher.Flux.single(Flux.java:7915)
    reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1017)
Error has been observed at the following site(s): (4)
    *_______Flux.single ⇢ at reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1017) (5)
    |_ Mono.subscribeOn ⇢ at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1071) (6)
Original Stack Trace: (7)
        at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127)
...
(8)
...
        at reactor.core.publisher.Mono.subscribeWith(Mono.java:4363)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4223)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4159)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4131)
        at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1067)| 1 | The original stack trace is truncated to a single frame. | 
| 2 | This is new: We see the wrapper operator that captures the stack. This is where the traceback starts to appear. | 
| 3 | First, we get some details about where the operator was assembled. | 
| 4 | Second, we get a notion of operator chain(s) through which the error propagated, from first to last (error site to subscribe site). | 
| 5 | Each operator that saw the error is mentioned along with the user class and line where it was used. Here we have a "root". | 
| 6 | Here we have a simple part of the chain. | 
| 7 | The rest of the stack trace is moved at the end… | 
| 8 | …showing a bit of the operator’s internals (so we removed a bit of the snippet here). | 
The captured stack trace is appended to the original error as a
suppressed OnAssemblyException. There are three parts to it, but the first section is the
most interesting. It shows the path of construction for the operator that triggered the
exception. Here, it shows that the single that caused our issue was actually created in the
scatterAndGather method.
Now that we are armed with enough information to find the culprit, we can have
a meaningful look at that scatterAndGather method:
private Mono<String> scatterAndGather(Flux<String> urls) {
    return urls.flatMap(url -> doRequest(url))
           .single(); (1)
}| 1 | Sure enough, here is our single. | 
Now we can see what the root cause of the error was a flatMap that performs
several HTTP calls to a few URLs but that is chained with single, which is too
restrictive. After a short git blame and a quick discussion with the author of
that line, we find out he meant to use the less restrictive take(1) instead.
We have solved our problem.
Now consider the following section in the stack trace:
Error has been observed at the following site(s):That second part of the traceback was not necessarily interesting in
this particular example, because the error was actually happening in the last
operator in the chain (the one closest to subscribe). Considering another
example might make it more clear:
FakeRepository.findAllUserByName(Flux.just("pedro", "simon", "stephane"))
              .transform(FakeUtils1.applyFilters)
              .transform(FakeUtils2.enrichUser)
              .blockLast();Now imagine that, inside findAllUserByName, there is a map that fails. Here,
we would see the following in the second part of the traceback:
Error has been observed at the following site(s):
    *________Flux.map ⇢ at reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:27)
    |_       Flux.map ⇢ at reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:28)
    |_    Flux.filter ⇢ at reactor.guide.FakeUtils1.lambda$static$1(FakeUtils1.java:29)
    |_ Flux.transform ⇢ at reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:39)
    |_   Flux.elapsed ⇢ at reactor.guide.FakeUtils2.lambda$static$0(FakeUtils2.java:30)
    |_ Flux.transform ⇢ at reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:40)This corresponds to the section of the chain(s) of operators that gets notified of the error:
- 
The exception originates in the first map. This one is identified as a root by the*connector and the fact_are used for indentation.
- 
The exception is seen by a second map(both in fact correspond to thefindAllUserByNamemethod).
- 
It is then seen by a filterand atransform, which indicate that part of the chain is constructed by a reusable transformation function (here, theapplyFiltersutility method).
- 
Finally, it is seen by an elapsedand atransform. Once again,elapsedis applied by the transformation function of that second transform.
In some cases where the same exception is propagated through multiple chains, the "root" marker *_
allows us to better separate such chains.
If a site is seen several time, there will be an (observed x times) after the call site information.
For instance, let us consider the following snippet:
public class MyClass {
    public void myMethod() {
        Flux<String> source = Flux.error(sharedError);
        Flux<String> chain1 = source.map(String::toLowerCase).filter(s -> s.length() < 4);
        Flux<String> chain2 = source.filter(s -> s.length() > 5).distinct();
        Mono<Void> when = Mono.when(chain1, chain2);
    }
}In the code above, error propagates to the when, going through two separate chains chain1 and chain2.
It would lead to a traceback containing the following:
Error has been observed at the following site(s):
    *_____Flux.error ⇢ at myClass.myMethod(MyClass.java:3) (observed 2 times)
    |_      Flux.map ⇢ at myClass.myMethod(MyClass.java:4)
    |_   Flux.filter ⇢ at myClass.myMethod(MyClass.java:4)
    *_____Flux.error ⇢ at myClass.myMethod(MyClass.java:3) (observed 2 times)
    |_   Flux.filter ⇢ at myClass.myMethod(MyClass.java:5)
    |_ Flux.distinct ⇢ at myClass.myMethod(MyClass.java:5)
    *______Mono.when ⇢ at myClass.myMethod(MyClass.java:7)We see that:
- 
there are 3 "root" elements (the whenis the true root).
- 
two chains starting from Flux.errorare visible.
- 
both chains seem to be based on the same Flux.errorsource (observed 2 times).
- 
first chain is Flux.error().map().filter
- 
second chain is `Flux.error().filter().distinct() 
| A note on tracebacks and suppressed exceptions:
As tracebacks are appended to original errors as suppressed exceptions, this can somewhat
interfere with another type of exception that uses this mechanism: composite exceptions.
Such exceptions can be created directly via Exceptions.multiple(Throwable…), or by some
operators that might join multiple erroring sources (likeFlux#flatMapDelayError). They
can be unwrapped into aListviaExceptions.unwrapMultiple(Throwable), in which case the traceback
would be considered a component of the composite and be part of the returnedList.
If that is somehow not desirable, tracebacks can be identified thanks toExceptions.isTraceback(Throwable)check, and excluded from such an unwrap by usingExceptions.unwrapMultipleExcludingTracebacks(Throwable)instead. | 
| Static exceptions: Optimizing exception handling sometimes involves reusing exceptions via pre-creating global static instances instead of creating new instances upon an exceptional state occuring. With such reused exceptions, tracebacks will start to accumulate as appended suppressed exceptions and inevitably lead to memory leaks. In most circumstances static exceptions are used in combination with disabling stacktrace capturing which is the biggest contributor to the cost of creating exceptions. However, exceptions can be created dynamically without a stack trace with only the marginal cost of memory allocation and no need for static instances. In case your code still needs to rely on static instances of exceptions, the way to avoid memory leaks with tracebacks is to disable suppression at the expense of not being able to see tracebacks if these exceptions are propagated down the reactive chain. | 
We deal with a form of instrumentation here, and creating a stack trace is costly. That is why this debugging feature should only be activated in a controlled manner, as a last resort.
3.1. The checkpoint() Alternative
The debug mode is global and affects every single operator assembled into a Flux or a
Mono inside the application. This has the benefit of allowing after-the-fact
debugging: Whatever the error, we can obtain additional information to debug it.
As we saw earlier, this global knowledge comes at the cost of an impact on performance (due to the number of populated stack traces). That cost can be reduced if we have an idea of likely problematic operators. However, we usually do not know which operators are likely to be problematic unless we observed an error in the wild, saw we were missing assembly information, and then modified the code to activate assembly tracking, hoping to observe the same error again.
In that scenario, we have to switch into debugging mode and make preparations in order to better observe a second occurrence of the error, this time capturing all the additional information.
If you can identify reactive chains that you assemble in your application for which
serviceability is critical, you can achieve a mix of both techniques with the
checkpoint() operator.
You can chain this operator into a method chain. The checkpoint operator works like the
hook version but only for its link of that particular chain.
There is also a checkpoint(String) variant that lets you add a unique String identifier
to the assembly traceback. This way, the stack trace is omitted and you rely on the
description to identify the assembly site. checkpoint(String) imposes less processing
cost than a regular checkpoint.
Last but not least, if you want to add a more generic description to the checkpoint but
still rely on the stack trace mechanism to identify the assembly site, you can force that
behavior by using the checkpoint("description", true) version. We are now back to the
initial message for the traceback, augmented with a description, as shown in the
following example:
Assembly trace from producer [reactor.core.publisher.ParallelSource], described as [descriptionCorrelation1234] : (1)
	reactor.core.publisher.ParallelFlux.checkpoint(ParallelFlux.java:215)
	reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225)
Error has been observed at the following site(s):
	|_	ParallelFlux.checkpoint ⇢ reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225)| 1 | descriptionCorrelation1234is the description provided in thecheckpoint. | 
The description could be a static identifier or user-readable description or a wider correlation ID (for instance, coming from a header in the case of an HTTP request).
| When global debugging is enabled in conjunction with checkpoints, the global debugging traceback style is applied and checkpoints are only reflected in the "Error has been observed…" section. As a result, the name of heavy checkpoints is not visible in this case. | 
4. Production-ready Global Debugging
Project Reactor comes with a separate Java Agent that instruments your code and adds debugging info without paying the cost of capturing the stacktrace on every operator call. The behaviour is very similar to Activating Debug Mode - aka tracebacks, but without the runtime performance overhead.
To use it in your app, you must add it as a dependency.
The following example shows how to add reactor-tools as a dependency in Maven:
<dependencies><dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-tools</artifactId>
    (1)
</dependency>| 1 | If you use the BOM, you do not need to specify a <version>. | 
The following example shows how to add reactor-tools as a dependency in Gradle:
dependencies blockdependencies {
   compile 'io.projectreactor:reactor-tools'
}It also needs to be explicitly initialized with:
ReactorDebugAgent.init();| Since the implementation will instrument your classes when they are loaded, the best place to put it is before everything else in your main(String[]) method: | 
public static void main(String[] args) {
    ReactorDebugAgent.init();
    SpringApplication.run(Application.class, args);
}You may also re-process existing classes with processExistingClasses() if you cannot run the init eagerly. For example, in JUnit5 tests from a TestExecutionListener or even in the class static initializer block:
ReactorDebugAgent.init();
ReactorDebugAgent.processExistingClasses();| Be aware that the re-processing takes a couple of seconds due to the need to iterate over all loaded classes and apply the transformation. Use it only if you see that some call-sites are not instrumented. | 
4.1. Limitations
ReactorDebugAgent is implemented as a Java Agent and uses ByteBuddy
to perform the self-attach.
Self-attach may not work on some JVMs, please refer to ByteBuddy’s documentation for more details.
4.2. Running ReactorDebugAgent as a Java Agent
If your environment does not support ByteBuddy’s self-attachment, you can run reactor-tools as a
Java Agent:
java -javaagent:reactor-tools.jar -jar app.jar4.3. Running ReactorDebugAgent at build time
It is also possible to run reactor-tools at build time. To do so, you need to apply it as a
plugin for ByteBuddy’s build instrumentation.
| The transformation will only be applied to your project’s classes. The classpath libraries will not be instrumented. | 
<dependencies>
	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-tools</artifactId>
		(1)
		<classifier>original</classifier> (2)
		<scope>runtime</scope>
	</dependency>
</dependencies>
<build>
	<plugins>
		<plugin>
			<groupId>net.bytebuddy</groupId>
			<artifactId>byte-buddy-maven-plugin</artifactId>
			<configuration>
				<transformations>
					<transformation>
						<plugin>reactor.tools.agent.ReactorDebugByteBuddyPlugin</plugin>
					</transformation>
				</transformations>
			</configuration>
		</plugin>
	</plugins>
</build>| 1 | If you use the BOM, you do not need to specify a <version>. | 
| 2 | classifierhere is important. | 
plugins {
	id 'net.bytebuddy.byte-buddy-gradle-plugin' version '1.10.9'
}
configurations {
	byteBuddyPlugin
}
dependencies {
	byteBuddyPlugin(
			group: 'io.projectreactor',
			name: 'reactor-tools',
			(1)
			classifier: 'original', (2)
	)
}
byteBuddy {
	transformation {
		plugin = "reactor.tools.agent.ReactorDebugByteBuddyPlugin"
		classPath = configurations.byteBuddyPlugin
	}
}| 1 | If you use the BOM, you do not need to specify a version. | 
| 2 | classifierhere is important. | 
5. Logging a Sequence
In addition to stack trace debugging and analysis, another powerful tool to have in your toolkit is the ability to trace and log events in an asynchronous sequence.
The log() operator can do just that. Chained inside a sequence, it peeks at every
event of the Flux or Mono upstream of it (including onNext, onError, and
onComplete as well as subscriptions, cancellations, and requests).
For instance, suppose we have Logback activated and configured and a chain like
range(1,10).take(3). By placing a log() before the take, we can get some
insight into how it works and what kind of events it propagates upstream to the range,
as the following example shows:
Flux<Integer> flux = Flux.range(1, 10)
                         .log()
                         .take(3);
flux.subscribe();This prints out the following (through the logger’s console appender):
10:45:20.200 [main] INFO  reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) (1)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | request(3) (2)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(1) (3)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(2)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(3)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | cancel() (4)Here, in addition to the logger’s own formatter (time, thread, level, message), the
log() operator outputs a few things in its own format:
| 1 | reactor.Flux.Range.1is an automatic category for the log, in case you use the
operator several times in a chain. It lets you distinguish which operator’s events
are logged (in this case, therange). You can overwrite the identifier with your own
custom category by using thelog(String)method signature. After a few separating
characters, the actual event gets printed. Here, we get anonSubscribecall, arequestcall, threeonNextcalls, and acancelcall. For the first line,onSubscribe, we get the implementation of theSubscriber, which usually corresponds
to the operator-specific implementation. Between square brackets, we get additional
information, including whether the operator can be automatically optimized through
synchronous or asynchronous fusion. | 
| 2 | On the second line, we can see that take limited the request to upstream to 3. | 
| 3 | Then the range sends three values in a row. | 
| 4 | On the last line, we see cancel(). | 
The second (2) and last lines (4) are the most interesting. We can see the take in action there.
It leverages backpressure in order to ask the source for exactly the expected amount of elements.
After having received enough elements, it tells the source no more items will be needed by calling cancel().
Note that if downstream had itself used backpressure, eg. by requesting only 1 element,
the take operator would have honored that (it caps the request when propagating it from downstream
to upstream).