public abstract class Operators extends Object
Subscription.request(long)
handling.
Combine utils available to operator implementations, @see http://github.com/reactor/reactive-streams-commonsModifier and Type | Class and Description |
---|---|
static class |
Operators.DeferredSubscription
Base class for Subscribers that will receive their Subscriptions at any time yet
they need to be cancelled or requested at any time.
|
static class |
Operators.MonoSubscriber<I,O>
A Subscriber/Subscription barrier that holds a single value at most and properly gates asynchronous behaviors
resulting from concurrent request or cancel and onXXX signals.
|
static class |
Operators.SubscriberAdapter<I,O>
A
Subscriber with an asymetric typed wrapped subscriber. |
Modifier and Type | Method and Description |
---|---|
static <T> long |
addAndGet(AtomicLongFieldUpdater<T> updater,
T instance,
long n)
Concurrent addition bound to Long.MAX_VALUE.
|
static long |
addAndGet(AtomicLong current,
long toAdd)
Deprecated.
This util is neither used by Core nor consistent with the other
`addAndGet` methods present which use
AtomicLongFieldUpdater . In
an effort to keep Operators API consistent, this util will not be supported
anymore and users need to port over its body under their app code. |
static long |
addCap(long a,
long b)
Cap an addition to Long.MAX_VALUE
|
static <T> Fuseable.QueueSubscription<T> |
as(Subscription s)
Returns the subscription as QueueSubscription if possible or null.
|
static Subscription |
cancelledSubscription()
A singleton Subscription that represents a cancelled subscription instance and
should not be leaked to clients as it represents a terminal state.
|
static void |
checkRequest(long n)
Throws an exception if request is 0 or negative as specified in rule 3.09 of Reactive Streams
|
static boolean |
checkRequest(long n,
Subscriber<?> subscriber)
Throws an exception if request is 0 or negative as specified in rule 3.09 of Reactive Streams
|
static void |
complete(Subscriber<?> s)
Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onComplete.
|
static <T> Subscriber<T> |
drainSubscriber()
Return a singleton
Subscriber that does not check for double onSubscribe
and purely request Long.MAX. |
static Subscription |
emptySubscription()
A singleton enumeration that represents a no-op Subscription instance that
can be freely given out to clients.
|
static void |
error(Subscriber<?> s,
Throwable e)
Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onError with the
supplied error.
|
static <T> long |
getAndAddCap(AtomicLongFieldUpdater<T> updater,
T instance,
long toAdd)
Concurrent addition bound to Long.MAX_VALUE.
|
static long |
multiplyCap(long a,
long b)
Cap a multiplication to Long.MAX_VALUE
|
static void |
onErrorDropped(Throwable e)
Take an unsignalled exception that is masking another one due to callback failure.
|
static void |
onErrorDropped(Throwable e,
Throwable root)
Take an unsignalled exception that is masking anowher one due to callback failure.
|
static <T> void |
onNextDropped(T t)
An unexpected event is about to be dropped.
|
static Throwable |
onOperatorError(Subscription subscription,
Throwable error)
Map an "operator" error given an operator parent
Subscription . |
static Throwable |
onOperatorError(Subscription subscription,
Throwable error,
Object dataSignal)
Map an "operator" error given an operator parent
Subscription . |
static Throwable |
onOperatorError(Throwable error)
Map an "operator" error.
|
static RuntimeException |
onRejectedExecution()
Return a wrapped
RejectedExecutionException which can be thrown by the
operator. |
static RuntimeException |
onRejectedExecution(Subscription subscription,
Throwable suppressed,
Object dataSignal)
Return a wrapped
RejectedExecutionException which can be thrown by the
operator. |
static <T> long |
produced(AtomicLongFieldUpdater<T> updater,
T instance,
long toSub)
Concurrent substraction bound to 0.
|
static <F> boolean |
replace(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
A generic utility to atomically replace a subscription or cancel if marked by a
singleton subscription.
|
static void |
reportBadRequest(long n)
Throw
IllegalArgumentException |
static void |
reportMoreProduced()
Throw
IllegalStateException |
static void |
reportSubscriptionSet()
Log reportedSubscriptions
|
static <T> Subscription |
scalarSubscription(Subscriber<? super T> subscriber,
T value)
Represents a fuseable Subscription that emits a single constant value synchronously
to a Subscriber or consumer.
|
static <T> Subscriber<T> |
serialize(Subscriber<? super T> subscriber)
Safely gate a
Subscriber by a serializing Subscriber . |
static <F> boolean |
set(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
A generic utility to atomically replace a subscription or cancel if marked by a
singleton subscription or concurrently set before.
|
static <F> boolean |
setOnce(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance,
Subscription s)
Sets the given subscription once and returns true if successful, false
if the field has a subscription already or has been cancelled.
|
static long |
subOrZero(long a,
long b)
Cap a substraction to 0
|
static <F> boolean |
terminate(AtomicReferenceFieldUpdater<F,Subscription> field,
F instance)
Atomically terminates the subscription if it is not already a
cancelledSubscription() , cancelling the subscription and setting the field
to the singleton cancelledSubscription() . |
static boolean |
validate(long n)
Evaluate if a request is strictly positive otherwise
reportBadRequest(long) |
static boolean |
validate(Subscription current,
Subscription next)
Check Subscription current state and cancel new Subscription if different null, returning true if
ready to subscribe.
|
@Deprecated public static long addAndGet(AtomicLong current, long toAdd)
AtomicLongFieldUpdater
. In
an effort to keep Operators API consistent, this util will not be supported
anymore and users need to port over its body under their app code.current
- current atomic to updatetoAdd
- delta to addpublic static <T> long addAndGet(AtomicLongFieldUpdater<T> updater, T instance, long n)
T
- the parent instance typeupdater
- current field updaterinstance
- current instance to updaten
- delta to addpublic static long addCap(long a, long b)
a
- left operandb
- right operandpublic static <T> Fuseable.QueueSubscription<T> as(Subscription s)
T
- the value type of the QueueSubscription.s
- the source subscription to try to convert.public static Subscription cancelledSubscription()
Subscription
public static void checkRequest(long n) throws IllegalArgumentException
n
- demand to checkIllegalArgumentException
- the nullOrNegativeRequestException instancepublic static boolean checkRequest(long n, Subscriber<?> subscriber)
n
- demand to checksubscriber
- Subscriber to onError if non strict positive nIllegalArgumentException
- if subscriber is null and demand is negative or 0.public static void complete(Subscriber<?> s)
s
- the target subscriberpublic static <T> Subscriber<T> drainSubscriber()
Subscriber
that does not check for double onSubscribe
and purely request Long.MAX. If an error is received it will raise a
Exceptions.errorCallbackNotImplemented(Throwable)
in the receiving thread.Subscriber
whose sole purpose is to request Long.MAXpublic static Subscription emptySubscription()
The enum also implements Fuseable.QueueSubscription so operators expecting a QueueSubscription from a Fuseable source don't have to double-check their Subscription received in onSubscribe.
Subscription
public static void error(Subscriber<?> s, Throwable e)
s
- target Subscriber to errore
- the actual errorpublic static <T> long getAndAddCap(AtomicLongFieldUpdater<T> updater, T instance, long toAdd)
T
- the parent instance typeupdater
- current field updaterinstance
- current instance to updatetoAdd
- delta to addpublic static long multiplyCap(long a, long b)
a
- left operandb
- right operandpublic static void onErrorDropped(Throwable e, Throwable root)
e
- the exception to handleroot
- the optional root cause to suppresspublic static void onErrorDropped(Throwable e)
e
- the exception to handlepublic static <T> void onNextDropped(T t)
T
- the dropped value typet
- the dropping datapublic static Throwable onOperatorError(Throwable error)
Exceptions.throwIfFatal(Throwable)
.error
- the callback or operator errorThrowable
public static Throwable onOperatorError(Subscription subscription, Throwable error)
Subscription
. The
result error will be passed via onError to the operator downstream.
Subscription
will be cancelled after checking for fatal error via
Exceptions.throwIfFatal(Throwable)
.subscription
- the linked operator parent Subscription
error
- the callback or operator errorThrowable
public static Throwable onOperatorError(Subscription subscription, Throwable error, Object dataSignal)
Subscription
. The
result error will be passed via onError to the operator downstream.
Subscription
will be cancelled after checking for fatal error via
Exceptions.throwIfFatal(Throwable)
.subscription
- the linked operator parent Subscription
error
- the callback or operator errordataSignal
- the value (onNext or onError) signal processed during failureThrowable
public static RuntimeException onRejectedExecution()
RejectedExecutionException
which can be thrown by the
operator. That denotes that an execution was rejected by a
Scheduler
due to dispose.
Wrapping is done by calling both Exceptions.bubble(Throwable)
and
onOperatorError(Subscription, Throwable, Object)
.
public static RuntimeException onRejectedExecution(Subscription subscription, Throwable suppressed, Object dataSignal)
RejectedExecutionException
which can be thrown by the
operator. That denotes that an execution was rejected by a
Scheduler
due to dispose.
Wrapping is done by calling both Exceptions.bubble(Throwable)
and
onOperatorError(Subscription, Throwable, Object)
(with the passed
Subscription
).
subscription
- the subscription to pass to onOperatorError.suppressed
- a Throwable to be suppressed by the RejectedExecutionException
(or null if not relevant)dataSignal
- a value to be passed to onOperatorError(Subscription, Throwable, Object)
(or null if not relevant)public static <T> long produced(AtomicLongFieldUpdater<T> updater, T instance, long toSub)
T
- the parent instance typeupdater
- current field updaterinstance
- current instance to updatetoSub
- delta to subpublic static <F> boolean replace(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
F
- the instance typefield
- The Atomic containerinstance
- the instance references
- the subscriptionpublic static void reportBadRequest(long n)
IllegalArgumentException
n
- the demand to evaluatepublic static void reportMoreProduced()
IllegalStateException
public static void reportSubscriptionSet()
public static <T> Subscription scalarSubscription(Subscriber<? super T> subscriber, T value)
T
- the value typesubscriber
- the delegate Subscriber
that will be requesting the valuevalue
- the single value to be emittedSubscription
public static <T> Subscriber<T> serialize(Subscriber<? super T> subscriber)
Subscriber
by a serializing Subscriber
.
Serialization uses thread-stealing and a potentially unbounded queue that might starve a calling thread if
races are too important and
Subscriber
is slower.
T
- the relayed typesubscriber
- the subscriber to wrapSubscriber
public static <F> boolean set(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
F
- the instance typefield
- The Atomic containerinstance
- the instance references
- the subscriptionpublic static <F> boolean setOnce(AtomicReferenceFieldUpdater<F,Subscription> field, F instance, Subscription s)
F
- the instance type containing the fieldfield
- the field accessorinstance
- the parent instances
- the subscription to set oncepublic static long subOrZero(long a, long b)
a
- left operandb
- right operandpublic static <F> boolean terminate(AtomicReferenceFieldUpdater<F,Subscription> field, F instance)
cancelledSubscription()
, cancelling the subscription and setting the field
to the singleton cancelledSubscription()
.F
- the instance type containing the fieldfield
- the field accessorinstance
- the parent instancepublic static boolean validate(Subscription current, Subscription next)
current
- current Subscription, expected to be nullnext
- new Subscriptionpublic static boolean validate(long n)
reportBadRequest(long)
n
- the request value