T
- the input and output typepublic final class UnicastProcessor<T> extends FluxProcessor<T,T> implements Fuseable.QueueSubscription<T>, Fuseable
Note: UnicastProcessor does not respect the actual subscriber's
demand as it is described in
Reactive Streams Spec. However,
UnicastProcessor embraces configurable Queue internally which allows enabling
backpressure support and preventing of consumer's overwhelming.
Hence, interaction model between producers and UnicastProcessor will be PUSH
only. In opposite, interaction model between UnicastProcessor and consumer will be
PUSH-PULL as defined in
Reactive Streams Spec.
In the case when upstream's signals overflow the bound of internal Queue,
UnicastProcessor will fail with signaling onError(
Exceptions.OverflowException
).
Note: The implementation keeps the order of signals. That means that in case of terminal signal (completion or error signals) it will be postponed until all of the previous signals has been consumed.
Fuseable.ConditionalSubscriber<T>, Fuseable.QueueSubscription<T>, Fuseable.ScalarCallable<T>, Fuseable.SynchronousSubscription<T>
Scannable.Attr<T>
Disposable.Composite, Disposable.Swap
NOT_SUPPORTED_MESSAGE
OPERATOR_NAME_UNRELATED_WORDS_PATTERN
Constructor and Description |
---|
UnicastProcessor(Queue<T> queue) |
UnicastProcessor(Queue<T> queue,
Consumer<? super T> onOverflow,
Disposable onTerminate) |
UnicastProcessor(Queue<T> queue,
Disposable onTerminate) |
Modifier and Type | Method and Description |
---|---|
CoreSubscriber<? super T> |
actual() |
void |
cancel() |
void |
clear() |
static <E> UnicastProcessor<E> |
create()
Create a new
UnicastProcessor that will buffer on an internal queue in an
unbounded fashion. |
static <E> UnicastProcessor<E> |
create(Queue<E> queue)
Create a new
UnicastProcessor that will buffer on a provided queue in an
unbounded fashion. |
static <E> UnicastProcessor<E> |
create(Queue<E> queue,
Consumer<? super E> onOverflow,
Disposable endcallback)
Create a new
UnicastProcessor that will buffer on a provided queue in an
unbounded fashion. |
static <E> UnicastProcessor<E> |
create(Queue<E> queue,
Disposable endcallback)
Create a new
UnicastProcessor that will buffer on a provided queue in an
unbounded fashion. |
Context |
currentContext()
Request a
Context from dependent components which can include downstream
operators during subscribing or a terminal Subscriber . |
long |
downstreamCount()
Return the number of active
Subscriber or -1 if untracked. |
int |
getBufferSize()
Return the processor buffer capacity if any or
Integer.MAX_VALUE |
Throwable |
getError()
Current error if any, default to null
|
int |
getPrefetch()
The prefetch configuration of the
Flux |
boolean |
hasDownstreams()
Return true if any
Subscriber is actively subscribed |
boolean |
isDisposed()
Optionally return true when the resource or task is disposed.
|
boolean |
isEmpty() |
boolean |
isTerminated()
Has this upstream finished or "completed" / "failed" ?
|
void |
onComplete() |
void |
onError(Throwable t) |
void |
onNext(T t) |
void |
onSubscribe(Subscription s)
Implementors should initialize any state used by
Subscriber.onNext(Object) before
calling Subscription.request(long) . |
T |
poll() |
void |
request(long n) |
int |
requestFusion(int requestedMode)
Request a specific fusion mode from this QueueSubscription.
|
Object |
scanUnsafe(Scannable.Attr key)
This method is used internally by components to define their key-value mappings
in a single place.
|
int |
size() |
default String |
stepName()
Return a meaningful
String representation of this Scannable in
its chain of Scannable.parents() and Scannable.actuals() . |
void |
subscribe(CoreSubscriber<? super T> actual)
An internal
Publisher.subscribe(Subscriber) that will bypass
Hooks.onLastOperator(Function) pointcut. |
dispose, hasCompleted, hasError, inners, isSerialized, serialize, serializeAlways, sink, sink, switchOnNext, wrap
all, any, as, blockFirst, blockFirst, blockLast, blockLast, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferUntil, bufferUntil, bufferUntilChanged, bufferUntilChanged, bufferUntilChanged, bufferWhen, bufferWhen, bufferWhile, cache, cache, cache, cache, cache, cache, cancelOn, cast, checkpoint, checkpoint, checkpoint, collect, collect, collectList, collectMap, collectMap, collectMap, collectMultimap, collectMultimap, collectMultimap, collectSortedList, collectSortedList, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, concat, concat, concat, concat, concatDelayError, concatDelayError, concatDelayError, concatDelayError, concatMap, concatMap, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapIterable, concatMapIterable, concatWith, concatWithValues, count, create, create, defaultIfEmpty, defer, deferWithContext, delayElements, delayElements, delaySequence, delaySequence, delaySubscription, delaySubscription, delaySubscription, delayUntil, dematerialize, distinct, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterTerminate, doFinally, doFirst, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnError, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elapsed, elapsed, elementAt, elementAt, empty, error, error, error, expand, expand, expandDeep, expandDeep, filter, filterWhen, filterWhen, first, first, flatMap, flatMap, flatMap, flatMap, flatMapDelayError, flatMapIterable, flatMapIterable, flatMapSequential, flatMapSequential, flatMapSequential, flatMapSequentialDelayError, from, fromArray, fromIterable, fromStream, fromStream, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupJoin, handle, hasElement, hasElements, hide, ignoreElements, index, index, interval, interval, interval, interval, join, just, just, last, last, limitRate, limitRate, limitRequest, log, log, log, log, log, log, map, materialize, merge, merge, merge, merge, merge, merge, mergeDelayError, mergeOrdered, mergeOrdered, mergeOrdered, mergeOrderedWith, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequentialDelayError, mergeSequentialDelayError, mergeSequentialDelayError, mergeWith, metrics, name, never, next, ofType, onAssembly, onAssembly, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorContinue, onErrorContinue, onErrorContinue, onErrorMap, onErrorMap, onErrorMap, onErrorResume, onErrorResume, onErrorResume, onErrorReturn, onErrorReturn, onErrorReturn, onErrorStop, onTerminateDetach, or, parallel, parallel, parallel, publish, publish, publish, publish, publishNext, publishOn, publishOn, publishOn, push, push, range, reduce, reduce, reduceWith, repeat, repeat, repeat, repeat, repeatWhen, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retryBackoff, retryBackoff, retryBackoff, retryBackoff, retryBackoff, retryWhen, sample, sample, sampleFirst, sampleFirst, sampleTimeout, sampleTimeout, scan, scan, scanWith, share, single, single, singleOrEmpty, skip, skip, skip, skipLast, skipUntil, skipUntilOther, skipWhile, sort, sort, startWith, startWith, startWith, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscriberContext, subscriberContext, subscribeWith, switchIfEmpty, switchMap, switchMap, switchOnFirst, switchOnNext, switchOnNext, tag, take, take, take, takeLast, takeUntil, takeUntilOther, takeWhile, then, then, thenEmpty, thenMany, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timestamp, timestamp, toIterable, toIterable, toIterable, toStream, toStream, toString, transform, transformDeferred, using, using, usingWhen, usingWhen, window, window, window, window, window, window, window, windowTimeout, windowTimeout, windowUntil, windowUntil, windowUntil, windowUntilChanged, windowUntilChanged, windowUntilChanged, windowWhen, windowWhile, windowWhile, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipWith, zipWith, zipWith, zipWith, zipWithIterable, zipWithIterable
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
add, addAll, contains, containsAll, element, iterator, offer, peek, remove, remove, removeAll, retainAll, toArray, toArray
equals, hashCode, parallelStream, removeIf, spliterator, stream
public UnicastProcessor(Queue<T> queue, Consumer<? super T> onOverflow, Disposable onTerminate)
public UnicastProcessor(Queue<T> queue, Disposable onTerminate)
public CoreSubscriber<? super T> actual()
public void cancel()
cancel
in interface Subscription
public void clear()
clear
in interface Collection<T>
public static <E> UnicastProcessor<E> create()
UnicastProcessor
that will buffer on an internal queue in an
unbounded fashion.E
- the relayed typeFluxProcessor
public static <E> UnicastProcessor<E> create(Queue<E> queue)
UnicastProcessor
that will buffer on a provided queue in an
unbounded fashion.E
- the relayed typequeue
- the buffering queueFluxProcessor
public static <E> UnicastProcessor<E> create(Queue<E> queue, Consumer<? super E> onOverflow, Disposable endcallback)
UnicastProcessor
that will buffer on a provided queue in an
unbounded fashion.E
- the relayed typequeue
- the buffering queueendcallback
- called on any terminal signalonOverflow
- called when queue.offer return false and unicastProcessor is
about to emit onError.FluxProcessor
public static <E> UnicastProcessor<E> create(Queue<E> queue, Disposable endcallback)
UnicastProcessor
that will buffer on a provided queue in an
unbounded fashion.E
- the relayed typequeue
- the buffering queueendcallback
- called on any terminal signalFluxProcessor
public Context currentContext()
CoreSubscriber
Context
from dependent components which can include downstream
operators during subscribing or a terminal Subscriber
.currentContext
in interface CoreSubscriber<T>
currentContext
in class FluxProcessor<T,T>
Context.empty()
public long downstreamCount()
FluxProcessor
Subscriber
or -1 if untracked.downstreamCount
in class FluxProcessor<T,T>
Subscriber
or -1 if untrackedpublic int getBufferSize()
FluxProcessor
Integer.MAX_VALUE
getBufferSize
in class FluxProcessor<T,T>
Integer.MAX_VALUE
@Nullable public Throwable getError()
FluxProcessor
getError
in class FluxProcessor<T,T>
public int getPrefetch()
Flux
Flux
getPrefetch
in class Flux<T>
Flux
, -1 if unspecifiedpublic boolean hasDownstreams()
FluxProcessor
Subscriber
is actively subscribedhasDownstreams
in class FluxProcessor<T,T>
Subscriber
is actively subscribedpublic boolean isDisposed()
Disposable
Implementations are not required to track disposition and as such may never return true even when disposed. However, they MUST only return true when there's a guarantee the resource or task is disposed.
isDisposed
in interface Disposable
public boolean isEmpty()
isEmpty
in interface Collection<T>
public boolean isTerminated()
FluxProcessor
isTerminated
in class FluxProcessor<T,T>
public void onComplete()
onComplete
in interface Subscriber<T>
public void onError(Throwable t)
onError
in interface Subscriber<T>
public void onNext(T t)
onNext
in interface Subscriber<T>
public void onSubscribe(Subscription s)
CoreSubscriber
Subscriber.onNext(Object)
before
calling Subscription.request(long)
. Should further onNext
related
state modification occur, thread-safety will be required.
Note that an invalid request <= 0
will not produce an onError and
will simply be ignored or reported through a debug-enabled
Logger
.
onSubscribe
in interface Subscriber<T>
onSubscribe
in interface CoreSubscriber<T>
public void request(long n)
request
in interface Subscription
public int requestFusion(int requestedMode)
Fuseable.QueueSubscription
One should request either SYNC, ASYNC or ANY modes (never NONE) and the implementor should return NONE, SYNC or ASYNC (never ANY).
For example, if a source supports only ASYNC fusion but the intermediate operator supports only SYNC fuseable sources, the operator may request SYNC fusion and the source can reject it via NONE, thus the operator can return NONE as well to downstream and the fusion doesn't happen.
requestFusion
in interface Fuseable.QueueSubscription<T>
requestedMode
- the mode requested by the intermediate operatorpublic Object scanUnsafe(Scannable.Attr key)
Scannable
Scannable.Attr
key,
implementors should take care to return values of the correct type, and return
null if no specific value is available.
For public consumption of attributes, prefer using Scannable.scan(Attr)
, which will
return a typed value and fall back to the key's default if the component didn't
define any mapping.
scanUnsafe
in interface Scannable
scanUnsafe
in class FluxProcessor<T,T>
key
- a Scannable.Attr
to resolve for the component.public int size()
size
in interface Collection<T>
public void subscribe(CoreSubscriber<? super T> actual)
Flux
Publisher.subscribe(Subscriber)
that will bypass
Hooks.onLastOperator(Function)
pointcut.
In addition to behave as expected by Publisher.subscribe(Subscriber)
in a controlled manner, it supports direct subscribe-time Context
passing.
subscribe
in interface CorePublisher<T>
subscribe
in class Flux<T>
actual
- the Subscriber
interested into the published sequenceFlux.subscribe(Subscriber)
public String stepName()
Scannable
String
representation of this Scannable
in
its chain of Scannable.parents()
and Scannable.actuals()
.