Class ReplayProcessor<T>
- Type Parameters:
T- the value type
- All Implemented Interfaces:
Processor<T,,T> Publisher<T>,Subscriber<T>,CorePublisher<T>,CoreSubscriber<T>,Disposable,Fuseable,Sinks.Many<T>,Scannable
-
Nested Class Summary
Nested classes/interfaces inherited from interface reactor.core.Disposable
Disposable.Composite, Disposable.SwapNested classes/interfaces inherited from interface reactor.core.Fuseable
Fuseable.ConditionalSubscriber<T>, Fuseable.QueueSubscription<T>, Fuseable.ScalarCallable<T>, Fuseable.SynchronousSubscription<T>Nested classes/interfaces inherited from interface reactor.core.Scannable
Scannable.Attr<T> -
Field Summary
Fields inherited from interface reactor.core.Scannable
OPERATOR_NAME_UNRELATED_WORDS_PATTERN -
Method Summary
Modifier and TypeMethodDescriptionasFlux()Deprecated.Return aFluxview of this sink.static <T> ReplayProcessor<T>Deprecated.useSinks.many().replay().latest()(or the unsafe variant if you're sure about external synchronization).static <T> ReplayProcessor<T>cacheLastOrDefault(@Nullable T value) Deprecated.useSinks.many().replay().latestOrDefault(value)(or the unsafe variant if you're sure about external synchronization).static <E> ReplayProcessor<E>create()Deprecated.useSinks.many().replay().all()(or the unsafe variant if you're sure about external synchronization).static <E> ReplayProcessor<E>create(int historySize) Deprecated.useSinks.many().replay().limit(historySize)(or the unsafe variant if you're sure about external synchronization).static <E> ReplayProcessor<E>create(int historySize, boolean unbounded) Deprecated.useSinks.many().replay().limit(historySize)for bounded cases (unbounded == false) orSinks.many().replay().all(bufferSize)otherwise (or the unsafe variant if you're sure about external synchronization).static <T> ReplayProcessor<T>createSizeAndTimeout(int size, Duration maxAge) Deprecated.useSinks.many().replay().limit(size, maxAge)(or the unsafe variant if you're sure about external synchronization).static <T> ReplayProcessor<T>createSizeAndTimeout(int size, Duration maxAge, Scheduler scheduler) Deprecated.useSinks.many().replay().limit(size, maxAge, scheduler)(or the unsafe variant if you're sure about external synchronization).static <T> ReplayProcessor<T>createTimeout(Duration maxAge) Deprecated.useSinks.many().replay().limit(maxAge)(or the unsafe variant if you're sure about external synchronization).static <T> ReplayProcessor<T>createTimeout(Duration maxAge, Scheduler scheduler) Deprecated.useSinks.many().replay().limit(maxAge, scheduler)(or the unsafe variant if you're sure about external synchronization).Deprecated.Request aContextfrom dependent components which can include downstream operators during subscribing or a terminalSubscriber.intDeprecated.Get how manySubscribersare currently subscribed to the sink.longDeprecated.Return the number of activeSubscriberor -1 if untracked.default voidemitComplete(Sinks.EmitFailureHandler failureHandler) A simplified attempt at completing via theSinks.Many.tryEmitComplete()API, generating anonCompletesignal.default voidemitError(Throwable error, Sinks.EmitFailureHandler failureHandler) A simplified attempt at failing the sequence via theSinks.Many.tryEmitError(Throwable)API, generating anonErrorsignal.default voidemitNext(T value, Sinks.EmitFailureHandler failureHandler) A simplified attempt at emitting a non-null element via theSinks.Many.tryEmitNext(Object)API, generating anonNextsignal.getError()Deprecated.Current error if any, default to nullintDeprecated.The prefetch configuration of theFluxinners()Deprecated.Return aStreamof referenced inners (flatmap, multicast etc)protected booleanDeprecated.Return true ifFluxProcessor<T, T>booleanDeprecated.Has this upstream finished or "completed" / "failed" ?voidDeprecated.voidDeprecated.voidDeprecated.voidDeprecated.Implementors should initialize any state used bySubscriber.onNext(Object)before callingSubscription.request(long).scanUnsafe(Scannable.Attr key) Deprecated.This method is used internally by components to define their key-value mappings in a single place.voidsubscribe(CoreSubscriber<? super T> actual) Deprecated.An internalPublisher.subscribe(Subscriber)that will bypassHooks.onLastOperator(Function)pointcut.Deprecated.Try to terminate the sequence successfully, generating anonCompletesignal.Deprecated.Try to fail the sequence, generating anonErrorsignal.tryEmitNext(T t) Deprecated.Try emitting a non-null element, generating anonNextsignal.Methods inherited from class reactor.core.publisher.FluxProcessor
dispose, getBufferSize, hasCompleted, hasDownstreams, hasError, isSerialized, serialize, serializeAlways, sink, sink, switchOnNext, wrapMethods inherited from class reactor.core.publisher.Flux
all, any, as, blockFirst, blockFirst, blockLast, blockLast, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferUntil, bufferUntil, bufferUntilChanged, bufferUntilChanged, bufferUntilChanged, bufferWhen, bufferWhen, bufferWhile, cache, cache, cache, cache, cache, cache, cancelOn, cast, checkpoint, checkpoint, checkpoint, collect, collect, collectList, collectMap, collectMap, collectMap, collectMultimap, collectMultimap, collectMultimap, collectSortedList, collectSortedList, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, concat, concat, concat, concat, concatDelayError, concatDelayError, concatDelayError, concatDelayError, concatMap, concatMap, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapIterable, concatMapIterable, concatWith, concatWithValues, contextCapture, contextWrite, contextWrite, count, create, create, defaultIfEmpty, defer, deferContextual, delayElements, delayElements, delaySequence, delaySequence, delaySubscription, delaySubscription, delaySubscription, delayUntil, dematerialize, distinct, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterTerminate, doFinally, doFirst, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnError, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elapsed, elapsed, elementAt, elementAt, empty, error, error, error, expand, expand, expandDeep, expandDeep, filter, filterWhen, filterWhen, first, first, firstWithSignal, firstWithSignal, firstWithValue, firstWithValue, flatMap, flatMap, flatMap, flatMap, flatMapDelayError, flatMapIterable, flatMapIterable, flatMapSequential, flatMapSequential, flatMapSequential, flatMapSequentialDelayError, from, fromArray, fromIterable, fromStream, fromStream, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupJoin, handle, hasElement, hasElements, hide, ignoreElements, index, index, interval, interval, interval, interval, join, just, just, last, last, limitRate, limitRate, limitRequest, log, log, log, log, log, log, map, mapNotNull, materialize, merge, merge, merge, merge, merge, merge, mergeComparing, mergeComparing, mergeComparing, mergeComparingDelayError, mergeComparingWith, mergeDelayError, mergeOrdered, mergeOrdered, mergeOrdered, mergeOrderedWith, mergePriority, mergePriority, mergePriority, mergePriorityDelayError, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequentialDelayError, mergeSequentialDelayError, mergeSequentialDelayError, mergeWith, metrics, name, never, next, ofType, onAssembly, onAssembly, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorComplete, onErrorComplete, onErrorComplete, onErrorContinue, onErrorContinue, onErrorContinue, onErrorMap, onErrorMap, onErrorMap, onErrorResume, onErrorResume, onErrorResume, onErrorReturn, onErrorReturn, onErrorReturn, onErrorStop, onTerminateDetach, or, parallel, parallel, parallel, publish, publish, publish, publish, publishNext, publishOn, publishOn, publishOn, push, push, range, reduce, reduce, reduceWith, repeat, repeat, repeat, repeat, repeatWhen, replay, replay, replay, replay, replay, replay, retry, retry, retryWhen, sample, sample, sampleFirst, sampleFirst, sampleTimeout, sampleTimeout, scan, scan, scanWith, share, shareNext, single, single, singleOrEmpty, skip, skip, skip, skipLast, skipUntil, skipUntilOther, skipWhile, sort, sort, startWith, startWith, startWith, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchOnFirst, switchOnFirst, switchOnNext, switchOnNext, tag, take, take, take, take, takeLast, takeUntil, takeUntilOther, takeWhile, tap, tap, tap, then, then, thenEmpty, thenMany, timed, timed, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timestamp, timestamp, toIterable, toIterable, toIterable, toStream, toStream, toString, transform, transformDeferred, transformDeferredContextual, using, using, using, using, usingWhen, usingWhen, window, window, window, window, window, window, window, windowTimeout, windowTimeout, windowTimeout, windowTimeout, windowUntil, windowUntil, windowUntil, windowUntilChanged, windowUntilChanged, windowUntilChanged, windowWhen, windowWhile, windowWhile, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipWith, zipWith, zipWith, zipWith, zipWithIterable, zipWithIterableMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface reactor.core.Disposable
isDisposedMethods inherited from interface reactor.core.Scannable
actuals, isScanAvailable, name, parents, scan, scanOrDefault, stepName, steps, tags, tagsDeduplicated
-
Method Details
-
cacheLast
Deprecated.useSinks.many().replay().latest()(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.Create aReplayProcessorthat caches the last element it has pushed, replaying it to late subscribers. This is a buffer-based ReplayProcessor with a history size of 1.
- Type Parameters:
T- the type of the pushed elements- Returns:
- a new
ReplayProcessorthat replays its last pushed element to each newSubscriber
-
cacheLastOrDefault
Deprecated.useSinks.many().replay().latestOrDefault(value)(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.Create aReplayProcessorthat caches the last element it has pushed, replaying it to late subscribers. If aSubscribercomes in before any value has been pushed, then thedefaultValueis emitted instead. This is a buffer-based ReplayProcessor with a history size of 1.
- Type Parameters:
T- the type of the pushed elements- Parameters:
value- a default value to start the sequence with in case nothing has been cached yet.- Returns:
- a new
ReplayProcessorthat replays its last pushed element to each newSubscriber, or a default one if nothing was pushed yet
-
create
Deprecated.useSinks.many().replay().all()(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.Create a newReplayProcessorthat replays an unbounded number of elements, using a default internalQueue.- Type Parameters:
E- the type of the pushed elements- Returns:
- a new
ReplayProcessorthat replays the whole history to each newSubscriber.
-
create
Deprecated.useSinks.many().replay().limit(historySize)(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.Create a newReplayProcessorthat replays up tohistorySizeelements.- Type Parameters:
E- the type of the pushed elements- Parameters:
historySize- the backlog size, ie. maximum items retained for replay.- Returns:
- a new
ReplayProcessorthat replays a limited history to each newSubscriber.
-
create
Deprecated.useSinks.many().replay().limit(historySize)for bounded cases (unbounded == false) orSinks.many().replay().all(bufferSize)otherwise (or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.Create a newReplayProcessorthat either replay all the elements or a limited amount of elements depending on theunboundedparameter.- Type Parameters:
E- the type of the pushed elements- Parameters:
historySize- maximum items retained if bounded, or initial link size if unboundedunbounded- true if "unlimited" data store must be supplied- Returns:
- a new
ReplayProcessorthat replays the whole history to each newSubscriberif configured as unbounded, a limited history otherwise.
-
createTimeout
Deprecated.useSinks.many().replay().limit(maxAge)(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.Creates a time-bounded replay processor.In this setting, the
ReplayProcessorinternally tags each observed item with a timestamp value supplied by theSchedulers.parallel()and keeps only those whose age is less than the supplied time value converted to milliseconds. For example, an item arrives at T=0 and the max age is set to 5; at T>=5 this first item is then evicted by any subsequent item or termination signal, leaving the buffer empty.Once the processor is terminated, subscribers subscribing to it will receive items that remained in the buffer after the terminal signal, regardless of their age.
If an subscriber subscribes while the
ReplayProcessoris active, it will observe only those items from within the buffer that have an age less than the specified time, and each item observed thereafter, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an subscriber subscribes, it observes items without gaps in the sequence except for any outdated items at the beginning of the sequence.- Type Parameters:
T- the type of items observed and emitted by the Processor- Parameters:
maxAge- the maximum age of the contained items- Returns:
- a new
ReplayProcessorthat replays elements based on their age.
-
createTimeout
@Deprecated public static <T> ReplayProcessor<T> createTimeout(Duration maxAge, Scheduler scheduler) Deprecated.useSinks.many().replay().limit(maxAge, scheduler)(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.Creates a time-bounded replay processor.In this setting, the
ReplayProcessorinternally tags each observed item with a timestamp value supplied by theSchedulerand keeps only those whose age is less than the supplied time value converted to milliseconds. For example, an item arrives at T=0 and the max age is set to 5; at T>=5 this first item is then evicted by any subsequent item or termination signal, leaving the buffer empty.Once the processor is terminated, subscribers subscribing to it will receive items that remained in the buffer after the terminal signal, regardless of their age.
If an subscriber subscribes while the
ReplayProcessoris active, it will observe only those items from within the buffer that have an age less than the specified time, and each item observed thereafter, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an subscriber subscribes, it observes items without gaps in the sequence except for any outdated items at the beginning of the sequence.- Type Parameters:
T- the type of items observed and emitted by the Processor- Parameters:
maxAge- the maximum age of the contained items- Returns:
- a new
ReplayProcessorthat replays elements based on their age.
-
createSizeAndTimeout
Deprecated.useSinks.many().replay().limit(size, maxAge)(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.Creates a time- and size-bounded replay processor.In this setting, the
ReplayProcessorinternally tags each received item with a timestamp value supplied by theSchedulers.parallel()and holds at mostsizeitems in its internal buffer. It evicts items from the start of the buffer if their age becomes less-than or equal to the supplied age in milliseconds or the buffer reaches itssizelimit.When subscribers subscribe to a terminated
ReplayProcessor, they observe the items that remained in the buffer after the terminal signal, regardless of their age, but at mostsizeitems.If an subscriber subscribes while the
ReplayProcessoris active, it will observe only those items from within the buffer that have age less than the specified time and each subsequent item, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an subscriber subscribes, it observes items without gaps in the sequence except for the outdated items at the beginning of the sequence.- Type Parameters:
T- the type of items observed and emitted by the Processor- Parameters:
maxAge- the maximum age of the contained itemssize- the maximum number of buffered items- Returns:
- a new
ReplayProcessorthat replay up tosizeelements, but will evict them from its history based on their age.
-
createSizeAndTimeout
@Deprecated public static <T> ReplayProcessor<T> createSizeAndTimeout(int size, Duration maxAge, Scheduler scheduler) Deprecated.useSinks.many().replay().limit(size, maxAge, scheduler)(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.Creates a time- and size-bounded replay processor.In this setting, the
ReplayProcessorinternally tags each received item with a timestamp value supplied by theSchedulerand holds at mostsizeitems in its internal buffer. It evicts items from the start of the buffer if their age becomes less-than or equal to the supplied age in milliseconds or the buffer reaches itssizelimit.When subscribers subscribe to a terminated
ReplayProcessor, they observe the items that remained in the buffer after the terminal signal, regardless of their age, but at mostsizeitems.If an subscriber subscribes while the
ReplayProcessoris active, it will observe only those items from within the buffer that have age less than the specified time and each subsequent item, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an subscriber subscribes, it observes items without gaps in the sequence except for the outdated items at the beginning of the sequence.- Type Parameters:
T- the type of items observed and emitted by the Processor- Parameters:
maxAge- the maximum age of the contained items in millisecondssize- the maximum number of buffered itemsscheduler- theSchedulerthat provides the current time- Returns:
- a new
ReplayProcessorthat replay up tosizeelements, but will evict them from its history based on their age.
-
subscribe
Deprecated.Description copied from class:FluxAn internalPublisher.subscribe(Subscriber)that will bypassHooks.onLastOperator(Function)pointcut.In addition to behave as expected by
Publisher.subscribe(Subscriber)in a controlled manner, it supports direct subscribe-timeContextpassing.- Specified by:
subscribein interfaceCorePublisher<T>- Specified by:
subscribein classFlux<T>- Parameters:
actual- theSubscriberinterested into the published sequence- See Also:
-
getError
Deprecated.Description copied from class:FluxProcessorCurrent error if any, default to null- Overrides:
getErrorin classFluxProcessor<T,T> - Returns:
- Current error if any, default to null
-
scanUnsafe
Deprecated.Description copied from interface:ScannableThis method is used internally by components to define their key-value mappings in a single place. Although it is ignoring the generic type of theScannable.Attrkey, implementors should take care to return values of the correct type, and return null if no specific value is available.For public consumption of attributes, prefer using
Scannable.scan(Attr), which will return a typed value and fall back to the key's default if the component didn't define any mapping.- Specified by:
scanUnsafein interfaceScannable- Overrides:
scanUnsafein classFluxProcessor<T,T> - Parameters:
key- aScannable.Attrto resolve for the component.- Returns:
- the value associated to the key for that specific component, or null if none.
-
inners
Deprecated.Description copied from interface:ScannableReturn aStreamof referenced inners (flatmap, multicast etc) -
downstreamCount
public long downstreamCount()Deprecated.Description copied from class:FluxProcessorReturn the number of activeSubscriberor -1 if untracked.- Overrides:
downstreamCountin classFluxProcessor<T,T> - Returns:
- the number of active
Subscriberor -1 if untracked
-
isTerminated
public boolean isTerminated()Deprecated.Description copied from class:FluxProcessorHas this upstream finished or "completed" / "failed" ?- Overrides:
isTerminatedin classFluxProcessor<T,T> - Returns:
- has this upstream finished or "completed" / "failed" ?
-
onSubscribe
Deprecated.Description copied from interface:CoreSubscriberImplementors should initialize any state used bySubscriber.onNext(Object)before callingSubscription.request(long). Should furtheronNextrelated state modification occur, thread-safety will be required.Note that an invalid request
<= 0will not produce an onError and will simply be ignored or reported through a debug-enabledLogger.- Specified by:
onSubscribein interfaceCoreSubscriber<T>- Specified by:
onSubscribein interfaceSubscriber<T>
-
currentContext
Deprecated.Description copied from interface:CoreSubscriberRequest aContextfrom dependent components which can include downstream operators during subscribing or a terminalSubscriber.- Specified by:
currentContextin interfaceCoreSubscriber<T>- Overrides:
currentContextin classFluxProcessor<T,T> - Returns:
- a resolved context or
Context.empty()
-
getPrefetch
public int getPrefetch()Deprecated.Description copied from class:FluxThe prefetch configuration of theFlux- Overrides:
getPrefetchin classFlux<T>- Returns:
- the prefetch configuration of the
Flux, -1 if unspecified
-
onComplete
public void onComplete()Deprecated.- Specified by:
onCompletein interfaceSubscriber<T>
-
tryEmitComplete
Deprecated.Description copied from interface:Sinks.ManyTry to terminate the sequence successfully, generating anonCompletesignal. The result of the attempt is represented as anSinks.EmitResult, which possibly indicates error cases.See the list of failure
Sinks.EmitResultinSinks.Many.emitComplete(EmitFailureHandler)javadoc for an example of how each of these can be dealt with, to decide if the emit API would be a good enough fit instead.- Specified by:
tryEmitCompletein interfaceSinks.Many<T>- Returns:
- an
Sinks.EmitResult, which should be checked to distinguish different possible failures - See Also:
-
onError
Deprecated.- Specified by:
onErrorin interfaceSubscriber<T>
-
tryEmitError
Deprecated.Description copied from interface:Sinks.ManyTry to fail the sequence, generating anonErrorsignal. The result of the attempt is represented as anSinks.EmitResult, which possibly indicates error cases.See the list of failure
Sinks.EmitResultinSinks.Many.emitError(Throwable, EmitFailureHandler)javadoc for an example of how each of these can be dealt with, to decide if the emit API would be a good enough fit instead.- Specified by:
tryEmitErrorin interfaceSinks.Many<T>- Parameters:
t- the exception to signal, not null- Returns:
- an
Sinks.EmitResult, which should be checked to distinguish different possible failures - See Also:
-
onNext
Deprecated.- Specified by:
onNextin interfaceSubscriber<T>
-
tryEmitNext
Deprecated.Description copied from interface:Sinks.ManyTry emitting a non-null element, generating anonNextsignal. The result of the attempt is represented as anSinks.EmitResult, which possibly indicates error cases.See the list of failure
Sinks.EmitResultinSinks.Many.emitNext(Object, EmitFailureHandler)javadoc for an example of how each of these can be dealt with, to decide if the emit API would be a good enough fit instead.Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot be propagated to any asynchronous handler, a bubbling exception, ...).
- Specified by:
tryEmitNextin interfaceSinks.Many<T>- Parameters:
t- the value to emit, not null- Returns:
- an
Sinks.EmitResult, which should be checked to distinguish different possible failures - See Also:
-
currentSubscriberCount
public int currentSubscriberCount()Deprecated.Description copied from interface:Sinks.ManyGet how manySubscribersare currently subscribed to the sink.This is a best effort peek at the sink state, and a subsequent attempt at emitting to the sink might still return
Sinks.EmitResult.FAIL_ZERO_SUBSCRIBERwhere relevant. (generally inSinks.Many.tryEmitNext(Object)). Request (and lack thereof) isn't taken into account, all registered subscribers are counted.- Specified by:
currentSubscriberCountin interfaceSinks.Many<T>- Returns:
- the number of subscribers at the time of invocation
-
asFlux
Deprecated.Description copied from interface:Sinks.ManyReturn aFluxview of this sink. Every call returns the same instance.- Specified by:
asFluxin interfaceSinks.Many<T>- Returns:
- the
Fluxview associated to thisSinks.Many
-
isIdentityProcessor
protected boolean isIdentityProcessor()Deprecated.Description copied from class:FluxProcessorReturn true ifFluxProcessor<T, T>- Overrides:
isIdentityProcessorin classFluxProcessor<T,T> - Returns:
- true if
FluxProcessor<T, T>
-
emitNext
Description copied from interface:Sinks.ManyA simplified attempt at emitting a non-null element via theSinks.Many.tryEmitNext(Object)API, generating anonNextsignal. If the result of the attempt is not asuccess, implementations SHOULD retry theSinks.Many.tryEmitNext(Object)call IF the providedSinks.EmitFailureHandlerreturnstrue. Otherwise, failures are dealt with in a predefined way that might depend on the actual sink implementation (see below for the vanilla reactor-core behavior).Generally,
Sinks.Many.tryEmitNext(Object)is preferable since it allows a custom handling of error cases, although this implies checking the returnedSinks.EmitResultand correctly acting on it. This API is intended as a good default for convenience.When the
Sinks.EmitResultis not a success, vanilla reactor-core operators have the following behavior:-
Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER: no particular handling. should ideally discard the value but at that point there's noSubscriberfrom which to get a contextual discard handler. -
Sinks.EmitResult.FAIL_OVERFLOW: discard the value (Operators.onDiscard(Object, Context)) then callSinks.Many.emitError(Throwable, Sinks.EmitFailureHandler)with aExceptions.failWithOverflow(String)exception. -
Sinks.EmitResult.FAIL_CANCELLED: discard the value (Operators.onDiscard(Object, Context)). -
Sinks.EmitResult.FAIL_TERMINATED: drop the value (Operators.onNextDropped(Object, Context)). -
Sinks.EmitResult.FAIL_NON_SERIALIZED: throw anSinks.EmissionExceptionmentioning RS spec rule 1.3. Note thatSinks.unsafe()never trigger this result. It would be possible for anSinks.EmitFailureHandlerto busy-loop and optimistically wait for the contention to disappear to avoid this case for safe sinks...
Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot be propagated to any asynchronous handler, a bubbling exception, a
Sinks.EmitResult.FAIL_NON_SERIALIZEDas described above, ...).- Specified by:
emitNextin interfaceSinks.Many<T>- Parameters:
value- the value to emit, not nullfailureHandler- the failure handler that allows retrying failedSinks.EmitResult.- See Also:
-
-
emitComplete
Description copied from interface:Sinks.ManyA simplified attempt at completing via theSinks.Many.tryEmitComplete()API, generating anonCompletesignal. If the result of the attempt is not asuccess, implementations SHOULD retry theSinks.Many.tryEmitComplete()call IF the providedSinks.EmitFailureHandlerreturnstrue. Otherwise, failures are dealt with in a predefined way that might depend on the actual sink implementation (see below for the vanilla reactor-core behavior).Generally,
Sinks.Many.tryEmitComplete()is preferable since it allows a custom handling of error cases, although this implies checking the returnedSinks.EmitResultand correctly acting on it. This API is intended as a good default for convenience.When the
Sinks.EmitResultis not a success, vanilla reactor-core operators have the following behavior:-
Sinks.EmitResult.FAIL_OVERFLOW: irrelevant as onComplete is not driven by backpressure. -
Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER: the completion can be ignored since nobody is listening. Note that most vanilla reactor sinks never trigger this result for onComplete, replaying the terminal signal to later subscribers instead (to the exception ofSinks.UnicastSpec.onBackpressureError()). -
Sinks.EmitResult.FAIL_CANCELLED: the completion can be ignored since nobody is interested. -
Sinks.EmitResult.FAIL_TERMINATED: the extra completion is basically ignored since there was a previous termination signal, but there is nothing interesting to log. -
Sinks.EmitResult.FAIL_NON_SERIALIZED: throw anSinks.EmissionExceptionmentioning RS spec rule 1.3. Note thatSinks.unsafe()never trigger this result. It would be possible for anSinks.EmitFailureHandlerto busy-loop and optimistically wait for the contention to disappear to avoid this case in safe sinks...
Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot be propagated to any asynchronous handler, a bubbling exception, a
Sinks.EmitResult.FAIL_NON_SERIALIZEDas described above, ...).- Specified by:
emitCompletein interfaceSinks.Many<T>- Parameters:
failureHandler- the failure handler that allows retrying failedSinks.EmitResult.- See Also:
-
-
emitError
Description copied from interface:Sinks.ManyA simplified attempt at failing the sequence via theSinks.Many.tryEmitError(Throwable)API, generating anonErrorsignal. If the result of the attempt is not asuccess, implementations SHOULD retry theSinks.Many.tryEmitError(Throwable)call IF the providedSinks.EmitFailureHandlerreturnstrue. Otherwise, failures are dealt with in a predefined way that might depend on the actual sink implementation (see below for the vanilla reactor-core behavior).Generally,
Sinks.Many.tryEmitError(Throwable)is preferable since it allows a custom handling of error cases, although this implies checking the returnedSinks.EmitResultand correctly acting on it. This API is intended as a good default for convenience.When the
Sinks.EmitResultis not a success, vanilla reactor-core operators have the following behavior:-
Sinks.EmitResult.FAIL_OVERFLOW: irrelevant as onError is not driven by backpressure. -
Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER: the error is ignored since nobody is listening. Note that most vanilla reactor sinks never trigger this result for onError, replaying the terminal signal to later subscribers instead (to the exception ofSinks.UnicastSpec.onBackpressureError()). -
Sinks.EmitResult.FAIL_CANCELLED: the error can be ignored since nobody is interested. -
Sinks.EmitResult.FAIL_TERMINATED: the error unexpectedly follows another terminal signal, so it is dropped viaOperators.onErrorDropped(Throwable, Context). -
Sinks.EmitResult.FAIL_NON_SERIALIZED: throw anSinks.EmissionExceptionmentioning RS spec rule 1.3. Note thatSinks.unsafe()never trigger this result. It would be possible for anSinks.EmitFailureHandlerto busy-loop and optimistically wait for the contention to disappear to avoid this case in safe sinks...
Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot be propagated to any asynchronous handler, a bubbling exception, a
Sinks.EmitResult.FAIL_NON_SERIALIZEDas described above, ...).- Specified by:
emitErrorin interfaceSinks.Many<T>- Parameters:
error- the exception to signal, not nullfailureHandler- the failure handler that allows retrying failedSinks.EmitResult.- See Also:
-
-
Sinksthrough variations underSinks.many().replay().