public interface TestSubscriber<T> extends CoreSubscriber<T>, Scannable
CoreSubscriber
that can be attached to any Publisher
to later assert which
events occurred at runtime. This can be used as an alternative to StepVerifier
for more complex scenarios, e.g. more than one possible outcome, racing...
The subscriber can be fine tuned with a builder()
, which also allows to produce a Fuseable.ConditionalSubscriber
variant if needed.
Subscriber
-inherited methods never throw, but a few failure conditions might be met, which
fall into two categories.
The first category are "protocol errors": when the occurrence of an incoming signal doesn't follow the Reactive Streams
specification. The case must be covered explicitly in the specification, and leads to the signal being added to the
getProtocolErrors()
list. All protocol errors imply that the Publisher
has terminated
already. The list of detected protocol errors is:
TestSubscriber
has already terminated (onComplete or onError), but an Subscriber.onNext(Object)
is received: onNext signal added to protocol errorsTestSubscriber
has already terminated, but an Subscriber.onComplete()
is received: onComplete signal added to protocol errorsTestSubscriber
has already terminated, but an Subscriber.onError(Throwable)
is received: onError signal added to protocol errors
The second category are "subscription failures", which are the only ones for which TestSubscriber
internally performs an assertion.
These failure conditions always lead to a cancellation of the subscription and are represented as an AssertionError
.
The assertion error is thrown by all the getXxx
and isXxx
accessors, the block()
methods
and the expectTerminalError()
/expectTerminalSignal()
methods. The possible subscription failures are:
TestSubscriber
has already received a Subscription
(ie. it is being reused). Both subscriptions are cancelled.Subscription
is not capable of fusion, but fusion was required by the userSubscription
is capable of fusion, but this was forbidden by the userSubscription
is capable of fusion, but the negotiated fusion mode is not the one required by the userModifier and Type | Interface and Description |
---|---|
static class |
TestSubscriber.FusionRequirement
An enum representing the 3 broad expectations around fusion.
|
Scannable.Attr<T>
OPERATOR_NAME_UNRELATED_WORDS_PATTERN
Modifier and Type | Method and Description |
---|---|
void |
block()
Block until an assertable end state has been reached.
|
void |
block(Duration timeout)
Block until an assertable end state has been reached, or a timeout
Duration has elapsed. |
static TestSubscriberBuilder |
builder()
Create a
TestSubscriber with tuning. |
void |
cancel()
|
static <T> TestSubscriber<T> |
create()
Create a simple plain
TestSubscriber which will make an unbounded demand on subscription ,
has an empty Context and makes no attempt at fusion negotiation. |
Throwable |
expectTerminalError()
Expect the
TestSubscriber to be terminated with an Subscriber.onError(Throwable)
and return the terminating Throwable if so. |
Signal<T> |
expectTerminalSignal()
|
int |
getFusionMode()
Return an
int code that represents the negotiated fusion mode for this TestSubscriber . |
List<Signal<T>> |
getProtocolErrors()
Return a
List of Signal which represent detected protocol error from the source Publisher ,
that is to say signals that were emitted to this TestSubscriber in violation of the Reactive Streams
specification. |
List<T> |
getReceivedOnNext()
Return the
List of all elements that have correctly been emitted to the TestSubscriber (onNext signals)
so far. |
List<T> |
getReceivedOnNextAfterCancellation()
Return the
List of elements that have been emitted to the TestSubscriber (onNext signals) so far,
after a cancel() was triggered. |
Signal<T> |
getTerminalSignal()
|
boolean |
isCancelled()
Check if this
TestSubscriber has been cancelled , which implies isTerminatedOrCancelled() is also true. |
boolean |
isTerminated()
Check if this
TestSubscriber has received a terminal signal, ie. |
boolean |
isTerminatedComplete()
Check if this
TestSubscriber has received a terminal signal that is specifically onComplete. |
boolean |
isTerminatedError()
Check if this
TestSubscriber has received a terminal signal that is specifically onError. |
boolean |
isTerminatedOrCancelled()
Check if this
TestSubscriber has either:
been cancelled: isCancelled() would return true
been terminated, having been signalled with onComplete or onError: isTerminated() would return true and getTerminalSignal()
would return a non-null Signal
The third possible failure condition, subscription failure, results in an AssertionError being thrown by this method
(like all other accessors, see also TestSubscriber javadoc). |
void |
request(long n)
|
currentContext, onSubscribe
onComplete, onError, onNext
actuals, from, inners, isScanAvailable, name, parents, scan, scanOrDefault, scanUnsafe, stepName, steps, tags, tagsDeduplicated
static <T> TestSubscriber<T> create()
TestSubscriber
which will make an unbounded demand on subscription
,
has an empty Context
and makes no attempt at fusion negotiation.T
- the type of data received by this subscriberTestSubscriber
static TestSubscriberBuilder builder()
TestSubscriber
with tuning. See TestSubscriberBuilder
.TestSubscriberBuilder
to fine tune the TestSubscriber
to producevoid cancel()
void request(long n)
n
elements from the Publisher
's Subscription
.
If this method is called before the TestSubscriber
has subscribed to the Publisher
,
pre-request is accumulated (including configured initial request
and replayed in a single batch upon subscription.
Note that if/once Fuseable.SYNC
fusion mode is established, this method MUST NOT be used, and this will
throw an IllegalStateException
.
n
- the additional amount to requestboolean isTerminatedOrCancelled()
TestSubscriber
has either:
isCancelled()
would return trueisTerminated()
would return true and getTerminalSignal()
would return a non-null Signal
AssertionError
being thrown by this method
(like all other accessors, see also TestSubscriber
javadoc).
Once this method starts returning true, any pending block()
calls should finish, and subsequent
block calls will return immediately.
TestSubscriber
has reached an end stateAssertionError
- in case of failure at subscription timeboolean isTerminated()
TestSubscriber
has received a terminal signal, ie. onComplete or onError.
When returning true
, implies:
isTerminatedOrCancelled()
is also truegetTerminalSignal()
returns a non-null Signal
expectTerminalSignal()
} returns the Signal
expectTerminalError()
} returns the Signal
in case of onError but throws in case of onCompleteTestSubscriber
has been terminated via onComplete or onErrorAssertionError
- in case of failure at subscription timeboolean isTerminatedComplete()
TestSubscriber
has received a terminal signal that is specifically onComplete.
When returning true
, implies:
isTerminatedOrCancelled()
is also trueisTerminated()
is also truegetTerminalSignal()
returns a non-null onComplete Signal
expectTerminalSignal()
} returns the same onComplete Signal
expectTerminalError()
} throwsTestSubscriber
has been terminated via onCompleteAssertionError
- in case of failure at subscription timeboolean isTerminatedError()
TestSubscriber
has received a terminal signal that is specifically onError.
When returning true
, implies:
isTerminatedOrCancelled()
is also trueisTerminated()
is also truegetTerminalSignal()
returns a non-null onError Signal
expectTerminalSignal()
} returns the same onError Signal
expectTerminalError()
} returns the terminating Throwable
TestSubscriber
has been terminated via onCompleteAssertionError
- in case of failure at subscription timeboolean isCancelled()
TestSubscriber
has been cancelled
, which implies isTerminatedOrCancelled()
is also true.TestSubscriber
has been cancelledAssertionError
- in case of failure at subscription time@Nullable Signal<T> getTerminalSignal()
Signal
if this TestSubscriber
isTerminated()
, or null
otherwise.
See also expectTerminalSignal()
as a stricter way of asserting the terminal state.Signal
or null if not terminatedAssertionError
- in case of failure at subscription timeisTerminated()
,
expectTerminalSignal()
Signal<T> expectTerminalSignal()
TestSubscriber
to be terminated
, and return the terminal Signal
if so. Otherwise, cancel the subscription and throw an AssertionError
.
Note that is there was already a subscription failure, the corresponding AssertionError
is raised by this
method instead.
Signal
(cannot be null)AssertionError
- in case of failure at subscription time, or if the subscriber hasn't terminated yetisTerminated()
,
getTerminalSignal()
Throwable expectTerminalError()
TestSubscriber
to be terminated
with an Subscriber.onError(Throwable)
and return the terminating Throwable
if so.
Otherwise, cancel the subscription and throw an AssertionError
.Throwable
(cannot be null)AssertionError
- in case of failure at subscription time, or if the subscriber hasn't errored.isTerminated()
,
isTerminatedError()
,
getTerminalSignal()
List<T> getReceivedOnNext()
List
of all elements that have correctly been emitted to the TestSubscriber
(onNext signals)
so far. This returns a new list that is not backed by the TestSubscriber
.
Note that this includes elements that would arrive after cancel()
, as this is allowed by the Reactive Streams
specification (cancellation is not necessarily synchronous and some elements may already be in flight when the source
takes notice of the cancellation).
These elements are also mirrored in the getReceivedOnNextAfterCancellation()
getter.
List
of all elements received by the TestSubscriber
as part of normal operationAssertionError
- in case of failure at subscription timegetReceivedOnNextAfterCancellation()
,
getProtocolErrors()
List<T> getReceivedOnNextAfterCancellation()
List
of elements that have been emitted to the TestSubscriber
(onNext signals) so far,
after a cancel()
was triggered. This returns a new list that is not backed by the TestSubscriber
.
Note that this is allowed by the Reactive Streams specification (cancellation is not necessarily synchronous and
some elements may already be in flight when the source takes notice of the cancellation).
This is a sub-list of the one returned by getReceivedOnNext()
(in the conceptual sense, as the two lists
are independent copies).
List
of elements of getReceivedOnNext()
that were received by the TestSubscriber
after cancel()
was triggeredAssertionError
- in case of failure at subscription timegetReceivedOnNext()
,
getProtocolErrors()
List<Signal<T>> getProtocolErrors()
List
of Signal
which represent detected protocol error from the source Publisher
,
that is to say signals that were emitted to this TestSubscriber
in violation of the Reactive Streams
specification. An example would be an Subscriber.onNext(Object)
signal emitted after an Subscriber.onComplete()
signal.
Note that the Signal
in the collection don't bear any ContextView
,
since they would all be the configured CoreSubscriber.currentContext()
.
List
of Signal
representing the detected protocol errors from the source Publisher
AssertionError
- in case of failure at subscription timeint getFusionMode()
int
code that represents the negotiated fusion mode for this TestSubscriber
.
Fusion codes can be converted to a human-readable value for display via Fuseable.fusionModeName(int)
.
If no particular fusion has been requested, returns Fuseable.NONE
.
Note that as long as this TestSubscriber
hasn't been subscribed to a Publisher
,
this method will return -1
. It will also throw an AssertionError
if the configured fusion mode
couldn't be negotiated at subscription.Fuseable.NONE
) if no fusion negotiated, a relevant fusion code otherwiseAssertionError
- in case of failure at subscription timevoid block()
isCancelled()
),
a "normal" termination (isTerminated()
) or subscription failure. In the later case only, this method
throws the corresponding AssertionError
.
An AssertionError is also thrown if the thread is interrupted.
AssertionError
- in case of failure at subscription time (or thread interruption)void block(Duration timeout)
Duration
has elapsed.
End state can be either a cancellation (isCancelled()
), a "normal" termination (isTerminated()
)
or a subscription failure. In the later case only, this method throws the corresponding AssertionError
.
In case of timeout, an AssertionError
with a message reflecting the configured duration is thrown.
An AssertionError is also thrown if the thread is interrupted.
AssertionError
- in case of failure at subscription time (or thread interruption)