T - the input and output typepublic final class UnicastProcessor<T> extends FluxProcessor<T,T> implements Fuseable.QueueSubscription<T>, Fuseable
Note: UnicastProcessor does not respect the actual subscriber's
demand as it is described in
Reactive Streams Spec. However,
UnicastProcessor embraces configurable Queue internally which allows enabling
backpressure support and preventing of consumer's overwhelming.
Hence, interaction model between producers and UnicastProcessor will be PUSH
only. In opposite, interaction model between UnicastProcessor and consumer will be
PUSH-PULL as defined in
Reactive Streams Spec.
In the case when upstream's signals overflow the bound of internal Queue,
UnicastProcessor will fail with signaling onError(
Exceptions.OverflowException).
Note: The implementation keeps the order of signals. That means that in case of terminal signal (completion or error signals) it will be postponed until all of the previous signals has been consumed.
Fuseable.ConditionalSubscriber<T>, Fuseable.QueueSubscription<T>, Fuseable.ScalarCallable<T>, Fuseable.SynchronousSubscription<T>Scannable.Attr<T>Disposable.Composite, Disposable.SwapNOT_SUPPORTED_MESSAGEOPERATOR_NAME_UNRELATED_WORDS_PATTERN| Constructor and Description |
|---|
UnicastProcessor(Queue<T> queue) |
UnicastProcessor(Queue<T> queue,
Consumer<? super T> onOverflow,
Disposable onTerminate) |
UnicastProcessor(Queue<T> queue,
Disposable onTerminate) |
| Modifier and Type | Method and Description |
|---|---|
CoreSubscriber<? super T> |
actual() |
void |
cancel() |
void |
clear() |
static <E> UnicastProcessor<E> |
create()
Create a new
UnicastProcessor that will buffer on an internal queue in an
unbounded fashion. |
static <E> UnicastProcessor<E> |
create(Queue<E> queue)
Create a new
UnicastProcessor that will buffer on a provided queue in an
unbounded fashion. |
static <E> UnicastProcessor<E> |
create(Queue<E> queue,
Consumer<? super E> onOverflow,
Disposable endcallback)
Create a new
UnicastProcessor that will buffer on a provided queue in an
unbounded fashion. |
static <E> UnicastProcessor<E> |
create(Queue<E> queue,
Disposable endcallback)
Create a new
UnicastProcessor that will buffer on a provided queue in an
unbounded fashion. |
Context |
currentContext()
Request a
Context from dependent components which can include downstream
operators during subscribing or a terminal Subscriber. |
long |
downstreamCount()
Return the number of active
Subscriber or -1 if untracked. |
int |
getBufferSize()
Return the processor buffer capacity if any or
Integer.MAX_VALUE |
Throwable |
getError()
Current error if any, default to null
|
int |
getPrefetch()
The prefetch configuration of the
Flux |
boolean |
hasDownstreams()
Return true if any
Subscriber is actively subscribed |
boolean |
isDisposed()
Optionally return true when the resource or task is disposed.
|
boolean |
isEmpty() |
boolean |
isTerminated()
Has this upstream finished or "completed" / "failed" ?
|
void |
onComplete() |
void |
onError(Throwable t) |
void |
onNext(T t) |
void |
onSubscribe(Subscription s)
Implementors should initialize any state used by
Subscriber.onNext(Object) before
calling Subscription.request(long). |
T |
poll() |
void |
request(long n) |
int |
requestFusion(int requestedMode)
Request a specific fusion mode from this QueueSubscription.
|
Object |
scanUnsafe(Scannable.Attr key)
This method is used internally by components to define their key-value mappings
in a single place.
|
int |
size() |
default String |
stepName()
Return a meaningful
String representation of this Scannable in
its chain of Scannable.parents() and Scannable.actuals(). |
void |
subscribe(CoreSubscriber<? super T> actual)
An internal
Publisher.subscribe(Subscriber) that will bypass
Hooks.onLastOperator(Function) pointcut. |
dispose, hasCompleted, hasError, inners, isSerialized, serialize, serializeAlways, sink, sink, switchOnNext, wrapall, any, as, blockFirst, blockFirst, blockLast, blockLast, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferUntil, bufferUntil, bufferUntilChanged, bufferUntilChanged, bufferUntilChanged, bufferWhen, bufferWhen, bufferWhile, cache, cache, cache, cache, cache, cache, cancelOn, cast, checkpoint, checkpoint, checkpoint, collect, collect, collectList, collectMap, collectMap, collectMap, collectMultimap, collectMultimap, collectMultimap, collectSortedList, collectSortedList, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, concat, concat, concat, concat, concatDelayError, concatDelayError, concatDelayError, concatDelayError, concatMap, concatMap, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapIterable, concatMapIterable, concatWith, concatWithValues, count, create, create, defaultIfEmpty, defer, deferWithContext, delayElements, delayElements, delaySequence, delaySequence, delaySubscription, delaySubscription, delaySubscription, delayUntil, dematerialize, distinct, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterTerminate, doFinally, doFirst, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnError, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elapsed, elapsed, elementAt, elementAt, empty, error, error, error, expand, expand, expandDeep, expandDeep, filter, filterWhen, filterWhen, first, first, flatMap, flatMap, flatMap, flatMap, flatMapDelayError, flatMapIterable, flatMapIterable, flatMapSequential, flatMapSequential, flatMapSequential, flatMapSequentialDelayError, from, fromArray, fromIterable, fromStream, fromStream, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupJoin, handle, hasElement, hasElements, hide, ignoreElements, index, index, interval, interval, interval, interval, join, just, just, last, last, limitRate, limitRate, limitRequest, log, log, log, log, log, log, map, materialize, merge, merge, merge, merge, merge, merge, mergeDelayError, mergeOrdered, mergeOrdered, mergeOrdered, mergeOrderedWith, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequentialDelayError, mergeSequentialDelayError, mergeSequentialDelayError, mergeWith, metrics, name, never, next, ofType, onAssembly, onAssembly, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorContinue, onErrorContinue, onErrorContinue, onErrorMap, onErrorMap, onErrorMap, onErrorResume, onErrorResume, onErrorResume, onErrorReturn, onErrorReturn, onErrorReturn, onErrorStop, onTerminateDetach, or, parallel, parallel, parallel, publish, publish, publish, publish, publishNext, publishOn, publishOn, publishOn, push, push, range, reduce, reduce, reduceWith, repeat, repeat, repeat, repeat, repeatWhen, replay, replay, replay, replay, replay, replay, retry, retry, retryWhen, sample, sample, sampleFirst, sampleFirst, sampleTimeout, sampleTimeout, scan, scan, scanWith, share, single, single, singleOrEmpty, skip, skip, skip, skipLast, skipUntil, skipUntilOther, skipWhile, sort, sort, startWith, startWith, startWith, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscriberContext, subscriberContext, subscribeWith, switchIfEmpty, switchMap, switchMap, switchOnFirst, switchOnFirst, switchOnNext, switchOnNext, tag, take, take, take, takeLast, takeUntil, takeUntilOther, takeWhile, then, then, thenEmpty, thenMany, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timestamp, timestamp, toIterable, toIterable, toIterable, toStream, toStream, toString, transform, transformDeferred, using, using, usingWhen, usingWhen, window, window, window, window, window, window, window, windowTimeout, windowTimeout, windowUntil, windowUntil, windowUntil, windowUntilChanged, windowUntilChanged, windowUntilChanged, windowWhen, windowWhile, windowWhile, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipWith, zipWith, zipWith, zipWith, zipWithIterable, zipWithIterableclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitadd, addAll, contains, containsAll, element, iterator, offer, peek, remove, remove, removeAll, retainAll, toArray, toArrayequals, hashCode, parallelStream, removeIf, spliterator, streampublic UnicastProcessor(Queue<T> queue, Consumer<? super T> onOverflow, Disposable onTerminate)
public UnicastProcessor(Queue<T> queue, Disposable onTerminate)
public CoreSubscriber<? super T> actual()
public void cancel()
cancel in interface Subscriptionpublic void clear()
clear in interface Collection<T>public static <E> UnicastProcessor<E> create()
UnicastProcessor that will buffer on an internal queue in an
unbounded fashion.E - the relayed typeFluxProcessorpublic static <E> UnicastProcessor<E> create(Queue<E> queue)
UnicastProcessor that will buffer on a provided queue in an
unbounded fashion.E - the relayed typequeue - the buffering queueFluxProcessorpublic static <E> UnicastProcessor<E> create(Queue<E> queue, Consumer<? super E> onOverflow, Disposable endcallback)
UnicastProcessor that will buffer on a provided queue in an
unbounded fashion.E - the relayed typequeue - the buffering queueendcallback - called on any terminal signalonOverflow - called when queue.offer return false and unicastProcessor is
about to emit onError.FluxProcessorpublic static <E> UnicastProcessor<E> create(Queue<E> queue, Disposable endcallback)
UnicastProcessor that will buffer on a provided queue in an
unbounded fashion.E - the relayed typequeue - the buffering queueendcallback - called on any terminal signalFluxProcessorpublic Context currentContext()
CoreSubscriberContext from dependent components which can include downstream
operators during subscribing or a terminal Subscriber.currentContext in interface CoreSubscriber<T>currentContext in class FluxProcessor<T,T>Context.empty()public long downstreamCount()
FluxProcessorSubscriber or -1 if untracked.downstreamCount in class FluxProcessor<T,T>Subscriber or -1 if untrackedpublic int getBufferSize()
FluxProcessorInteger.MAX_VALUEgetBufferSize in class FluxProcessor<T,T>Integer.MAX_VALUE@Nullable public Throwable getError()
FluxProcessorgetError in class FluxProcessor<T,T>public int getPrefetch()
FluxFluxgetPrefetch in class Flux<T>Flux, -1 if unspecifiedpublic boolean hasDownstreams()
FluxProcessorSubscriber is actively subscribedhasDownstreams in class FluxProcessor<T,T>Subscriber is actively subscribedpublic boolean isDisposed()
DisposableImplementations are not required to track disposition and as such may never return true even when disposed. However, they MUST only return true when there's a guarantee the resource or task is disposed.
isDisposed in interface Disposablepublic boolean isEmpty()
isEmpty in interface Collection<T>public boolean isTerminated()
FluxProcessorisTerminated in class FluxProcessor<T,T>public void onComplete()
onComplete in interface Subscriber<T>public void onError(Throwable t)
onError in interface Subscriber<T>public void onNext(T t)
onNext in interface Subscriber<T>public void onSubscribe(Subscription s)
CoreSubscriberSubscriber.onNext(Object) before
calling Subscription.request(long). Should further onNext related
state modification occur, thread-safety will be required.
Note that an invalid request <= 0 will not produce an onError and
will simply be ignored or reported through a debug-enabled
Logger.
onSubscribe in interface Subscriber<T>onSubscribe in interface CoreSubscriber<T>public void request(long n)
request in interface Subscriptionpublic int requestFusion(int requestedMode)
Fuseable.QueueSubscriptionOne should request either SYNC, ASYNC or ANY modes (never NONE) and the implementor should return NONE, SYNC or ASYNC (never ANY).
For example, if a source supports only ASYNC fusion but the intermediate operator supports only SYNC fuseable sources, the operator may request SYNC fusion and the source can reject it via NONE, thus the operator can return NONE as well to downstream and the fusion doesn't happen.
requestFusion in interface Fuseable.QueueSubscription<T>requestedMode - the mode requested by the intermediate operatorpublic Object scanUnsafe(Scannable.Attr key)
ScannableScannable.Attr key,
implementors should take care to return values of the correct type, and return
null if no specific value is available.
For public consumption of attributes, prefer using Scannable.scan(Attr), which will
return a typed value and fall back to the key's default if the component didn't
define any mapping.
scanUnsafe in interface ScannablescanUnsafe in class FluxProcessor<T,T>key - a Scannable.Attr to resolve for the component.public int size()
size in interface Collection<T>public void subscribe(CoreSubscriber<? super T> actual)
FluxPublisher.subscribe(Subscriber) that will bypass
Hooks.onLastOperator(Function) pointcut.
In addition to behave as expected by Publisher.subscribe(Subscriber)
in a controlled manner, it supports direct subscribe-time Context passing.
subscribe in interface CorePublisher<T>subscribe in class Flux<T>actual - the Subscriber interested into the published sequenceFlux.subscribe(Subscriber)public String stepName()
ScannableString representation of this Scannable in
its chain of Scannable.parents() and Scannable.actuals().