public abstract class Operators extends Object
Subscription.request(long) handling.
Combine utils available to operator implementations, @see http://github.com/reactor/reactive-streams-commons| Modifier and Type | Class and Description |
|---|---|
static class |
Operators.DeferredSubscription
Base class for Subscribers that will receive their Subscriptions at any time, yet
they might also need to be cancelled or requested at any time.
|
static class |
Operators.MonoSubscriber<I,O>
A Subscriber/Subscription barrier that holds a single value at most and properly gates asynchronous behaviors
resulting from concurrent request or cancel and onXXX signals.
|
| Modifier and Type | Method and Description |
|---|---|
static <T> long |
addAndGet(AtomicLongFieldUpdater<T> updater,
T instance,
long n)
Concurrent addition bound to Long.MAX_VALUE.
|
static long |
addCap(long a,
long b)
Cap an addition to Long.MAX_VALUE
|
static <T> Fuseable.QueueSubscription<T> |
as(Subscription s)
Returns the subscription as QueueSubscription if possible or null.
|
static Subscription |
cancelledSubscription()
A singleton Subscription that represents a cancelled subscription instance and
should not be leaked to clients as it represents a terminal state.
|
static void |
checkRequest(long n)
Throws an exception if request is 0 or negative as specified in rule 3.09 of Reactive Streams
|
static boolean |
checkRequest(long n,
Subscriber<?> subscriber)
Propagate an exception to a subscriber if request is 0 or negative,
as specified in rule 3.09 of Reactive Streams
|
static void |
complete(Subscriber<?> s)
Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onComplete.
|
static <T> Subscriber<T> |
drainSubscriber()
Return a singleton
Subscriber that does not check for double onSubscribe
and purely request Long.MAX. |
static Subscription |
emptySubscription()
A singleton enumeration that represents a no-op Subscription instance that
can be freely given out to clients.
|
static void |
error(Subscriber<?> s,
Throwable e)
Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onError with the
supplied error.
|
static <T> long |
getAndAddCap(AtomicLongFieldUpdater<T> updater,
T instance,
long toAdd)
Concurrent addition bound to Long.MAX_VALUE.
|
static long |
multiplyCap(long a,
long b)
Cap a multiplication to Long.MAX_VALUE
|
static void |
onErrorDropped(Throwable e)
An unexpected exception is about to be dropped.
|
static void |
onErrorDropped(Throwable e,
Throwable root)
An unexpected exception is about to be dropped, and it additionally
masks another one due to callback failure.
|
static <T> void |
onNextDropped(T t)
An unexpected event is about to be dropped.
|
static Throwable |
onOperatorError(Subscription subscription,
Throwable error)
Map an "operator" error given an operator parent
Subscription. |
static Throwable |
onOperatorError(Subscription subscription,
Throwable error,
Object dataSignal)
Map an "operator" error given an operator parent
Subscription. |
static Throwable |
onOperatorError(Throwable error)
Map an "operator" error.
|
static RuntimeException |
onRejectedExecution()
Return a wrapped
RejectedExecutionException which can be thrown by the
operator. |
static RuntimeException |
onRejectedExecution(Subscription subscription,
Throwable suppressed,
Object dataSignal)
Return a wrapped
RejectedExecutionException which can be thrown by the
operator. |
static <T> long |
produced(AtomicLongFieldUpdater<T> updater,
T instance,
long toSub)
Concurrent subtraction bound to 0, mostly used to decrement a request tracker by
the amount produced by the operator.
|
static <F> boolean |
replace(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
A generic utility to atomically replace a subscription or cancel the replacement
if the current subscription is marked as already cancelled (as in
cancelledSubscription()). |
static void |
reportBadRequest(long n)
Throw an
IllegalArgumentException if the request is null or negative. |
static void |
reportMoreProduced()
Throw an
IllegalStateException that indicates more than the requested
amount was produced. |
static void |
reportSubscriptionSet()
Log a
duplicate subscription error. |
static <T> Subscription |
scalarSubscription(Subscriber<? super T> subscriber,
T value)
Represents a fuseable Subscription that emits a single constant value synchronously
to a Subscriber or consumer.
|
static <T> Subscriber<T> |
serialize(Subscriber<? super T> subscriber)
Safely gate a
Subscriber by making sure onNext signals are delivered
sequentially (serialized). |
static <F> boolean |
set(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
A generic utility to atomically replace a subscription or cancel the replacement
if current subscription is marked as cancelled (as in
cancelledSubscription())
or was concurrently updated before. |
static <F> boolean |
setOnce(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
Sets the given subscription once and returns true if successful, false
if the field has a subscription already or has been cancelled.
|
static long |
subOrZero(long a,
long b)
Cap a subtraction to 0
|
static <F> boolean |
terminate(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance)
Atomically terminates the subscription if it is not already a
cancelledSubscription(), cancelling the subscription and setting the field
to the singleton cancelledSubscription(). |
static boolean |
validate(long n)
Evaluate if a request is strictly positive otherwise
reportBadRequest(long) |
static boolean |
validate(Subscription current,
Subscription next)
Check Subscription current state and cancel new Subscription if current is set,
or return true if ready to subscribe.
|
public static <T> long addAndGet(AtomicLongFieldUpdater<T> updater, T instance, long n)
T - the parent instance typeupdater - current field updaterinstance - current instance to updaten - delta to addpublic static long addCap(long a,
long b)
a - left operandb - right operandpublic static <T> Fuseable.QueueSubscription<T> as(Subscription s)
T - the value type of the QueueSubscription.s - the source subscription to try to convert.public static Subscription cancelledSubscription()
Subscription to be used as an inner representation
of the cancelled statepublic static void checkRequest(long n)
throws IllegalArgumentException
n - demand to checkIllegalArgumentException - the nullOrNegativeRequestException instancepublic static boolean checkRequest(long n,
Subscriber<?> subscriber)
n - demand to checksubscriber - Subscriber to onError if non strict positive nIllegalArgumentException - if subscriber is null and demand is negative or 0.public static void complete(Subscriber<?> s)
s - the target subscriberpublic static <T> Subscriber<T> drainSubscriber()
Subscriber that does not check for double onSubscribe
and purely request Long.MAX. If an error is received it will raise a
Exceptions.errorCallbackNotImplemented(Throwable) in the receiving thread.Subscriber whose sole purpose is to request Long.MAXpublic static Subscription emptySubscription()
The enum also implements Fuseable.QueueSubscription so operators expecting a QueueSubscription from a Fuseable source don't have to double-check their Subscription received in onSubscribe.
Subscriptionpublic static void error(Subscriber<?> s, Throwable e)
s - target Subscriber to errore - the actual errorpublic static <T> long getAndAddCap(AtomicLongFieldUpdater<T> updater, T instance, long toAdd)
T - the parent instance typeupdater - current field updaterinstance - current instance to updatetoAdd - delta to addpublic static long multiplyCap(long a,
long b)
a - left operandb - right operandpublic static void onErrorDropped(Throwable e, Throwable root)
e - the exception to handleroot - the optional root cause to suppresspublic static void onErrorDropped(Throwable e)
e - the dropped exceptionpublic static <T> void onNextDropped(T t)
T - the dropped value typet - the dropped datapublic static Throwable onOperatorError(Throwable error)
Exceptions.throwIfFatal(Throwable).error - the callback or operator errorThrowablepublic static Throwable onOperatorError(Subscription subscription, Throwable error)
Subscription. The
result error will be passed via onError to the operator downstream.
Subscription will be cancelled after checking for fatal error via
Exceptions.throwIfFatal(Throwable).subscription - the linked operator parent Subscriptionerror - the callback or operator errorThrowablepublic static Throwable onOperatorError(Subscription subscription, Throwable error, Object dataSignal)
Subscription. The
result error will be passed via onError to the operator downstream.
Subscription will be cancelled after checking for fatal error via
Exceptions.throwIfFatal(Throwable). Takes an additional signal, which
can be added as a suppressed exception if it is a Throwable and the
default hook is in place.subscription - the linked operator parent Subscriptionerror - the callback or operator errordataSignal - the value (onNext or onError) signal processed during failureThrowablepublic static RuntimeException onRejectedExecution()
RejectedExecutionException which can be thrown by the
operator. This exception denotes that an execution was rejected by a
Scheduler, notably when it was already disposed.
Wrapping is done by calling both Exceptions.bubble(Throwable) and
onOperatorError(Subscription, Throwable, Object).
public static RuntimeException onRejectedExecution(Subscription subscription, Throwable suppressed, Object dataSignal)
RejectedExecutionException which can be thrown by the
operator. This exception denotes that an execution was rejected by a
Scheduler, notably when it was already disposed.
Wrapping is done by calling both Exceptions.bubble(Throwable) and
onOperatorError(Subscription, Throwable, Object) (with the passed
Subscription).
subscription - the subscription to pass to onOperatorError.suppressed - a Throwable to be suppressed by the RejectedExecutionException (or null if not relevant)dataSignal - a value to be passed to onOperatorError(Subscription, Throwable, Object) (or null if not relevant)public static <T> long produced(AtomicLongFieldUpdater<T> updater, T instance, long toSub)
T - the parent instance typeupdater - current field updaterinstance - current instance to updatetoSub - delta to subtractpublic static <F> boolean replace(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
cancelledSubscription()).F - the instance typefield - The Atomic containerinstance - the instance references - the subscriptionpublic static void reportBadRequest(long n)
IllegalArgumentException if the request is null or negative.n - the demand to evaluateExceptions.nullOrNegativeRequestException(long)public static void reportMoreProduced()
IllegalStateException that indicates more than the requested
amount was produced.Exceptions.failWithOverflow()public static void reportSubscriptionSet()
duplicate subscription error.public static <T> Subscription scalarSubscription(Subscriber<? super T> subscriber, T value)
T - the value typesubscriber - the delegate Subscriber that will be requesting the valuevalue - the single value to be emittedSubscriptionpublic static <T> Subscriber<T> serialize(Subscriber<? super T> subscriber)
Subscriber by making sure onNext signals are delivered
sequentially (serialized).
Serialization uses thread-stealing and a potentially unbounded queue that might
starve a calling thread if races are too important and Subscriber is slower.

T - the relayed typesubscriber - the subscriber to serializeSubscriberpublic static <F> boolean set(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
cancelledSubscription())
or was concurrently updated before.
The replaced subscription is itself cancelled.
F - the instance typefield - The Atomic containerinstance - the instance references - the subscriptionpublic static <F> boolean setOnce(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
If the field already has a subscription, it is cancelled and the duplicate
subscription is reported (see reportSubscriptionSet()).
F - the instance type containing the fieldfield - the field accessorinstance - the parent instances - the subscription to set oncepublic static long subOrZero(long a,
long b)
a - left operandb - right operandpublic static <F> boolean terminate(AtomicReferenceFieldUpdater<F,Subscription> field, F instance)
cancelledSubscription(), cancelling the subscription and setting the field
to the singleton cancelledSubscription().F - the instance type containing the fieldfield - the field accessorinstance - the parent instancepublic static boolean validate(Subscription current, Subscription next)
current - current Subscription, expected to be nullnext - new Subscriptionpublic static boolean validate(long n)
reportBadRequest(long)n - the request value