public abstract class Operators
extends java.lang.Object
Subscription.request(long) handling.
Combine utils available to operator implementations, @see http://github.com/reactor/reactive-streams-commons| Modifier and Type | Class and Description |
|---|---|
static class |
Operators.DeferredSubscription
Base class for Subscribers that will receive their Subscriptions at any time yet
they need to be cancelled or requested at any time.
|
static class |
Operators.MonoSubscriber<I,O>
A Subscriber/Subscription barrier that holds a single value at most and properly gates asynchronous behaviors
resulting from concurrent request or cancel and onXXX signals.
|
static class |
Operators.SubscriberAdapter<I,O>
A
Subscriber with an asymetric typed wrapped subscriber. |
| Modifier and Type | Method and Description |
|---|---|
static <T> long |
addAndGet(java.util.concurrent.atomic.AtomicLongFieldUpdater<T> updater,
T instance,
long n)
Concurrent addition bound to Long.MAX_VALUE.
|
static long |
addAndGet(java.util.concurrent.atomic.AtomicLong current,
long toAdd)
Concurrent addition bound to Long.MAX_VALUE.
|
static long |
addCap(long a,
long b)
Cap an addition to Long.MAX_VALUE
|
static <T> Fuseable.QueueSubscription<T> |
as(org.reactivestreams.Subscription s)
Returns the subscription as QueueSubscription if possible or null.
|
static org.reactivestreams.Subscription |
cancelledSubscription()
A singleton Subscription that represents a cancelled subscription instance and
should not be leaked to clients as it represents a terminal state.
|
static void |
checkRequest(long n)
Throws an exception if request is 0 or negative as specified in rule 3.09 of Reactive Streams
|
static boolean |
checkRequest(long n,
org.reactivestreams.Subscriber<?> subscriber)
Throws an exception if request is 0 or negative as specified in rule 3.09 of Reactive Streams
|
static void |
complete(org.reactivestreams.Subscriber<?> s)
Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onComplete.
|
static org.reactivestreams.Subscription |
emptySubscription()
A singleton enumeration that represents a no-op Subscription instance that
can be freely given out to clients.
|
static void |
error(org.reactivestreams.Subscriber<?> s,
java.lang.Throwable e)
Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onError with the
supplied error.
|
static <T> long |
getAndAddCap(java.util.concurrent.atomic.AtomicLongFieldUpdater<T> updater,
T instance,
long toAdd)
Concurrent addition bound to Long.MAX_VALUE.
|
static long |
multiplyCap(long a,
long b)
Cap a multiplication to Long.MAX_VALUE
|
static void |
onErrorDropped(java.lang.Throwable e)
Take an unsignalled exception that is masking anowher one due to callback failure.
|
static void |
onErrorDropped(java.lang.Throwable e,
java.lang.Throwable root)
Take an unsignalled exception that is masking anowher one due to callback failure.
|
static <T> void |
onNextDropped(T t)
An unexpected event is about to be dropped.
|
static java.lang.Throwable |
onOperatorError(org.reactivestreams.Subscription subscription,
java.lang.Throwable error)
Map an "operator" error given an operator parent
Subscription. |
static java.lang.Throwable |
onOperatorError(org.reactivestreams.Subscription subscription,
java.lang.Throwable error,
java.lang.Object dataSignal)
Map an "operator" error given an operator parent
Subscription. |
static java.lang.Throwable |
onOperatorError(java.lang.Throwable error)
Map an "operator" error.
|
static <T> long |
produced(java.util.concurrent.atomic.AtomicLongFieldUpdater<T> updater,
T instance,
long toSub)
Concurrent substraction bound to 0.
|
static <F> boolean |
replace(java.util.concurrent.atomic.AtomicReferenceFieldUpdater<F,org.reactivestreams.Subscription> field,
F instance,
org.reactivestreams.Subscription s)
A generic utility to atomically replace a subscription or cancel if marked by a
singleton subscription.
|
static void |
reportBadRequest(long n)
Throw
IllegalArgumentException |
static void |
reportMoreProduced()
Throw
IllegalStateException |
static void |
reportSubscriptionSet()
Log reportedSubscriptions
|
static <T> org.reactivestreams.Subscription |
scalarSubscription(org.reactivestreams.Subscriber<? super T> subscriber,
T value)
Represents a fuseable Subscription that emits a single constant value synchronously
to a Subscriber or consumer.
|
static <T> org.reactivestreams.Subscriber<T> |
serialize(org.reactivestreams.Subscriber<? super T> subscriber)
Safely gate a
Subscriber by a serializing Subscriber. |
static <F> boolean |
set(java.util.concurrent.atomic.AtomicReferenceFieldUpdater<F,org.reactivestreams.Subscription> field,
F instance,
org.reactivestreams.Subscription s)
A generic utility to atomically replace a subscription or cancel if marked by a
singleton subscription or concurrently set before.
|
static <F> boolean |
setOnce(java.util.concurrent.atomic.AtomicReferenceFieldUpdater<F,org.reactivestreams.Subscription> field,
F instance,
org.reactivestreams.Subscription s)
Sets the given subscription once and returns true if successful, false
if the field has a subscription already or has been cancelled.
|
static long |
subOrZero(long a,
long b)
Cap a substraction to 0
|
static <F> boolean |
terminate(java.util.concurrent.atomic.AtomicReferenceFieldUpdater<F,org.reactivestreams.Subscription> field,
F instance) |
static boolean |
validate(long n)
Evaluate if a request is strictly positive otherwise
reportBadRequest(long) |
static boolean |
validate(org.reactivestreams.Subscription current,
org.reactivestreams.Subscription next)
Check Subscription current state and cancel new Subscription if different null, returning true if
ready to subscribe.
|
public static long addAndGet(java.util.concurrent.atomic.AtomicLong current,
long toAdd)
current - current atomic to updatetoAdd - delta to addpublic static <T> long addAndGet(java.util.concurrent.atomic.AtomicLongFieldUpdater<T> updater,
T instance,
long n)
T - the parent instance typeupdater - current field updaterinstance - current instance to updaten - delta to addpublic static long addCap(long a,
long b)
a - left operandb - right operandpublic static <T> Fuseable.QueueSubscription<T> as(org.reactivestreams.Subscription s)
T - the value type of the QueueSubscription.s - the source subscription to try to convert.public static org.reactivestreams.Subscription cancelledSubscription()
Subscriptionpublic static void checkRequest(long n)
throws java.lang.IllegalArgumentException
n - demand to checkjava.lang.IllegalArgumentExceptionpublic static boolean checkRequest(long n,
org.reactivestreams.Subscriber<?> subscriber)
n - demand to checksubscriber - Subscriber to onError if non strict positive njava.lang.IllegalArgumentException - if subscriber is null and demand is negative or 0.public static void complete(org.reactivestreams.Subscriber<?> s)
s - public static org.reactivestreams.Subscription emptySubscription()
The enum also implements Fuseable.QueueSubscription so operators expecting a QueueSubscription from a Fuseable source don't have to double-check their Subscription received in onSubscribe.
Subscriptionpublic static void error(org.reactivestreams.Subscriber<?> s,
java.lang.Throwable e)
s - target Subscriber to errore - the actual errorpublic static <T> long getAndAddCap(java.util.concurrent.atomic.AtomicLongFieldUpdater<T> updater,
T instance,
long toAdd)
T - the parent instance typeupdater - current field updaterinstance - current instance to updatetoAdd - delta to addpublic static <T> long produced(java.util.concurrent.atomic.AtomicLongFieldUpdater<T> updater,
T instance,
long toSub)
T - the parent instance typeupdater - current field updaterinstance - current instance to updatetoSub - delta to subpublic static long multiplyCap(long a,
long b)
a - left operandb - right operandpublic static void onErrorDropped(java.lang.Throwable e,
java.lang.Throwable root)
e - the exception to handleroot - the optional root cause to suppresspublic static void onErrorDropped(java.lang.Throwable e)
e - the exception to handlepublic static <T> void onNextDropped(T t)
T - the dropped value typet - the dropping datapublic static java.lang.Throwable onOperatorError(java.lang.Throwable error)
Exceptions.throwIfFatal(Throwable).error - the callback or operator errorThrowablepublic static java.lang.Throwable onOperatorError(org.reactivestreams.Subscription subscription,
java.lang.Throwable error)
Subscription. The
result error will be passed via onError to the operator downstream.
Subscription will be cancelled after checking for fatal error via
Exceptions.throwIfFatal(Throwable).subscription - the linked operator parent Subscriptionerror - the callback or operator errorThrowablepublic static java.lang.Throwable onOperatorError(org.reactivestreams.Subscription subscription,
java.lang.Throwable error,
java.lang.Object dataSignal)
Subscription. The
result error will be passed via onError to the operator downstream.
Subscription will be cancelled after checking for fatal error via
Exceptions.throwIfFatal(Throwable).subscription - the linked operator parent Subscriptionerror - the callback or operator errordataSignal - the value (onNext or onError) signal processed during failureThrowablepublic static <F> boolean replace(java.util.concurrent.atomic.AtomicReferenceFieldUpdater<F,org.reactivestreams.Subscription> field,
F instance,
org.reactivestreams.Subscription s)
F - the instance typefield - The Atomic containerinstance - the instance references - the subscriptionpublic static void reportBadRequest(long n)
IllegalArgumentExceptionn - the demand to evaluatepublic static void reportMoreProduced()
IllegalStateExceptionpublic static void reportSubscriptionSet()
public static <F> boolean set(java.util.concurrent.atomic.AtomicReferenceFieldUpdater<F,org.reactivestreams.Subscription> field,
F instance,
org.reactivestreams.Subscription s)
F - the instance typefield - The Atomic containerinstance - the instance references - the subscriptionpublic static <F> boolean setOnce(java.util.concurrent.atomic.AtomicReferenceFieldUpdater<F,org.reactivestreams.Subscription> field,
F instance,
org.reactivestreams.Subscription s)
F - the instance type containing the fieldfield - the field accessorinstance - the parent instances - the subscription to set oncepublic static long subOrZero(long a,
long b)
a - left operandb - right operandpublic static <F> boolean terminate(java.util.concurrent.atomic.AtomicReferenceFieldUpdater<F,org.reactivestreams.Subscription> field,
F instance)
public static boolean validate(org.reactivestreams.Subscription current,
org.reactivestreams.Subscription next)
current - current Subscription, expected to be nullnext - new Subscriptionpublic static boolean validate(long n)
reportBadRequest(long)n - the request valuepublic static <T> org.reactivestreams.Subscription scalarSubscription(org.reactivestreams.Subscriber<? super T> subscriber,
T value)
T - the value typesubscriber - the delegate Subscriber that will be requesting the valuevalue - the single value to be emittedSubscriptionpublic static <T> org.reactivestreams.Subscriber<T> serialize(org.reactivestreams.Subscriber<? super T> subscriber)
Subscriber by a serializing Subscriber.
Serialization uses thread-stealing and a potentially unbounded queue that might starve a calling thread if
races are too important and
Subscriber is slower.

T - the relayed typesubscriber - the subscriber to wrapSubscriber