Using Global Hooks
Reactor has another category of configurable callbacks that are invoked by Reactor
operators in various situations. They are all set in the Hooks
class, and they fall into
three categories:
1. Dropping Hooks
Dropping hooks are invoked when the source of an operator does not comply with the
Reactive Streams specification. These kind of errors are outside of the normal execution
path (that is, they cannot be propagated through onError
).
Typically, a Publisher
calls onNext
on the operator despite having already called
onCompleted
on it previously. In that case, the onNext
value is dropped. The same
is true for an extraneous onError
signal.
The corresponding hooks, onNextDropped
and onErrorDropped
, let you provide a global
Consumer
for these drops. For example, you can use it to log the drop and clean up
resources associated with a value if needed (as it never makes it to the rest of the
reactive chain).
Setting the hooks twice in a row is additive: every consumer you provide is invoked. The
hooks can be fully reset to their defaults by using the Hooks.resetOn*Dropped()
methods.
2. Internal Error Hook
One hook, onOperatorError
, is invoked by operators when an unexpected Exception
is
thrown during the execution of their onNext
, onError
, and onComplete
methods.
Unlike the previous category, this is still within the normal execution path. A typical
example is the map
operator with a map function that throws an Exception
(such as
division by zero). It is still possible at this point to go through the usual channel of
onError
, and that is what the operator does.
First, it passes the Exception
through onOperatorError
. The hook lets you inspect the
error (and the incriminating value, if relevant) and change the Exception
. Of course,
you can also do something on the side, such as log and return the original Exception
.
Note that you can set the onOperatorError
hook multiple times. You can provide a
String
identifier for a particular BiFunction
and subsequent calls with different
keys concatenates the functions, which are all executed. On the other hand, reusing the
same key twice lets you replace a function you previously set.
As a consequence, the default hook behavior can be both fully reset (by using
Hooks.resetOnOperatorError()
) or partially reset for a specific key
only (by using
Hooks.resetOnOperatorError(String)
).
3. Assembly Hooks
These hooks tie in the lifecycle of operators. They are invoked when a chain of operators
is assembled (that is, instantiated). onEachOperator
lets you dynamically change each
operator as it is assembled in the chain, by returning a different Publisher
.
onLastOperator
is similar, except that it is invoked only on the last operator in the
chain before the subscribe
call.
If you want to decorate all operators with a cross-cutting Subscriber
implementation,
you can look into the Operators#lift*
methods to help you deal with the various
types of Reactor Publishers
out there (Flux
, Mono
, ParallelFlux
, GroupedFlux
, and ConnectableFlux
),
as well as their Fuseable
versions.
Like onOperatorError
, these hooks are cumulative and can be identified with a key. They
can also be reset partially or totally.
4. Hook Presets
The Hooks
utility class provides two preset hooks. These are alternatives to
the default behaviors that you can use by calling their corresponding method, rather than
coming up with the hook yourself:
-
onNextDroppedFail()
:onNextDropped
used to throw aExceptions.failWithCancel()
exception. It now defaults to logging the dropped value at the DEBUG level. To go back to the old default behavior of throwing, useonNextDroppedFail()
. -
onOperatorDebug()
: This method activates debug mode. It ties into theonOperatorError
hook, so callingresetOnOperatorError()
also resets it. You can independently reset it by usingresetOnOperatorDebug()
, as it uses a specific key internally.