Class ReplayProcessor<T>

java.lang.Object
reactor.core.publisher.Flux<OUT>
reactor.core.publisher.FluxProcessor<T,T>
reactor.core.publisher.ReplayProcessor<T>
Type Parameters:
T - the value type
All Implemented Interfaces:
Processor<T,T>, Publisher<T>, Subscriber<T>, CorePublisher<T>, CoreSubscriber<T>, Disposable, Fuseable, Sinks.Many<T>, Scannable

@Deprecated public final class ReplayProcessor<T> extends FluxProcessor<T,T> implements Fuseable
Deprecated.
To be removed in 3.5, prefer clear cut usage of Sinks through variations under Sinks.many().replay().
Replays all or the last N items to Subscribers.

  • Method Details

    • cacheLast

      @Deprecated public static <T> ReplayProcessor<T> cacheLast()
      Deprecated.
      use Sinks.many().replay().latest() (or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.
      Create a ReplayProcessor that caches the last element it has pushed, replaying it to late subscribers. This is a buffer-based ReplayProcessor with a history size of 1.

      Type Parameters:
      T - the type of the pushed elements
      Returns:
      a new ReplayProcessor that replays its last pushed element to each new Subscriber
    • cacheLastOrDefault

      @Deprecated public static <T> ReplayProcessor<T> cacheLastOrDefault(@Nullable T value)
      Deprecated.
      use Sinks.many().replay().latestOrDefault(value) (or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.
      Create a ReplayProcessor that caches the last element it has pushed, replaying it to late subscribers. If a Subscriber comes in before any value has been pushed, then the defaultValue is emitted instead. This is a buffer-based ReplayProcessor with a history size of 1.

      Type Parameters:
      T - the type of the pushed elements
      Parameters:
      value - a default value to start the sequence with in case nothing has been cached yet.
      Returns:
      a new ReplayProcessor that replays its last pushed element to each new Subscriber, or a default one if nothing was pushed yet
    • create

      @Deprecated public static <E> ReplayProcessor<E> create()
      Deprecated.
      use Sinks.many().replay().all() (or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.
      Create a new ReplayProcessor that replays an unbounded number of elements, using a default internal Queue.
      Type Parameters:
      E - the type of the pushed elements
      Returns:
      a new ReplayProcessor that replays the whole history to each new Subscriber.
    • create

      @Deprecated public static <E> ReplayProcessor<E> create(int historySize)
      Deprecated.
      use Sinks.many().replay().limit(historySize) (or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.
      Create a new ReplayProcessor that replays up to historySize elements.
      Type Parameters:
      E - the type of the pushed elements
      Parameters:
      historySize - the backlog size, ie. maximum items retained for replay.
      Returns:
      a new ReplayProcessor that replays a limited history to each new Subscriber.
    • create

      @Deprecated public static <E> ReplayProcessor<E> create(int historySize, boolean unbounded)
      Deprecated.
      use Sinks.many().replay().limit(historySize) for bounded cases (unbounded == false) or Sinks.many().replay().all(bufferSize) otherwise (or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.
      Create a new ReplayProcessor that either replay all the elements or a limited amount of elements depending on the unbounded parameter.
      Type Parameters:
      E - the type of the pushed elements
      Parameters:
      historySize - maximum items retained if bounded, or initial link size if unbounded
      unbounded - true if "unlimited" data store must be supplied
      Returns:
      a new ReplayProcessor that replays the whole history to each new Subscriber if configured as unbounded, a limited history otherwise.
    • createTimeout

      @Deprecated public static <T> ReplayProcessor<T> createTimeout(Duration maxAge)
      Deprecated.
      use Sinks.many().replay().limit(maxAge) (or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.
      Creates a time-bounded replay processor.

      In this setting, the ReplayProcessor internally tags each observed item with a timestamp value supplied by the Schedulers.parallel() and keeps only those whose age is less than the supplied time value converted to milliseconds. For example, an item arrives at T=0 and the max age is set to 5; at T>=5 this first item is then evicted by any subsequent item or termination signal, leaving the buffer empty.

      Once the processor is terminated, subscribers subscribing to it will receive items that remained in the buffer after the terminal signal, regardless of their age.

      If an subscriber subscribes while the ReplayProcessor is active, it will observe only those items from within the buffer that have an age less than the specified time, and each item observed thereafter, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an subscriber subscribes, it observes items without gaps in the sequence except for any outdated items at the beginning of the sequence.

      Type Parameters:
      T - the type of items observed and emitted by the Processor
      Parameters:
      maxAge - the maximum age of the contained items
      Returns:
      a new ReplayProcessor that replays elements based on their age.
    • createTimeout

      @Deprecated public static <T> ReplayProcessor<T> createTimeout(Duration maxAge, Scheduler scheduler)
      Deprecated.
      use Sinks.many().replay().limit(maxAge, scheduler) (or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.
      Creates a time-bounded replay processor.

      In this setting, the ReplayProcessor internally tags each observed item with a timestamp value supplied by the Scheduler and keeps only those whose age is less than the supplied time value converted to milliseconds. For example, an item arrives at T=0 and the max age is set to 5; at T>=5 this first item is then evicted by any subsequent item or termination signal, leaving the buffer empty.

      Once the processor is terminated, subscribers subscribing to it will receive items that remained in the buffer after the terminal signal, regardless of their age.

      If an subscriber subscribes while the ReplayProcessor is active, it will observe only those items from within the buffer that have an age less than the specified time, and each item observed thereafter, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an subscriber subscribes, it observes items without gaps in the sequence except for any outdated items at the beginning of the sequence.

      Type Parameters:
      T - the type of items observed and emitted by the Processor
      Parameters:
      maxAge - the maximum age of the contained items
      Returns:
      a new ReplayProcessor that replays elements based on their age.
    • createSizeAndTimeout

      @Deprecated public static <T> ReplayProcessor<T> createSizeAndTimeout(int size, Duration maxAge)
      Deprecated.
      use Sinks.many().replay().limit(size, maxAge) (or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.
      Creates a time- and size-bounded replay processor.

      In this setting, the ReplayProcessor internally tags each received item with a timestamp value supplied by the Schedulers.parallel() and holds at most size items in its internal buffer. It evicts items from the start of the buffer if their age becomes less-than or equal to the supplied age in milliseconds or the buffer reaches its size limit.

      When subscribers subscribe to a terminated ReplayProcessor, they observe the items that remained in the buffer after the terminal signal, regardless of their age, but at most size items.

      If an subscriber subscribes while the ReplayProcessor is active, it will observe only those items from within the buffer that have age less than the specified time and each subsequent item, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an subscriber subscribes, it observes items without gaps in the sequence except for the outdated items at the beginning of the sequence.

      Type Parameters:
      T - the type of items observed and emitted by the Processor
      Parameters:
      maxAge - the maximum age of the contained items
      size - the maximum number of buffered items
      Returns:
      a new ReplayProcessor that replay up to size elements, but will evict them from its history based on their age.
    • createSizeAndTimeout

      @Deprecated public static <T> ReplayProcessor<T> createSizeAndTimeout(int size, Duration maxAge, Scheduler scheduler)
      Deprecated.
      use Sinks.many().replay().limit(size, maxAge, scheduler) (or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.
      Creates a time- and size-bounded replay processor.

      In this setting, the ReplayProcessor internally tags each received item with a timestamp value supplied by the Scheduler and holds at most size items in its internal buffer. It evicts items from the start of the buffer if their age becomes less-than or equal to the supplied age in milliseconds or the buffer reaches its size limit.

      When subscribers subscribe to a terminated ReplayProcessor, they observe the items that remained in the buffer after the terminal signal, regardless of their age, but at most size items.

      If an subscriber subscribes while the ReplayProcessor is active, it will observe only those items from within the buffer that have age less than the specified time and each subsequent item, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an subscriber subscribes, it observes items without gaps in the sequence except for the outdated items at the beginning of the sequence.

      Type Parameters:
      T - the type of items observed and emitted by the Processor
      Parameters:
      maxAge - the maximum age of the contained items in milliseconds
      size - the maximum number of buffered items
      scheduler - the Scheduler that provides the current time
      Returns:
      a new ReplayProcessor that replay up to size elements, but will evict them from its history based on their age.
    • subscribe

      public void subscribe(CoreSubscriber<? super T> actual)
      Deprecated.
      Description copied from class: Flux
      An internal Publisher.subscribe(Subscriber) that will bypass Hooks.onLastOperator(Function) pointcut.

      In addition to behave as expected by Publisher.subscribe(Subscriber) in a controlled manner, it supports direct subscribe-time Context passing.

      Specified by:
      subscribe in interface CorePublisher<T>
      Specified by:
      subscribe in class Flux<T>
      Parameters:
      actual - the Subscriber interested into the published sequence
      See Also:
    • getError

      public @Nullable Throwable getError()
      Deprecated.
      Description copied from class: FluxProcessor
      Current error if any, default to null
      Overrides:
      getError in class FluxProcessor<T,T>
      Returns:
      Current error if any, default to null
    • scanUnsafe

      public @Nullable Object scanUnsafe(Scannable.Attr key)
      Deprecated.
      Description copied from interface: Scannable
      This method is used internally by components to define their key-value mappings in a single place. Although it is ignoring the generic type of the Scannable.Attr key, implementors should take care to return values of the correct type, and return null if no specific value is available.

      For public consumption of attributes, prefer using Scannable.scan(Attr), which will return a typed value and fall back to the key's default if the component didn't define any mapping.

      Specified by:
      scanUnsafe in interface Scannable
      Overrides:
      scanUnsafe in class FluxProcessor<T,T>
      Parameters:
      key - a Scannable.Attr to resolve for the component.
      Returns:
      the value associated to the key for that specific component, or null if none.
    • inners

      public Stream<? extends Scannable> inners()
      Deprecated.
      Description copied from interface: Scannable
      Return a Stream of referenced inners (flatmap, multicast etc)
      Specified by:
      inners in interface Scannable
      Overrides:
      inners in class FluxProcessor<T,T>
      Returns:
      a Stream of referenced inners (flatmap, multicast etc)
    • downstreamCount

      public long downstreamCount()
      Deprecated.
      Description copied from class: FluxProcessor
      Return the number of active Subscriber or -1 if untracked.
      Overrides:
      downstreamCount in class FluxProcessor<T,T>
      Returns:
      the number of active Subscriber or -1 if untracked
    • isTerminated

      public boolean isTerminated()
      Deprecated.
      Description copied from class: FluxProcessor
      Has this upstream finished or "completed" / "failed" ?
      Overrides:
      isTerminated in class FluxProcessor<T,T>
      Returns:
      has this upstream finished or "completed" / "failed" ?
    • onSubscribe

      public void onSubscribe(Subscription s)
      Deprecated.
      Description copied from interface: CoreSubscriber
      Implementors should initialize any state used by Subscriber.onNext(Object) before calling Subscription.request(long). Should further onNext related state modification occur, thread-safety will be required.

      Note that an invalid request <= 0 will not produce an onError and will simply be ignored or reported through a debug-enabled Logger.

      Specified by:
      onSubscribe in interface CoreSubscriber<T>
      Specified by:
      onSubscribe in interface Subscriber<T>
    • currentContext

      public Context currentContext()
      Deprecated.
      Description copied from interface: CoreSubscriber
      Request a Context from dependent components which can include downstream operators during subscribing or a terminal Subscriber.
      Specified by:
      currentContext in interface CoreSubscriber<T>
      Overrides:
      currentContext in class FluxProcessor<T,T>
      Returns:
      a resolved context or Context.empty()
    • getPrefetch

      public int getPrefetch()
      Deprecated.
      Description copied from class: Flux
      The prefetch configuration of the Flux
      Overrides:
      getPrefetch in class Flux<T>
      Returns:
      the prefetch configuration of the Flux, -1 if unspecified
    • onComplete

      public void onComplete()
      Deprecated.
      Specified by:
      onComplete in interface Subscriber<T>
    • tryEmitComplete

      public Sinks.EmitResult tryEmitComplete()
      Deprecated.
      Description copied from interface: Sinks.Many
      Try to terminate the sequence successfully, generating an onComplete signal. The result of the attempt is represented as an Sinks.EmitResult, which possibly indicates error cases.

      See the list of failure Sinks.EmitResult in Sinks.Many.emitComplete(EmitFailureHandler) javadoc for an example of how each of these can be dealt with, to decide if the emit API would be a good enough fit instead.

      Specified by:
      tryEmitComplete in interface Sinks.Many<T>
      Returns:
      an Sinks.EmitResult, which should be checked to distinguish different possible failures
      See Also:
    • onError

      public void onError(Throwable throwable)
      Deprecated.
      Specified by:
      onError in interface Subscriber<T>
    • tryEmitError

      public Sinks.EmitResult tryEmitError(Throwable t)
      Deprecated.
      Description copied from interface: Sinks.Many
      Try to fail the sequence, generating an onError signal. The result of the attempt is represented as an Sinks.EmitResult, which possibly indicates error cases.

      See the list of failure Sinks.EmitResult in Sinks.Many.emitError(Throwable, EmitFailureHandler) javadoc for an example of how each of these can be dealt with, to decide if the emit API would be a good enough fit instead.

      Specified by:
      tryEmitError in interface Sinks.Many<T>
      Parameters:
      t - the exception to signal, not null
      Returns:
      an Sinks.EmitResult, which should be checked to distinguish different possible failures
      See Also:
    • onNext

      public void onNext(T t)
      Deprecated.
      Specified by:
      onNext in interface Subscriber<T>
    • tryEmitNext

      public Sinks.EmitResult tryEmitNext(T t)
      Deprecated.
      Description copied from interface: Sinks.Many
      Try emitting a non-null element, generating an onNext signal. The result of the attempt is represented as an Sinks.EmitResult, which possibly indicates error cases.

      See the list of failure Sinks.EmitResult in Sinks.Many.emitNext(Object, EmitFailureHandler) javadoc for an example of how each of these can be dealt with, to decide if the emit API would be a good enough fit instead.

      Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot be propagated to any asynchronous handler, a bubbling exception, ...).

      Specified by:
      tryEmitNext in interface Sinks.Many<T>
      Parameters:
      t - the value to emit, not null
      Returns:
      an Sinks.EmitResult, which should be checked to distinguish different possible failures
      See Also:
    • currentSubscriberCount

      public int currentSubscriberCount()
      Deprecated.
      Description copied from interface: Sinks.Many
      Get how many Subscribers are currently subscribed to the sink.

      This is a best effort peek at the sink state, and a subsequent attempt at emitting to the sink might still return Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER where relevant. (generally in Sinks.Many.tryEmitNext(Object)). Request (and lack thereof) isn't taken into account, all registered subscribers are counted.

      Specified by:
      currentSubscriberCount in interface Sinks.Many<T>
      Returns:
      the number of subscribers at the time of invocation
    • asFlux

      public Flux<T> asFlux()
      Deprecated.
      Description copied from interface: Sinks.Many
      Return a Flux view of this sink. Every call returns the same instance.
      Specified by:
      asFlux in interface Sinks.Many<T>
      Returns:
      the Flux view associated to this Sinks.Many
    • isIdentityProcessor

      protected boolean isIdentityProcessor()
      Deprecated.
      Description copied from class: FluxProcessor
      Return true if FluxProcessor<T, T>
      Overrides:
      isIdentityProcessor in class FluxProcessor<T,T>
      Returns:
      true if FluxProcessor<T, T>
    • emitNext

      default void emitNext(T value, Sinks.EmitFailureHandler failureHandler)
      Description copied from interface: Sinks.Many
      A simplified attempt at emitting a non-null element via the Sinks.Many.tryEmitNext(Object) API, generating an onNext signal. If the result of the attempt is not a success, implementations SHOULD retry the Sinks.Many.tryEmitNext(Object) call IF the provided Sinks.EmitFailureHandler returns true. Otherwise, failures are dealt with in a predefined way that might depend on the actual sink implementation (see below for the vanilla reactor-core behavior).

      Generally, Sinks.Many.tryEmitNext(Object) is preferable since it allows a custom handling of error cases, although this implies checking the returned Sinks.EmitResult and correctly acting on it. This API is intended as a good default for convenience.

      When the Sinks.EmitResult is not a success, vanilla reactor-core operators have the following behavior:

      Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot be propagated to any asynchronous handler, a bubbling exception, a Sinks.EmitResult.FAIL_NON_SERIALIZED as described above, ...).

      Specified by:
      emitNext in interface Sinks.Many<T>
      Parameters:
      value - the value to emit, not null
      failureHandler - the failure handler that allows retrying failed Sinks.EmitResult.
      See Also:
    • emitComplete

      default void emitComplete(Sinks.EmitFailureHandler failureHandler)
      Description copied from interface: Sinks.Many
      A simplified attempt at completing via the Sinks.Many.tryEmitComplete() API, generating an onComplete signal. If the result of the attempt is not a success, implementations SHOULD retry the Sinks.Many.tryEmitComplete() call IF the provided Sinks.EmitFailureHandler returns true. Otherwise, failures are dealt with in a predefined way that might depend on the actual sink implementation (see below for the vanilla reactor-core behavior).

      Generally, Sinks.Many.tryEmitComplete() is preferable since it allows a custom handling of error cases, although this implies checking the returned Sinks.EmitResult and correctly acting on it. This API is intended as a good default for convenience.

      When the Sinks.EmitResult is not a success, vanilla reactor-core operators have the following behavior:

      Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot be propagated to any asynchronous handler, a bubbling exception, a Sinks.EmitResult.FAIL_NON_SERIALIZED as described above, ...).

      Specified by:
      emitComplete in interface Sinks.Many<T>
      Parameters:
      failureHandler - the failure handler that allows retrying failed Sinks.EmitResult.
      See Also:
    • emitError

      default void emitError(Throwable error, Sinks.EmitFailureHandler failureHandler)
      Description copied from interface: Sinks.Many
      A simplified attempt at failing the sequence via the Sinks.Many.tryEmitError(Throwable) API, generating an onError signal. If the result of the attempt is not a success, implementations SHOULD retry the Sinks.Many.tryEmitError(Throwable) call IF the provided Sinks.EmitFailureHandler returns true. Otherwise, failures are dealt with in a predefined way that might depend on the actual sink implementation (see below for the vanilla reactor-core behavior).

      Generally, Sinks.Many.tryEmitError(Throwable) is preferable since it allows a custom handling of error cases, although this implies checking the returned Sinks.EmitResult and correctly acting on it. This API is intended as a good default for convenience.

      When the Sinks.EmitResult is not a success, vanilla reactor-core operators have the following behavior:

      Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot be propagated to any asynchronous handler, a bubbling exception, a Sinks.EmitResult.FAIL_NON_SERIALIZED as described above, ...).

      Specified by:
      emitError in interface Sinks.Many<T>
      Parameters:
      error - the exception to signal, not null
      failureHandler - the failure handler that allows retrying failed Sinks.EmitResult.
      See Also: