Using Global Hooks

Reactor has another category of configurable callbacks that are invoked by Reactor operators in various situations. They are all set in the Hooks class, and they fall into three categories:

1. Dropping Hooks

Dropping hooks are invoked when the source of an operator does not comply with the Reactive Streams specification. These kind of errors are outside of the normal execution path (that is, they cannot be propagated through onError).

Typically, a Publisher calls onNext on the operator despite having already called onCompleted on it previously. In that case, the onNext value is dropped. The same is true for an extraneous onError signal.

The corresponding hooks, onNextDropped and onErrorDropped, let you provide a global Consumer for these drops. For example, you can use it to log the drop and clean up resources associated with a value if needed (as it never makes it to the rest of the reactive chain).

Setting the hooks twice in a row is additive: every consumer you provide is invoked. The hooks can be fully reset to their defaults by using the Hooks.resetOn*Dropped() methods.

2. Internal Error Hook

One hook, onOperatorError, is invoked by operators when an unexpected Exception is thrown during the execution of their onNext, onError, and onComplete methods.

Unlike the previous category, this is still within the normal execution path. A typical example is the map operator with a map function that throws an Exception (such as division by zero). It is still possible at this point to go through the usual channel of onError, and that is what the operator does.

First, it passes the Exception through onOperatorError. The hook lets you inspect the error (and the incriminating value, if relevant) and change the Exception. Of course, you can also do something on the side, such as log and return the original Exception.

Note that you can set the onOperatorError hook multiple times. You can provide a String identifier for a particular BiFunction and subsequent calls with different keys concatenates the functions, which are all executed. On the other hand, reusing the same key twice lets you replace a function you previously set.

As a consequence, the default hook behavior can be both fully reset (by using Hooks.resetOnOperatorError()) or partially reset for a specific key only (by using Hooks.resetOnOperatorError(String)).

3. Assembly Hooks

These hooks tie in the lifecycle of operators. They are invoked when a chain of operators is assembled (that is, instantiated). onEachOperator lets you dynamically change each operator as it is assembled in the chain, by returning a different Publisher. onLastOperator is similar, except that it is invoked only on the last operator in the chain before the subscribe call.

If you want to decorate all operators with a cross-cutting Subscriber implementation, you can look into the Operators#lift* methods to help you deal with the various types of Reactor Publishers out there (Flux, Mono, ParallelFlux, GroupedFlux, and ConnectableFlux), as well as their Fuseable versions.

Like onOperatorError, these hooks are cumulative and can be identified with a key. They can also be reset partially or totally.

4. Hook Presets

The Hooks utility class provides two preset hooks. These are alternatives to the default behaviors that you can use by calling their corresponding method, rather than coming up with the hook yourself:

  • onNextDroppedFail(): onNextDropped used to throw a Exceptions.failWithCancel() exception. It now defaults to logging the dropped value at the DEBUG level. To go back to the old default behavior of throwing, use onNextDroppedFail().

  • onOperatorDebug(): This method activates debug mode. It ties into the onOperatorError hook, so calling resetOnOperatorError() also resets it. You can independently reset it by using resetOnOperatorDebug(), as it uses a specific key internally.