T
- the input and output value typeSinks
through
variations of Sinks.many().multicast().onBackpressureBuffer()
.
This processor was blocking in onNext(Object)
.
This behaviour can be implemented with the Sinks
API by calling
Sinks.Many.tryEmitNext(Object)
and retrying, e.g.:
while (sink.tryEmitNext(v).hasFailed()) {
LockSupport.parkNanos(10);
}
@Deprecated public final class EmitterProcessor<T> extends FluxProcessor<T,T>
The default create()
factories will only produce the new elements observed in
the parent sequence after a given Subscriber
is subscribed.
Scannable.Attr<T>
Disposable.Composite, Disposable.Swap
OPERATOR_NAME_UNRELATED_WORDS_PATTERN
Modifier and Type | Method and Description |
---|---|
Flux<T> |
asFlux()
Deprecated.
Return a
Flux view of this sink. |
static <E> EmitterProcessor<E> |
create()
Deprecated.
Create a new
EmitterProcessor using Queues.SMALL_BUFFER_SIZE
backlog size and auto-cancel. |
static <E> EmitterProcessor<E> |
create(boolean autoCancel)
Deprecated.
Create a new
EmitterProcessor using Queues.SMALL_BUFFER_SIZE
backlog size and the provided auto-cancel. |
static <E> EmitterProcessor<E> |
create(int bufferSize)
Deprecated.
Create a new
EmitterProcessor using the provided backlog size, with auto-cancel. |
static <E> EmitterProcessor<E> |
create(int bufferSize,
boolean autoCancel)
Deprecated.
Create a new
EmitterProcessor using the provided backlog size and auto-cancellation. |
Context |
currentContext()
Deprecated.
Request a
Context from dependent components which can include downstream
operators during subscribing or a terminal Subscriber . |
int |
currentSubscriberCount()
Deprecated.
Get how many
Subscribers are currently subscribed to the sink. |
long |
downstreamCount()
Deprecated.
Return the number of active
Subscriber or -1 if untracked. |
default void |
emitComplete(Sinks.EmitFailureHandler failureHandler)
A simplified attempt at completing via the
Sinks.Many.tryEmitComplete() API, generating an
onComplete signal. |
default void |
emitError(Throwable error,
Sinks.EmitFailureHandler failureHandler)
A simplified attempt at failing the sequence via the
Sinks.Many.tryEmitError(Throwable) API, generating an
onError signal. |
default void |
emitNext(T value,
Sinks.EmitFailureHandler failureHandler)
A simplified attempt at emitting a non-null element via the
Sinks.Many.tryEmitNext(Object) API, generating an
onNext signal. |
int |
getBufferSize()
Deprecated.
Return the processor buffer capacity if any or
Integer.MAX_VALUE |
Throwable |
getError()
Deprecated.
Current error if any, default to null
|
int |
getPending()
Deprecated.
Return the number of parked elements in the emitter backlog.
|
int |
getPrefetch()
Deprecated.
The prefetch configuration of the
Flux |
Stream<? extends Scannable> |
inners()
Deprecated.
Return a
Stream of referenced inners (flatmap, multicast etc) |
boolean |
isCancelled()
Deprecated.
|
boolean |
isDisposed()
Deprecated.
Optionally return true when the resource or task is disposed.
|
protected boolean |
isIdentityProcessor()
Deprecated.
Return true if
FluxProcessor<T, T> |
boolean |
isTerminated()
Deprecated.
Has this upstream finished or "completed" / "failed" ?
|
void |
onComplete()
Deprecated.
|
void |
onError(Throwable throwable)
Deprecated.
|
void |
onNext(T t)
Deprecated.
|
void |
onSubscribe(Subscription s)
Deprecated.
Implementors should initialize any state used by
Subscriber.onNext(Object) before
calling Subscription.request(long) . |
Object |
scanUnsafe(Scannable.Attr key)
Deprecated.
This method is used internally by components to define their key-value mappings
in a single place.
|
void |
subscribe(CoreSubscriber<? super T> actual)
Deprecated.
An internal
Publisher.subscribe(Subscriber) that will bypass
Hooks.onLastOperator(Function) pointcut. |
Sinks.EmitResult |
tryEmitComplete()
Deprecated.
Try to terminate the sequence successfully, generating an
onComplete
signal. |
Sinks.EmitResult |
tryEmitError(Throwable t)
Deprecated.
Try to fail the sequence, generating an
onError
signal. |
Sinks.EmitResult |
tryEmitNext(T t)
Deprecated.
Try emitting a non-null element, generating an
onNext signal. |
dispose, hasCompleted, hasDownstreams, hasError, isSerialized, serialize, serializeAlways, sink, sink, switchOnNext, wrap
all, any, as, blockFirst, blockFirst, blockLast, blockLast, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferUntil, bufferUntil, bufferUntilChanged, bufferUntilChanged, bufferUntilChanged, bufferWhen, bufferWhen, bufferWhile, cache, cache, cache, cache, cache, cache, cancelOn, cast, checkpoint, checkpoint, checkpoint, collect, collect, collectList, collectMap, collectMap, collectMap, collectMultimap, collectMultimap, collectMultimap, collectSortedList, collectSortedList, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, concat, concat, concat, concat, concatDelayError, concatDelayError, concatDelayError, concatDelayError, concatMap, concatMap, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapIterable, concatMapIterable, concatWith, concatWithValues, contextWrite, contextWrite, count, create, create, defaultIfEmpty, defer, deferContextual, deferWithContext, delayElements, delayElements, delaySequence, delaySequence, delaySubscription, delaySubscription, delaySubscription, delayUntil, dematerialize, distinct, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterTerminate, doFinally, doFirst, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnError, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elapsed, elapsed, elementAt, elementAt, empty, error, error, error, expand, expand, expandDeep, expandDeep, filter, filterWhen, filterWhen, first, first, firstWithSignal, firstWithSignal, firstWithValue, firstWithValue, flatMap, flatMap, flatMap, flatMap, flatMapDelayError, flatMapIterable, flatMapIterable, flatMapSequential, flatMapSequential, flatMapSequential, flatMapSequentialDelayError, from, fromArray, fromIterable, fromStream, fromStream, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupJoin, handle, hasElement, hasElements, hide, ignoreElements, index, index, interval, interval, interval, interval, join, just, just, last, last, limitRate, limitRate, limitRequest, log, log, log, log, log, log, map, materialize, merge, merge, merge, merge, merge, merge, mergeDelayError, mergeOrdered, mergeOrdered, mergeOrdered, mergeOrderedWith, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequentialDelayError, mergeSequentialDelayError, mergeSequentialDelayError, mergeWith, metrics, name, never, next, ofType, onAssembly, onAssembly, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorContinue, onErrorContinue, onErrorContinue, onErrorMap, onErrorMap, onErrorMap, onErrorResume, onErrorResume, onErrorResume, onErrorReturn, onErrorReturn, onErrorReturn, onErrorStop, onTerminateDetach, or, parallel, parallel, parallel, publish, publish, publish, publish, publishNext, publishOn, publishOn, publishOn, push, push, range, reduce, reduce, reduceWith, repeat, repeat, repeat, repeat, repeatWhen, replay, replay, replay, replay, replay, replay, retry, retry, retryWhen, sample, sample, sampleFirst, sampleFirst, sampleTimeout, sampleTimeout, scan, scan, scanWith, share, shareNext, single, single, singleOrEmpty, skip, skip, skip, skipLast, skipUntil, skipUntilOther, skipWhile, sort, sort, startWith, startWith, startWith, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscriberContext, subscriberContext, subscribeWith, switchIfEmpty, switchMap, switchMap, switchOnFirst, switchOnFirst, switchOnNext, switchOnNext, tag, take, take, take, takeLast, takeUntil, takeUntilOther, takeWhile, then, then, thenEmpty, thenMany, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timestamp, timestamp, toIterable, toIterable, toIterable, toStream, toStream, toString, transform, transformDeferred, transformDeferredContextual, using, using, usingWhen, usingWhen, window, window, window, window, window, window, window, windowTimeout, windowTimeout, windowUntil, windowUntil, windowUntil, windowUntilChanged, windowUntilChanged, windowUntilChanged, windowWhen, windowWhile, windowWhile, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipWith, zipWith, zipWith, zipWith, zipWithIterable, zipWithIterable
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
public static <E> EmitterProcessor<E> create()
EmitterProcessor
using Queues.SMALL_BUFFER_SIZE
backlog size and auto-cancel.E
- Type of processed signalspublic static <E> EmitterProcessor<E> create(boolean autoCancel)
EmitterProcessor
using Queues.SMALL_BUFFER_SIZE
backlog size and the provided auto-cancel.E
- Type of processed signalsautoCancel
- automatically cancelpublic static <E> EmitterProcessor<E> create(int bufferSize)
EmitterProcessor
using the provided backlog size, with auto-cancel.E
- Type of processed signalsbufferSize
- the internal buffer size to hold signalspublic static <E> EmitterProcessor<E> create(int bufferSize, boolean autoCancel)
EmitterProcessor
using the provided backlog size and auto-cancellation.E
- Type of processed signalsbufferSize
- the internal buffer size to hold signalsautoCancel
- automatically cancelpublic Stream<? extends Scannable> inners()
Scannable
Stream
of referenced inners (flatmap, multicast etc)public Context currentContext()
CoreSubscriber
Context
from dependent components which can include downstream
operators during subscribing or a terminal Subscriber
.currentContext
in interface CoreSubscriber<T>
currentContext
in class FluxProcessor<T,T>
Context.empty()
public void subscribe(CoreSubscriber<? super T> actual)
Flux
Publisher.subscribe(Subscriber)
that will bypass
Hooks.onLastOperator(Function)
pointcut.
In addition to behave as expected by Publisher.subscribe(Subscriber)
in a controlled manner, it supports direct subscribe-time Context
passing.
subscribe
in interface CorePublisher<T>
subscribe
in class Flux<T>
actual
- the Subscriber
interested into the published sequenceFlux.subscribe(Subscriber)
public void onComplete()
onComplete
in interface Subscriber<T>
public Sinks.EmitResult tryEmitComplete()
Sinks.Many
onComplete
signal. The result of the attempt is represented as an Sinks.EmitResult
, which possibly indicates error cases.
See the list of failure Sinks.EmitResult
in #emitComplete(EmitFailureHandler)
javadoc for an
example of how each of these can be dealt with, to decide if the emit API would be a good enough fit instead.
Sinks.EmitResult
, which should be checked to distinguish different possible failuresSubscriber.onComplete()
public void onError(Throwable throwable)
onError
in interface Subscriber<T>
public Sinks.EmitResult tryEmitError(Throwable t)
Sinks.Many
onError
signal. The result of the attempt is represented as an Sinks.EmitResult
, which possibly indicates error cases.
See the list of failure Sinks.EmitResult
in #emitError(Throwable, EmitFailureHandler)
javadoc for an
example of how each of these can be dealt with, to decide if the emit API would be a good enough fit instead.
t
- the exception to signal, not nullSinks.EmitResult
, which should be checked to distinguish different possible failuresSubscriber.onError(Throwable)
public void onNext(T t)
onNext
in interface Subscriber<T>
public Sinks.EmitResult tryEmitNext(T t)
Sinks.Many
onNext
signal.
The result of the attempt is represented as an Sinks.EmitResult
, which possibly indicates error cases.
See the list of failure Sinks.EmitResult
in #emitNext(Object, EmitFailureHandler)
javadoc for an
example of how each of these can be dealt with, to decide if the emit API would be a good enough fit instead.
Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot be propagated to any asynchronous handler, a bubbling exception, ...).
t
- the value to emit, not nullSinks.EmitResult
, which should be checked to distinguish different possible failuresSubscriber.onNext(Object)
public int currentSubscriberCount()
Sinks.Many
Subscribers
are currently subscribed to the sink.
This is a best effort peek at the sink state, and a subsequent attempt at emitting
to the sink might still return Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER
where relevant.
(generally in Sinks.Many.tryEmitNext(Object)
). Request (and lack thereof) isn't taken
into account, all registered subscribers are counted.
public Flux<T> asFlux()
Sinks.Many
Flux
view of this sink. Every call returns the same instance.Flux
view associated to this Sinks.Many
protected boolean isIdentityProcessor()
FluxProcessor
FluxProcessor<T, T>
isIdentityProcessor
in class FluxProcessor<T,T>
FluxProcessor<T, T>
public int getPending()
public boolean isDisposed()
Disposable
Implementations are not required to track disposition and as such may never return true even when disposed. However, they MUST only return true when there's a guarantee the resource or task is disposed.
isDisposed
in interface Disposable
public void onSubscribe(Subscription s)
CoreSubscriber
Subscriber.onNext(Object)
before
calling Subscription.request(long)
. Should further onNext
related
state modification occur, thread-safety will be required.
Note that an invalid request <= 0
will not produce an onError and
will simply be ignored or reported through a debug-enabled
Logger
.
onSubscribe
in interface Subscriber<T>
onSubscribe
in interface CoreSubscriber<T>
@Nullable public Throwable getError()
FluxProcessor
getError
in class FluxProcessor<T,T>
public boolean isCancelled()
public final int getBufferSize()
FluxProcessor
Integer.MAX_VALUE
getBufferSize
in class FluxProcessor<T,T>
Integer.MAX_VALUE
public boolean isTerminated()
FluxProcessor
isTerminated
in class FluxProcessor<T,T>
public int getPrefetch()
Flux
Flux
getPrefetch
in class Flux<T>
Flux
, -1 if unspecified@Nullable public Object scanUnsafe(Scannable.Attr key)
Scannable
Scannable.Attr
key,
implementors should take care to return values of the correct type, and return
null if no specific value is available.
For public consumption of attributes, prefer using Scannable.scan(Attr)
, which will
return a typed value and fall back to the key's default if the component didn't
define any mapping.
scanUnsafe
in interface Scannable
scanUnsafe
in class FluxProcessor<T,T>
key
- a Scannable.Attr
to resolve for the component.public long downstreamCount()
FluxProcessor
Subscriber
or -1 if untracked.downstreamCount
in class FluxProcessor<T,T>
Subscriber
or -1 if untrackedpublic void emitNext(T value, Sinks.EmitFailureHandler failureHandler)
Sinks.Many
Sinks.Many.tryEmitNext(Object)
API, generating an
onNext
signal.
If the result of the attempt is not a success
, implementations SHOULD retry the
Sinks.Many.tryEmitNext(Object)
call IF the provided Sinks.EmitFailureHandler
returns true
.
Otherwise, failures are dealt with in a predefined way that might depend on the actual sink implementation
(see below for the vanilla reactor-core behavior).
Generally, Sinks.Many.tryEmitNext(Object)
is preferable since it allows a custom handling
of error cases, although this implies checking the returned Sinks.EmitResult
and correctly
acting on it. This API is intended as a good default for convenience.
When the Sinks.EmitResult
is not a success, vanilla reactor-core operators have the following behavior:
Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER
: no particular handling. should ideally discard the value but at that
point there's no Subscriber
from which to get a contextual discard handler.
Sinks.EmitResult.FAIL_OVERFLOW
: discard the value (Operators.onDiscard(Object, Context)
)
then call Sinks.Many.emitError(Throwable, Sinks.EmitFailureHandler)
with a Exceptions.failWithOverflow(String)
exception.
Sinks.EmitResult.FAIL_CANCELLED
: discard the value (Operators.onDiscard(Object, Context)
).
Sinks.EmitResult.FAIL_TERMINATED
: drop the value (Operators.onNextDropped(Object, Context)
).
Sinks.EmitResult.FAIL_NON_SERIALIZED
: throw an Sinks.EmissionException
mentioning RS spec rule 1.3.
Note that Sinks.unsafe()
never trigger this result. It would be possible for an Sinks.EmitFailureHandler
to busy-loop and optimistically wait for the contention to disappear to avoid this case for safe sinks...
Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot
be propagated to any asynchronous handler, a bubbling exception, a Sinks.EmitResult.FAIL_NON_SERIALIZED
as described above, ...).
emitNext
in interface Sinks.Many<T>
value
- the value to emit, not nullfailureHandler
- the failure handler that allows retrying failed Sinks.EmitResult
.Sinks.Many.tryEmitNext(Object)
,
Subscriber.onNext(Object)
public void emitComplete(Sinks.EmitFailureHandler failureHandler)
Sinks.Many
Sinks.Many.tryEmitComplete()
API, generating an
onComplete
signal.
If the result of the attempt is not a success
, implementations SHOULD retry the
Sinks.Many.tryEmitComplete()
call IF the provided Sinks.EmitFailureHandler
returns true
.
Otherwise, failures are dealt with in a predefined way that might depend on the actual sink implementation
(see below for the vanilla reactor-core behavior).
Generally, Sinks.Many.tryEmitComplete()
is preferable since it allows a custom handling
of error cases, although this implies checking the returned Sinks.EmitResult
and correctly
acting on it. This API is intended as a good default for convenience.
When the Sinks.EmitResult
is not a success, vanilla reactor-core operators have the following behavior:
Sinks.EmitResult.FAIL_OVERFLOW
: irrelevant as onComplete is not driven by backpressure.
Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER
: the completion can be ignored since nobody is listening.
Note that most vanilla reactor sinks never trigger this result for onComplete, replaying the
terminal signal to later subscribers instead (to the exception of Sinks.UnicastSpec.onBackpressureError()
).
Sinks.EmitResult.FAIL_CANCELLED
: the completion can be ignored since nobody is interested.
Sinks.EmitResult.FAIL_TERMINATED
: the extra completion is basically ignored since there was a previous
termination signal, but there is nothing interesting to log.
Sinks.EmitResult.FAIL_NON_SERIALIZED
: throw an Sinks.EmissionException
mentioning RS spec rule 1.3.
Note that Sinks.unsafe()
never trigger this result. It would be possible for an Sinks.EmitFailureHandler
to busy-loop and optimistically wait for the contention to disappear to avoid this case in safe sinks...
Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot
be propagated to any asynchronous handler, a bubbling exception, a Sinks.EmitResult.FAIL_NON_SERIALIZED
as described above, ...).
emitComplete
in interface Sinks.Many<T>
failureHandler
- the failure handler that allows retrying failed Sinks.EmitResult
.Sinks.Many.tryEmitComplete()
,
Subscriber.onComplete()
public void emitError(Throwable error, Sinks.EmitFailureHandler failureHandler)
Sinks.Many
Sinks.Many.tryEmitError(Throwable)
API, generating an
onError
signal.
If the result of the attempt is not a success
, implementations SHOULD retry the
Sinks.Many.tryEmitError(Throwable)
call IF the provided Sinks.EmitFailureHandler
returns true
.
Otherwise, failures are dealt with in a predefined way that might depend on the actual sink implementation
(see below for the vanilla reactor-core behavior).
Generally, Sinks.Many.tryEmitError(Throwable)
is preferable since it allows a custom handling
of error cases, although this implies checking the returned Sinks.EmitResult
and correctly
acting on it. This API is intended as a good default for convenience.
When the Sinks.EmitResult
is not a success, vanilla reactor-core operators have the following behavior:
Sinks.EmitResult.FAIL_OVERFLOW
: irrelevant as onError is not driven by backpressure.
Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER
: the error is ignored since nobody is listening. Note that most vanilla reactor sinks
never trigger this result for onError, replaying the terminal signal to later subscribers instead
(to the exception of Sinks.UnicastSpec.onBackpressureError()
).
Sinks.EmitResult.FAIL_CANCELLED
: the error can be ignored since nobody is interested.
Sinks.EmitResult.FAIL_TERMINATED
: the error unexpectedly follows another terminal signal, so it is
dropped via Operators.onErrorDropped(Throwable, Context)
.
Sinks.EmitResult.FAIL_NON_SERIALIZED
: throw an Sinks.EmissionException
mentioning RS spec rule 1.3.
Note that Sinks.unsafe()
never trigger this result. It would be possible for an Sinks.EmitFailureHandler
to busy-loop and optimistically wait for the contention to disappear to avoid this case in safe sinks...
Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot
be propagated to any asynchronous handler, a bubbling exception, a Sinks.EmitResult.FAIL_NON_SERIALIZED
as described above, ...).
emitError
in interface Sinks.Many<T>
error
- the exception to signal, not nullfailureHandler
- the failure handler that allows retrying failed Sinks.EmitResult
.Sinks.Many.tryEmitError(Throwable)
,
Subscriber.onError(Throwable)