Debugging Reactor

Switching from an imperative and synchronous programming paradigm to a reactive and asynchronous one can sometimes be daunting. One of the steepest steps in the learning curve is how to analyze and debug when something goes wrong.

In the imperative world, debugging is usually pretty straightforward. You can read the stacktrace and see where the problem originated. Was it entirely a failure of your code? Did the failure occur in some library code? If so, what part of your code called the library, potentially passing in improper parameters that ultimately caused the failure?

1. The Typical Reactor Stack Trace

When you shift to asynchronous code, things can get much more complicated.

Consider the following stack trace:

A typical Reactor stack trace
java.lang.IndexOutOfBoundsException: Source emitted more than one item
    at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:445)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:379)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
    at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
    at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:332)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
    at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
    at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
    at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
    at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3096)
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:3204)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3090)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3057)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3029)
    at reactor.guide.GuideTests.debuggingCommonStacktrace(GuideTests.java:995)

There is a lot going on there. We get an IndexOutOfBoundsException, which tells us that a source emitted more than one item.

We can probably quickly come to assume that this source is a Flux or a Mono, as confirmed by the next line, which mentions MonoSingle. So it appears to be some sort of complaint from a single operator.

Referring to the Javadoc for the Mono#single operator, we see that single has a contract: The source must emit exactly one element. It appears we had a source that emitted more than one and thus violated that contract.

Can we dig deeper and identify that source? The following rows are not very helpful. They take us through the internals of what seems to be a reactive chain, through multiple calls to subscribe and request.

By skimming over these rows, we can at least start to form a picture of the kind of chain that went wrong: It seems to involve a MonoSingle, a FluxFlatMap, and a FluxRange (each gets several rows in the trace, but overall these three classes are involved). So a range().flatMap().single() chain maybe?

But what if we use that pattern a lot in our application? This still does not tell us much, and simply searching for single is not going to find the problem. Then the last line refers to some of our code. Finally, we are getting close.

Hold on, though. When we go to the source file, all we see is that a pre-existing Flux is subscribed to, as follows:

toDebug
    .subscribeOn(Schedulers.immediate())
    .subscribe(System.out::println, Throwable::printStackTrace);

All of this happened at subscription time, but the Flux itself was not declared there. Worse, when we go to where the variable is declared, we see the following:

public Mono<String> toDebug; //please overlook the public class attribute

The variable is not instantiated where it is declared. We must assume a worst-case scenario where we find out that there could be a few different code paths that set it in the application. We remain unsure of which one caused the issue.

This is kind of the Reactor equivalent of a runtime error, as opposed to a compilation error.

What we want to find out more easily is where the operator was added into the chain - that is, where the Flux was declared. We usually refer to that as the “assembly” of the Flux.

2. Activating Debug Mode - aka tracebacks

this section describes the easiest but also the slowest way to enable the debugging capabilities due to the fact that it captures the stacktrace on every operator. See The checkpoint() Alternative for a more fine grained way of debugging, and Production-ready Global Debugging for a more advanced and performant global option.

Even though the stacktrace was still able to convey some information for someone with a bit of experience, we can see that it is not ideal by itself in more advanced cases.

Fortunately, Reactor comes with assembly-time instrumentation that is designed for debugging.

This is done by activating a global debug mode via the Hooks.onOperatorDebug() method at application start (or at least before the incriminated Flux or Mono can be instantiated), as follows:

Hooks.onOperatorDebug();

This starts instrumenting the calls to Reactor operator methods (where they are assembled into the chain) by wrapping the construction of the operator and capturing a stack trace there. Since this is done when the operator chain is declared, the hook should be activated before that, so the safest way is to activate it right at the start of your application.

Later on, if an exception occurs, the failing operator is able to refer to that capture and to rework the stack trace, appending additional information.

We call this captured assembly information (and additional information added to the exceptions by Reactor in general) a traceback.

In the next section, we see how the stack trace differs and how to interpret that new information.

3. Reading a Stack Trace in Debug Mode

When we reuse our initial example but activate the operatorStacktrace debug feature, several things happen:

  1. The stack trace, which points to subscription site and is thus less interesting, is cut after the first frame and set aside.

  2. A special suppressed exception is added to the original exception (or amended if already there).

  3. A message is constructed for that special exception with several sections.

  4. First section will trace back to the assembly site of the operator that fails.

  5. Second section will attempt to display the chain(s) that are built from this operator and have seen the error propagate

  6. Last section is the original stack trace

The full stack trace, once printed, is as follows:

java.lang.IndexOutOfBoundsException: Source emitted more than one item
    at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127) (1)
    Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: (2)
Assembly trace from producer [reactor.core.publisher.MonoSingle] : (3)
    reactor.core.publisher.Flux.single(Flux.java:7915)
    reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1017)
Error has been observed at the following site(s): (4)
    *_______Flux.single ⇢ at reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1017) (5)
    |_ Mono.subscribeOn ⇢ at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1071) (6)
Original Stack Trace: (7)
        at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127)
...
(8)
...
        at reactor.core.publisher.Mono.subscribeWith(Mono.java:4363)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4223)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4159)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4131)
        at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1067)
1 The original stack trace is truncated to a single frame.
2 This is new: We see the wrapper operator that captures the stack. This is where the traceback starts to appear.
3 First, we get some details about where the operator was assembled.
4 Second, we get a notion of operator chain(s) through which the error propagated, from first to last (error site to subscribe site).
5 Each operator that saw the error is mentioned along with the user class and line where it was used. Here we have a "root".
6 Here we have a simple part of the chain.
7 The rest of the stack trace is moved at the end…​
8 …​showing a bit of the operator’s internals (so we removed a bit of the snippet here).

The captured stack trace is appended to the original error as a suppressed OnAssemblyException. There are three parts to it, but the first section is the most interesting. It shows the path of construction for the operator that triggered the exception. Here, it shows that the single that caused our issue was actually created in the scatterAndGather method.

Now that we are armed with enough information to find the culprit, we can have a meaningful look at that scatterAndGather method:

private Mono<String> scatterAndGather(Flux<String> urls) {
    return urls.flatMap(url -> doRequest(url))
           .single(); (1)
}
1 Sure enough, here is our single.

Now we can see what the root cause of the error was a flatMap that performs several HTTP calls to a few URLs but that is chained with single, which is too restrictive. After a short git blame and a quick discussion with the author of that line, we find out he meant to use the less restrictive take(1) instead.

We have solved our problem.

Now consider the following section in the stack trace:

Error has been observed at the following site(s):

That second part of the traceback was not necessarily interesting in this particular example, because the error was actually happening in the last operator in the chain (the one closest to subscribe). Considering another example might make it more clear:

FakeRepository.findAllUserByName(Flux.just("pedro", "simon", "stephane"))
              .transform(FakeUtils1.applyFilters)
              .transform(FakeUtils2.enrichUser)
              .blockLast();

Now imagine that, inside findAllUserByName, there is a map that fails. Here, we would see the following in the second part of the traceback:

Error has been observed at the following site(s):
    *________Flux.map ⇢ at reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:27)
    |_       Flux.map ⇢ at reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:28)
    |_    Flux.filter ⇢ at reactor.guide.FakeUtils1.lambda$static$1(FakeUtils1.java:29)
    |_ Flux.transform ⇢ at reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:39)
    |_   Flux.elapsed ⇢ at reactor.guide.FakeUtils2.lambda$static$0(FakeUtils2.java:30)
    |_ Flux.transform ⇢ at reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:40)

This corresponds to the section of the chain(s) of operators that gets notified of the error:

  1. The exception originates in the first map. This one is identified as a root by the * connector and the fact _ are used for indentation.

  2. The exception is seen by a second map (both in fact correspond to the findAllUserByName method).

  3. It is then seen by a filter and a transform, which indicate that part of the chain is constructed by a reusable transformation function (here, the applyFilters utility method).

  4. Finally, it is seen by an elapsed and a transform. Once again, elapsed is applied by the transformation function of that second transform.

In some cases where the same exception is propagated through multiple chains, the "root" marker *_ allows us to better separate such chains. If a site is seen several time, there will be an (observed x times) after the call site information.

For instance, let us consider the following snippet:

public class MyClass {
    public void myMethod() {
        Flux<String> source = Flux.error(sharedError);
        Flux<String> chain1 = source.map(String::toLowerCase).filter(s -> s.length() < 4);
        Flux<String> chain2 = source.filter(s -> s.length() > 5).distinct();

        Mono<Void> when = Mono.when(chain1, chain2);
    }
}

In the code above, error propagates to the when, going through two separate chains chain1 and chain2. It would lead to a traceback containing the following:

Error has been observed at the following site(s):
    *_____Flux.error ⇢ at myClass.myMethod(MyClass.java:3) (observed 2 times)
    |_      Flux.map ⇢ at myClass.myMethod(MyClass.java:4)
    |_   Flux.filter ⇢ at myClass.myMethod(MyClass.java:4)
    *_____Flux.error ⇢ at myClass.myMethod(MyClass.java:3) (observed 2 times)
    |_   Flux.filter ⇢ at myClass.myMethod(MyClass.java:5)
    |_ Flux.distinct ⇢ at myClass.myMethod(MyClass.java:5)
    *______Mono.when ⇢ at myClass.myMethod(MyClass.java:7)

We see that:

  1. there are 3 "root" elements (the when is the true root).

  2. two chains starting from Flux.error are visible.

  3. both chains seem to be based on the same Flux.error source (observed 2 times).

  4. first chain is Flux.error().map().filter

  5. second chain is `Flux.error().filter().distinct()

A note on tracebacks and suppressed exceptions: As tracebacks are appended to original errors as suppressed exceptions, this can somewhat interfere with another type of exception that uses this mechanism: composite exceptions. Such exceptions can be created directly via Exceptions.multiple(Throwable…​), or by some operators that might join multiple erroring sources (like Flux#flatMapDelayError). They can be unwrapped into a List via Exceptions.unwrapMultiple(Throwable), in which case the traceback would be considered a component of the composite and be part of the returned List. If that is somehow not desirable, tracebacks can be identified thanks to Exceptions.isTraceback(Throwable) check, and excluded from such an unwrap by using Exceptions.unwrapMultipleExcludingTracebacks(Throwable) instead.

We deal with a form of instrumentation here, and creating a stack trace is costly. That is why this debugging feature should only be activated in a controlled manner, as a last resort.

3.1. The checkpoint() Alternative

The debug mode is global and affects every single operator assembled into a Flux or a Mono inside the application. This has the benefit of allowing after-the-fact debugging: Whatever the error, we can obtain additional information to debug it.

As we saw earlier, this global knowledge comes at the cost of an impact on performance (due to the number of populated stack traces). That cost can be reduced if we have an idea of likely problematic operators. However, we usually do not know which operators are likely to be problematic unless we observed an error in the wild, saw we were missing assembly information, and then modified the code to activate assembly tracking, hoping to observe the same error again.

In that scenario, we have to switch into debugging mode and make preparations in order to better observe a second occurrence of the error, this time capturing all the additional information.

If you can identify reactive chains that you assemble in your application for which serviceability is critical, you can achieve a mix of both techniques with the checkpoint() operator.

You can chain this operator into a method chain. The checkpoint operator works like the hook version but only for its link of that particular chain.

There is also a checkpoint(String) variant that lets you add a unique String identifier to the assembly traceback. This way, the stack trace is omitted and you rely on the description to identify the assembly site. checkpoint(String) imposes less processing cost than a regular checkpoint.

Last but not least, if you want to add a more generic description to the checkpoint but still rely on the stack trace mechanism to identify the assembly site, you can force that behavior by using the checkpoint("description", true) version. We are now back to the initial message for the traceback, augmented with a description, as shown in the following example:

Assembly trace from producer [reactor.core.publisher.ParallelSource], described as [descriptionCorrelation1234] : (1)
	reactor.core.publisher.ParallelFlux.checkpoint(ParallelFlux.java:215)
	reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225)
Error has been observed at the following site(s):
	|_	ParallelFlux.checkpoint ⇢ reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225)
1 descriptionCorrelation1234 is the description provided in the checkpoint.

The description could be a static identifier or user-readable description or a wider correlation ID (for instance, coming from a header in the case of an HTTP request).

When global debugging is enabled in conjunction with checkpoints, the global debugging traceback style is applied and checkpoints are only reflected in the "Error has been observed…​" section. As a result, the name of heavy checkpoints is not visible in this case.

4. Production-ready Global Debugging

Project Reactor comes with a separate Java Agent that instruments your code and adds debugging info without paying the cost of capturing the stacktrace on every operator call. The behaviour is very similar to Activating Debug Mode - aka tracebacks, but without the runtime performance overhead.

To use it in your app, you must add it as a dependency.

The following example shows how to add reactor-tools as a dependency in Maven:

reactor-tools in Maven, in <dependencies>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-tools</artifactId>
    (1)
</dependency>
1 If you use the BOM, you do not need to specify a <version>.

The following example shows how to add reactor-tools as a dependency in Gradle:

reactor-tools in Gradle, amend the dependencies block
dependencies {
   compile 'io.projectreactor:reactor-tools'
}

It also needs to be explicitly initialized with:

ReactorDebugAgent.init();
Since the implementation will instrument your classes when they are loaded, the best place to put it is before everything else in your main(String[]) method:
public static void main(String[] args) {
    ReactorDebugAgent.init();
    SpringApplication.run(Application.class, args);
}

You may also re-process existing classes with processExistingClasses() if you cannot run the init eagerly. For example, in JUnit5 tests from a TestExecutionListener or even in the class static initializer block:

ReactorDebugAgent.init();
ReactorDebugAgent.processExistingClasses();
Be aware that the re-processing takes a couple of seconds due to the need to iterate over all loaded classes and apply the transformation. Use it only if you see that some call-sites are not instrumented.

4.1. Limitations

ReactorDebugAgent is implemented as a Java Agent and uses ByteBuddy to perform the self-attach. Self-attach may not work on some JVMs, please refer to ByteBuddy’s documentation for more details.

4.2. Running ReactorDebugAgent as a Java Agent

If your environment does not support ByteBuddy’s self-attachment, you can run reactor-tools as a Java Agent:

java -javaagent reactor-tools.jar -jar app.jar

4.3. Running ReactorDebugAgent at build time

It is also possible to run reactor-tools at build time. To do so, you need to apply it as a plugin for ByteBuddy’s build instrumentation.

The transformation will only be applied to your project’s classes. The classpath libraries will not be instrumented.
reactor-tools with ByteBuddy’s Maven plugin
<dependencies>
	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-tools</artifactId>
		(1)
		<classifier>original</classifier> (2)
		<scope>runtime</scope>
	</dependency>
</dependencies>

<build>
	<plugins>
		<plugin>
			<groupId>net.bytebuddy</groupId>
			<artifactId>byte-buddy-maven-plugin</artifactId>
			<configuration>
				<transformations>
					<transformation>
						<plugin>reactor.tools.agent.ReactorDebugByteBuddyPlugin</plugin>
					</transformation>
				</transformations>
			</configuration>
		</plugin>
	</plugins>
</build>
1 If you use the BOM, you do not need to specify a <version>.
2 classifier here is important.
reactor-tools with ByteBuddy’s Gradle plugin
plugins {
	id 'net.bytebuddy.byte-buddy-gradle-plugin' version '1.10.9'
}

configurations {
	byteBuddyPlugin
}

dependencies {
	byteBuddyPlugin(
			group: 'io.projectreactor',
			name: 'reactor-tools',
			(1)
			classifier: 'original', (2)
	)
}

byteBuddy {
	transformation {
		plugin = "reactor.tools.agent.ReactorDebugByteBuddyPlugin"
		classPath = configurations.byteBuddyPlugin
	}
}
1 If you use the BOM, you do not need to specify a version.
2 classifier here is important.

5. Logging a Sequence

In addition to stack trace debugging and analysis, another powerful tool to have in your toolkit is the ability to trace and log events in an asynchronous sequence.

The log() operator can do just that. Chained inside a sequence, it peeks at every event of the Flux or Mono upstream of it (including onNext, onError, and onComplete as well as subscriptions, cancellations, and requests).

A note on logging implementation

The log operator uses the Loggers utility class, which picks up common logging frameworks such as Log4J and Logback through SLF4J and defaults to logging to the console if SLF4J is unavailable.

The console fallback uses System.err for the WARN and ERROR log levels and System.out for everything else.

If you prefer a JDK java.util.logging fallback, as in 3.0.x, you can get it by setting the reactor.logging.fallback system property to JDK.

In all cases, when logging in production you should take care to configure the underlying logging framework to use its most asynchronous and non-blocking approach — for instance, an AsyncAppender in Logback or AsyncLogger in Log4j 2.

For instance, suppose we have Logback activated and configured and a chain like range(1,10).take(3). By placing a log() before the take, we can get some insight into how it works and what kind of events it propagates upstream to the range, as the following example shows:

Flux<Integer> flux = Flux.range(1, 10)
                         .log()
                         .take(3);
flux.subscribe();

This prints out the following (through the logger’s console appender):

10:45:20.200 [main] INFO  reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) (1)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | request(3) (2)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(1) (3)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(2)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(3)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | cancel() (4)

Here, in addition to the logger’s own formatter (time, thread, level, message), the log() operator outputs a few things in its own format:

1 reactor.Flux.Range.1 is an automatic category for the log, in case you use the operator several times in a chain. It lets you distinguish which operator’s events are logged (in this case, the range). You can overwrite the identifier with your own custom category by using the log(String) method signature. After a few separating characters, the actual event gets printed. Here, we get an onSubscribe call, a request call, three onNext calls, and a cancel call. For the first line, onSubscribe, we get the implementation of the Subscriber, which usually corresponds to the operator-specific implementation. Between square brackets, we get additional information, including whether the operator can be automatically optimized through synchronous or asynchronous fusion.
2 On the second line, we can see that take limited the request to upstream to 3.
3 Then the range sends three values in a row.
4 On the last line, we see cancel().

The second (2) and last lines (4) are the most interesting. We can see the take in action there. It leverages backpressure in order to ask the source for exactly the expected amount of elements. After having received enough elements, it tells the source no more items will be needed by calling cancel(). Note that if downstream had itself used backpressure, eg. by requesting only 1 element, the take operator would have honored that (it caps the request when propagating it from downstream to upstream).