public abstract class Hooks extends Object
Modifier and Type | Method and Description |
---|---|
static <T,P extends Publisher<T>> |
addCallSiteInfo(P publisher,
String callSite)
Deprecated.
Should only be used by the instrumentation, DOES NOT guarantee any compatibility
|
static void |
addQueueWrapper(String key,
Function<Queue<?>,Queue<?>> decorator)
Adds a wrapper for every
Queue used in Reactor. |
static <T,P extends Publisher<T>> |
addReturnInfo(P publisher,
String method)
Deprecated.
Should only be used by the instrumentation, DOES NOT guarantee any compatibility
|
static <T> Flux<T> |
convertToFluxBypassingHooks(Publisher<T> publisher)
|
static <T> Mono<T> |
convertToMonoBypassingHooks(Publisher<T> publisher,
boolean enforceMonoContract)
|
static void |
disableAutomaticContextPropagation()
Globally disables automatic context propagation to
ThreadLocal s. |
static void |
disableContextLossTracking()
Globally disables the
Context loss detection that was previously
enabled by enableContextLossTracking() . |
static void |
enableAutomaticContextPropagation()
Globally enables automatic context propagation to
ThreadLocal s. |
static void |
enableContextLossTracking()
Globally enables the
Context loss detection in operators like
Flux.transform(java.util.function.Function<? super reactor.core.publisher.Flux<T>, ? extends org.reactivestreams.Publisher<V>>) or Mono.transformDeferred(java.util.function.Function<? super reactor.core.publisher.Mono<T>, ? extends org.reactivestreams.Publisher<V>>) when non-Reactor types are used. |
static void |
onEachOperator(Function<? super Publisher<Object>,? extends Publisher<Object>> onEachOperator)
|
static void |
onEachOperator(String key,
Function<? super Publisher<Object>,? extends Publisher<Object>> onEachOperator)
|
static void |
onErrorDropped(Consumer<? super Throwable> c)
Override global error dropped strategy which by default bubble back the error.
|
static void |
onLastOperator(Function<? super Publisher<Object>,? extends Publisher<Object>> onLastOperator)
|
static void |
onLastOperator(String key,
Function<? super Publisher<Object>,? extends Publisher<Object>> onLastOperator)
|
static void |
onNextDropped(Consumer<Object> c)
Override global data dropped strategy which by default logs at DEBUG level.
|
static void |
onNextDroppedFail()
Resets
onNextDropped hook(s) and
apply a strategy of throwing Exceptions.failWithCancel() instead. |
static void |
onNextError(BiFunction<? super Throwable,Object,? extends Throwable> onNextError)
Set the custom global error mode hook for operators that support resuming
during an error in their
Subscriber.onNext(Object) . |
static void |
onOperatorDebug()
Enable operator stack recorder that captures a declaration stack whenever an
operator is instantiated.
|
static void |
onOperatorError(BiFunction<? super Throwable,Object,? extends Throwable> onOperatorError)
Add a custom error mapping, overriding the default one.
|
static void |
onOperatorError(String key,
BiFunction<? super Throwable,Object,? extends Throwable> onOperatorError)
Add or replace a named custom error mapping, overriding the default one.
|
static void |
removeQueueWrapper(String key)
Removes existing
Queue wrapper by key. |
static void |
removeQueueWrappers()
Remove all queue wrappers.
|
static void |
resetOnEachOperator()
Reset global "assembly" hook tracking
|
static void |
resetOnEachOperator(String key)
Remove the sub-hook with key
key from the onEachOperator hook. |
static void |
resetOnErrorDropped()
Reset global error dropped strategy to bubbling back the error.
|
static void |
resetOnLastOperator()
Reset global "subscriber" hook tracking
|
static void |
resetOnLastOperator(String key)
Remove the sub-hook with key
key from the onLastOperator hook. |
static void |
resetOnNextDropped()
Reset global data dropped strategy to throwing via
Exceptions.failWithCancel() |
static void |
resetOnNextError()
Reset global onNext error handling strategy to terminating the sequence with
an onError and cancelling upstream (
OnNextFailureStrategy.STOP ). |
static void |
resetOnOperatorDebug()
Reset global operator debug.
|
static void |
resetOnOperatorError()
Reset global operator error mapping to the default behavior.
|
static void |
resetOnOperatorError(String key)
Remove the sub-hook with key
key from the onOperatorError hook. |
static <T> Queue<T> |
wrapQueue(Queue<T> queue)
Applies the
Queue wrappers that were previously registered. |
public static <T> Flux<T> convertToFluxBypassingHooks(Publisher<T> publisher)
public static <T> Mono<T> convertToMonoBypassingHooks(Publisher<T> publisher, boolean enforceMonoContract)
Publisher
to a Mono
without applying Hooks
.
Can optionally perform a "direct" (or unsafe) conversion when the caller is certain the Publisher
has Mono
semantics.T
- the type of data emitted by the Publisher
publisher
- the Publisher
to convert to a Mono
enforceMonoContract
- true
to ensure Mono
semantics (by cancelling on first onNext if source isn't already a Mono
),
false
to perform a direct conversion (see Mono.fromDirect(Publisher)
).Publisher
wrapped as a Mono
, or the original if it was a Mono
public static void onEachOperator(Function<? super Publisher<Object>,? extends Publisher<Object>> onEachOperator)
Publisher
operator interceptor for each operator created
(Flux
or Mono
). The passed function is applied to the original
operator Publisher
and can return a different Publisher
,
on the condition that it generically maintains the same data type as the original.
Use of the Flux
/Mono
APIs is discouraged as it will recursively
call this hook, leading to StackOverflowError
.
Note that sub-hooks are cumulative, but invoking this method twice with the same instance
(or any instance that has the same `toString`) will result in only a single instance
being applied. See onEachOperator(String, Function)
for a variant that
allows you to name the sub-hooks (and thus replace them or remove them individually
later on). Can be fully reset via resetOnEachOperator()
.
This pointcut function cannot make use of Flux
, Mono
or
ParallelFlux
APIs as it would lead to a recursive call to the hook: the
operator calls would effectively invoke onEachOperator from onEachOperator.
See convertToFluxBypassingHooks(Publisher)
and convertToMonoBypassingHooks(Publisher, boolean)
.
onEachOperator
- the sub-hook: a function to intercept each operation call
(e.g. map (fn)
and map(fn2)
in flux.map(fn).map(fn2).subscribe()
)onEachOperator(String, Function)
,
resetOnEachOperator(String)
,
resetOnEachOperator()
,
onLastOperator(Function)
public static void onEachOperator(String key, Function<? super Publisher<Object>,? extends Publisher<Object>> onEachOperator)
Publisher
operator interceptor for each operator created
(Flux
or Mono
). The passed function is applied to the original
operator Publisher
and can return a different Publisher
,
on the condition that it generically maintains the same data type as the original.
Use of the Flux
/Mono
APIs is discouraged as it will recursively
call this hook, leading to StackOverflowError
.
Note that sub-hooks are cumulative. Invoking this method twice with the same key will
replace the old sub-hook with that name, but keep the execution order (eg. A-h1, B-h2,
A-h3 will keep A-B execution order, leading to hooks h3 then h2 being executed).
Removing a particular key using resetOnEachOperator(String)
then adding it
back will result in the execution order changing (the later sub-hook being executed
last). Can be fully reset via resetOnEachOperator()
.
This pointcut function cannot make use of Flux
, Mono
or
ParallelFlux
APIs as it would lead to a recursive call to the hook: the
operator calls would effectively invoke onEachOperator from onEachOperator.
See convertToFluxBypassingHooks(Publisher)
and convertToMonoBypassingHooks(Publisher, boolean)
.
key
- the key for the sub-hook to add/replaceonEachOperator
- the sub-hook: a function to intercept each operation call
(e.g. map (fn)
and map(fn2)
in flux.map(fn).map(fn2).subscribe()
)onEachOperator(Function)
,
resetOnEachOperator(String)
,
resetOnEachOperator()
,
onLastOperator(String, Function)
public static void resetOnEachOperator(String key)
key
from the onEachOperator hook. No-op if
no such key has been registered, and equivalent to calling resetOnEachOperator()
if it was the last sub-hook.key
- the key of the sub-hook to removepublic static void resetOnEachOperator()
public static void onErrorDropped(Consumer<? super Throwable> c)
The hook is cumulative, so calling this method several times will set up the hook for as many consumer invocations (even if called with the same consumer instance).
c
- the Consumer
to apply to dropped errorspublic static void onLastOperator(Function<? super Publisher<Object>,? extends Publisher<Object>> onLastOperator)
Publisher
operator interceptor for the last operator created
in every flow (Flux
or Mono
). The passed function is applied
to the original operator Publisher
and can return a different Publisher
,
on the condition that it generically maintains the same data type as the original.
Note that sub-hooks are cumulative, but invoking this method twice with the same
instance (or any instance that has the same `toString`) will result in only a single
instance being applied. See onLastOperator(String, Function)
for a variant
that allows you to name the sub-hooks (and thus replace them or remove them individually
later on). Can be fully reset via resetOnLastOperator()
.
This pointcut function cannot make use of Flux
, Mono
or
ParallelFlux
APIs as it would lead to a recursive call to the hook: the
operator calls would effectively invoke onEachOperator from onEachOperator.
See convertToFluxBypassingHooks(Publisher)
and convertToMonoBypassingHooks(Publisher, boolean)
.
onLastOperator
- the sub-hook: a function to intercept last operation call
(e.g. map(fn2)
in flux.map(fn).map(fn2).subscribe()
)onLastOperator(String, Function)
,
resetOnLastOperator(String)
,
resetOnLastOperator()
,
onEachOperator(Function)
public static void onLastOperator(String key, Function<? super Publisher<Object>,? extends Publisher<Object>> onLastOperator)
Publisher
operator interceptor for the last operator created
in every flow (Flux
or Mono
). The passed function is applied
to the original operator Publisher
and can return a different Publisher
,
on the condition that it generically maintains the same data type as the original.
Use of the Flux
/Mono
APIs is discouraged as it will recursively
call this hook, leading to StackOverflowError
.
Note that sub-hooks are cumulative. Invoking this method twice with the same key will
replace the old sub-hook with that name, but keep the execution order (eg. A-h1, B-h2,
A-h3 will keep A-B execution order, leading to hooks h3 then h2 being executed).
Removing a particular key using resetOnLastOperator(String)
then adding it
back will result in the execution order changing (the later sub-hook being executed
last). Can be fully reset via resetOnLastOperator()
.
This pointcut function cannot make use of Flux
, Mono
or
ParallelFlux
APIs as it would lead to a recursive call to the hook: the
operator calls would effectively invoke onEachOperator from onEachOperator.
See convertToFluxBypassingHooks(Publisher)
and convertToMonoBypassingHooks(Publisher, boolean)
.
key
- the key for the sub-hook to add/replaceonLastOperator
- the sub-hook: a function to intercept last operation call
(e.g. map(fn2)
in flux.map(fn).map(fn2).subscribe()
)onLastOperator(Function)
,
resetOnLastOperator(String)
,
resetOnLastOperator()
,
onEachOperator(String, Function)
public static void resetOnLastOperator(String key)
key
from the onLastOperator hook. No-op if
no such key has been registered, and equivalent to calling resetOnLastOperator()
if it was the last sub-hook.key
- the key of the sub-hook to removepublic static void resetOnLastOperator()
public static void onNextDropped(Consumer<Object> c)
The hook is cumulative, so calling this method several times will set up the hook for as many consumer invocations (even if called with the same consumer instance).
c
- the Consumer
to apply to data (onNext) that is droppedonNextDroppedFail()
public static void onNextDroppedFail()
onNextDropped hook(s)
and
apply a strategy of throwing Exceptions.failWithCancel()
instead.
Use resetOnNextDropped()
to reset to the default strategy of logging.
public static void onOperatorDebug()
This is added as a specifically-keyed sub-hook in onEachOperator(String, Function)
.
public static void resetOnOperatorDebug()
public static void onNextError(BiFunction<? super Throwable,Object,? extends Throwable> onNextError)
Subscriber.onNext(Object)
.
The hook is a BiFunction
of Throwable
and potentially null Object
.
If it is also a BiPredicate
, its
test
method should be
used to determine if an error should be processed (matching predicate) or completely
skipped (non-matching predicate). Typical usage, as in Operators
, is to
check if the predicate matches and fallback to Operators.onOperatorError(Throwable, Context)
if it doesn't.
onNextError
- the new BiFunction
to use.public static void onOperatorError(BiFunction<? super Throwable,Object,? extends Throwable> onOperatorError)
Note that sub-hooks are cumulative, but invoking this method twice with the same
instance (or any instance that has the same `toString`) will result in only a single
instance being applied. See onOperatorError(String, BiFunction)
for a variant
that allows you to name the sub-hooks (and thus replace them or remove them individually
later on). Can be fully reset via resetOnOperatorError()
.
For reference, the default mapping is to unwrap the exception and, if the second parameter is another exception, to add it to the first as suppressed.
onOperatorError
- an operator error BiFunction
mapper, returning an arbitrary exception
given the failure and optionally some original context (data or error).onOperatorError(String, BiFunction)
,
resetOnOperatorError(String)
,
resetOnOperatorError()
public static void onOperatorError(String key, BiFunction<? super Throwable,Object,? extends Throwable> onOperatorError)
Note that invoking this method twice with the same key will replace the old sub-hook
with that name, but keep the execution order (eg. A-h1, B-h2, A-h3 will keep A-B
execution order, leading to hooks h3 then h2 being executed). Removing a particular
key using resetOnOperatorError(String)
then adding it back will result in
the execution order changing (the later sub-hook being executed last).
Can be fully reset via resetOnOperatorError()
.
For reference, the default mapping is to unwrap the exception and, if the second parameter is another exception, to add it to the first as a suppressed.
key
- the key for the sub-hook to add/replaceonOperatorError
- an operator error BiFunction
mapper, returning an arbitrary exception
given the failure and optionally some original context (data or error).onOperatorError(String, BiFunction)
,
resetOnOperatorError(String)
,
resetOnOperatorError()
public static void resetOnOperatorError(String key)
key
from the onOperatorError hook. No-op if
no such key has been registered, and equivalent to calling resetOnOperatorError()
if it was the last sub-hook.key
- the key of the sub-hook to removepublic static void resetOnOperatorError()
For reference, the default mapping is to unwrap the exception and, if the second parameter is another exception, to add it to the first as a suppressed.
public static void resetOnErrorDropped()
public static void resetOnNextDropped()
Exceptions.failWithCancel()
public static void resetOnNextError()
OnNextFailureStrategy.STOP
).public static void enableContextLossTracking()
Context
loss detection in operators like
Flux.transform(java.util.function.Function<? super reactor.core.publisher.Flux<T>, ? extends org.reactivestreams.Publisher<V>>)
or Mono.transformDeferred(java.util.function.Function<? super reactor.core.publisher.Mono<T>, ? extends org.reactivestreams.Publisher<V>>)
when non-Reactor types are used.
An exception will be thrown upon applying the transformation if the original Context
isn't reachable
(ie. it has been replaced by a totally different Context
, or no Context
at all)public static void disableContextLossTracking()
Context
loss detection that was previously
enabled by enableContextLossTracking()
.public static void enableAutomaticContextPropagation()
ThreadLocal
s.
It requires the
context-propagation library
to be on the classpath to have an effect.
Using the implicit global ContextRegistry
it reads entries present in
the modified Context
using
Flux.contextWrite(ContextView)
(or Mono.contextWrite(ContextView)
)
and Flux.contextWrite(Function)
(or Mono.contextWrite(Function)
)
and restores all ThreadLocal
s associated via same keys for which
ThreadLocalAccessor
s are registered.
The ThreadLocal
s are present in the upstream operators from the
contextWrite(...)
call and the unmodified (downstream) Context
is
used when signals are delivered downstream, making the contextWrite(...)
a logical boundary for the context propagation mechanism.
This mechanism automatically performs Flux.contextCapture()
and Mono.contextCapture()
in Flux.blockFirst()
,
Flux.blockLast()
, Flux.toIterable()
, and Mono.block()
(and
their overloads).
public static void disableAutomaticContextPropagation()
ThreadLocal
s.enableAutomaticContextPropagation()
@Nullable @Deprecated public static <T,P extends Publisher<T>> Publisher<T> addReturnInfo(@Nullable P publisher, String method)
@Nullable @Deprecated public static <T,P extends Publisher<T>> Publisher<T> addCallSiteInfo(@Nullable P publisher, String callSite)
public static void addQueueWrapper(String key, Function<Queue<?>,Queue<?>> decorator)
Queue
used in Reactor.
Note that it won't affect existing instances of Queue
.
Hint: one may use AbstractQueue
to reduce the number of methods to implement.removeQueueWrapper(String)
Queue
MUST NOT change Queue
's behavior. Only side effects are allowed.public static void removeQueueWrapper(String key)
Queue
wrapper by key.addQueueWrapper(String, Function)
public static void removeQueueWrappers()
public static <T> Queue<T> wrapQueue(Queue<T> queue)
Queue
wrappers that were previously registered.
SHOULD NOT change the behavior of the provided Queue
.queue
- the Queue
to wrap.Queue
wrappers registered with addQueueWrapper(String, Function)
.addQueueWrapper(String, Function)
,
removeQueueWrapper(String)