public abstract class Operators extends Object
Subscription.request(long)
handling.
Combine utils available to operator implementations, @see https://github.com/reactor/reactive-streams-commonsModifier and Type | Class and Description |
---|---|
static class |
Operators.DeferredSubscription
Base class for Subscribers that will receive their Subscriptions at any time, yet
they might also need to be cancelled or requested at any time.
|
static class |
Operators.MonoSubscriber<I,O>
A Subscriber/Subscription barrier that holds a single value at most and properly gates asynchronous behaviors
resulting from concurrent request or cancel and onXXX signals.
|
Modifier and Type | Method and Description |
---|---|
static <T> long |
addCap(AtomicLongFieldUpdater<T> updater,
T instance,
long toAdd)
Concurrent addition bound to Long.MAX_VALUE.
|
static long |
addCap(long a,
long b)
Cap an addition to Long.MAX_VALUE
|
static <T> Fuseable.QueueSubscription<T> |
as(Subscription s)
Returns the subscription as QueueSubscription if possible or null.
|
static boolean |
canAppearAfterOnSubscribe(Subscription subscription)
Check whether the provided
Subscription is the one used to satisfy Spec's ยง1.9 rule
before signalling an error. |
static Subscription |
cancelledSubscription()
A singleton Subscription that represents a cancelled subscription instance and
should not be leaked to clients as it represents a terminal state.
|
static void |
complete(Subscriber<?> s)
Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onComplete.
|
static <T> CoreSubscriber<T> |
drainSubscriber()
Return a singleton
Subscriber that does not check for double onSubscribe
and purely request Long.MAX. |
static <T> CoreSubscriber<T> |
emptySubscriber()
A
Subscriber that is expected to be used as a placeholder and
never actually be called. |
static Subscription |
emptySubscription()
A singleton enumeration that represents a no-op Subscription instance that
can be freely given out to clients.
|
static Context |
enableOnDiscard(Context target,
Consumer<?> discardConsumer)
Utility method to activate the onDiscard feature (see
Flux.doOnDiscard(Class, Consumer) )
in a target Context . |
static void |
error(Subscriber<?> s,
Throwable e)
Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onError with the
supplied error.
|
static <I,O> Function<? super Publisher<I>,? extends Publisher<O>> |
lift(BiFunction<Scannable,? super CoreSubscriber<? super O>,? extends CoreSubscriber<? super I>> lifter)
Create a function that can be used to support a custom operator via
CoreSubscriber decoration. |
static <O> Function<? super Publisher<O>,? extends Publisher<O>> |
lift(Predicate<Scannable> filter,
BiFunction<Scannable,? super CoreSubscriber<? super O>,? extends CoreSubscriber<? super O>> lifter)
Create a function that can be used to support a custom operator via
CoreSubscriber decoration. |
static <I,O> Function<? super Publisher<I>,? extends Publisher<O>> |
liftPublisher(BiFunction<Publisher,? super CoreSubscriber<? super O>,? extends CoreSubscriber<? super I>> lifter)
Create a function that can be used to support a custom operator via
CoreSubscriber decoration. |
static <O> Function<? super Publisher<O>,? extends Publisher<O>> |
liftPublisher(Predicate<Publisher> filter,
BiFunction<Publisher,? super CoreSubscriber<? super O>,? extends CoreSubscriber<? super O>> lifter)
Create a function that can be used to support a custom operator via
CoreSubscriber decoration. |
static long |
multiplyCap(long a,
long b)
Cap a multiplication to Long.MAX_VALUE
|
static <T> void |
onDiscard(T element,
Context context)
Invoke a (local or global) hook that processes elements that get discarded.
|
static void |
onDiscardMultiple(Collection<?> multiple,
Context context)
Invoke a (local or global) hook that processes elements that get discarded en masse.
|
static void |
onDiscardMultiple(Iterator<?> multiple,
boolean knownToBeFinite,
Context context)
Invoke a (local or global) hook that processes elements that remains in an
Iterator . |
static void |
onDiscardMultiple(Spliterator<?> multiple,
boolean knownToBeFinite,
Context context)
Invoke a (local or global) hook that processes elements that remains in an
Spliterator . |
static void |
onDiscardMultiple(Stream<?> multiple,
Context context)
Invoke a (local or global) hook that processes elements that get discarded en masse.
|
static <T> void |
onDiscardQueueWithClear(Queue<T> queue,
Context context,
Function<T,Stream<?>> extract)
Invoke a (local or global) hook that processes elements that get discarded
en masse after having been enqueued, due to cancellation or error.
|
static void |
onErrorDropped(Throwable e,
Context context)
An unexpected exception is about to be dropped.
|
static <T> CorePublisher<T> |
onLastAssembly(CorePublisher<T> source)
Applies the hooks registered with
Hooks.onLastOperator(java.util.function.Function<? super org.reactivestreams.Publisher<java.lang.Object>, ? extends org.reactivestreams.Publisher<java.lang.Object>>) and returns
CorePublisher ready to be subscribed on. |
static <T> void |
onNextDropped(T t,
Context context)
An unexpected event is about to be dropped.
|
static <T> Throwable |
onNextError(T value,
Throwable error,
Context context)
Find the
OnNextFailureStrategy to apply to the calling async operator (which could be
a local error mode defined in the Context ) and apply it. |
static <T> Throwable |
onNextError(T value,
Throwable error,
Context context,
Subscription subscriptionForCancel)
Find the
OnNextFailureStrategy to apply to the calling operator (which could be a local
error mode defined in the Context ) and apply it. |
static BiFunction<? super Throwable,Object,? extends Throwable> |
onNextErrorFunction(Context context) |
static <T> Throwable |
onNextInnerError(Throwable error,
Context context,
Subscription subscriptionForCancel)
Find the
OnNextFailureStrategy to apply to the calling operator (which could be a local
error mode defined in the Context ) and apply it. |
static <T> RuntimeException |
onNextPollError(T value,
Throwable error,
Context context)
Find the
OnNextFailureStrategy to apply to the calling async operator (which could be
a local error mode defined in the Context ) and apply it. |
static Throwable |
onOperatorError(Subscription subscription,
Throwable error,
Context context)
Map an "operator" error given an operator parent
Subscription . |
static Throwable |
onOperatorError(Subscription subscription,
Throwable error,
Object dataSignal,
Context context)
Map an "operator" error given an operator parent
Subscription . |
static Throwable |
onOperatorError(Throwable error,
Context context)
Map an "operator" error.
|
static RuntimeException |
onRejectedExecution(Throwable original,
Context context)
Return a wrapped
RejectedExecutionException which can be thrown by the
operator. |
static RuntimeException |
onRejectedExecution(Throwable original,
Subscription subscription,
Throwable suppressed,
Object dataSignal,
Context context)
Return a wrapped
RejectedExecutionException which can be thrown by the
operator. |
static <T> long |
produced(AtomicLongFieldUpdater<T> updater,
T instance,
long toSub)
Concurrent subtraction bound to 0, mostly used to decrement a request tracker by
the amount produced by the operator.
|
static <F> boolean |
replace(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
A generic utility to atomically replace a subscription or cancel the replacement
if the current subscription is marked as already cancelled (as in
cancelledSubscription() ). |
static void |
reportBadRequest(long n)
Log an
IllegalArgumentException if the request is null or negative. |
static void |
reportMoreProduced()
Log an
IllegalStateException that indicates more than the requested
amount was produced. |
static void |
reportSubscriptionSet()
Log a
duplicate subscription error. |
static void |
reportThrowInSubscribe(CoreSubscriber<?> subscriber,
Throwable e)
Report a
Throwable that was thrown from a call to Publisher.subscribe(Subscriber) ,
attempting to notify the Subscriber by:
providing a special Subscription via Subscriber.onSubscribe(Subscription)
immediately delivering an onError signal after that
|
static <T> Subscription |
scalarSubscription(CoreSubscriber<? super T> subscriber,
T value)
Represents a fuseable Subscription that emits a single constant value synchronously
to a Subscriber or consumer.
|
static <T> Subscription |
scalarSubscription(CoreSubscriber<? super T> subscriber,
T value,
String stepName)
Represents a fuseable Subscription that emits a single constant value synchronously
to a Subscriber or consumer.
|
static <T> CoreSubscriber<T> |
serialize(CoreSubscriber<? super T> subscriber)
Safely gate a
Subscriber by making sure onNext signals are delivered
sequentially (serialized). |
static <F> boolean |
set(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
A generic utility to atomically replace a subscription or cancel the replacement
if current subscription is marked as cancelled (as in
cancelledSubscription() )
or was concurrently updated before. |
static <F> boolean |
setOnce(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
Sets the given subscription once and returns true if successful, false
if the field has a subscription already or has been cancelled.
|
static long |
subOrZero(long a,
long b)
Cap a subtraction to 0
|
static <F> boolean |
terminate(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance)
Atomically terminates the subscription if it is not already a
cancelledSubscription() , cancelling the subscription and setting the field
to the singleton cancelledSubscription() . |
static <T> Fuseable.ConditionalSubscriber<? super T> |
toConditionalSubscriber(CoreSubscriber<? super T> actual)
If the actual
CoreSubscriber is not Fuseable.ConditionalSubscriber ,
it will apply an adapter which directly maps all
Fuseable.ConditionalSubscriber.tryOnNext(Object) to
Subscriber.onNext(Object)
and always returns true as the result |
static <T> CoreSubscriber<? super T> |
toCoreSubscriber(Subscriber<? super T> actual)
If the actual
Subscriber is not a CoreSubscriber , it will apply
safe strict wrapping to apply all reactive streams rules including the ones
relaxed by internal operators based on CoreSubscriber . |
static boolean |
validate(long n)
Evaluate if a request is strictly positive otherwise
reportBadRequest(long) |
static boolean |
validate(Subscription current,
Subscription next)
Check Subscription current state and cancel new Subscription if current is set,
or return true if ready to subscribe.
|
public static long addCap(long a, long b)
a
- left operandb
- right operandpublic static <T> long addCap(AtomicLongFieldUpdater<T> updater, T instance, long toAdd)
T
- the parent instance typeupdater
- current field updaterinstance
- current instance to updatetoAdd
- delta to add@Nullable public static <T> Fuseable.QueueSubscription<T> as(Subscription s)
T
- the value type of the QueueSubscription.s
- the source subscription to try to convert.public static Subscription cancelledSubscription()
Subscription
to be used as an inner representation
of the cancelled statepublic static void complete(Subscriber<?> s)
s
- the target subscriberpublic static <T> CoreSubscriber<T> drainSubscriber()
Subscriber
that does not check for double onSubscribe
and purely request Long.MAX. If an error is received it will raise a
Exceptions.errorCallbackNotImplemented(Throwable)
in the receiving thread.Subscriber
whose sole purpose is to request Long.MAXpublic static <T> CoreSubscriber<T> emptySubscriber()
Subscriber
that is expected to be used as a placeholder and
never actually be called. All methods log an error.T
- the type of data (ignored)public static Subscription emptySubscription()
The enum also implements Fuseable.QueueSubscription so operators expecting a QueueSubscription from a Fuseable source don't have to double-check their Subscription received in onSubscribe.
Subscription
public static boolean canAppearAfterOnSubscribe(Subscription subscription)
Subscription
is the one used to satisfy Spec's ยง1.9 rule
before signalling an error.subscription
- the subscription to test.reportThrowInSubscribe(CoreSubscriber, Throwable)
.public static void error(Subscriber<?> s, Throwable e)
s
- target Subscriber to errore
- the actual errorpublic static void reportThrowInSubscribe(CoreSubscriber<?> subscriber, Throwable e)
Throwable
that was thrown from a call to Publisher.subscribe(Subscriber)
,
attempting to notify the Subscriber
by:
Subscription
via Subscriber.onSubscribe(Subscription)
onError
signal after that
As at that point the subscriber MAY have already been provided with a Subscription
, we
assume most well formed subscribers will ignore this second Subscription
per Reactive
Streams rule 1.9. Subscribers that don't usually ignore may recognize this special case and ignore
it by checking canAppearAfterOnSubscribe(Subscription)
.
Note that if the onSubscribe
attempt throws,
fatal
exceptions are thrown. Other exceptions
are added as suppressed
on the original exception,
which is then directly notified as an onError
signal
(again assuming that such exceptions occur because a Subscription
is already set).
subscriber
- the Subscriber
being subscribed when the error happenede
- the Throwable
that was thrown from Publisher.subscribe(Subscriber)
canAppearAfterOnSubscribe(Subscription)
public static <I,O> Function<? super Publisher<I>,? extends Publisher<O>> lift(BiFunction<Scannable,? super CoreSubscriber<? super O>,? extends CoreSubscriber<? super I>> lifter)
CoreSubscriber
decoration. The function is compatible with
Flux.transform(Function)
, Mono.transform(Function)
,
Hooks.onEachOperator(Function)
and Hooks.onLastOperator(Function)
,
but requires that the original Publisher
be Scannable
.
This variant attempts to expose the Publisher
as a Scannable
for
convenience of introspection. You should however avoid instanceof checks or any
other processing that depends on identity of the Publisher
, as it might
get hidden if Scannable.isScanAvailable()
returns false
.
Use liftPublisher(BiFunction)
instead for that kind of use case.
I
- the input typeO
- the output typelifter
- the bifunction taking Scannable
from the enclosing
publisher (assuming it is compatible) and consuming CoreSubscriber
.
It must return a receiving CoreSubscriber
that will immediately subscribe
to the applied Publisher
.Function
liftPublisher(BiFunction)
public static <O> Function<? super Publisher<O>,? extends Publisher<O>> lift(Predicate<Scannable> filter, BiFunction<Scannable,? super CoreSubscriber<? super O>,? extends CoreSubscriber<? super O>> lifter)
CoreSubscriber
decoration. The function is compatible with
Flux.transform(Function)
, Mono.transform(Function)
,
Hooks.onEachOperator(Function)
and Hooks.onLastOperator(Function)
,
but requires that the original Publisher
be Scannable
.
This variant attempts to expose the Publisher
as a Scannable
for
convenience of introspection. You should however avoid instanceof checks or any
other processing that depends on identity of the Publisher
, as it might
get hidden if Scannable.isScanAvailable()
returns false
.
Use liftPublisher(Predicate, BiFunction)
instead for that kind of use case.
The function will be invoked only if the passed Predicate
matches.
Therefore the transformed type O must be the same than the input type since
unmatched predicate will return the applied Publisher
.
O
- the input and output typefilter
- the predicate to match taking Scannable
from the applied
publisher to operate on. Assumes original is scan-compatible.lifter
- the bifunction taking Scannable
from the enclosing
publisher and consuming CoreSubscriber
. It must return a receiving
CoreSubscriber
that will immediately subscribe to the applied
Publisher
. Assumes the original is scan-compatible.Function
liftPublisher(Predicate, BiFunction)
public static <I,O> Function<? super Publisher<I>,? extends Publisher<O>> liftPublisher(BiFunction<Publisher,? super CoreSubscriber<? super O>,? extends CoreSubscriber<? super I>> lifter)
CoreSubscriber
decoration. The function is compatible with
Flux.transform(Function)
, Mono.transform(Function)
,
Hooks.onEachOperator(Function)
and Hooks.onLastOperator(Function)
,
and works with the raw Publisher
as input, which is useful if you need to
detect the precise type of the source (eg. instanceof checks to detect Mono, Flux,
true Scannable, etc...).I
- the input typeO
- the output typelifter
- the bifunction taking the raw Publisher
and
CoreSubscriber
. The publisher can be double-checked (including with
instanceof
, and the function must return a receiving CoreSubscriber
that will immediately subscribe to the Publisher
.Function
public static <O> Function<? super Publisher<O>,? extends Publisher<O>> liftPublisher(Predicate<Publisher> filter, BiFunction<Publisher,? super CoreSubscriber<? super O>,? extends CoreSubscriber<? super O>> lifter)
CoreSubscriber
decoration. The function is compatible with
Flux.transform(Function)
, Mono.transform(Function)
,
Hooks.onEachOperator(Function)
and Hooks.onLastOperator(Function)
,
and works with the raw Publisher
as input, which is useful if you need to
detect the precise type of the source (eg. instanceof checks to detect Mono, Flux,
true Scannable, etc...).
The function will be invoked only if the passed Predicate
matches.
Therefore the transformed type O must be the same than the input type since
unmatched predicate will return the applied Publisher
.
O
- the input and output typefilter
- the Predicate
that the raw Publisher
must pass for
the transformation to occurlifter
- the BiFunction
taking the raw Publisher
and
CoreSubscriber
. The publisher can be double-checked (including with
instanceof
, and the function must return a receiving CoreSubscriber
that will immediately subscribe to the Publisher
.Function
public static long multiplyCap(long a, long b)
a
- left operandb
- right operandpublic static final Context enableOnDiscard(@Nullable Context target, Consumer<?> discardConsumer)
Flux.doOnDiscard(Class, Consumer)
)
in a target Context
. Prefer using the Flux
API, and reserve this for
testing purposes.public static <T> void onDiscard(@Nullable T element, Context context)
filter()
predicate).
For elements that are buffered or enqueued, but subsequently discarded due to
cancellation or error, see onDiscardMultiple(Stream, Context)
and
onDiscardQueueWithClear(Queue, Context, Function)
.
T
- the type of the elementelement
- the element that is being discardedcontext
- the context in which to look for a local hookonDiscardMultiple(Stream, Context)
,
onDiscardMultiple(Collection, Context)
,
onDiscardQueueWithClear(Queue, Context, Function)
public static <T> void onDiscardQueueWithClear(@Nullable Queue<T> queue, Context context, @Nullable Function<T,Stream<?>> extract)
Queue
(either by repeated Queue.poll()
calls if
a hook is defined, or by Collection.clear()
as a shortcut if no hook is defined).T
- the type of the elementqueue
- the queue that is being discarded and clearedcontext
- the context in which to look for a local hookextract
- an optional extractor method for cases where the queue doesn't
directly contain the elements to discardonDiscardMultiple(Stream, Context)
,
onDiscardMultiple(Collection, Context)
,
onDiscard(Object, Context)
public static void onDiscardMultiple(Stream<?> multiple, Context context)
multiple
- the collection of elements to discard (possibly extracted from other
collections/arrays/queues)context
- the Context
in which to look for local hookonDiscard(Object, Context)
,
onDiscardMultiple(Collection, Context)
,
onDiscardQueueWithClear(Queue, Context, Function)
public static void onDiscardMultiple(@Nullable Collection<?> multiple, Context context)
multiple
- the collection of elements to discardcontext
- the Context
in which to look for local hookonDiscard(Object, Context)
,
onDiscardMultiple(Stream, Context)
,
onDiscardQueueWithClear(Queue, Context, Function)
public static void onDiscardMultiple(@Nullable Iterator<?> multiple, boolean knownToBeFinite, Context context)
Iterator
.
Since iterators can be infinite, this method requires that you explicitly ensure the iterator is
knownToBeFinite
. Typically, operating on an Iterable
one can get such a
guarantee by looking at the Spliterator's
Spliterator.getExactSizeIfKnown()
.multiple
- the Iterator
whose remainder to discardknownToBeFinite
- is the caller guaranteeing that the iterator is finite and can be iterated overcontext
- the Context
in which to look for local hookonDiscard(Object, Context)
,
onDiscardMultiple(Collection, Context)
,
onDiscardQueueWithClear(Queue, Context, Function)
public static void onDiscardMultiple(@Nullable Spliterator<?> multiple, boolean knownToBeFinite, Context context)
Spliterator
.
Since spliterators can be infinite, this method requires that you explicitly ensure the spliterator is
knownToBeFinite
. Typically, one can get such a guarantee by looking at the Spliterator.getExactSizeIfKnown()
.multiple
- the Spliterator
whose remainder to discardknownToBeFinite
- is the caller guaranteeing that the iterator is finite and can be iterated overcontext
- the Context
in which to look for local hookonDiscard(Object, Context)
,
onDiscardMultiple(Collection, Context)
,
onDiscardQueueWithClear(Queue, Context, Function)
public static void onErrorDropped(Throwable e, Context context)
If no hook is registered for Hooks.onErrorDropped(Consumer)
, the dropped
error is logged at ERROR level.
e
- the dropped exceptioncontext
- a context that might hold a local error consumerpublic static <T> void onNextDropped(T t, Context context)
If no hook is registered for Hooks.onNextDropped(Consumer)
, the dropped
element is just logged at DEBUG level.
T
- the dropped value typet
- the dropped datacontext
- a context that might hold a local next consumerpublic static Throwable onOperatorError(Throwable error, Context context)
Exceptions.throwIfFatal(Throwable)
.error
- the callback or operator errorcontext
- a context that might hold a local error consumerThrowable
public static Throwable onOperatorError(@Nullable Subscription subscription, Throwable error, Context context)
Subscription
. The
result error will be passed via onError to the operator downstream.
Subscription
will be cancelled after checking for fatal error via
Exceptions.throwIfFatal(Throwable)
.subscription
- the linked operator parent Subscription
error
- the callback or operator errorcontext
- a context that might hold a local error consumerThrowable
public static Throwable onOperatorError(@Nullable Subscription subscription, Throwable error, @Nullable Object dataSignal, Context context)
Subscription
. The
result error will be passed via onError to the operator downstream.
Subscription
will be cancelled after checking for fatal error via
Exceptions.throwIfFatal(Throwable)
. Takes an additional signal, which
can be added as a suppressed exception if it is a Throwable
and the
default hook
is in place.subscription
- the linked operator parent Subscription
error
- the callback or operator errordataSignal
- the value (onNext or onError) signal processed during failurecontext
- a context that might hold a local error consumerThrowable
public static RuntimeException onRejectedExecution(Throwable original, Context context)
RejectedExecutionException
which can be thrown by the
operator. This exception denotes that an execution was rejected by a
Scheduler
, notably when it was already disposed.
Wrapping is done by calling both Exceptions.failWithRejected(Throwable)
and
onOperatorError(Subscription, Throwable, Object, Context)
.
original
- the original execution errorcontext
- a context that might hold a local error consumerpublic static final BiFunction<? super Throwable,Object,? extends Throwable> onNextErrorFunction(Context context)
@Nullable public static <T> Throwable onNextError(@Nullable T value, Throwable error, Context context, Subscription subscriptionForCancel)
OnNextFailureStrategy
to apply to the calling operator (which could be a local
error mode defined in the Context
) and apply it. For poll(), prefer
onNextPollError(Object, Throwable, Context)
as it returns a RuntimeException
.
Cancels the Subscription
and return a Throwable
if errors are
fatal for the error mode, in which case the operator should call onError with the
returned error. On the contrary, if the error mode allows the sequence to
continue, does not cancel the Subscription and returns null
.
Typical usage pattern differs depending on the calling method:
onNext
: check for a throwable return value and call
Subscriber.onError(Throwable)
if not null, otherwise perform a direct
request(1)
on the upstream.tryOnNext
: check for a throwable return value and call
Subscriber.onError(Throwable)
if not null, otherwise
return false
to indicate value was not consumed and more must be
tried.onNextError(Object, Throwable, Context)
instead.poll
(where the error will be thrown): use onNextPollError(Object, Throwable, Context)
instead.T
- The type of the value causing the error.value
- The onNext value that caused an error. Can be null.error
- The error.context
- The most significant Context
in which to look for an OnNextFailureStrategy
.subscriptionForCancel
- The mandatory Subscription
that should be cancelled if the
strategy is terminal. See also onNextError(Object, Throwable, Context)
and
onNextPollError(Object, Throwable, Context)
for alternatives that don't cancel a subscriptionThrowable
to propagate through onError if the strategy is
terminal and cancelled the subscription, null if not.@Nullable public static <T> Throwable onNextError(@Nullable T value, Throwable error, Context context)
OnNextFailureStrategy
to apply to the calling async operator (which could be
a local error mode defined in the Context
) and apply it.
This variant never cancels a Subscription
. It returns a Throwable
if the error is
fatal for the error mode, in which case the operator should call onError with the
returned error. On the contrary, if the error mode allows the sequence to
continue, this method returns null
.
T
- The type of the value causing the error.value
- The onNext value that caused an error.error
- The error.context
- The most significant Context
in which to look for an OnNextFailureStrategy
.Throwable
to propagate through onError if the strategy is terminal, null if not.onNextError(Object, Throwable, Context, Subscription)
public static <T> Throwable onNextInnerError(Throwable error, Context context, @Nullable Subscription subscriptionForCancel)
OnNextFailureStrategy
to apply to the calling operator (which could be a local
error mode defined in the Context
) and apply it.T
- The type of the value causing the error.error
- The error.context
- The most significant Context
in which to look for an OnNextFailureStrategy
.subscriptionForCancel
- The Subscription
that should be cancelled if the
strategy is terminal. Null to ignore (for poll, use onNextPollError(Object, Throwable, Context)
rather than passing null).Throwable
to propagate through onError if the strategy is
terminal and cancelled the subscription, null if not.@Nullable public static <T> RuntimeException onNextPollError(@Nullable T value, Throwable error, Context context)
OnNextFailureStrategy
to apply to the calling async operator (which could be
a local error mode defined in the Context
) and apply it.
Returns a RuntimeException
if the error is fatal for the error mode, in which
case the operator poll should throw the returned error. On the contrary if the
error mode allows the sequence to continue, returns null
in which case
the operator should retry the poll()
.
Note that this method wraps
checked exceptions in order to
return a RuntimeException
that can be thrown from an arbitrary method. If you don't want to
throw the returned exception and this wrapping behavior is undesirable, but you still don't want to
cancel a subscription, you can use onNextError(Object, Throwable, Context)
instead.
T
- The type of the value causing the error.value
- The onNext value that caused an error.error
- The error.context
- The most significant Context
in which to look for an OnNextFailureStrategy
.RuntimeException
to be thrown (eg. within Queue.poll()
if the error is terminal in
the strategy, null if not.onNextError(Object, Throwable, Context)
public static <T> CorePublisher<T> onLastAssembly(CorePublisher<T> source)
Hooks.onLastOperator(java.util.function.Function<? super org.reactivestreams.Publisher<java.lang.Object>, ? extends org.reactivestreams.Publisher<java.lang.Object>>)
and returns
CorePublisher
ready to be subscribed on.T
- the type of the value.source
- the original CorePublisher
.CorePublisher
to subscribe on.public static RuntimeException onRejectedExecution(Throwable original, @Nullable Subscription subscription, @Nullable Throwable suppressed, @Nullable Object dataSignal, Context context)
RejectedExecutionException
which can be thrown by the
operator. This exception denotes that an execution was rejected by a
Scheduler
, notably when it was already disposed.
Wrapping is done by calling both Exceptions.failWithRejected(Throwable)
and
onOperatorError(Subscription, Throwable, Object, Context)
(with the passed
Subscription
).
original
- the original execution errorsubscription
- the subscription to pass to onOperatorError.suppressed
- a Throwable to be suppressed by the RejectedExecutionException
(or null if not relevant)dataSignal
- a value to be passed to onOperatorError(Subscription, Throwable, Object, Context)
(or null if not relevant)context
- a context that might hold a local error consumerpublic static <T> long produced(AtomicLongFieldUpdater<T> updater, T instance, long toSub)
T
- the parent instance typeupdater
- current field updaterinstance
- current instance to updatetoSub
- delta to subtractpublic static <F> boolean replace(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
cancelledSubscription()
).F
- the instance typefield
- The Atomic containerinstance
- the instance references
- the subscriptionpublic static void reportBadRequest(long n)
IllegalArgumentException
if the request is null or negative.n
- the failing demandExceptions.nullOrNegativeRequestException(long)
public static void reportMoreProduced()
IllegalStateException
that indicates more than the requested
amount was produced.Exceptions.failWithOverflow()
public static void reportSubscriptionSet()
duplicate subscription
error.public static <T> Subscription scalarSubscription(CoreSubscriber<? super T> subscriber, T value)
T
- the value typesubscriber
- the delegate Subscriber
that will be requesting the valuevalue
- the single value to be emittedSubscription
public static <T> Subscription scalarSubscription(CoreSubscriber<? super T> subscriber, T value, String stepName)
stepName
for the purpose of Scannable.stepName()
.T
- the value typesubscriber
- the delegate Subscriber
that will be requesting the valuevalue
- the single value to be emittedstepName
- the String
to represent the Subscription
in Scannable.stepName()
Subscription
public static <T> CoreSubscriber<T> serialize(CoreSubscriber<? super T> subscriber)
Subscriber
by making sure onNext signals are delivered
sequentially (serialized).
Serialization uses thread-stealing and a potentially unbounded queue that might
starve a calling thread if races are too important and Subscriber
is slower.
T
- the relayed typesubscriber
- the subscriber to serializeSubscriber
public static <F> boolean set(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
cancelledSubscription()
)
or was concurrently updated before.
The replaced subscription is itself cancelled.
F
- the instance typefield
- The Atomic containerinstance
- the instance references
- the subscriptionpublic static <F> boolean setOnce(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
If the field already has a subscription, it is cancelled and the duplicate
subscription is reported (see reportSubscriptionSet()
).
F
- the instance type containing the fieldfield
- the field accessorinstance
- the parent instances
- the subscription to set oncepublic static long subOrZero(long a, long b)
a
- left operandb
- right operandpublic static <F> boolean terminate(AtomicReferenceFieldUpdater<F,Subscription> field, F instance)
cancelledSubscription()
, cancelling the subscription and setting the field
to the singleton cancelledSubscription()
.F
- the instance type containing the fieldfield
- the field accessorinstance
- the parent instancepublic static boolean validate(@Nullable Subscription current, Subscription next)
current
- current Subscription, expected to be nullnext
- new Subscriptionpublic static boolean validate(long n)
reportBadRequest(long)
n
- the request valuepublic static <T> CoreSubscriber<? super T> toCoreSubscriber(Subscriber<? super T> actual)
Subscriber
is not a CoreSubscriber
, it will apply
safe strict wrapping to apply all reactive streams rules including the ones
relaxed by internal operators based on CoreSubscriber
.T
- passed subscriber typeactual
- the Subscriber
to apply hook onSubscriber
public static <T> Fuseable.ConditionalSubscriber<? super T> toConditionalSubscriber(CoreSubscriber<? super T> actual)
CoreSubscriber
is not Fuseable.ConditionalSubscriber
,
it will apply an adapter which directly maps all
Fuseable.ConditionalSubscriber.tryOnNext(Object)
to
Subscriber.onNext(Object)
and always returns true as the resultT
- passed subscriber typeactual
- the Subscriber
to adaptFuseable.ConditionalSubscriber