Using Global Hooks
Reactor has another category of configurable callbacks that are invoked by Reactor
operators in various situations. They are all set in the Hooks class, and they fall into
three categories:
1. Dropping Hooks
Dropping hooks are invoked when the source of an operator does not comply with the
Reactive Streams specification. These kind of errors are outside of the normal execution
path (that is, they cannot be propagated through onError).
Typically, a Publisher calls onNext on the operator despite having already called
onCompleted on it previously. In that case, the onNext value is dropped. The same
is true for an extraneous onError signal.
The corresponding hooks, onNextDropped and onErrorDropped, let you provide a global
Consumer for these drops. For example, you can use it to log the drop and clean up
resources associated with a value if needed (as it never makes it to the rest of the
reactive chain).
Setting the hooks twice in a row is additive: every consumer you provide is invoked. The
hooks can be fully reset to their defaults by using the Hooks.resetOn*Dropped() methods.
2. Internal Error Hook
One hook, onOperatorError, is invoked by operators when an unexpected Exception is
thrown during the execution of their onNext, onError, and onComplete methods.
Unlike the previous category, this is still within the normal execution path. A typical
example is the map operator with a map function that throws an Exception (such as
division by zero). It is still possible at this point to go through the usual channel of
onError, and that is what the operator does.
First, it passes the Exception through onOperatorError. The hook lets you inspect the
error (and the incriminating value, if relevant) and change the Exception. Of course,
you can also do something on the side, such as log and return the original Exception.
Note that you can set the onOperatorError hook multiple times. You can provide a
String identifier for a particular BiFunction and subsequent calls with different
keys concatenates the functions, which are all executed. On the other hand, reusing the
same key twice lets you replace a function you previously set.
As a consequence, the default hook behavior can be both fully reset (by using
Hooks.resetOnOperatorError()) or partially reset for a specific key only (by using
Hooks.resetOnOperatorError(String)).
3. Assembly Hooks
These hooks tie in the lifecycle of operators. They are invoked when a chain of operators
is assembled (that is, instantiated). onEachOperator lets you dynamically change each
operator as it is assembled in the chain, by returning a different Publisher.
onLastOperator is similar, except that it is invoked only on the last operator in the
chain before the subscribe call.
If you want to decorate all operators with a cross-cutting Subscriber implementation,
you can look into the Operators#lift* methods to help you deal with the various
types of Reactor Publishers out there (Flux, Mono, ParallelFlux, GroupedFlux, and ConnectableFlux),
as well as their Fuseable versions.
Like onOperatorError, these hooks are cumulative and can be identified with a key. They
can also be reset partially or totally.
4. Hook Presets
The Hooks utility class provides two preset hooks. These are alternatives to
the default behaviors that you can use by calling their corresponding method, rather than
coming up with the hook yourself:
-
onNextDroppedFail():onNextDroppedused to throw aExceptions.failWithCancel()exception. It now defaults to logging the dropped value at the DEBUG level. To go back to the old default behavior of throwing, useonNextDroppedFail(). -
onOperatorDebug(): This method activates debug mode. It ties into theonOperatorErrorhook, so callingresetOnOperatorError()also resets it. You can independently reset it by usingresetOnOperatorDebug(), as it uses a specific key internally.