- the value typepublic final class ReplayProcessor<T> extends FluxProcessor<T,T> implements Fuseable
Fuseable.ConditionalSubscriber<T>, Fuseable.QueueSubscription<T>, Fuseable.ScalarCallable<T>, Fuseable.SynchronousSubscription<T>
Disposable.Composite, Disposable.Swap
Modifier and Type | Method and Description |
static <T> ReplayProcessor<T> |
Create a
ReplayProcessor that caches the last element it has pushed,
replaying it to late subscribers. |
static <T> ReplayProcessor<T> |
cacheLastOrDefault(T value)
Create a
ReplayProcessor that caches the last element it has pushed,
replaying it to late subscribers. |
static <E> ReplayProcessor<E> |
Create a new
ReplayProcessor that replays an unbounded number of elements,
using a default internal Queue . |
static <E> ReplayProcessor<E> |
create(int historySize)
Create a new
ReplayProcessor that replays up to historySize
elements. |
static <E> ReplayProcessor<E> |
create(int historySize,
boolean unbounded)
Create a new
ReplayProcessor that either replay all the elements or a
limited amount of elements depending on the unbounded parameter. |
static <T> ReplayProcessor<T> |
createSizeAndTimeout(int size,
Duration maxAge)
Creates a time- and size-bounded replay processor.
static <T> ReplayProcessor<T> |
createSizeAndTimeout(int size,
Duration maxAge,
Scheduler scheduler)
Creates a time- and size-bounded replay processor.
static <T> ReplayProcessor<T> |
createTimeout(Duration maxAge)
Creates a time-bounded replay processor.
static <T> ReplayProcessor<T> |
createTimeout(Duration maxAge,
Scheduler scheduler)
Creates a time-bounded replay processor.
Context |
Request a
Context from dependent components which can include downstream
operators during subscribing or a terminal Subscriber . |
long |
Return the number of active
Subscriber or -1 if untracked. |
Throwable |
Current error if any, default to null
int |
The prefetch configuration of the
Flux |
Stream<? extends Scannable> |
Return a
Stream of referenced inners (flatmap, multicast etc) |
boolean |
Has this upstream finished or "completed" / "failed" ?
void |
onComplete() |
void |
onError(Throwable t) |
void |
onNext(T t) |
void |
onSubscribe(Subscription s)
Implementors should initialize any state used by
Subscriber.onNext(Object) before
calling Subscription.request(long) . |
Object |
scanUnsafe(Scannable.Attr key)
This method is used internally by components to define their key-value mappings
in a single place.
void |
subscribe(CoreSubscriber<? super T> actual)
An internal
Publisher.subscribe(Subscriber) that will bypass
Hooks.onLastOperator(Function) pointcut. |
dispose, getBufferSize, hasCompleted, hasDownstreams, hasError, isSerialized, serialize, serializeAlways, sink, sink, switchOnNext, wrap
all, any, as, blockFirst, blockFirst, blockLast, blockLast, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferUntil, bufferUntil, bufferWhen, bufferWhen, bufferWhile, cache, cache, cache, cache, cache, cache, cancelOn, cast, checkpoint, checkpoint, checkpoint, collect, collect, collectList, collectMap, collectMap, collectMap, collectMultimap, collectMultimap, collectMultimap, collectSortedList, collectSortedList, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, compose, concat, concat, concat, concat, concatDelayError, concatDelayError, concatDelayError, concatDelayError, concatMap, concatMap, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapIterable, concatMapIterable, concatWith, concatWithValues, count, create, create, defaultIfEmpty, defer, delayElements, delayElements, delaySequence, delaySequence, delaySubscription, delaySubscription, delaySubscription, delayUntil, dematerialize, distinct, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterTerminate, doFinally, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnError, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elapsed, elapsed, elementAt, elementAt, empty, error, error, error, expand, expand, expandDeep, expandDeep, filter, filterWhen, filterWhen, first, first, flatMap, flatMap, flatMap, flatMap, flatMapDelayError, flatMapIterable, flatMapIterable, flatMapSequential, flatMapSequential, flatMapSequential, flatMapSequentialDelayError, from, fromArray, fromIterable, fromStream, fromStream, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupJoin, handle, hasElement, hasElements, hide, ignoreElements, index, index, interval, interval, interval, interval, join, just, just, last, last, limitRate, limitRate, limitRequest, log, log, log, log, log, log, map, materialize, merge, merge, merge, merge, merge, merge, mergeDelayError, mergeOrdered, mergeOrdered, mergeOrdered, mergeOrderedWith, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequentialDelayError, mergeSequentialDelayError, mergeSequentialDelayError, mergeWith, metrics, name, never, next, ofType, onAssembly, onAssembly, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorContinue, onErrorContinue, onErrorContinue, onErrorMap, onErrorMap, onErrorMap, onErrorResume, onErrorResume, onErrorResume, onErrorReturn, onErrorReturn, onErrorReturn, onErrorStop, onLastAssembly, onTerminateDetach, or, parallel, parallel, parallel, publish, publish, publish, publish, publishNext, publishOn, publishOn, publishOn, push, push, range, reduce, reduce, reduceWith, repeat, repeat, repeat, repeat, repeatWhen, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retryBackoff, retryBackoff, retryBackoff, retryWhen, sample, sample, sampleFirst, sampleFirst, sampleTimeout, sampleTimeout, scan, scan, scanWith, share, single, single, singleOrEmpty, skip, skip, skip, skipLast, skipUntil, skipUntilOther, skipWhile, sort, sort, startWith, startWith, startWith, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscriberContext, subscriberContext, subscribeWith, switchIfEmpty, switchMap, switchMap, switchOnNext, switchOnNext, tag, take, take, take, takeLast, takeUntil, takeUntilOther, takeWhile, then, then, thenEmpty, thenMany, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timestamp, timestamp, toIterable, toIterable, toIterable, toStream, toStream, toString, transform, using, using, usingWhen, usingWhen, usingWhen, window, window, window, window, window, window, window, windowTimeout, windowTimeout, windowUntil, windowUntil, windowUntil, windowWhen, windowWhile, windowWhile, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipWith, zipWith, zipWith, zipWith, zipWithIterable, zipWithIterable
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
actuals, from, isScanAvailable, name, parents, scan, scanOrDefault, stepName, steps, tags
public static <T> ReplayProcessor<T> cacheLast()
that caches the last element it has pushed,
replaying it to late subscribers. This is a buffer-based ReplayProcessor with
a history size of 1.
- the type of the pushed elementsReplayProcessor
that replays its last pushed element to each new
public static <T> ReplayProcessor<T> cacheLastOrDefault(@Nullable T value)
that caches the last element it has pushed,
replaying it to late subscribers. If a Subscriber
comes in before
any value has been pushed, then the defaultValue
is emitted instead.
This is a buffer-based ReplayProcessor with a history size of 1.
- the type of the pushed elementsvalue
- a default value to start the sequence with in case nothing has been
cached yet.ReplayProcessor
that replays its last pushed element to each new
, or a default one if nothing was pushed yetpublic static <E> ReplayProcessor<E> create()
that replays an unbounded number of elements,
using a default internal Queue
- the type of the pushed elementsReplayProcessor
that replays the whole history to each new
.public static <E> ReplayProcessor<E> create(int historySize)
that replays up to historySize
- the type of the pushed elementshistorySize
- the backlog size, ie. maximum items retained for replay.ReplayProcessor
that replays a limited history to each new
.public static <E> ReplayProcessor<E> create(int historySize, boolean unbounded)
that either replay all the elements or a
limited amount of elements depending on the unbounded
- the type of the pushed elementshistorySize
- maximum items retained if bounded, or initial link size if unboundedunbounded
- true if "unlimited" data store must be suppliedReplayProcessor
that replays the whole history to each new
if configured as unbounded, a limited history otherwise.public static <T> ReplayProcessor<T> createTimeout(Duration maxAge)
In this setting, the ReplayProcessor
internally tags each observed item
with a timestamp value supplied by the Schedulers.parallel()
and keeps only
those whose age is less than the supplied time value converted to milliseconds. For
example, an item arrives at T=0 and the max age is set to 5; at T>=5 this first
item is then evicted by any subsequent item or termination signal, leaving the
buffer empty.
Once the processor is terminated, subscribers subscribing to it will receive items that remained in the buffer after the terminal signal, regardless of their age.
If an subscriber subscribes while the ReplayProcessor
is active, it will
observe only those items from within the buffer that have an age less than the
specified time, and each item observed thereafter, even if the buffer evicts items
due to the time constraint in the mean time. In other words, once an subscriber
subscribes, it observes items without gaps in the sequence except for any outdated
items at the beginning of the sequence.
Note that terminal signals (onError
and onComplete
) trigger
eviction as well. For example, with a max age of 5, the first item is observed at
T=0, then an onComplete
signal arrives at T=10. If an subscriber subscribes
at T=11, it will find an empty ReplayProcessor
with just an onCompleted
- the type of items observed and emitted by the ProcessormaxAge
- the maximum age of the contained itemsReplayProcessor
that replays elements based on their age.public static <T> ReplayProcessor<T> createTimeout(Duration maxAge, Scheduler scheduler)
In this setting, the ReplayProcessor
internally tags each observed item
with a timestamp value supplied by the Scheduler
and keeps only
those whose age is less than the supplied time value converted to milliseconds. For
example, an item arrives at T=0 and the max age is set to 5; at T>=5 this first
item is then evicted by any subsequent item or termination signal, leaving the
buffer empty.
Once the processor is terminated, subscribers subscribing to it will receive items that remained in the buffer after the terminal signal, regardless of their age.
If an subscriber subscribes while the ReplayProcessor
is active, it will
observe only those items from within the buffer that have an age less than the
specified time, and each item observed thereafter, even if the buffer evicts items
due to the time constraint in the mean time. In other words, once an subscriber
subscribes, it observes items without gaps in the sequence except for any outdated
items at the beginning of the sequence.
Note that terminal signals (onError
and onComplete
) trigger
eviction as well. For example, with a max age of 5, the first item is observed at
T=0, then an onComplete
signal arrives at T=10. If an subscriber subscribes
at T=11, it will find an empty ReplayProcessor
with just an onCompleted
- the type of items observed and emitted by the ProcessormaxAge
- the maximum age of the contained itemsReplayProcessor
that replays elements based on their age.public static <T> ReplayProcessor<T> createSizeAndTimeout(int size, Duration maxAge)
In this setting, the ReplayProcessor
internally tags each received item
with a timestamp value supplied by the Schedulers.parallel()
and holds at
items in its internal buffer. It evicts items from the start of the
buffer if their age becomes less-than or equal to the supplied age in milliseconds
or the buffer reaches its size
When subscribers subscribe to a terminated ReplayProcessor
, they observe
the items that remained in the buffer after the terminal signal, regardless of
their age, but at most size
If an subscriber subscribes while the ReplayProcessor
is active, it will
observe only those items from within the buffer that have age less than the
specified time and each subsequent item, even if the buffer evicts items due to the
time constraint in the mean time. In other words, once an subscriber subscribes, it
observes items without gaps in the sequence except for the outdated items at the
beginning of the sequence.
Note that terminal signals (onError
and onComplete
) trigger
eviction as well. For example, with a max age of 5, the first item is observed at
T=0, then an onComplete
signal arrives at T=10. If an Subscriber subscribes
at T=11, it will find an empty ReplayProcessor
with just an onCompleted
- the type of items observed and emitted by the ProcessormaxAge
- the maximum age of the contained itemssize
- the maximum number of buffered itemsReplayProcessor
that replay up to size
elements, but
will evict them from its history based on their age.public static <T> ReplayProcessor<T> createSizeAndTimeout(int size, Duration maxAge, Scheduler scheduler)
In this setting, the ReplayProcessor
internally tags each received item
with a timestamp value supplied by the Scheduler
and holds at most
items in its internal buffer. It evicts items from the start of the
buffer if their age becomes less-than or equal to the supplied age in milliseconds
or the buffer reaches its size
When subscribers subscribe to a terminated ReplayProcessor
, they observe
the items that remained in the buffer after the terminal signal, regardless of
their age, but at most size
If an subscriber subscribes while the ReplayProcessor
is active, it will
observe only those items from within the buffer that have age less than the
specified time and each subsequent item, even if the buffer evicts items due to the
time constraint in the mean time. In other words, once an subscriber subscribes, it
observes items without gaps in the sequence except for the outdated items at the
beginning of the sequence.
Note that terminal signals (onError
and onComplete
) trigger
eviction as well. For example, with a max age of 5, the first item is observed at
T=0, then an onComplete
signal arrives at T=10. If an Subscriber subscribes
at T=11, it will find an empty ReplayProcessor
with just an onCompleted
- the type of items observed and emitted by the ProcessormaxAge
- the maximum age of the contained items in millisecondssize
- the maximum number of buffered itemsscheduler
- the Scheduler
that provides the current timeReplayProcessor
that replay up to size
elements, but
will evict them from its history based on their age.public void subscribe(CoreSubscriber<? super T> actual)
that will bypass
In addition to behave as expected by Publisher.subscribe(Subscriber)
in a controlled manner, it supports direct subscribe-time Context
in class Flux<T>
- the Subscriber
interested into the published sequenceFlux.subscribe(Subscriber)
@Nullable public Throwable getError()
in class FluxProcessor<T,T>
@Nullable public Object scanUnsafe(Scannable.Attr key)
implementors should take care to return values of the correct type, and return
null if no specific value is available.
For public consumption of attributes, prefer using Scannable.scan(Attr)
, which will
return a typed value and fall back to the key's default if the component didn't
define any mapping.
in interface Scannable
in class FluxProcessor<T,T>
- a Scannable.Attr
to resolve for the component.public Stream<? extends Scannable> inners()
of referenced inners (flatmap, multicast etc)public long downstreamCount()
or -1 if untracked.downstreamCount
in class FluxProcessor<T,T>
or -1 if untrackedpublic boolean isTerminated()
in class FluxProcessor<T,T>
public void onSubscribe(Subscription s)
calling Subscription.request(long)
. Should further onNext
state modification occur, thread-safety will be required.
Note that an invalid request <= 0
will not produce an onError and
will simply be ignored or reported through a debug-enabled
in interface Subscriber<T>
in interface CoreSubscriber<T>
public Context currentContext()
from dependent components which can include downstream
operators during subscribing or a terminal Subscriber
in interface CoreSubscriber<T>
public int getPrefetch()
in class Flux<T>
, -1 if unspecifiedpublic void onNext(T t)
in interface Subscriber<T>
public void onError(Throwable t)
in interface Subscriber<T>
public void onComplete()
in interface Subscriber<T>