E
- Type of dispatched signalpublic final class TopicProcessor<E> extends FluxProcessor<IN,IN>
Created from share(boolean)
, the TopicProcessor
will authorize concurrent publishing (multi-producer)
from its receiving side Subscriber.onNext(Object)
.
Additionally, any of the TopicProcessor
will stop the event loop thread if an error occurs.
The processor
respects the Reactive Streams contract and must not be signalled concurrently on any
onXXXX method if share(boolean)
has not been used. Each subscriber will be assigned a unique thread that will only
stop on
terminal event: Complete, Error or Cancel. If Auto-Cancel is enabled, when all
subscribers are unregistered, a cancel signal is sent to the upstream Publisher if any.
Executor can be customized and will define how many concurrent subscribers are allowed
(fixed thread). When a Subscriber requests Long.MAX, there won't be any backpressure
applied and the producer will run at risk of being throttled if the subscribers don't
catch up. With any other strictly positive demand, a subscriber will stop reading new
Next signals (Complete and Error will still be read) as soon as the demand has been
fully consumed by the publisher.
When more than 1 subscriber listens to that processor, they will all receive the exact same events if their respective demand is still strictly positive, very much like a Fan-Out scenario.
When the backlog has been completely booked and no subscribers is draining the signals, the publisher will start throttling. In effect the smaller the backlog size is defined, the smaller the difference in processing rate between subscribers must remain. Since the sequence for each subscriber will point to various ringBuffer locations, the processor knows when a backlog can't override the previously occupied slot.
Modifier and Type | Class and Description |
---|---|
static class |
reactor.core.publisher.EventLoopProcessor.Slot<T>
A simple reusable data container.
|
Modifier and Type | Field and Description |
---|---|
static boolean |
TRACEABLE_RING_BUFFER_PROCESSOR
Whether the RingBuffer*Processor can be graphed by wrapping the individual Sequence with the target downstream
|
UNSPECIFIED
Modifier and Type | Method and Description |
---|---|
boolean |
alive()
Determine whether this
Processor can be used. |
boolean |
awaitAndShutdown()
Block until all submitted tasks have completed, then do a normal
EventLoopProcessor.shutdown() . |
boolean |
awaitAndShutdown(long timeout,
java.util.concurrent.TimeUnit timeUnit)
Block until all submitted tasks have completed, then do a normal
EventLoopProcessor#shutdown() . |
static <E> TopicProcessor<E> |
create()
Create a new TopicProcessor using
QueueSupplier.SMALL_BUFFER_SIZE backlog size,
blockingWait Strategy and auto-cancel. |
static <E> TopicProcessor<E> |
create(boolean autoCancel)
Create a new TopicProcessor using
QueueSupplier.SMALL_BUFFER_SIZE backlog size,
blockingWait Strategy and the passed auto-cancel setting. |
static <E> TopicProcessor<E> |
create(java.util.concurrent.ExecutorService service)
Create a new TopicProcessor using
QueueSupplier.SMALL_BUFFER_SIZE backlog size,
blockingWait Strategy and auto-cancel. |
static <E> TopicProcessor<E> |
create(java.util.concurrent.ExecutorService service,
boolean autoCancel)
Create a new TopicProcessor using
QueueSupplier.SMALL_BUFFER_SIZE backlog size,
blockingWait Strategy and the passed auto-cancel setting. |
static <E> TopicProcessor<E> |
create(java.util.concurrent.ExecutorService service,
int bufferSize)
Create a new TopicProcessor using passed backlog size, blockingWait Strategy
and will auto-cancel.
|
static <E> TopicProcessor<E> |
create(java.util.concurrent.ExecutorService service,
int bufferSize,
boolean autoCancel)
Create a new TopicProcessor using passed backlog size, blockingWait Strategy
and the auto-cancel argument.
|
static <E> TopicProcessor<E> |
create(java.util.concurrent.ExecutorService service,
int bufferSize,
WaitStrategy strategy)
Create a new TopicProcessor using passed backlog size, wait strategy and will
auto-cancel.
|
static <E> TopicProcessor<E> |
create(java.util.concurrent.ExecutorService service,
int bufferSize,
WaitStrategy strategy,
boolean autoCancel)
Create a new TopicProcessor using passed backlog size, wait strategy and
auto-cancel settings.
|
static <E> TopicProcessor<E> |
create(java.lang.String name)
Create a new
TopicProcessor using QueueSupplier.SMALL_BUFFER_SIZE backlog size, blockingWait
Strategy and auto-cancel. |
static <E> TopicProcessor<E> |
create(java.lang.String name,
int bufferSize)
Create a new TopicProcessor using
QueueSupplier.SMALL_BUFFER_SIZE backlog size,
blockingWait Strategy and the passed auto-cancel setting. |
static <E> TopicProcessor<E> |
create(java.lang.String name,
int bufferSize,
boolean autoCancel)
Create a new TopicProcessor using the blockingWait Strategy, passed backlog
size, and auto-cancel settings.
|
static <E> TopicProcessor<E> |
create(java.lang.String name,
int bufferSize,
WaitStrategy strategy)
Create a new TopicProcessor using passed backlog size, wait strategy and will
auto-cancel.
|
static <E> TopicProcessor<E> |
create(java.lang.String name,
int bufferSize,
WaitStrategy strategy,
boolean autoCancel)
Create a new TopicProcessor using passed backlog size, wait strategy and
auto-cancel settings.
|
static <E> TopicProcessor<E> |
create(java.lang.String name,
int bufferSize,
WaitStrategy strategy,
java.util.function.Supplier<E> signalSupplier)
Create a new TopicProcessor using passed backlog size, wait strategy, signal
supplier.
|
protected void |
doComplete() |
protected void |
doError(java.lang.Throwable t) |
long |
downstreamCount()
the number of downstream receivers
|
java.util.Iterator<?> |
downstreams()
the connected data receivers
|
Flux<E> |
drain()
Drain is a hot replication of the current buffer delivered if supported.
|
Flux<IN> |
forceShutdown()
Shutdown this
Processor , forcibly halting any work currently executing and discarding any tasks that have
not yet been executed. |
long |
getAvailableCapacity() |
long |
getCapacity()
Return defined element capacity
|
java.lang.Throwable |
getError()
Current error if any, default to null
|
long |
getPending()
Return current used space in buffer
|
int |
hashCode() |
boolean |
isCancelled() |
boolean |
isStarted()
Has this upstream started or "onSubscribed" ?
|
boolean |
isTerminated()
Has this upstream finished or "completed" / "failed" ?
|
void |
onComplete() |
void |
onError(java.lang.Throwable t) |
void |
onNext(IN o) |
void |
onSubscribe(org.reactivestreams.Subscription s) |
protected void |
requestTask(org.reactivestreams.Subscription s) |
void |
run() |
static <E> TopicProcessor<E> |
share(boolean autoCancel)
Create a new TopicProcessor using
QueueSupplier.SMALL_BUFFER_SIZE backlog size,
blockingWait Strategy and the passed auto-cancel setting. |
static <E> TopicProcessor<E> |
share(java.util.concurrent.ExecutorService service)
Create a new TopicProcessor using
QueueSupplier.SMALL_BUFFER_SIZE backlog size,
blockingWait Strategy and auto-cancel. |
static <E> TopicProcessor<E> |
share(java.util.concurrent.ExecutorService service,
boolean autoCancel)
Create a new TopicProcessor using
QueueSupplier.SMALL_BUFFER_SIZE backlog size,
blockingWait Strategy and the passed auto-cancel setting. |
static <E> TopicProcessor<E> |
share(java.util.concurrent.ExecutorService service,
int bufferSize)
Create a new TopicProcessor using passed backlog size, blockingWait Strategy
and will auto-cancel.
|
static <E> TopicProcessor<E> |
share(java.util.concurrent.ExecutorService service,
int bufferSize,
boolean autoCancel)
Create a new TopicProcessor using passed backlog size, blockingWait Strategy
and the auto-cancel argument.
|
static <E> TopicProcessor<E> |
share(java.util.concurrent.ExecutorService service,
int bufferSize,
WaitStrategy strategy)
Create a new TopicProcessor using passed backlog size, wait strategy and will
auto-cancel.
|
static <E> TopicProcessor<E> |
share(java.util.concurrent.ExecutorService service,
int bufferSize,
WaitStrategy strategy,
boolean autoCancel)
Create a new TopicProcessor using passed backlog size, wait strategy and
auto-cancel settings.
|
static <E> TopicProcessor<E> |
share(java.lang.String name,
int bufferSize)
Create a new TopicProcessor using
QueueSupplier.SMALL_BUFFER_SIZE backlog size,
blockingWait Strategy and the passed auto-cancel setting. |
static <E> TopicProcessor<E> |
share(java.lang.String name,
int bufferSize,
boolean autoCancel)
Create a new TopicProcessor using the blockingWait Strategy, passed backlog
size, and auto-cancel settings.
|
static <E> TopicProcessor<E> |
share(java.lang.String name,
int bufferSize,
java.util.function.Supplier<E> signalSupplier)
Create a new TopicProcessor using passed backlog size, wait strategy and
signal supplier.
|
static <E> TopicProcessor<E> |
share(java.lang.String name,
int bufferSize,
WaitStrategy strategy)
Create a new TopicProcessor using passed backlog size, wait strategy and will
auto-cancel.
|
static <E> TopicProcessor<E> |
share(java.lang.String name,
int bufferSize,
WaitStrategy strategy,
boolean autoCancel)
Create a new TopicProcessor using passed backlog size, wait strategy and
auto-cancel settings.
|
static <E> TopicProcessor<E> |
share(java.lang.String name,
int bufferSize,
WaitStrategy waitStrategy,
java.util.function.Supplier<E> signalSupplier)
Create a new TopicProcessor using passed backlog size, wait strategy and
signal supplier.
|
void |
shutdown()
Shutdown this active
Processor such that it can no longer be used. |
void |
subscribe(org.reactivestreams.Subscriber<? super E> subscriber) |
java.lang.String |
toString() |
org.reactivestreams.Subscription |
upstream()
Return the direct source of data, Supports reference.
|
connect, connectSink, connectSink, serialize, switchOnNext, wrap
all, any, as, awaitOnSubscribe, blockFirst, blockFirst, blockFirstMillis, blockLast, blockLast, blockLastMillis, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferMillis, bufferMillis, bufferMillis, bufferMillis, bufferMillis, bufferMillis, bufferMillis, cache, cache, cache, cache, cancelOn, cast, collect, collect, collectList, collectMap, collectMap, collectMap, collectMultimap, collectMultimap, collectMultimap, collectSortedList, collectSortedList, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, compose, concat, concat, concat, concat, concatDelayError, concatDelayError, concatDelayError, concatDelayError, concatMap, concatMap, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapIterable, concatMapIterable, concatWith, count, create, create, defaultIfEmpty, defer, delay, delayMillis, delayMillis, delaySubscription, delaySubscription, delaySubscriptionMillis, delaySubscriptionMillis, dematerialize, distinct, distinct, distinctUntilChanged, distinctUntilChanged, doAfterTerminate, doOnCancel, doOnComplete, doOnError, doOnError, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elapsed, elapsed, elementAt, elementAt, empty, error, error, filter, firstEmitting, firstEmitting, firstEmittingWith, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapIterable, flatMapIterable, from, fromArray, fromIterable, fromStream, generate, generate, generate, getPrefetch, groupBy, groupBy, groupJoin, handle, hasElement, hasElements, hide, ignoreElements, interval, interval, intervalMillis, intervalMillis, intervalMillis, intervalMillis, join, just, just, last, last, log, log, log, log, map, mapError, mapError, mapError, materialize, merge, merge, merge, merge, merge, merge, mergeWith, never, next, ofType, onAssembly, onAssembly, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorResumeWith, onErrorResumeWith, onErrorResumeWith, onErrorReturn, onErrorReturn, onErrorReturn, onTerminateDetach, parallel, parallel, parallel, publish, publish, publish, publish, publishNext, publishOn, publishOn, range, reduce, reduce, reduceWith, repeat, repeat, repeat, repeat, repeatWhen, replay, replay, replay, replay, replayMillis, replayMillis, retry, retry, retry, retry, retryWhen, sample, sample, sampleFirst, sampleFirst, sampleFirstMillis, sampleMillis, sampleTimeout, sampleTimeout, scan, scan, scanWith, share, single, single, singleOrEmpty, skip, skip, skipLast, skipMillis, skipMillis, skipUntil, skipUntilOther, skipWhile, sort, sort, startWith, startWith, startWith, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchOnError, switchOnError, switchOnError, switchOnNext, switchOnNext, take, take, takeLast, takeMillis, takeMillis, takeUntil, takeUntilOther, takeWhile, then, then, then, thenMany, thenMany, timeout, timeout, timeout, timeout, timeout, timeoutMillis, timeoutMillis, timeoutMillis, timeoutMillis, timestamp, timestamp, toIterable, toIterable, toIterable, toStream, toStream, transform, using, using, window, window, window, window, window, window, window, window, windowMillis, windowMillis, windowMillis, windowMillis, windowMillis, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipWith, zipWith, zipWith, zipWith, zipWithIterable, zipWithIterable
clone, equals, finalize, getClass, notify, notifyAll, wait, wait, wait
hasDownstreams
expectedFromUpstream, limit, requestedFromDownstream
public static final boolean TRACEABLE_RING_BUFFER_PROCESSOR
public static <E> TopicProcessor<E> create()
QueueSupplier.SMALL_BUFFER_SIZE
backlog size,
blockingWait Strategy and auto-cancel. A new Cached ThreadExecutorPool will be implicitely created.
E
- Type of processed signalspublic static <E> TopicProcessor<E> create(java.lang.String name)
TopicProcessor
using QueueSupplier.SMALL_BUFFER_SIZE
backlog size, blockingWait
Strategy and auto-cancel. A new Cached ThreadExecutorPool will be implicitely created.
E
- Type of processed signalsname
- processor thread logical namepublic static <E> TopicProcessor<E> create(boolean autoCancel)
QueueSupplier.SMALL_BUFFER_SIZE
backlog size,
blockingWait Strategy and the passed auto-cancel setting. A new Cached ThreadExecutorPool will be implicitely created.
E
- Type of processed signalsautoCancel
- Should this propagate cancellation when unregistered by all
subscribers ?public static <E> TopicProcessor<E> create(java.util.concurrent.ExecutorService service)
QueueSupplier.SMALL_BUFFER_SIZE
backlog size,
blockingWait Strategy and auto-cancel. The passed ExecutorService
will execute as many event-loop consuming the
ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurepublic static <E> TopicProcessor<E> create(java.util.concurrent.ExecutorService service, boolean autoCancel)
QueueSupplier.SMALL_BUFFER_SIZE
backlog size,
blockingWait Strategy and the passed auto-cancel setting. The passed ExecutorService
will execute as many event-loop consuming the
ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructureautoCancel
- Should this propagate cancellation when unregistered by all
subscribers ?public static <E> TopicProcessor<E> create(java.lang.String name, int bufferSize)
QueueSupplier.SMALL_BUFFER_SIZE
backlog size,
blockingWait Strategy and the passed auto-cancel setting. A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify the created threads.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created
threadsbufferSize
- A Backlog Size to mitigate slow subscriberspublic static <E> TopicProcessor<E> create(java.lang.String name, int bufferSize, boolean autoCancel)
The passed ExecutorService
will execute as many event-loop consuming the ringbuffer as subscribers.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created
threadsbufferSize
- A Backlog Size to mitigate slow subscribersautoCancel
- Should this propagate cancellation when unregistered by all
subscribers ?public static <E> TopicProcessor<E> create(java.util.concurrent.ExecutorService service, int bufferSize)
The passed ExecutorService
will execute as many event-loop consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurebufferSize
- A Backlog Size to mitigate slow subscriberspublic static <E> TopicProcessor<E> create(java.util.concurrent.ExecutorService service, int bufferSize, boolean autoCancel)
The passed ExecutorService
will execute as many event-loop consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurebufferSize
- A Backlog Size to mitigate slow subscribersautoCancel
- Should this propagate cancellation when unregistered by all
subscribers ?public static <E> TopicProcessor<E> create(java.lang.String name, int bufferSize, WaitStrategy strategy)
A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify the created threads.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created
threadsbufferSize
- A Backlog Size to mitigate slow subscribersstrategy
- A RingBuffer WaitStrategy to use instead of the default
blocking wait strategy.public static <E> TopicProcessor<E> create(java.lang.String name, int bufferSize, WaitStrategy strategy, java.util.function.Supplier<E> signalSupplier)
A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify the created threads.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created
threadsbufferSize
- A Backlog Size to mitigate slow subscribersstrategy
- A RingBuffer WaitStrategy to use instead of the default
blocking wait strategy.signalSupplier
- A supplier of dispatched signals to preallocate in the ring
bufferpublic static <E> TopicProcessor<E> create(java.lang.String name, int bufferSize, WaitStrategy strategy, boolean autoCancel)
A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify the created threads.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created
threadsbufferSize
- A Backlog Size to mitigate slow subscribersstrategy
- A RingBuffer WaitStrategy to use instead of the default
blocking wait strategy.autoCancel
- Should this propagate cancellation when unregistered by all
subscribers ?public static <E> TopicProcessor<E> create(java.util.concurrent.ExecutorService service, int bufferSize, WaitStrategy strategy)
The passed ExecutorService
will
execute as many event-loop consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurebufferSize
- A Backlog Size to mitigate slow subscribersstrategy
- A RingBuffer WaitStrategy to use instead of the default
blocking wait strategy.public static <E> TopicProcessor<E> create(java.util.concurrent.ExecutorService service, int bufferSize, WaitStrategy strategy, boolean autoCancel)
The passed ExecutorService
will execute as many event-loop consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurebufferSize
- A Backlog Size to mitigate slow subscribersstrategy
- A RingBuffer WaitStrategy to use instead of the default
blocking wait strategy.autoCancel
- Should this propagate cancellation when unregistered by all
subscribers ?public static <E> TopicProcessor<E> share(boolean autoCancel)
QueueSupplier.SMALL_BUFFER_SIZE
backlog size,
blockingWait Strategy and the passed auto-cancel setting. A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
A new Cached ThreadExecutorPool will be implicitely created.
E
- Type of processed signalsautoCancel
- Should this propagate cancellation when unregistered by all
subscribers ?public static <E> TopicProcessor<E> share(java.util.concurrent.ExecutorService service)
QueueSupplier.SMALL_BUFFER_SIZE
backlog size,
blockingWait Strategy and auto-cancel. A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
The passed ExecutorService
will execute as many
event-loop consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurepublic static <E> TopicProcessor<E> share(java.util.concurrent.ExecutorService service, boolean autoCancel)
QueueSupplier.SMALL_BUFFER_SIZE
backlog size,
blockingWait Strategy and the passed auto-cancel setting. A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
The passed ExecutorService
will
execute as many event-loop consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructureautoCancel
- Should this propagate cancellation when unregistered by all
subscribers ?public static <E> TopicProcessor<E> share(java.lang.String name, int bufferSize)
QueueSupplier.SMALL_BUFFER_SIZE
backlog size,
blockingWait Strategy and the passed auto-cancel setting. A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify the created threads.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created
threadsbufferSize
- A Backlog Size to mitigate slow subscriberspublic static <E> TopicProcessor<E> share(java.lang.String name, int bufferSize, boolean autoCancel)
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
The
passed ExecutorService
will execute as many event-loop
consuming the ringbuffer as subscribers.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created
threadsbufferSize
- A Backlog Size to mitigate slow subscribersautoCancel
- Should this propagate cancellation when unregistered by all
subscribers ?public static <E> TopicProcessor<E> share(java.util.concurrent.ExecutorService service, int bufferSize)
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
The passed ExecutorService
will execute as many event-loop consuming the
ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurebufferSize
- A Backlog Size to mitigate slow subscriberspublic static <E> TopicProcessor<E> share(java.util.concurrent.ExecutorService service, int bufferSize, boolean autoCancel)
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
The
passed ExecutorService
will execute as many event-loop
consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurebufferSize
- A Backlog Size to mitigate slow subscribersautoCancel
- Should this propagate cancellation when unregistered by all
subscribers ?public static <E> TopicProcessor<E> share(java.lang.String name, int bufferSize, WaitStrategy strategy)
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify the created threads.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created
threadsbufferSize
- A Backlog Size to mitigate slow subscribersstrategy
- A RingBuffer WaitStrategy to use instead of the default
blocking wait strategy.public static <E> TopicProcessor<E> share(java.lang.String name, int bufferSize, java.util.function.Supplier<E> signalSupplier)
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify the created threads.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created
threadsbufferSize
- A Backlog Size to mitigate slow subscriberssignalSupplier
- A supplier of dispatched signals to preallocate in the ring
bufferpublic static <E> TopicProcessor<E> share(java.lang.String name, int bufferSize, WaitStrategy waitStrategy, java.util.function.Supplier<E> signalSupplier)
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify the created threads.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created
threadsbufferSize
- A Backlog Size to mitigate slow subscriberswaitStrategy
- A RingBuffer WaitStrategy to use instead of the default
blocking wait strategy.
buffersignalSupplier
- the supplier of signalspublic static <E> TopicProcessor<E> share(java.lang.String name, int bufferSize, WaitStrategy strategy, boolean autoCancel)
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify the created threads.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created
threadsbufferSize
- A Backlog Size to mitigate slow subscribersstrategy
- A RingBuffer WaitStrategy to use instead of the default
blocking wait strategy.autoCancel
- Should this propagate cancellation when unregistered by all
subscribers ?public static <E> TopicProcessor<E> share(java.util.concurrent.ExecutorService service, int bufferSize, WaitStrategy strategy)
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
The passed ExecutorService
will execute as many event-loop consuming the
ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurebufferSize
- A Backlog Size to mitigate slow subscribersstrategy
- A RingBuffer WaitStrategy to use instead of the default
blocking wait strategy.public static <E> TopicProcessor<E> share(java.util.concurrent.ExecutorService service, int bufferSize, WaitStrategy strategy, boolean autoCancel)
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
The passed ExecutorService
will execute as many event-loop consuming the
ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurebufferSize
- A Backlog Size to mitigate slow subscribersstrategy
- A RingBuffer WaitStrategy to use instead of the default
blocking wait strategy.autoCancel
- Should this propagate cancellation when unregistered by all
subscribers ?public void subscribe(org.reactivestreams.Subscriber<? super E> subscriber)
subscribe
in interface org.reactivestreams.Publisher<E>
subscribe
in class FluxProcessor<E,E>
public Flux<E> drain()
Flux
sequence possibly unbounded of incoming buffered values or empty if not supported.protected void doError(java.lang.Throwable t)
protected void doComplete()
public long getPending()
Trackable
protected void requestTask(org.reactivestreams.Subscription s)
public long downstreamCount()
MultiProducer
public void run()
public final boolean alive()
Processor
can be used.Resource
is alive and can be used, false otherwise.public final boolean awaitAndShutdown()
EventLoopProcessor.shutdown()
.public final boolean awaitAndShutdown(long timeout, java.util.concurrent.TimeUnit timeUnit)
EventLoopProcessor#shutdown()
.timeout
- the timeout valuetimeUnit
- the unit for timeoutpublic final Flux<IN> forceShutdown()
Processor
, forcibly halting any work currently executing and discarding any tasks that have
not yet been executed.public long getAvailableCapacity()
public java.util.Iterator<?> downstreams()
MultiProducer
downstreams
in interface MultiProducer
public final java.lang.Throwable getError()
Trackable
public final int hashCode()
hashCode
in class java.lang.Object
public final boolean isCancelled()
isCancelled
in interface Trackable
public final boolean isStarted()
Trackable
public final boolean isTerminated()
Trackable
isTerminated
in interface Trackable
public final void onComplete()
onComplete
in interface org.reactivestreams.Subscriber<IN>
public final void onError(java.lang.Throwable t)
onError
in interface org.reactivestreams.Subscriber<IN>
public final void onNext(IN o)
onNext
in interface org.reactivestreams.Subscriber<IN>
public final void onSubscribe(org.reactivestreams.Subscription s)
onSubscribe
in interface org.reactivestreams.Subscriber<IN>
public final void shutdown()
Processor
such that it can no longer be used. If the resource carries any work, it
will wait (but NOT blocking the caller) for all the remaining tasks to perform before closing the resource.public final org.reactivestreams.Subscription upstream()
Receiver
public final long getCapacity()
Trackable
getCapacity
in interface Trackable
getCapacity
in class FluxProcessor<IN,IN>