public abstract class Operators extends Object
Subscription.request(long)
handling.
Combine utils available to operator implementations, @see http://github.com/reactor/reactive-streams-commonsModifier and Type | Class and Description |
---|---|
static class |
Operators.DeferredSubscription
Base class for Subscribers that will receive their Subscriptions at any time, yet
they might also need to be cancelled or requested at any time.
|
static class |
Operators.MonoSubscriber<I,O>
A Subscriber/Subscription barrier that holds a single value at most and properly gates asynchronous behaviors
resulting from concurrent request or cancel and onXXX signals.
|
Modifier and Type | Method and Description |
---|---|
static <T> long |
addAndGet(AtomicLongFieldUpdater<T> updater,
T instance,
long n)
Concurrent addition bound to Long.MAX_VALUE.
|
static long |
addCap(long a,
long b)
Cap an addition to Long.MAX_VALUE
|
static <T> Fuseable.QueueSubscription<T> |
as(Subscription s)
Returns the subscription as QueueSubscription if possible or null.
|
static Subscription |
cancelledSubscription()
A singleton Subscription that represents a cancelled subscription instance and
should not be leaked to clients as it represents a terminal state.
|
static void |
checkRequest(long n)
Throws an exception if request is 0 or negative as specified in rule 3.09 of Reactive Streams
|
static boolean |
checkRequest(long n,
Subscriber<?> subscriber)
Propagate an exception to a subscriber if request is 0 or negative,
as specified in rule 3.09 of Reactive Streams
|
static void |
complete(Subscriber<?> s)
Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onComplete.
|
static <T> Subscriber<T> |
drainSubscriber()
Return a singleton
Subscriber that does not check for double onSubscribe
and purely request Long.MAX. |
static Subscription |
emptySubscription()
A singleton enumeration that represents a no-op Subscription instance that
can be freely given out to clients.
|
static void |
error(Subscriber<?> s,
Throwable e)
Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onError with the
supplied error.
|
static <T> long |
getAndAddCap(AtomicLongFieldUpdater<T> updater,
T instance,
long toAdd)
Concurrent addition bound to Long.MAX_VALUE.
|
static long |
multiplyCap(long a,
long b)
Cap a multiplication to Long.MAX_VALUE
|
static void |
onErrorDropped(Throwable e)
An unexpected exception is about to be dropped.
|
static void |
onErrorDropped(Throwable e,
Throwable root)
An unexpected exception is about to be dropped, and it additionally
masks another one due to callback failure.
|
static <T> void |
onNextDropped(T t)
An unexpected event is about to be dropped.
|
static Throwable |
onOperatorError(Subscription subscription,
Throwable error)
Map an "operator" error given an operator parent
Subscription . |
static Throwable |
onOperatorError(Subscription subscription,
Throwable error,
Object dataSignal)
Map an "operator" error given an operator parent
Subscription . |
static Throwable |
onOperatorError(Throwable error)
Map an "operator" error.
|
static RuntimeException |
onRejectedExecution()
Return a wrapped
RejectedExecutionException which can be thrown by the
operator. |
static RuntimeException |
onRejectedExecution(Subscription subscription,
Throwable suppressed,
Object dataSignal)
Return a wrapped
RejectedExecutionException which can be thrown by the
operator. |
static <T> long |
produced(AtomicLongFieldUpdater<T> updater,
T instance,
long toSub)
Concurrent subtraction bound to 0, mostly used to decrement a request tracker by
the amount produced by the operator.
|
static <F> boolean |
replace(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
A generic utility to atomically replace a subscription or cancel the replacement
if the current subscription is marked as already cancelled (as in
cancelledSubscription() ). |
static void |
reportBadRequest(long n)
Throw an
IllegalArgumentException if the request is null or negative. |
static void |
reportMoreProduced()
Throw an
IllegalStateException that indicates more than the requested
amount was produced. |
static void |
reportSubscriptionSet()
Log a
duplicate subscription error. |
static <T> Subscription |
scalarSubscription(Subscriber<? super T> subscriber,
T value)
Represents a fuseable Subscription that emits a single constant value synchronously
to a Subscriber or consumer.
|
static <T> Subscriber<T> |
serialize(Subscriber<? super T> subscriber)
Safely gate a
Subscriber by making sure onNext signals are delivered
sequentially (serialized). |
static <F> boolean |
set(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
A generic utility to atomically replace a subscription or cancel the replacement
if current subscription is marked as cancelled (as in
cancelledSubscription() )
or was concurrently updated before. |
static <F> boolean |
setOnce(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
Sets the given subscription once and returns true if successful, false
if the field has a subscription already or has been cancelled.
|
static long |
subOrZero(long a,
long b)
Cap a subtraction to 0
|
static <F> boolean |
terminate(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance)
Atomically terminates the subscription if it is not already a
cancelledSubscription() , cancelling the subscription and setting the field
to the singleton cancelledSubscription() . |
static boolean |
validate(long n)
Evaluate if a request is strictly positive otherwise
reportBadRequest(long) |
static boolean |
validate(Subscription current,
Subscription next)
Check Subscription current state and cancel new Subscription if current is set,
or return true if ready to subscribe.
|
public static <T> long addAndGet(AtomicLongFieldUpdater<T> updater, T instance, long n)
T
- the parent instance typeupdater
- current field updaterinstance
- current instance to updaten
- delta to addpublic static long addCap(long a, long b)
a
- left operandb
- right operandpublic static <T> Fuseable.QueueSubscription<T> as(Subscription s)
T
- the value type of the QueueSubscription.s
- the source subscription to try to convert.public static Subscription cancelledSubscription()
Subscription
to be used as an inner representation
of the cancelled statepublic static void checkRequest(long n) throws IllegalArgumentException
n
- demand to checkIllegalArgumentException
- the nullOrNegativeRequestException instancepublic static boolean checkRequest(long n, Subscriber<?> subscriber)
n
- demand to checksubscriber
- Subscriber to onError if non strict positive nIllegalArgumentException
- if subscriber is null and demand is negative or 0.public static void complete(Subscriber<?> s)
s
- the target subscriberpublic static <T> Subscriber<T> drainSubscriber()
Subscriber
that does not check for double onSubscribe
and purely request Long.MAX. If an error is received it will raise a
Exceptions.errorCallbackNotImplemented(Throwable)
in the receiving thread.Subscriber
whose sole purpose is to request Long.MAXpublic static Subscription emptySubscription()
The enum also implements Fuseable.QueueSubscription so operators expecting a QueueSubscription from a Fuseable source don't have to double-check their Subscription received in onSubscribe.
Subscription
public static void error(Subscriber<?> s, Throwable e)
s
- target Subscriber to errore
- the actual errorpublic static <T> long getAndAddCap(AtomicLongFieldUpdater<T> updater, T instance, long toAdd)
T
- the parent instance typeupdater
- current field updaterinstance
- current instance to updatetoAdd
- delta to addpublic static long multiplyCap(long a, long b)
a
- left operandb
- right operandpublic static void onErrorDropped(Throwable e, Throwable root)
e
- the exception to handleroot
- the optional root cause to suppresspublic static void onErrorDropped(Throwable e)
e
- the dropped exceptionpublic static <T> void onNextDropped(T t)
T
- the dropped value typet
- the dropped datapublic static Throwable onOperatorError(Throwable error)
Exceptions.throwIfFatal(Throwable)
.error
- the callback or operator errorThrowable
public static Throwable onOperatorError(Subscription subscription, Throwable error)
Subscription
. The
result error will be passed via onError to the operator downstream.
Subscription
will be cancelled after checking for fatal error via
Exceptions.throwIfFatal(Throwable)
.subscription
- the linked operator parent Subscription
error
- the callback or operator errorThrowable
public static Throwable onOperatorError(Subscription subscription, Throwable error, Object dataSignal)
Subscription
. The
result error will be passed via onError to the operator downstream.
Subscription
will be cancelled after checking for fatal error via
Exceptions.throwIfFatal(Throwable)
. Takes an additional signal, which
can be added as a suppressed exception if it is a Throwable
and the
default hook
is in place.subscription
- the linked operator parent Subscription
error
- the callback or operator errordataSignal
- the value (onNext or onError) signal processed during failureThrowable
public static RuntimeException onRejectedExecution()
RejectedExecutionException
which can be thrown by the
operator. This exception denotes that an execution was rejected by a
Scheduler
, notably when it was already disposed.
Wrapping is done by calling both Exceptions.bubble(Throwable)
and
onOperatorError(Subscription, Throwable, Object)
.
public static RuntimeException onRejectedExecution(Subscription subscription, Throwable suppressed, Object dataSignal)
RejectedExecutionException
which can be thrown by the
operator. This exception denotes that an execution was rejected by a
Scheduler
, notably when it was already disposed.
Wrapping is done by calling both Exceptions.bubble(Throwable)
and
onOperatorError(Subscription, Throwable, Object)
(with the passed
Subscription
).
subscription
- the subscription to pass to onOperatorError.suppressed
- a Throwable to be suppressed by the RejectedExecutionException
(or null if not relevant)dataSignal
- a value to be passed to onOperatorError(Subscription, Throwable, Object)
(or null if not relevant)public static <T> long produced(AtomicLongFieldUpdater<T> updater, T instance, long toSub)
T
- the parent instance typeupdater
- current field updaterinstance
- current instance to updatetoSub
- delta to subtractpublic static <F> boolean replace(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
cancelledSubscription()
).F
- the instance typefield
- The Atomic containerinstance
- the instance references
- the subscriptionpublic static void reportBadRequest(long n)
IllegalArgumentException
if the request is null or negative.n
- the demand to evaluateExceptions.nullOrNegativeRequestException(long)
public static void reportMoreProduced()
IllegalStateException
that indicates more than the requested
amount was produced.Exceptions.failWithOverflow()
public static void reportSubscriptionSet()
duplicate subscription
error.public static <T> Subscription scalarSubscription(Subscriber<? super T> subscriber, T value)
T
- the value typesubscriber
- the delegate Subscriber
that will be requesting the valuevalue
- the single value to be emittedSubscription
public static <T> Subscriber<T> serialize(Subscriber<? super T> subscriber)
Subscriber
by making sure onNext signals are delivered
sequentially (serialized).
Serialization uses thread-stealing and a potentially unbounded queue that might
starve a calling thread if races are too important and Subscriber
is slower.
T
- the relayed typesubscriber
- the subscriber to serializeSubscriber
public static <F> boolean set(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
cancelledSubscription()
)
or was concurrently updated before.
The replaced subscription is itself cancelled.
F
- the instance typefield
- The Atomic containerinstance
- the instance references
- the subscriptionpublic static <F> boolean setOnce(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
If the field already has a subscription, it is cancelled and the duplicate
subscription is reported (see reportSubscriptionSet()
).
F
- the instance type containing the fieldfield
- the field accessorinstance
- the parent instances
- the subscription to set oncepublic static long subOrZero(long a, long b)
a
- left operandb
- right operandpublic static <F> boolean terminate(AtomicReferenceFieldUpdater<F,Subscription> field, F instance)
cancelledSubscription()
, cancelling the subscription and setting the field
to the singleton cancelledSubscription()
.F
- the instance type containing the fieldfield
- the field accessorinstance
- the parent instancepublic static boolean validate(Subscription current, Subscription next)
current
- current Subscription, expected to be nullnext
- new Subscriptionpublic static boolean validate(long n)
reportBadRequest(long)
n
- the request value