Class BaseSubscriber<T>

java.lang.Object
reactor.core.publisher.BaseSubscriber<T>
All Implemented Interfaces:
Subscriber<T>, Subscription, CoreSubscriber<T>, Disposable

public abstract class BaseSubscriber<T> extends Object implements CoreSubscriber<T>, Subscription, Disposable
A simple base class for a Subscriber implementation that lets the user perform a request(long) and cancel() on it directly. As the targeted use case is to manually handle requests, the hookOnSubscribe(Subscription) and hookOnNext(Object) hooks are expected to be implemented, but they nonetheless default to an unbounded request at subscription time. If you need to define a Context for this BaseSubscriber, simply override its CoreSubscriber.currentContext() method.

Override the other optional hooks hookOnComplete(), hookOnError(Throwable) and hookOnCancel() to customize the base behavior. You also have a termination hook, hookFinally(SignalType).

Most of the time, exceptions triggered inside hooks are propagated to onError(Throwable) (unless there is a fatal exception). The class is in the reactor.core.publisher package, as this subscriber is tied to a single Publisher.

Author:
Simon Baslé
  • Constructor Details

    • BaseSubscriber

      public BaseSubscriber()
  • Method Details

    • upstream

      protected Subscription upstream()
      Return current Subscription
      Returns:
      current Subscription
    • isDisposed

      public boolean isDisposed()
      Description copied from interface: Disposable
      Optionally return true when the resource or task is disposed.

      Implementations are not required to track disposition and as such may never return true even when disposed. However, they MUST only return true when there's a guarantee the resource or task is disposed.

      Specified by:
      isDisposed in interface Disposable
      Returns:
      true when there's a guarantee the resource or task is disposed.
    • dispose

      public void dispose()
      Specified by:
      dispose in interface Disposable
    • hookOnSubscribe

      protected void hookOnSubscribe(Subscription subscription)
      Hook for further processing of onSubscribe's Subscription. Implement this method to call request(long) as an initial request. Values other than the unbounded Long.MAX_VALUE imply that you'll also call request in hookOnNext(Object).

      Defaults to request unbounded Long.MAX_VALUE as in requestUnbounded()

      Parameters:
      subscription - the subscription to optionally process
    • hookOnNext

      protected void hookOnNext(T value)
      Hook for processing of onNext values. You can call request(long) here to further request data from the source Publisher if the initial request wasn't unbounded.

      Defaults to doing nothing.

      Parameters:
      value - the emitted value to process
    • hookOnComplete

      protected void hookOnComplete()
      Optional hook for completion processing. Defaults to doing nothing.
    • hookOnError

      protected void hookOnError(Throwable throwable)
      Optional hook for error processing. Default is to call Exceptions.errorCallbackNotImplemented(Throwable).
      Parameters:
      throwable - the error to process
    • hookOnCancel

      protected void hookOnCancel()
      Optional hook executed when the subscription is cancelled by calling this Subscriber's cancel() method. Defaults to doing nothing.
    • hookFinally

      protected void hookFinally(SignalType type)
      Optional hook executed after any of the termination events (onError, onComplete, cancel). The hook is executed in addition to and after hookOnError(Throwable), hookOnComplete() and hookOnCancel() hooks, even if these callbacks fail. Defaults to doing nothing. A failure of the callback will be caught by Operators.onErrorDropped(Throwable, reactor.util.context.Context).
      Parameters:
      type - the type of termination event that triggered the hook (SignalType.ON_ERROR, SignalType.ON_COMPLETE or SignalType.CANCEL)
    • onSubscribe

      public final void onSubscribe(Subscription s)
      Description copied from interface: CoreSubscriber
      Implementors should initialize any state used by Subscriber.onNext(Object) before calling Subscription.request(long). Should further onNext related state modification occur, thread-safety will be required.

      Note that an invalid request <= 0 will not produce an onError and will simply be ignored or reported through a debug-enabled Logger.

      Specified by:
      onSubscribe in interface CoreSubscriber<T>
      Specified by:
      onSubscribe in interface Subscriber<T>
    • onNext

      public final void onNext(T value)
      Specified by:
      onNext in interface Subscriber<T>
    • onError

      public final void onError(Throwable t)
      Specified by:
      onError in interface Subscriber<T>
    • onComplete

      public final void onComplete()
      Specified by:
      onComplete in interface Subscriber<T>
    • request

      public final void request(long n)
      Specified by:
      request in interface Subscription
    • requestUnbounded

      public final void requestUnbounded()
      Request an unbounded amount.
    • cancel

      public final void cancel()
      Specified by:
      cancel in interface Subscription
    • toString

      public String toString()
      Overrides:
      toString in class Object