public abstract class BaseSubscriber<T> extends Object implements CoreSubscriber<T>, Subscription, Disposable
Subscriber
implementation that lets the user
perform a request(long)
and cancel()
on it directly. As the targeted
use case is to manually handle requests, the hookOnSubscribe(Subscription)
and
hookOnNext(Object)
hooks are expected to be implemented, but they nonetheless
default to an unbounded request at subscription time. If you need to define a Context
for this BaseSubscriber
, simply override its CoreSubscriber.currentContext()
method.
Override the other optional hooks hookOnComplete()
,
hookOnError(Throwable)
and hookOnCancel()
to customize the base behavior. You also have a termination hook,
hookFinally(SignalType)
.
Most of the time, exceptions triggered inside hooks are propagated to
onError(Throwable)
(unless there is a fatal exception). The class is in the
reactor.core.publisher
package, as this subscriber is tied to a single
Publisher
.
Disposable.Composite, Disposable.Swap
Constructor and Description |
---|
BaseSubscriber() |
Modifier and Type | Method and Description |
---|---|
void |
cancel() |
void |
dispose()
|
protected void |
hookFinally(SignalType type)
Optional hook executed after any of the termination events (onError, onComplete,
cancel).
|
protected void |
hookOnCancel()
Optional hook executed when the subscription is cancelled by calling this
Subscriber's
cancel() method. |
protected void |
hookOnComplete()
Optional hook for completion processing.
|
protected void |
hookOnError(Throwable throwable)
Optional hook for error processing.
|
protected void |
hookOnNext(T value)
Hook for processing of onNext values.
|
protected void |
hookOnSubscribe(Subscription subscription)
Hook for further processing of onSubscribe's Subscription.
|
boolean |
isDisposed()
Optionally return true when the resource or task is disposed.
|
void |
onComplete() |
void |
onError(Throwable t) |
void |
onNext(T value) |
void |
onSubscribe(Subscription s)
Implementors should initialize any state used by
Subscriber.onNext(Object) before
calling Subscription.request(long) . |
void |
request(long n) |
void |
requestUnbounded()
Request an unbounded amount. |
String |
toString() |
protected Subscription |
upstream()
Return current
Subscription |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
currentContext
protected Subscription upstream()
Subscription
Subscription
public boolean isDisposed()
Disposable
Implementations are not required to track disposition and as such may never return true even when disposed. However, they MUST only return true when there's a guarantee the resource or task is disposed.
isDisposed
in interface Disposable
public void dispose()
dispose
in interface Disposable
protected void hookOnSubscribe(Subscription subscription)
request(long)
as an initial request. Values other than the
unbounded Long.MAX_VALUE
imply that you'll also call request in
hookOnNext(Object)
.
Defaults to request unbounded Long.MAX_VALUE as in requestUnbounded()
subscription
- the subscription to optionally processprotected void hookOnNext(T value)
request(long)
here
to further request data from the source Publisher
if
the initial request
wasn't unbounded.
Defaults to doing nothing.
value
- the emitted value to processprotected void hookOnComplete()
protected void hookOnError(Throwable throwable)
Exceptions.errorCallbackNotImplemented(Throwable)
.throwable
- the error to processprotected void hookOnCancel()
cancel()
method. Defaults to doing nothing.protected void hookFinally(SignalType type)
hookOnError(Throwable)
,
hookOnComplete()
and hookOnCancel()
hooks, even if these callbacks
fail. Defaults to doing nothing. A failure of the callback will be caught by
Operators.onErrorDropped(Throwable, reactor.util.context.Context)
.type
- the type of termination event that triggered the hook
(SignalType.ON_ERROR
, SignalType.ON_COMPLETE
or
SignalType.CANCEL
)public final void onSubscribe(Subscription s)
CoreSubscriber
Subscriber.onNext(Object)
before
calling Subscription.request(long)
. Should further onNext
related
state modification occur, thread-safety will be required.
Note that an invalid request <= 0
will not produce an onError and
will simply be ignored or reported through a debug-enabled
Logger
.
onSubscribe
in interface Subscriber<T>
onSubscribe
in interface CoreSubscriber<T>
public final void onNext(T value)
onNext
in interface Subscriber<T>
public final void onError(Throwable t)
onError
in interface Subscriber<T>
public final void onComplete()
onComplete
in interface Subscriber<T>
public final void request(long n)
request
in interface Subscription
public final void requestUnbounded()
Request
an unbounded amount.public final void cancel()
cancel
in interface Subscription