E
- Type of dispatched signal@Deprecated public final class WorkQueueProcessor<E> extends FluxProcessor<IN,IN>
Created from Flux.share()
, the WorkQueueProcessor
will authorize concurrent publishing
(multi-producer) from its receiving side Subscriber.onNext(Object)
.
WorkQueueProcessor
is able to replay up to its buffer size number of failed signals (either
dropped or fatally throwing on child Subscriber.onNext(T)
).
The processor is very similar to TopicProcessor
but
only partially respects the Reactive Streams contract.
The purpose of this processor is to distribute the signals to only one of the subscribed subscribers and to share the demand amongst all subscribers. The scenario is akin to Executor or Round-Robin distribution. However there is no guarantee the distribution will be respecting a round-robin distribution all the time.
The core use for this component is to scale up easily without suffering the overhead of an Executor and without using dedicated queues by subscriber, which is less used memory, less GC, more win.
Modifier and Type | Class and Description |
---|---|
static class |
reactor.extra.processor.EventLoopProcessor.Slot<T>
Deprecated.
A simple reusable data container.
|
static class |
WorkQueueProcessor.Builder<T>
Deprecated.
WorkQueueProcessor builder that can be used to create new
processors. |
Scannable.Attr<T>
Disposable.Composite, Disposable.Swap
OPERATOR_NAME_UNRELATED_WORDS_PATTERN
Modifier and Type | Method and Description |
---|---|
boolean |
alive()
Deprecated.
Determine whether this
Processor can be used. |
boolean |
awaitAndShutdown()
Deprecated.
Block until all submitted tasks have completed, then do a normal
EventLoopProcessor.dispose() . |
boolean |
awaitAndShutdown(java.time.Duration timeout)
Deprecated.
Block until all submitted tasks have completed, then do a normal
EventLoopProcessor#dispose() . |
boolean |
awaitAndShutdown(long timeout,
TimeUnit timeUnit)
Deprecated.
use
awaitAndShutdown(Duration) instead |
static <T> WorkQueueProcessor.Builder<T> |
builder()
Deprecated.
Create a new
WorkQueueProcessor WorkQueueProcessor.Builder with default properties. |
static <E> WorkQueueProcessor<E> |
create()
Deprecated.
Create a new WorkQueueProcessor using
Queues.SMALL_BUFFER_SIZE backlog size,
blockingWait Strategy and auto-cancel. |
static <E> WorkQueueProcessor<E> |
create(String name,
int bufferSize)
Deprecated.
Create a new WorkQueueProcessor using the passed buffer size, blockingWait
Strategy and auto-cancel.
|
protected static String |
defaultName(ThreadFactory threadFactory,
Class<? extends reactor.extra.processor.EventLoopProcessor> clazz)
Deprecated.
A method to extract a name from the ThreadFactory if it turns out to be a Supplier
(in which case the supplied value string representation is used).
|
protected static ExecutorService |
defaultRequestTaskExecutor(String name)
Deprecated.
A method to create a suitable default
ExecutorService for use in implementors
requestTask(Subscription) (a cached
thread pool ), reusing a main name and appending [request-task] suffix. |
protected void |
doComplete()
Deprecated.
|
protected void |
doError(Throwable t)
Deprecated.
|
long |
downstreamCount()
Deprecated.
|
Flux<E> |
drain()
Deprecated.
Drain is a hot replication of the current buffer delivered if supported.
|
Flux<IN> |
forceShutdown()
Deprecated.
Shutdown this
Processor , forcibly halting any work currently executing and discarding any tasks that have
not yet been executed. |
long |
getAvailableCapacity()
Deprecated.
|
int |
getBufferSize()
Deprecated.
|
Throwable |
getError()
Deprecated.
|
long |
getPending()
Deprecated.
Return the number of parked elements in the emitter backlog.
|
int |
hashCode()
Deprecated.
|
java.util.stream.Stream<? extends Scannable> |
inners()
Deprecated.
|
boolean |
isSerialized()
Deprecated.
|
boolean |
isTerminated()
Deprecated.
|
void |
onComplete()
Deprecated.
|
void |
onError(Throwable t)
Deprecated.
|
void |
onNext(IN o)
Deprecated.
|
void |
onSubscribe(Subscription s)
Deprecated.
|
protected void |
requestTask(Subscription s)
Deprecated.
|
void |
run()
Deprecated.
|
Object |
scanUnsafe(Scannable.Attr key)
Deprecated.
|
protected boolean |
serializeAlways()
Deprecated.
|
static <E> WorkQueueProcessor<E> |
share(String name,
int bufferSize)
Deprecated.
Create a new shared WorkQueueProcessor using the passed buffer size, blockingWait
Strategy and auto-cancel.
|
void |
shutdown()
Deprecated.
Shutdown this active
Processor such that it can no longer be used. |
void |
subscribe(CoreSubscriber<? super E> actual)
Deprecated.
|
String |
toString()
Deprecated.
|
currentContext, dispose, hasCompleted, hasDownstreams, hasError, isIdentityProcessor, serialize, sink, sink, switchOnNext, wrap
all, any, as, blockFirst, blockFirst, blockLast, blockLast, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferUntil, bufferUntil, bufferUntilChanged, bufferUntilChanged, bufferUntilChanged, bufferWhen, bufferWhen, bufferWhile, cache, cache, cache, cache, cache, cache, cancelOn, cast, checkpoint, checkpoint, checkpoint, collect, collect, collectList, collectMap, collectMap, collectMap, collectMultimap, collectMultimap, collectMultimap, collectSortedList, collectSortedList, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, concat, concat, concat, concat, concatDelayError, concatDelayError, concatDelayError, concatDelayError, concatMap, concatMap, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapIterable, concatMapIterable, concatWith, concatWithValues, contextWrite, contextWrite, count, create, create, defaultIfEmpty, defer, deferContextual, deferWithContext, delayElements, delayElements, delaySequence, delaySequence, delaySubscription, delaySubscription, delaySubscription, delayUntil, dematerialize, distinct, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterTerminate, doFinally, doFirst, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnError, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elapsed, elapsed, elementAt, elementAt, empty, error, error, error, expand, expand, expandDeep, expandDeep, filter, filterWhen, filterWhen, first, first, firstWithSignal, firstWithSignal, firstWithValue, firstWithValue, flatMap, flatMap, flatMap, flatMap, flatMapDelayError, flatMapIterable, flatMapIterable, flatMapSequential, flatMapSequential, flatMapSequential, flatMapSequentialDelayError, from, fromArray, fromIterable, fromStream, fromStream, generate, generate, generate, getPrefetch, groupBy, groupBy, groupBy, groupBy, groupJoin, handle, hasElement, hasElements, hide, ignoreElements, index, index, interval, interval, interval, interval, join, just, just, last, last, limitRate, limitRate, limitRequest, log, log, log, log, log, log, map, materialize, merge, merge, merge, merge, merge, merge, mergeDelayError, mergeOrdered, mergeOrdered, mergeOrdered, mergeOrderedWith, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequentialDelayError, mergeSequentialDelayError, mergeSequentialDelayError, mergeWith, metrics, name, never, next, ofType, onAssembly, onAssembly, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorContinue, onErrorContinue, onErrorContinue, onErrorMap, onErrorMap, onErrorMap, onErrorResume, onErrorResume, onErrorResume, onErrorReturn, onErrorReturn, onErrorReturn, onErrorStop, onTerminateDetach, or, parallel, parallel, parallel, publish, publish, publish, publish, publishNext, publishOn, publishOn, publishOn, push, push, range, reduce, reduce, reduceWith, repeat, repeat, repeat, repeat, repeatWhen, replay, replay, replay, replay, replay, replay, retry, retry, retryWhen, sample, sample, sampleFirst, sampleFirst, sampleTimeout, sampleTimeout, scan, scan, scanWith, share, shareNext, single, single, singleOrEmpty, skip, skip, skip, skipLast, skipUntil, skipUntilOther, skipWhile, sort, sort, startWith, startWith, startWith, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscriberContext, subscriberContext, subscribeWith, switchIfEmpty, switchMap, switchMap, switchOnFirst, switchOnFirst, switchOnNext, switchOnNext, tag, take, take, take, takeLast, takeUntil, takeUntilOther, takeWhile, then, then, thenEmpty, thenMany, timed, timed, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timestamp, timestamp, toIterable, toIterable, toIterable, toStream, toStream, transform, transformDeferred, transformDeferredContextual, using, using, usingWhen, usingWhen, window, window, window, window, window, window, window, windowTimeout, windowTimeout, windowUntil, windowUntil, windowUntil, windowUntilChanged, windowUntilChanged, windowUntilChanged, windowWhen, windowWhile, windowWhile, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipWith, zipWith, zipWith, zipWith, zipWithIterable, zipWithIterable
clone, equals, finalize, getClass, notify, notifyAll, wait, wait, wait
actuals, from, isScanAvailable, name, parents, scan, scanOrDefault, stepName, steps, tags
isDisposed
public static final <T> WorkQueueProcessor.Builder<T> builder()
WorkQueueProcessor
WorkQueueProcessor.Builder
with default properties.public static <E> WorkQueueProcessor<E> create()
Queues.SMALL_BUFFER_SIZE
backlog size,
blockingWait Strategy and auto-cancel. A new Cached ThreadExecutorPool will be implicitly created.
E
- Type of processed signalspublic static <E> WorkQueueProcessor<E> create(String name, int bufferSize)
A new Cached ThreadExecutorPool will be implicitly created and will use the passed name to qualify the created threads.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created
threadsbufferSize
- A Backlog Size to mitigate slow subscriberspublic static <E> WorkQueueProcessor<E> share(String name, int bufferSize)
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
A new Cached ThreadExecutorPool will be implicitly created and will use the passed name to qualify the created threads.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created
threadsbufferSize
- A Backlog Size to mitigate slow subscriberspublic void subscribe(CoreSubscriber<? super E> actual)
public Flux<E> drain()
Flux
sequence possibly unbounded of incoming buffered values or empty if not supported.protected void doError(Throwable t)
protected void doComplete()
protected void requestTask(Subscription s)
public long getPending()
public void run()
@Nullable public Object scanUnsafe(Scannable.Attr key)
scanUnsafe
in interface Scannable
scanUnsafe
in class FluxProcessor<IN,IN>
protected static String defaultName(@Nullable ThreadFactory threadFactory, Class<? extends reactor.extra.processor.EventLoopProcessor> clazz)
threadFactory
- the factory to test for a supplied nameclazz
- protected static ExecutorService defaultRequestTaskExecutor(String name)
ExecutorService
for use in implementors
requestTask(Subscription)
(a cached
thread pool
), reusing a main name and appending [request-task]
suffix.name
- the main thread name used by the processor.ExecutorService
for requestTask.public final boolean alive()
Processor
can be used.Resource
is alive and can be used, false otherwise.public final boolean awaitAndShutdown()
EventLoopProcessor.dispose()
.@Deprecated public final boolean awaitAndShutdown(long timeout, TimeUnit timeUnit)
awaitAndShutdown(Duration)
insteadEventLoopProcessor#dispose()
.timeout
- the timeout valuetimeUnit
- the unit for timeoutpublic final boolean awaitAndShutdown(java.time.Duration timeout)
EventLoopProcessor#dispose()
.timeout
- the timeout value as a Duration
. Note this is converted to a Long
of nanoseconds (which amounts to roughly 292 years maximum timeout).public java.util.stream.Stream<? extends Scannable> inners()
inners
in interface Scannable
inners
in class FluxProcessor<IN,IN>
public final Flux<IN> forceShutdown()
Processor
, forcibly halting any work currently executing and discarding any tasks that have
not yet been executed.public final long getAvailableCapacity()
@Nullable public final Throwable getError()
getError
in class FluxProcessor<IN,IN>
public boolean isSerialized()
isSerialized
in class FluxProcessor<IN,IN>
public final boolean isTerminated()
isTerminated
in class FluxProcessor<IN,IN>
public final void onComplete()
onComplete
in interface Subscriber<IN>
public final void onError(Throwable t)
onError
in interface Subscriber<IN>
public final void onNext(IN o)
onNext
in interface Subscriber<IN>
public final void onSubscribe(Subscription s)
onSubscribe
in interface Subscriber<IN>
onSubscribe
in interface CoreSubscriber<IN>
protected boolean serializeAlways()
serializeAlways
in class FluxProcessor<IN,IN>
public final void shutdown()
Processor
such that it can no longer be used. If the resource carries any work, it
will wait (but NOT blocking the caller) for all the remaining tasks to perform before closing the resource.public final int getBufferSize()
getBufferSize
in class FluxProcessor<IN,IN>
public long downstreamCount()
downstreamCount
in class FluxProcessor<IN,IN>