Class Operators
Subscription.request(long) handling.
Combine utils available to operator implementations, @see https://github.com/reactor/reactive-streams-commons-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classBase class for Subscribers that will receive their Subscriptions at any time, yet they might also need to be cancelled or requested at any time.static classA Subscriber/Subscription barrier that holds a single value at most and properly gates asynchronous behaviors resulting from concurrent request or cancel and onXXX signals. -
Method Summary
Modifier and TypeMethodDescriptionstatic longaddCap(long a, long b) Cap an addition to Long.MAX_VALUEstatic <T> longaddCap(AtomicLongFieldUpdater<T> updater, T instance, long toAdd) Concurrent addition bound to Long.MAX_VALUE.static <T> @Nullable Fuseable.QueueSubscription<T>as(Subscription s) Returns the subscription as QueueSubscription if possible or null.static booleancanAppearAfterOnSubscribe(Subscription subscription) Check whether the providedSubscriptionis the one used to satisfy Spec's ยง1.9 rule before signalling an error.static SubscriptionA singleton Subscription that represents a cancelled subscription instance and should not be leaked to clients as it represents a terminal state.static voidcomplete(Subscriber<?> s) Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onComplete.static <T> CoreSubscriber<T>Return a singletonSubscriberthat does not check for double onSubscribe and purely request Long.MAX.static <T> CoreSubscriber<T>ASubscriberthat is expected to be used as a placeholder and never actually be called.static SubscriptionA singleton enumeration that represents a no-op Subscription instance that can be freely given out to clients.static final ContextenableOnDiscard(@Nullable Context target, Consumer<?> discardConsumer) Utility method to activate the onDiscard feature (seeFlux.doOnDiscard(Class, Consumer)) in a targetContext.static voiderror(Subscriber<?> s, Throwable e) Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onError with the supplied error.lift(BiFunction<Scannable, ? super CoreSubscriber<? super O>, ? extends CoreSubscriber<? super I>> lifter) Create a function that can be used to support a custom operator viaCoreSubscriberdecoration.lift(Predicate<Scannable> filter, BiFunction<Scannable, ? super CoreSubscriber<? super O>, ? extends CoreSubscriber<? super O>> lifter) Create a function that can be used to support a custom operator viaCoreSubscriberdecoration.liftPublisher(BiFunction<Publisher, ? super CoreSubscriber<? super O>, ? extends CoreSubscriber<? super I>> lifter) Create a function that can be used to support a custom operator viaCoreSubscriberdecoration.liftPublisher(Predicate<Publisher> filter, BiFunction<Publisher, ? super CoreSubscriber<? super O>, ? extends CoreSubscriber<? super O>> lifter) Create a function that can be used to support a custom operator viaCoreSubscriberdecoration.static longmultiplyCap(long a, long b) Cap a multiplication to Long.MAX_VALUEstatic <T> voidInvoke a (local or global) hook that processes elements that get discarded.static voidonDiscardMultiple(@Nullable Collection<?> multiple, Context context) Invoke a (local or global) hook that processes elements that get discarded en masse.static voidonDiscardMultiple(@Nullable Iterator<?> multiple, boolean knownToBeFinite, Context context) Invoke a (local or global) hook that processes elements that remains in anIterator.static voidonDiscardMultiple(@Nullable Spliterator<?> multiple, boolean knownToBeFinite, Context context) Invoke a (local or global) hook that processes elements that remains in anSpliterator.static voidonDiscardMultiple(Stream<?> multiple, Context context) Invoke a (local or global) hook that processes elements that get discarded en masse.static <T> voidonDiscardQueueWithClear(@Nullable Queue<T> queue, Context context, @Nullable Function<T, Stream<?>> extract) Invoke a (local or global) hook that processes elements that get discarded en masse after having been enqueued, due to cancellation or error.static voidonErrorDropped(Throwable e, Context context) An unexpected exception is about to be dropped.static <T> CorePublisher<T>onLastAssembly(CorePublisher<T> source) Applies the hooks registered withHooks.onLastOperator(java.util.function.Function<? super org.reactivestreams.Publisher<java.lang.Object>, ? extends org.reactivestreams.Publisher<java.lang.Object>>)and returnsCorePublisherready to be subscribed on.static <T> voidonNextDropped(T t, Context context) An unexpected event is about to be dropped.onNextError(@Nullable T value, Throwable error, Context context) Find theOnNextFailureStrategyto apply to the calling async operator (which could be a local error mode defined in theContext) and apply it.onNextError(@Nullable T value, Throwable error, Context context, Subscription subscriptionForCancel) Find theOnNextFailureStrategyto apply to the calling operator (which could be a local error mode defined in theContext) and apply it.static final BiFunction<? super Throwable,Object, ? extends Throwable> onNextErrorFunction(Context context) onNextInnerError(Throwable error, Context context, @Nullable Subscription subscriptionForCancel) Find theOnNextFailureStrategyto apply to the calling operator (which could be a local error mode defined in theContext) and apply it.static <T> @Nullable RuntimeExceptiononNextPollError(@Nullable T value, Throwable error, Context context) Find theOnNextFailureStrategyto apply to the calling async operator (which could be a local error mode defined in theContext) and apply it.static ThrowableonOperatorError(@Nullable Subscription subscription, Throwable error, @Nullable Object dataSignal, Context context) Map an "operator" error given an operator parentSubscription.static ThrowableonOperatorError(@Nullable Subscription subscription, Throwable error, Context context) Map an "operator" error given an operator parentSubscription.static ThrowableonOperatorError(Throwable error, Context context) Map an "operator" error.static RuntimeExceptiononRejectedExecution(Throwable original, @Nullable Subscription subscription, @Nullable Throwable suppressed, @Nullable Object dataSignal, Context context) Return a wrappedRejectedExecutionExceptionwhich can be thrown by the operator.static RuntimeExceptiononRejectedExecution(Throwable original, Context context) Return a wrappedRejectedExecutionExceptionwhich can be thrown by the operator.static <T> longproduced(AtomicLongFieldUpdater<T> updater, T instance, long toSub) Concurrent subtraction bound to 0, mostly used to decrement a request tracker by the amount produced by the operator.static <F> booleanreplace(AtomicReferenceFieldUpdater<F, @Nullable Subscription> field, F instance, Subscription s) A generic utility to atomically replace a subscription or cancel the replacement if the current subscription is marked as already cancelled (as incancelledSubscription()).static voidreportBadRequest(long n) Log anIllegalArgumentExceptionif the request is null or negative.static voidLog anIllegalStateExceptionthat indicates more than the requested amount was produced.static voidLog aduplicate subscriptionerror.static voidreportThrowInSubscribe(CoreSubscriber<?> subscriber, Throwable e) Report aThrowablethat was thrown from a call toPublisher.subscribe(Subscriber), attempting to notify theSubscriberby: providing a specialSubscriptionviaSubscriber.onSubscribe(Subscription)immediately delivering anonErrorsignal after thatstatic <T> SubscriptionscalarSubscription(CoreSubscriber<? super T> subscriber, T value) Represents a fuseable Subscription that emits a single constant value synchronously to a Subscriber or consumer.static <T> SubscriptionscalarSubscription(CoreSubscriber<? super T> subscriber, T value, String stepName) Represents a fuseable Subscription that emits a single constant value synchronously to a Subscriber or consumer.static <T> CoreSubscriber<T>serialize(CoreSubscriber<? super T> subscriber) Safely gate aSubscriberby making sure onNext signals are delivered sequentially (serialized).static <F> booleanset(AtomicReferenceFieldUpdater<F, @Nullable Subscription> field, F instance, Subscription s) A generic utility to atomically replace a subscription or cancel the replacement if current subscription is marked as cancelled (as incancelledSubscription()) or was concurrently updated before.static <F> booleansetOnce(AtomicReferenceFieldUpdater<F, @Nullable Subscription> field, F instance, Subscription s) Sets the given subscription once and returns true if successful, false if the field has a subscription already or has been cancelled.static longsubOrZero(long a, long b) Cap a subtraction to 0static <F> booleanterminate(AtomicReferenceFieldUpdater<F, @Nullable Subscription> field, F instance) Atomically terminates the subscription if it is not already acancelledSubscription(), cancelling the subscription and setting the field to the singletoncancelledSubscription().static <T> Fuseable.ConditionalSubscriber<? super T>toConditionalSubscriber(CoreSubscriber<? super T> actual) If the actualCoreSubscriberis notFuseable.ConditionalSubscriber, it will apply an adapter which directly maps allFuseable.ConditionalSubscriber.tryOnNext(Object)toSubscriber.onNext(Object)and always returns true as the resultstatic <T> CoreSubscriber<? super T>toCoreSubscriber(Subscriber<? super T> actual) If the actualSubscriberis not aCoreSubscriber, it will apply safe strict wrapping to apply all reactive streams rules including the ones relaxed by internal operators based onCoreSubscriber.static <T> voidtoFluxOrMono(Publisher<? extends T>[] sources) static <T> CorePublisher<T>toFluxOrMono(Publisher<T> publisher) static booleanvalidate(long n) Evaluate if a request is strictly positive otherwisereportBadRequest(long)static booleanvalidate(@Nullable Subscription current, Subscription next) Check Subscription current state and cancel new Subscription if current is set, or return true if ready to subscribe.
-
Method Details
-
addCap
public static long addCap(long a, long b) Cap an addition to Long.MAX_VALUE- Parameters:
a- left operandb- right operand- Returns:
- Addition result or Long.MAX_VALUE if overflow
-
addCap
Concurrent addition bound to Long.MAX_VALUE. Any concurrent write will "happen before" this operation.- Type Parameters:
T- the parent instance type- Parameters:
updater- current field updaterinstance- current instance to updatetoAdd- delta to add- Returns:
- value before addition or Long.MAX_VALUE
-
as
Returns the subscription as QueueSubscription if possible or null.- Type Parameters:
T- the value type of the QueueSubscription.- Parameters:
s- the source subscription to try to convert.- Returns:
- the QueueSubscription instance or null
-
cancelledSubscription
A singleton Subscription that represents a cancelled subscription instance and should not be leaked to clients as it represents a terminal state.
If algorithms need to hand out a subscription, replace this with a singleton subscription because there is no standard way to tell if a Subscription is cancelled or not otherwise.- Returns:
- a singleton noop
Subscriptionto be used as an inner representation of the cancelled state
-
complete
Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onComplete.- Parameters:
s- the target subscriber
-
drainSubscriber
Return a singletonSubscriberthat does not check for double onSubscribe and purely request Long.MAX. If an error is received it will raise aExceptions.errorCallbackNotImplemented(Throwable)in the receiving thread.- Returns:
- a new
Subscriberwhose sole purpose is to request Long.MAX
-
emptySubscriber
ASubscriberthat is expected to be used as a placeholder and never actually be called. All methods log an error.- Type Parameters:
T- the type of data (ignored)- Returns:
- a placeholder subscriber
-
emptySubscription
A singleton enumeration that represents a no-op Subscription instance that can be freely given out to clients.The enum also implements Fuseable.QueueSubscription so operators expecting a QueueSubscription from a Fuseable source don't have to double-check their Subscription received in onSubscribe.
- Returns:
- a singleton noop
Subscription
-
canAppearAfterOnSubscribe
Check whether the providedSubscriptionis the one used to satisfy Spec's ยง1.9 rule before signalling an error.- Parameters:
subscription- the subscription to test.- Returns:
- true if passed subscription is a subscription created in
reportThrowInSubscribe(CoreSubscriber, Throwable).
-
error
Calls onSubscribe on the target Subscriber with the empty instance followed by a call to onError with the supplied error.- Parameters:
s- target Subscriber to errore- the actual error
-
reportThrowInSubscribe
Report aThrowablethat was thrown from a call toPublisher.subscribe(Subscriber), attempting to notify theSubscriberby:- providing a special
SubscriptionviaSubscriber.onSubscribe(Subscription) - immediately delivering an
onErrorsignal after that
As at that point the subscriber MAY have already been provided with a
Subscription, we assume most well formed subscribers will ignore this secondSubscriptionper Reactive Streams rule 1.9. Subscribers that don't usually ignore may recognize this special case and ignore it by checkingcanAppearAfterOnSubscribe(Subscription).Note that if the
onSubscribeattempt throws,fatalexceptions are thrown. Other exceptions are added assuppressedon the original exception, which is then directly notified as anonErrorsignal (again assuming that such exceptions occur because aSubscriptionis already set).- Parameters:
subscriber- theSubscriberbeing subscribed when the error happenede- theThrowablethat was thrown fromPublisher.subscribe(Subscriber)- See Also:
- providing a special
-
lift
public static <I,O> Function<? super Publisher<I>,? extends Publisher<O>> lift(BiFunction<Scannable, ? super CoreSubscriber<? super O>, ? extends CoreSubscriber<? super I>> lifter) Create a function that can be used to support a custom operator viaCoreSubscriberdecoration. The function is compatible withFlux.transform(Function),Mono.transform(Function),Hooks.onEachOperator(Function)andHooks.onLastOperator(Function), but requires that the originalPublisherbeScannable.This variant attempts to expose the
Publisheras aScannablefor convenience of introspection. You should however avoid instanceof checks or any other processing that depends on identity of thePublisher, as it might get hidden ifScannable.isScanAvailable()returnsfalse. UseliftPublisher(BiFunction)instead for that kind of use case.- Type Parameters:
I- the input typeO- the output type- Parameters:
lifter- the bifunction takingScannablefrom the enclosing publisher (assuming it is compatible) and consumingCoreSubscriber. It must return a receivingCoreSubscriberthat will immediately subscribe to the appliedPublisher.- Returns:
- a new
Function - See Also:
-
lift
public static <O> Function<? super Publisher<O>,? extends Publisher<O>> lift(Predicate<Scannable> filter, BiFunction<Scannable, ? super CoreSubscriber<? super O>, ? extends CoreSubscriber<? super O>> lifter) Create a function that can be used to support a custom operator viaCoreSubscriberdecoration. The function is compatible withFlux.transform(Function),Mono.transform(Function),Hooks.onEachOperator(Function)andHooks.onLastOperator(Function), but requires that the originalPublisherbeScannable.This variant attempts to expose the
Publisheras aScannablefor convenience of introspection. You should however avoid instanceof checks or any other processing that depends on identity of thePublisher, as it might get hidden ifScannable.isScanAvailable()returnsfalse. UseliftPublisher(Predicate, BiFunction)instead for that kind of use case.The function will be invoked only if the passed
Predicatematches. Therefore the transformed type O must be the same than the input type since unmatched predicate will return the appliedPublisher.- Type Parameters:
O- the input and output type- Parameters:
filter- the predicate to match takingScannablefrom the applied publisher to operate on. Assumes original is scan-compatible.lifter- the bifunction takingScannablefrom the enclosing publisher and consumingCoreSubscriber. It must return a receivingCoreSubscriberthat will immediately subscribe to the appliedPublisher. Assumes the original is scan-compatible.- Returns:
- a new
Function - See Also:
-
liftPublisher
public static <I,O> Function<? super Publisher<I>,? extends Publisher<O>> liftPublisher(BiFunction<Publisher, ? super CoreSubscriber<? super O>, ? extends CoreSubscriber<? super I>> lifter) Create a function that can be used to support a custom operator viaCoreSubscriberdecoration. The function is compatible withFlux.transform(Function),Mono.transform(Function),Hooks.onEachOperator(Function)andHooks.onLastOperator(Function), and works with the rawPublisheras input, which is useful if you need to detect the precise type of the source (eg. instanceof checks to detect Mono, Flux, true Scannable, etc...).- Type Parameters:
I- the input typeO- the output type- Parameters:
lifter- the bifunction taking the rawPublisherandCoreSubscriber. The publisher can be double-checked (including withinstanceof, and the function must return a receivingCoreSubscriberthat will immediately subscribe to thePublisher.- Returns:
- a new
Function
-
liftPublisher
public static <O> Function<? super Publisher<O>,? extends Publisher<O>> liftPublisher(Predicate<Publisher> filter, BiFunction<Publisher, ? super CoreSubscriber<? super O>, ? extends CoreSubscriber<? super O>> lifter) Create a function that can be used to support a custom operator viaCoreSubscriberdecoration. The function is compatible withFlux.transform(Function),Mono.transform(Function),Hooks.onEachOperator(Function)andHooks.onLastOperator(Function), and works with the rawPublisheras input, which is useful if you need to detect the precise type of the source (eg. instanceof checks to detect Mono, Flux, true Scannable, etc...).The function will be invoked only if the passed
Predicatematches. Therefore the transformed type O must be the same than the input type since unmatched predicate will return the appliedPublisher.- Type Parameters:
O- the input and output type- Parameters:
filter- thePredicatethat the rawPublishermust pass for the transformation to occurlifter- theBiFunctiontaking the rawPublisherandCoreSubscriber. The publisher can be double-checked (including withinstanceof, and the function must return a receivingCoreSubscriberthat will immediately subscribe to thePublisher.- Returns:
- a new
Function
-
multiplyCap
public static long multiplyCap(long a, long b) Cap a multiplication to Long.MAX_VALUE- Parameters:
a- left operandb- right operand- Returns:
- Product result or Long.MAX_VALUE if overflow
-
enableOnDiscard
Utility method to activate the onDiscard feature (seeFlux.doOnDiscard(Class, Consumer)) in a targetContext. Prefer using theFluxAPI, and reserve this for testing purposes. -
onDiscard
Invoke a (local or global) hook that processes elements that get discarded. This includes elements that are dropped (for malformed sources), but also filtered out (eg. not passing afilter()predicate).For elements that are buffered or enqueued, but subsequently discarded due to cancellation or error, see
onDiscardMultiple(Stream, Context)andonDiscardQueueWithClear(Queue, Context, Function).- Type Parameters:
T- the type of the element- Parameters:
element- the element that is being discardedcontext- the context in which to look for a local hook- See Also:
-
onDiscardQueueWithClear
public static <T> void onDiscardQueueWithClear(@Nullable Queue<T> queue, Context context, @Nullable Function<T, Stream<?>> extract) Invoke a (local or global) hook that processes elements that get discarded en masse after having been enqueued, due to cancellation or error. This method also empties theQueue(either by repeatedQueue.poll()calls if a hook is defined, or byCollection.clear()as a shortcut if no hook is defined).- Type Parameters:
T- the type of the element- Parameters:
queue- the queue that is being discarded and clearedcontext- the context in which to look for a local hookextract- an optional extractor method for cases where the queue doesn't directly contain the elements to discard- See Also:
-
onDiscardMultiple
Invoke a (local or global) hook that processes elements that get discarded en masse. This includes elements that are buffered but subsequently discarded due to cancellation or error.- Parameters:
multiple- the collection of elements to discard (possibly extracted from other collections/arrays/queues)context- theContextin which to look for local hook- See Also:
-
onDiscardMultiple
Invoke a (local or global) hook that processes elements that get discarded en masse. This includes elements that are buffered but subsequently discarded due to cancellation or error.- Parameters:
multiple- the collection of elements to discardcontext- theContextin which to look for local hook- See Also:
-
onDiscardMultiple
public static void onDiscardMultiple(@Nullable Iterator<?> multiple, boolean knownToBeFinite, Context context) Invoke a (local or global) hook that processes elements that remains in anIterator. Since iterators can be infinite, this method requires that you explicitly ensure the iterator isknownToBeFinite. Typically, operating on anIterableone can get such a guarantee by looking at theSpliterator'sSpliterator.getExactSizeIfKnown(). -
onDiscardMultiple
public static void onDiscardMultiple(@Nullable Spliterator<?> multiple, boolean knownToBeFinite, Context context) Invoke a (local or global) hook that processes elements that remains in anSpliterator. Since spliterators can be infinite, this method requires that you explicitly ensure the spliterator isknownToBeFinite. Typically, one can get such a guarantee by looking at theSpliterator.getExactSizeIfKnown().- Parameters:
multiple- theSpliteratorwhose remainder to discardknownToBeFinite- is the caller guaranteeing that the iterator is finite and can be iterated overcontext- theContextin which to look for local hook- See Also:
-
onErrorDropped
An unexpected exception is about to be dropped.If no hook is registered for
Hooks.onErrorDropped(Consumer), the dropped error is logged at ERROR level.- Parameters:
e- the dropped exceptioncontext- a context that might hold a local error consumer
-
onNextDropped
An unexpected event is about to be dropped.If no hook is registered for
Hooks.onNextDropped(Consumer), the dropped element is just logged at DEBUG level.- Type Parameters:
T- the dropped value type- Parameters:
t- the dropped datacontext- a context that might hold a local next consumer
-
onOperatorError
Map an "operator" error. The result error will be passed via onError to the operator downstream after checking for fatal error viaExceptions.throwIfFatal(Throwable).- Parameters:
error- the callback or operator errorcontext- a context that might hold a local error consumer- Returns:
- mapped
Throwable
-
onOperatorError
public static Throwable onOperatorError(@Nullable Subscription subscription, Throwable error, Context context) Map an "operator" error given an operator parentSubscription. The result error will be passed via onError to the operator downstream.Subscriptionwill be cancelled after checking for fatal error viaExceptions.throwIfFatal(Throwable).- Parameters:
subscription- the linked operator parentSubscriptionerror- the callback or operator errorcontext- a context that might hold a local error consumer- Returns:
- mapped
Throwable
-
onOperatorError
public static Throwable onOperatorError(@Nullable Subscription subscription, Throwable error, @Nullable Object dataSignal, Context context) Map an "operator" error given an operator parentSubscription. The result error will be passed via onError to the operator downstream.Subscriptionwill be cancelled after checking for fatal error viaExceptions.throwIfFatal(Throwable). Takes an additional signal, which can be added as a suppressed exception if it is aThrowableand the defaulthookis in place.- Parameters:
subscription- the linked operator parentSubscriptionerror- the callback or operator errordataSignal- the value (onNext or onError) signal processed during failurecontext- a context that might hold a local error consumer- Returns:
- mapped
Throwable
-
onRejectedExecution
Return a wrappedRejectedExecutionExceptionwhich can be thrown by the operator. This exception denotes that an execution was rejected by aScheduler, notably when it was already disposed.Wrapping is done by calling both
Exceptions.failWithRejected(Throwable)andonOperatorError(Subscription, Throwable, Object, Context).- Parameters:
original- the original execution errorcontext- a context that might hold a local error consumer
-
onNextErrorFunction
public static final BiFunction<? super Throwable,Object, onNextErrorFunction? extends Throwable> (Context context) -
onNextError
public static <T> @Nullable Throwable onNextError(@Nullable T value, Throwable error, Context context, Subscription subscriptionForCancel) Find theOnNextFailureStrategyto apply to the calling operator (which could be a local error mode defined in theContext) and apply it. For poll(), preferonNextPollError(Object, Throwable, Context)as it returns aRuntimeException.Cancels the
Subscriptionand return aThrowableif errors are fatal for the error mode, in which case the operator should call onError with the returned error. On the contrary, if the error mode allows the sequence to continue, does not cancel the Subscription and returnsnull.Typical usage pattern differs depending on the calling method:
onNext: check for a throwable return value and callSubscriber.onError(Throwable)if not null, otherwise perform a directrequest(1)on the upstream.tryOnNext: check for a throwable return value and callSubscriber.onError(Throwable)if not null, otherwise returnfalseto indicate value was not consumed and more must be tried.- any of the above where the error is going to be propagated through onError but the
subscription shouldn't be cancelled: use
onNextError(Object, Throwable, Context)instead. poll(where the error will be thrown): useonNextPollError(Object, Throwable, Context)instead.
- Type Parameters:
T- The type of the value causing the error.- Parameters:
value- The onNext value that caused an error. Can be null.error- The error.context- The most significantContextin which to look for anOnNextFailureStrategy.subscriptionForCancel- The mandatorySubscriptionthat should be cancelled if the strategy is terminal. See alsoonNextError(Object, Throwable, Context)andonNextPollError(Object, Throwable, Context)for alternatives that don't cancel a subscription- Returns:
- a
Throwableto propagate through onError if the strategy is terminal and cancelled the subscription, null if not.
-
onNextError
public static <T> @Nullable Throwable onNextError(@Nullable T value, Throwable error, Context context) Find theOnNextFailureStrategyto apply to the calling async operator (which could be a local error mode defined in theContext) and apply it.This variant never cancels a
Subscription. It returns aThrowableif the error is fatal for the error mode, in which case the operator should call onError with the returned error. On the contrary, if the error mode allows the sequence to continue, this method returnsnull.- Type Parameters:
T- The type of the value causing the error.- Parameters:
value- The onNext value that caused an error.error- The error.context- The most significantContextin which to look for anOnNextFailureStrategy.- Returns:
- a
Throwableto propagate through onError if the strategy is terminal, null if not. - See Also:
-
onNextInnerError
public static <T> @Nullable Throwable onNextInnerError(Throwable error, Context context, @Nullable Subscription subscriptionForCancel) Find theOnNextFailureStrategyto apply to the calling operator (which could be a local error mode defined in theContext) and apply it.- Type Parameters:
T- The type of the value causing the error.- Parameters:
error- The error.context- The most significantContextin which to look for anOnNextFailureStrategy.subscriptionForCancel- TheSubscriptionthat should be cancelled if the strategy is terminal. Null to ignore (for poll, useonNextPollError(Object, Throwable, Context)rather than passing null).- Returns:
- a
Throwableto propagate through onError if the strategy is terminal and cancelled the subscription, null if not.
-
onNextPollError
public static <T> @Nullable RuntimeException onNextPollError(@Nullable T value, Throwable error, Context context) Find theOnNextFailureStrategyto apply to the calling async operator (which could be a local error mode defined in theContext) and apply it.Returns a
RuntimeExceptionif the error is fatal for the error mode, in which case the operator poll should throw the returned error. On the contrary if the error mode allows the sequence to continue, returnsnullin which case the operator should retry thepoll().Note that this method
wrapschecked exceptions in order to return aRuntimeExceptionthat can be thrown from an arbitrary method. If you don't want to throw the returned exception and this wrapping behavior is undesirable, but you still don't want to cancel a subscription, you can useonNextError(Object, Throwable, Context)instead.- Type Parameters:
T- The type of the value causing the error.- Parameters:
value- The onNext value that caused an error.error- The error.context- The most significantContextin which to look for anOnNextFailureStrategy.- Returns:
- a
RuntimeExceptionto be thrown (eg. withinQueue.poll()if the error is terminal in the strategy, null if not. - See Also:
-
onLastAssembly
Applies the hooks registered withHooks.onLastOperator(java.util.function.Function<? super org.reactivestreams.Publisher<java.lang.Object>, ? extends org.reactivestreams.Publisher<java.lang.Object>>)and returnsCorePublisherready to be subscribed on.- Type Parameters:
T- the type of the value.- Parameters:
source- the originalCorePublisher.- Returns:
- a
CorePublisherto subscribe on.
-
toFluxOrMono
-
toFluxOrMono
-
onRejectedExecution
public static RuntimeException onRejectedExecution(Throwable original, @Nullable Subscription subscription, @Nullable Throwable suppressed, @Nullable Object dataSignal, Context context) Return a wrappedRejectedExecutionExceptionwhich can be thrown by the operator. This exception denotes that an execution was rejected by aScheduler, notably when it was already disposed.Wrapping is done by calling both
Exceptions.failWithRejected(Throwable)andonOperatorError(Subscription, Throwable, Object, Context)(with the passedSubscription).- Parameters:
original- the original execution errorsubscription- the subscription to pass to onOperatorError.suppressed- a Throwable to be suppressed by theRejectedExecutionException(or null if not relevant)dataSignal- a value to be passed toonOperatorError(Subscription, Throwable, Object, Context)(or null if not relevant)context- a context that might hold a local error consumer
-
produced
Concurrent subtraction bound to 0, mostly used to decrement a request tracker by the amount produced by the operator. Any concurrent write will "happen before" this operation.- Type Parameters:
T- the parent instance type- Parameters:
updater- current field updaterinstance- current instance to updatetoSub- delta to subtract- Returns:
- value after subtraction or zero
-
replace
public static <F> boolean replace(AtomicReferenceFieldUpdater<F, @Nullable Subscription> field, F instance, Subscription s) A generic utility to atomically replace a subscription or cancel the replacement if the current subscription is marked as already cancelled (as incancelledSubscription()).- Type Parameters:
F- the instance type- Parameters:
field- The Atomic containerinstance- the instance references- the subscription- Returns:
- true if replaced
-
reportBadRequest
public static void reportBadRequest(long n) Log anIllegalArgumentExceptionif the request is null or negative.- Parameters:
n- the failing demand- See Also:
-
reportMoreProduced
public static void reportMoreProduced()Log anIllegalStateExceptionthat indicates more than the requested amount was produced.- See Also:
-
reportSubscriptionSet
public static void reportSubscriptionSet()Log aduplicate subscriptionerror. -
scalarSubscription
Represents a fuseable Subscription that emits a single constant value synchronously to a Subscriber or consumer.- Type Parameters:
T- the value type- Parameters:
subscriber- the delegateSubscriberthat will be requesting the valuevalue- the single value to be emitted- Returns:
- a new scalar
Subscription
-
scalarSubscription
public static <T> Subscription scalarSubscription(CoreSubscriber<? super T> subscriber, T value, String stepName) Represents a fuseable Subscription that emits a single constant value synchronously to a Subscriber or consumer. Also give the subscription a user-definedstepNamefor the purpose ofScannable.stepName().- Type Parameters:
T- the value type- Parameters:
subscriber- the delegateSubscriberthat will be requesting the valuevalue- the single value to be emittedstepName- theStringto represent theSubscriptioninScannable.stepName()- Returns:
- a new scalar
Subscription
-
serialize
Safely gate aSubscriberby making sure onNext signals are delivered sequentially (serialized). Serialization uses thread-stealing and a potentially unbounded queue that might starve a calling thread if races are too important andSubscriberis slower.
- Type Parameters:
T- the relayed type- Parameters:
subscriber- the subscriber to serialize- Returns:
- a serializing
Subscriber
-
set
public static <F> boolean set(AtomicReferenceFieldUpdater<F, @Nullable Subscription> field, F instance, Subscription s) A generic utility to atomically replace a subscription or cancel the replacement if current subscription is marked as cancelled (as incancelledSubscription()) or was concurrently updated before.The replaced subscription is itself cancelled.
- Type Parameters:
F- the instance type- Parameters:
field- The Atomic containerinstance- the instance references- the subscription- Returns:
- true if replaced
-
setOnce
public static <F> boolean setOnce(AtomicReferenceFieldUpdater<F, @Nullable Subscription> field, F instance, Subscription s) Sets the given subscription once and returns true if successful, false if the field has a subscription already or has been cancelled.If the field already has a subscription, it is cancelled and the duplicate subscription is reported (see
reportSubscriptionSet()).- Type Parameters:
F- the instance type containing the field- Parameters:
field- the field accessorinstance- the parent instances- the subscription to set once- Returns:
- true if successful, false if the target was not empty or has been cancelled
-
subOrZero
public static long subOrZero(long a, long b) Cap a subtraction to 0- Parameters:
a- left operandb- right operand- Returns:
- Subtraction result or 0 if overflow
-
terminate
public static <F> boolean terminate(AtomicReferenceFieldUpdater<F, @Nullable Subscription> field, F instance) Atomically terminates the subscription if it is not already acancelledSubscription(), cancelling the subscription and setting the field to the singletoncancelledSubscription().- Type Parameters:
F- the instance type containing the field- Parameters:
field- the field accessorinstance- the parent instance- Returns:
- true if terminated, false if the subscription was already terminated
-
validate
Check Subscription current state and cancel new Subscription if current is set, or return true if ready to subscribe.- Parameters:
current- current Subscription, expected to be nullnext- new Subscription- Returns:
- true if Subscription can be used
-
validate
public static boolean validate(long n) Evaluate if a request is strictly positive otherwisereportBadRequest(long)- Parameters:
n- the request value- Returns:
- true if valid
-
toCoreSubscriber
If the actualSubscriberis not aCoreSubscriber, it will apply safe strict wrapping to apply all reactive streams rules including the ones relaxed by internal operators based onCoreSubscriber.- Type Parameters:
T- passed subscriber type- Parameters:
actual- theSubscriberto apply hook on- Returns:
- an eventually transformed
Subscriber
-
toConditionalSubscriber
public static <T> Fuseable.ConditionalSubscriber<? super T> toConditionalSubscriber(CoreSubscriber<? super T> actual) If the actualCoreSubscriberis notFuseable.ConditionalSubscriber, it will apply an adapter which directly maps allFuseable.ConditionalSubscriber.tryOnNext(Object)toSubscriber.onNext(Object)and always returns true as the result- Type Parameters:
T- passed subscriber type- Parameters:
actual- theSubscriberto adapt- Returns:
- a potentially adapted
Fuseable.ConditionalSubscriber
-