public abstract class Operators extends Object
Subscription.request(long)
handling.
Combine utils available to operator implementations, @see http://github.com/reactor/reactive-streams-commonsModifier and Type | Class and Description |
---|---|
static class |
Operators.DeferredSubscription
Base class for Subscribers that will receive their Subscriptions at any time, yet
they might also need to be cancelled or requested at any time.
|
static class |
Operators.MonoSubscriber<I,O>
A Subscriber/Subscription barrier that holds a single value at most and properly gates asynchronous behaviors
resulting from concurrent request or cancel and onXXX signals.
|
Modifier and Type | Method and Description |
---|---|
static <T> long |
addCap(AtomicLongFieldUpdater<T> updater,
T instance,
long toAdd)
Concurrent addition bound to Long.MAX_VALUE.
|
static long |
addCap(long a,
long b)
Cap an addition to Long.MAX_VALUE
|
static <T> Fuseable.QueueSubscription<T> |
as(Subscription s)
Returns the subscription as QueueSubscription if possible or null.
|
static Subscription |
cancelledSubscription()
A singleton Subscription that represents a cancelled subscription instance and
should not be leaked to clients as it represents a terminal state.
|
static void |
complete(Subscriber<?> s)
Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onComplete.
|
static <T> CoreSubscriber<T> |
drainSubscriber()
Return a singleton
Subscriber that does not check for double onSubscribe
and purely request Long.MAX. |
static <T> CoreSubscriber<T> |
emptySubscriber()
A
Subscriber that is expected to be used as a placeholder and
never actually be called. |
static Subscription |
emptySubscription()
A singleton enumeration that represents a no-op Subscription instance that
can be freely given out to clients.
|
static void |
error(Subscriber<?> s,
Throwable e)
Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onError with the
supplied error.
|
static <I,O> Function<? super Publisher<I>,? extends Publisher<O>> |
lift(BiFunction<Scannable,? super CoreSubscriber<? super O>,? extends CoreSubscriber<? super I>> lifter)
Create a function that can be used to support a custom operator via
CoreSubscriber decoration. |
static <O> Function<? super Publisher<O>,? extends Publisher<O>> |
lift(Predicate<Scannable> filter,
BiFunction<Scannable,? super CoreSubscriber<? super O>,? extends CoreSubscriber<? super O>> lifter)
Create a function that can be used to support a custom operator via
CoreSubscriber decoration. |
static long |
multiplyCap(long a,
long b)
Cap a multiplication to Long.MAX_VALUE
|
static void |
onErrorDropped(Throwable e,
Context context)
An unexpected exception is about to be dropped.
|
static <T> void |
onNextDropped(T t,
Context context)
An unexpected event is about to be dropped.
|
static Throwable |
onOperatorError(Subscription subscription,
Throwable error,
Context context)
Map an "operator" error given an operator parent
Subscription . |
static Throwable |
onOperatorError(Subscription subscription,
Throwable error,
Object dataSignal,
Context context)
Map an "operator" error given an operator parent
Subscription . |
static Throwable |
onOperatorError(Throwable error,
Context context)
Map an "operator" error.
|
static RuntimeException |
onRejectedExecution(Throwable original,
Context context)
Return a wrapped
RejectedExecutionException which can be thrown by the
operator. |
static RuntimeException |
onRejectedExecution(Throwable original,
Subscription subscription,
Throwable suppressed,
Object dataSignal,
Context context)
Return a wrapped
RejectedExecutionException which can be thrown by the
operator. |
static <T> long |
produced(AtomicLongFieldUpdater<T> updater,
T instance,
long toSub)
Concurrent subtraction bound to 0, mostly used to decrement a request tracker by
the amount produced by the operator.
|
static <F> boolean |
replace(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
A generic utility to atomically replace a subscription or cancel the replacement
if the current subscription is marked as already cancelled (as in
cancelledSubscription() ). |
static void |
reportBadRequest(long n)
Log an
IllegalArgumentException if the request is null or negative. |
static void |
reportMoreProduced()
Log an
IllegalStateException that indicates more than the requested
amount was produced. |
static void |
reportSubscriptionSet()
Log a
duplicate subscription error. |
static <T> Subscription |
scalarSubscription(CoreSubscriber<? super T> subscriber,
T value)
Represents a fuseable Subscription that emits a single constant value synchronously
to a Subscriber or consumer.
|
static <T> CoreSubscriber<T> |
serialize(CoreSubscriber<? super T> subscriber)
Safely gate a
Subscriber by making sure onNext signals are delivered
sequentially (serialized). |
static <F> boolean |
set(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
A generic utility to atomically replace a subscription or cancel the replacement
if current subscription is marked as cancelled (as in
cancelledSubscription() )
or was concurrently updated before. |
static <F> boolean |
setOnce(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
Sets the given subscription once and returns true if successful, false
if the field has a subscription already or has been cancelled.
|
static long |
subOrZero(long a,
long b)
Cap a subtraction to 0
|
static <F> boolean |
terminate(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance)
Atomically terminates the subscription if it is not already a
cancelledSubscription() , cancelling the subscription and setting the field
to the singleton cancelledSubscription() . |
static <T> CoreSubscriber<? super T> |
toCoreSubscriber(Subscriber<? super T> actual)
If the actual
Subscriber is not a CoreSubscriber , it will apply
safe strict wrapping to apply all reactive streams rules including the ones
relaxed by internal operators based on CoreSubscriber . |
static boolean |
validate(long n)
Evaluate if a request is strictly positive otherwise
reportBadRequest(long) |
static boolean |
validate(Subscription current,
Subscription next)
Check Subscription current state and cancel new Subscription if current is set,
or return true if ready to subscribe.
|
public static long addCap(long a, long b)
a
- left operandb
- right operandpublic static <T> long addCap(AtomicLongFieldUpdater<T> updater, T instance, long toAdd)
T
- the parent instance typeupdater
- current field updaterinstance
- current instance to updatetoAdd
- delta to add@Nullable public static <T> Fuseable.QueueSubscription<T> as(Subscription s)
T
- the value type of the QueueSubscription.s
- the source subscription to try to convert.public static Subscription cancelledSubscription()
Subscription
to be used as an inner representation
of the cancelled statepublic static void complete(Subscriber<?> s)
s
- the target subscriberpublic static <T> CoreSubscriber<T> drainSubscriber()
Subscriber
that does not check for double onSubscribe
and purely request Long.MAX. If an error is received it will raise a
Exceptions.errorCallbackNotImplemented(Throwable)
in the receiving thread.Subscriber
whose sole purpose is to request Long.MAXpublic static <T> CoreSubscriber<T> emptySubscriber()
Subscriber
that is expected to be used as a placeholder and
never actually be called. All methods log an error.T
- the type of data (ignored)public static Subscription emptySubscription()
The enum also implements Fuseable.QueueSubscription so operators expecting a QueueSubscription from a Fuseable source don't have to double-check their Subscription received in onSubscribe.
Subscription
public static void error(Subscriber<?> s, Throwable e)
s
- target Subscriber to errore
- the actual errorpublic static <I,O> Function<? super Publisher<I>,? extends Publisher<O>> lift(BiFunction<Scannable,? super CoreSubscriber<? super O>,? extends CoreSubscriber<? super I>> lifter)
CoreSubscriber
decoration. The function is compatible with
Flux.transform(Function)
, Mono.transform(Function)
,
Hooks.onEachOperator(Function)
and Hooks.onLastOperator(Function)
I
- the input typeO
- the output typelifter
- the bifunction taking Scannable
from the enclosing
publisher and consuming CoreSubscriber
. It must return a receiving
CoreSubscriber
that will immediately subscribe to the applied
Publisher
.Function
public static <O> Function<? super Publisher<O>,? extends Publisher<O>> lift(Predicate<Scannable> filter, BiFunction<Scannable,? super CoreSubscriber<? super O>,? extends CoreSubscriber<? super O>> lifter)
CoreSubscriber
decoration. The function is compatible with
Flux.transform(Function)
, Mono.transform(Function)
,
Hooks.onEachOperator(Function)
and Hooks.onLastOperator(Function)
The function will be invoked only if the passed Predicate
matches.
Therefore the transformed type O must be the same than the input type since
unmatched predicate will return the applied Publisher
.
O
- the input and output typefilter
- the predicate to match taking Scannable
from the applied
publisher to operate onlifter
- the bifunction taking Scannable
from the enclosing
publisher and consuming CoreSubscriber
. It must return a receiving
CoreSubscriber
that will immediately subscribe to the applied
Publisher
.Function
public static long multiplyCap(long a, long b)
a
- left operandb
- right operandpublic static void onErrorDropped(Throwable e, Context context)
If no hook is registered for Hooks.onErrorDropped(Consumer)
, the dropped
error is logged at ERROR level and thrown (via Exceptions.bubble(Throwable)
.
e
- the dropped exceptioncontext
- a context that might hold a local error consumerpublic static <T> void onNextDropped(T t, Context context)
If no hook is registered for Hooks.onNextDropped(Consumer)
, the dropped
element is just logged at DEBUG level.
T
- the dropped value typet
- the dropped datacontext
- a context that might hold a local next consumerpublic static Throwable onOperatorError(Throwable error, Context context)
Exceptions.throwIfFatal(Throwable)
.error
- the callback or operator errorcontext
- a context that might hold a local error consumerThrowable
public static Throwable onOperatorError(@Nullable Subscription subscription, Throwable error, Context context)
Subscription
. The
result error will be passed via onError to the operator downstream.
Subscription
will be cancelled after checking for fatal error via
Exceptions.throwIfFatal(Throwable)
.subscription
- the linked operator parent Subscription
error
- the callback or operator errorcontext
- a context that might hold a local error consumerThrowable
public static Throwable onOperatorError(@Nullable Subscription subscription, Throwable error, @Nullable Object dataSignal, Context context)
Subscription
. The
result error will be passed via onError to the operator downstream.
Subscription
will be cancelled after checking for fatal error via
Exceptions.throwIfFatal(Throwable)
. Takes an additional signal, which
can be added as a suppressed exception if it is a Throwable
and the
default hook
is in place.subscription
- the linked operator parent Subscription
error
- the callback or operator errordataSignal
- the value (onNext or onError) signal processed during failurecontext
- a context that might hold a local error consumerThrowable
public static RuntimeException onRejectedExecution(Throwable original, Context context)
RejectedExecutionException
which can be thrown by the
operator. This exception denotes that an execution was rejected by a
Scheduler
, notably when it was already disposed.
Wrapping is done by calling both Exceptions.bubble(Throwable)
and
onOperatorError(Subscription, Throwable, Object, Context)
.
original
- the original execution errorcontext
- a context that might hold a local error consumerpublic static RuntimeException onRejectedExecution(Throwable original, @Nullable Subscription subscription, @Nullable Throwable suppressed, @Nullable Object dataSignal, Context context)
RejectedExecutionException
which can be thrown by the
operator. This exception denotes that an execution was rejected by a
Scheduler
, notably when it was already disposed.
Wrapping is done by calling both Exceptions.failWithRejected(Throwable)
and
onOperatorError(Subscription, Throwable, Object, Context)
(with the passed
Subscription
).
original
- the original execution errorsubscription
- the subscription to pass to onOperatorError.suppressed
- a Throwable to be suppressed by the RejectedExecutionException
(or null if not relevant)dataSignal
- a value to be passed to onOperatorError(Subscription, Throwable, Object, Context)
(or null if not relevant)context
- a context that might hold a local error consumerpublic static <T> long produced(AtomicLongFieldUpdater<T> updater, T instance, long toSub)
T
- the parent instance typeupdater
- current field updaterinstance
- current instance to updatetoSub
- delta to subtractpublic static <F> boolean replace(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
cancelledSubscription()
).F
- the instance typefield
- The Atomic containerinstance
- the instance references
- the subscriptionpublic static void reportBadRequest(long n)
IllegalArgumentException
if the request is null or negative.n
- the failing demandExceptions.nullOrNegativeRequestException(long)
public static void reportMoreProduced()
IllegalStateException
that indicates more than the requested
amount was produced.Exceptions.failWithOverflow()
public static void reportSubscriptionSet()
duplicate subscription
error.public static <T> Subscription scalarSubscription(CoreSubscriber<? super T> subscriber, T value)
T
- the value typesubscriber
- the delegate Subscriber
that will be requesting the valuevalue
- the single value to be emittedSubscription
public static <T> CoreSubscriber<T> serialize(CoreSubscriber<? super T> subscriber)
Subscriber
by making sure onNext signals are delivered
sequentially (serialized).
Serialization uses thread-stealing and a potentially unbounded queue that might
starve a calling thread if races are too important and Subscriber
is slower.
T
- the relayed typesubscriber
- the subscriber to serializeSubscriber
public static <F> boolean set(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
cancelledSubscription()
)
or was concurrently updated before.
The replaced subscription is itself cancelled.
F
- the instance typefield
- The Atomic containerinstance
- the instance references
- the subscriptionpublic static <F> boolean setOnce(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
If the field already has a subscription, it is cancelled and the duplicate
subscription is reported (see reportSubscriptionSet()
).
F
- the instance type containing the fieldfield
- the field accessorinstance
- the parent instances
- the subscription to set oncepublic static long subOrZero(long a, long b)
a
- left operandb
- right operandpublic static <F> boolean terminate(AtomicReferenceFieldUpdater<F,Subscription> field, F instance)
cancelledSubscription()
, cancelling the subscription and setting the field
to the singleton cancelledSubscription()
.F
- the instance type containing the fieldfield
- the field accessorinstance
- the parent instancepublic static boolean validate(@Nullable Subscription current, Subscription next)
current
- current Subscription, expected to be nullnext
- new Subscriptionpublic static boolean validate(long n)
reportBadRequest(long)
n
- the request valuepublic static <T> CoreSubscriber<? super T> toCoreSubscriber(Subscriber<? super T> actual)
Subscriber
is not a CoreSubscriber
, it will apply
safe strict wrapping to apply all reactive streams rules including the ones
relaxed by internal operators based on CoreSubscriber
.T
- passed subscriber typeactual
- the Subscriber
to apply hook onSubscriber