public abstract class Operators extends Object
Subscription.request(long) handling.
Combine utils available to operator implementations, @see http://github.com/reactor/reactive-streams-commons| Modifier and Type | Class and Description |
|---|---|
static class |
Operators.DeferredSubscription
Base class for Subscribers that will receive their Subscriptions at any time yet
they need to be cancelled or requested at any time.
|
static class |
Operators.MonoSubscriber<I,O>
A Subscriber/Subscription barrier that holds a single value at most and properly gates asynchronous behaviors
resulting from concurrent request or cancel and onXXX signals.
|
static class |
Operators.SubscriberAdapter<I,O>
A
Subscriber with an asymetric typed wrapped subscriber. |
| Modifier and Type | Method and Description |
|---|---|
static <T> long |
addAndGet(AtomicLongFieldUpdater<T> updater,
T instance,
long n)
Concurrent addition bound to Long.MAX_VALUE.
|
static long |
addAndGet(AtomicLong current,
long toAdd)
Deprecated.
This util is neither used by Core nor consistent with the other
`addAndGet` methods present which use
AtomicLongFieldUpdater. In
an effort to keep Operators API consistent, this util will not be supported
anymore and users need to port over its body under their app code. |
static long |
addCap(long a,
long b)
Cap an addition to Long.MAX_VALUE
|
static <T> Fuseable.QueueSubscription<T> |
as(Subscription s)
Returns the subscription as QueueSubscription if possible or null.
|
static Subscription |
cancelledSubscription()
A singleton Subscription that represents a cancelled subscription instance and
should not be leaked to clients as it represents a terminal state.
|
static void |
checkRequest(long n)
Throws an exception if request is 0 or negative as specified in rule 3.09 of Reactive Streams
|
static boolean |
checkRequest(long n,
Subscriber<?> subscriber)
Throws an exception if request is 0 or negative as specified in rule 3.09 of Reactive Streams
|
static void |
complete(Subscriber<?> s)
Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onComplete.
|
static <T> Subscriber<T> |
drainSubscriber()
Return a singleton
Subscriber that does not check for double onSubscribe
and purely request Long.MAX. |
static Subscription |
emptySubscription()
A singleton enumeration that represents a no-op Subscription instance that
can be freely given out to clients.
|
static void |
error(Subscriber<?> s,
Throwable e)
Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onError with the
supplied error.
|
static <T> long |
getAndAddCap(AtomicLongFieldUpdater<T> updater,
T instance,
long toAdd)
Concurrent addition bound to Long.MAX_VALUE.
|
static long |
multiplyCap(long a,
long b)
Cap a multiplication to Long.MAX_VALUE
|
static void |
onErrorDropped(Throwable e)
Take an unsignalled exception that is masking another one due to callback failure.
|
static void |
onErrorDropped(Throwable e,
Throwable root)
Take an unsignalled exception that is masking anowher one due to callback failure.
|
static <T> void |
onNextDropped(T t)
An unexpected event is about to be dropped.
|
static Throwable |
onOperatorError(Subscription subscription,
Throwable error)
Map an "operator" error given an operator parent
Subscription. |
static Throwable |
onOperatorError(Subscription subscription,
Throwable error,
Object dataSignal)
Map an "operator" error given an operator parent
Subscription. |
static Throwable |
onOperatorError(Throwable error)
Map an "operator" error.
|
static RuntimeException |
onRejectedExecution()
Return a wrapped
RejectedExecutionException which can be thrown by the
operator. |
static RuntimeException |
onRejectedExecution(Subscription subscription,
Throwable suppressed,
Object dataSignal)
Return a wrapped
RejectedExecutionException which can be thrown by the
operator. |
static <T> long |
produced(AtomicLongFieldUpdater<T> updater,
T instance,
long toSub)
Concurrent substraction bound to 0.
|
static <F> boolean |
replace(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
A generic utility to atomically replace a subscription or cancel if marked by a
singleton subscription.
|
static void |
reportBadRequest(long n)
Throw
IllegalArgumentException |
static void |
reportMoreProduced()
Throw
IllegalStateException |
static void |
reportSubscriptionSet()
Log reportedSubscriptions
|
static <T> Subscription |
scalarSubscription(Subscriber<? super T> subscriber,
T value)
Represents a fuseable Subscription that emits a single constant value synchronously
to a Subscriber or consumer.
|
static <T> Subscriber<T> |
serialize(Subscriber<? super T> subscriber)
Safely gate a
Subscriber by a serializing Subscriber. |
static <F> boolean |
set(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
A generic utility to atomically replace a subscription or cancel if marked by a
singleton subscription or concurrently set before.
|
static <F> boolean |
setOnce(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
Sets the given subscription once and returns true if successful, false
if the field has a subscription already or has been cancelled.
|
static long |
subOrZero(long a,
long b)
Cap a substraction to 0
|
static <F> boolean |
terminate(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance)
Atomically terminates the subscription if it is not already a
cancelledSubscription(), cancelling the subscription and setting the field
to the singleton cancelledSubscription(). |
static boolean |
validate(long n)
Evaluate if a request is strictly positive otherwise
reportBadRequest(long) |
static boolean |
validate(Subscription current,
Subscription next)
Check Subscription current state and cancel new Subscription if different null, returning true if
ready to subscribe.
|
@Deprecated public static long addAndGet(AtomicLong current, long toAdd)
AtomicLongFieldUpdater. In
an effort to keep Operators API consistent, this util will not be supported
anymore and users need to port over its body under their app code.current - current atomic to updatetoAdd - delta to addpublic static <T> long addAndGet(AtomicLongFieldUpdater<T> updater, T instance, long n)
T - the parent instance typeupdater - current field updaterinstance - current instance to updaten - delta to addpublic static long addCap(long a,
long b)
a - left operandb - right operandpublic static <T> Fuseable.QueueSubscription<T> as(Subscription s)
T - the value type of the QueueSubscription.s - the source subscription to try to convert.public static Subscription cancelledSubscription()
Subscriptionpublic static void checkRequest(long n)
throws IllegalArgumentException
n - demand to checkIllegalArgumentException - the nullOrNegativeRequestException instancepublic static boolean checkRequest(long n,
Subscriber<?> subscriber)
n - demand to checksubscriber - Subscriber to onError if non strict positive nIllegalArgumentException - if subscriber is null and demand is negative or 0.public static void complete(Subscriber<?> s)
s - the target subscriberpublic static <T> Subscriber<T> drainSubscriber()
Subscriber that does not check for double onSubscribe
and purely request Long.MAX. If an error is received it will raise a
Exceptions.errorCallbackNotImplemented(Throwable) in the receiving thread.Subscriber whose sole purpose is to request Long.MAXpublic static Subscription emptySubscription()
The enum also implements Fuseable.QueueSubscription so operators expecting a QueueSubscription from a Fuseable source don't have to double-check their Subscription received in onSubscribe.
Subscriptionpublic static void error(Subscriber<?> s, Throwable e)
s - target Subscriber to errore - the actual errorpublic static <T> long getAndAddCap(AtomicLongFieldUpdater<T> updater, T instance, long toAdd)
T - the parent instance typeupdater - current field updaterinstance - current instance to updatetoAdd - delta to addpublic static long multiplyCap(long a,
long b)
a - left operandb - right operandpublic static void onErrorDropped(Throwable e, Throwable root)
e - the exception to handleroot - the optional root cause to suppresspublic static void onErrorDropped(Throwable e)
e - the exception to handlepublic static <T> void onNextDropped(T t)
T - the dropped value typet - the dropping datapublic static Throwable onOperatorError(Throwable error)
Exceptions.throwIfFatal(Throwable).error - the callback or operator errorThrowablepublic static Throwable onOperatorError(Subscription subscription, Throwable error)
Subscription. The
result error will be passed via onError to the operator downstream.
Subscription will be cancelled after checking for fatal error via
Exceptions.throwIfFatal(Throwable).subscription - the linked operator parent Subscriptionerror - the callback or operator errorThrowablepublic static Throwable onOperatorError(Subscription subscription, Throwable error, Object dataSignal)
Subscription. The
result error will be passed via onError to the operator downstream.
Subscription will be cancelled after checking for fatal error via
Exceptions.throwIfFatal(Throwable).subscription - the linked operator parent Subscriptionerror - the callback or operator errordataSignal - the value (onNext or onError) signal processed during failureThrowablepublic static RuntimeException onRejectedExecution()
RejectedExecutionException which can be thrown by the
operator. That denotes that an execution was rejected by a
Scheduler due to dispose.
Wrapping is done by calling both Exceptions.bubble(Throwable) and
onOperatorError(Subscription, Throwable, Object).
public static RuntimeException onRejectedExecution(Subscription subscription, Throwable suppressed, Object dataSignal)
RejectedExecutionException which can be thrown by the
operator. That denotes that an execution was rejected by a
Scheduler due to dispose.
Wrapping is done by calling both Exceptions.bubble(Throwable) and
onOperatorError(Subscription, Throwable, Object) (with the passed
Subscription).
subscription - the subscription to pass to onOperatorError.suppressed - a Throwable to be suppressed by the RejectedExecutionException (or null if not relevant)dataSignal - a value to be passed to onOperatorError(Subscription, Throwable, Object) (or null if not relevant)public static <T> long produced(AtomicLongFieldUpdater<T> updater, T instance, long toSub)
T - the parent instance typeupdater - current field updaterinstance - current instance to updatetoSub - delta to subpublic static <F> boolean replace(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
F - the instance typefield - The Atomic containerinstance - the instance references - the subscriptionpublic static void reportBadRequest(long n)
IllegalArgumentExceptionn - the demand to evaluatepublic static void reportMoreProduced()
IllegalStateExceptionpublic static void reportSubscriptionSet()
public static <T> Subscription scalarSubscription(Subscriber<? super T> subscriber, T value)
T - the value typesubscriber - the delegate Subscriber that will be requesting the valuevalue - the single value to be emittedSubscriptionpublic static <T> Subscriber<T> serialize(Subscriber<? super T> subscriber)
Subscriber by a serializing Subscriber.
Serialization uses thread-stealing and a potentially unbounded queue that might starve a calling thread if
races are too important and
Subscriber is slower.

T - the relayed typesubscriber - the subscriber to wrapSubscriberpublic static <F> boolean set(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
F - the instance typefield - The Atomic containerinstance - the instance references - the subscriptionpublic static <F> boolean setOnce(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
F - the instance type containing the fieldfield - the field accessorinstance - the parent instances - the subscription to set oncepublic static long subOrZero(long a,
long b)
a - left operandb - right operandpublic static <F> boolean terminate(AtomicReferenceFieldUpdater<F,Subscription> field, F instance)
cancelledSubscription(), cancelling the subscription and setting the field
to the singleton cancelledSubscription().F - the instance type containing the fieldfield - the field accessorinstance - the parent instancepublic static boolean validate(Subscription current, Subscription next)
current - current Subscription, expected to be nullnext - new Subscriptionpublic static boolean validate(long n)
reportBadRequest(long)n - the request value