T
- the value type.public class TestSubscriber<T> extends java.lang.Object implements org.reactivestreams.Subscriber<T>, org.reactivestreams.Subscription, Trackable, Receiver
To create a new instance of TestSubscriber
, you have the choice between
these static methods:
subscribe(Publisher)
: create a new TestSubscriber
,
subscribe to it with the specified Publisher
and requests an unbounded
number of elements.subscribe(Publisher, long)
: create a new TestSubscriber
,
subscribe to it with the specified Publisher
and requests n
elements
(can be 0 if you want no initial demand).
create()
: create a new TestSubscriber
and requests
an unbounded number of elements.create(long)
: create a new TestSubscriber
and
requests n
elements (can be 0 if you want no initial demand).
If you are testing asynchronous publishers, don't forget to use one of the
await*()
methods to wait for the data to assert.
You can extend this class but only the onNext, onError and onComplete can be overridden.
You can call request(long)
and cancel()
from any thread or from within
the overridable methods but you should avoid calling the assertXXX methods asynchronously.
Usage:
TestSubscriber
.subscribe(publisher)
.await()
.assertValues("ABC", "DEF");
Modifier and Type | Field and Description |
---|---|
static java.time.Duration |
DEFAULT_VALUES_TIMEOUT
Default timeout for waiting next values to be received
|
UNSPECIFIED
Modifier and Type | Method and Description |
---|---|
TestSubscriber<T> |
assertComplete()
Assert a complete successfully signal has been received.
|
TestSubscriber<T> |
assertContainValues(java.util.Set<? extends T> expectedValues)
Assert the specified values have been received.
|
TestSubscriber<T> |
assertError()
Assert an error signal has been received.
|
TestSubscriber<T> |
assertError(java.lang.Class<? extends java.lang.Throwable> clazz)
Assert an error signal has been received.
|
TestSubscriber<T> |
assertErrorMessage(java.lang.String message) |
TestSubscriber<T> |
assertErrorWith(java.util.function.Consumer<? super java.lang.Throwable> expectation)
Assert an error signal has been received.
|
TestSubscriber<T> |
assertFuseableSource()
Assert that the upstream was a Fuseable source.
|
TestSubscriber<T> |
assertFusionEnabled()
Assert that the fusion mode was granted.
|
TestSubscriber<T> |
assertFusionMode(int expectedMode) |
TestSubscriber<T> |
assertFusionRejected()
Assert that the fusion mode was granted.
|
protected void |
assertionError(java.lang.String message,
java.lang.Throwable cause)
Prepares and throws an AssertionError exception based on the message, cause, the
active state and the potential errors so far.
|
TestSubscriber<T> |
assertNoError()
Assert no error signal has been received.
|
TestSubscriber<T> |
assertNonFuseableSource()
Assert that the upstream was not a Fuseable source.
|
TestSubscriber<T> |
assertNotComplete()
Assert no complete successfully signal has been received.
|
TestSubscriber<T> |
assertNotSubscribed()
Assert no subscription occurred.
|
TestSubscriber<T> |
assertNotTerminated()
Assert no complete successfully or error signal has been received.
|
TestSubscriber<T> |
assertNoValues()
Assert no values have been received.
|
TestSubscriber<T> |
assertSubscribed()
Assert subscription occurred (once).
|
TestSubscriber<T> |
assertTerminated()
Assert either complete successfully or error signal has been received.
|
TestSubscriber<T> |
assertValueCount(long n)
Assert
n values has been received. |
TestSubscriber<T> |
assertValues(T... expectedValues)
Assert the specified values have been received in the declared order.
|
TestSubscriber<T> |
assertValueSequence(java.lang.Iterable<? extends T> expectedSequence)
Assert the specified values have been received in the same order read by the
passed
Iterable . |
TestSubscriber<T> |
assertValuesWith(java.util.function.Consumer<T>... expectations)
Assert the specified values have been received in the declared order.
|
TestSubscriber<T> |
await()
Blocking method that waits until a complete successfully or error signal is received.
|
TestSubscriber<T> |
await(java.time.Duration timeout)
Blocking method that waits until a complete successfully or error signal is received
or until a timeout occurs.
|
static void |
await(java.time.Duration timeout,
java.lang.String errorMessage,
java.util.function.BooleanSupplier conditionSupplier)
Blocking method that waits until
conditionSupplier returns true, or if it
does not before the specified timeout, throw an AssertionError with the
specified error message. |
static void |
await(java.time.Duration timeout,
java.util.function.Supplier<java.lang.String> errorMessageSupplier,
java.util.function.BooleanSupplier conditionSupplier)
Blocking method that waits until
conditionSupplier returns true, or if it
does not before the specified timeout, throws an AssertionError with the
specified error message supplier. |
TestSubscriber<T> |
awaitAndAssertNextValueCount(long n)
Blocking method that waits until
n next values have been received. |
TestSubscriber<T> |
awaitAndAssertNextValues(T... values)
Blocking method that waits until
n next values have been received (n is the
number of values provided) to assert them. |
TestSubscriber<T> |
awaitAndAssertNextValuesWith(java.util.function.Consumer<T>... expectations)
Blocking method that waits until
n next values have been received
(n is the number of expectations provided) to assert them. |
void |
cancel() |
TestSubscriber<T> |
configureValuesStorage(boolean enabled)
Enable or disabled the values storage.
|
TestSubscriber<T> |
configureValuesTimeout(java.time.Duration timeout)
Configure the timeout in seconds for waiting next values to be received (3 seconds
by default).
|
static <T> TestSubscriber<T> |
create()
Create a new
TestSubscriber that requests an unbounded number of elements. |
static <T> TestSubscriber<T> |
create(long n)
Create a new
TestSubscriber that requests initially n elements. |
int |
establishedFusionMode()
Returns the established fusion mode or -1 if it was not enabled
|
protected java.lang.String |
fusionModeName(int mode) |
boolean |
isCancelled() |
boolean |
isStarted()
Has this upstream started or "onSubscribed" ?
|
boolean |
isTerminated()
Has this upstream finished or "completed" / "failed" ?
|
protected void |
normalRequest(long n) |
void |
onComplete() |
void |
onError(java.lang.Throwable t) |
void |
onNext(T t) |
void |
onSubscribe(org.reactivestreams.Subscription s) |
void |
request(long n) |
protected void |
requestDeferred()
Requests the deferred amount if not zero.
|
long |
requestedFromDownstream()
Return defined element capacity, used to drive new
Subscription request needs. |
TestSubscriber<T> |
requestedFusionMode(int requestMode)
Setup what fusion mode should be requested from the incomining
Subscription if it happens to be QueueSubscription
|
protected boolean |
set(org.reactivestreams.Subscription s)
Atomically sets the single subscription and requests the missed amount from it.
|
protected boolean |
setWithoutRequesting(org.reactivestreams.Subscription s)
Sets the Subscription once but does not request anything.
|
static <T> TestSubscriber<T> |
subscribe(org.reactivestreams.Publisher<T> publisher)
Create a new
TestSubscriber that requests an unbounded number of elements,
and make the specified publisher subscribe to it. |
static <T> TestSubscriber<T> |
subscribe(org.reactivestreams.Publisher<T> publisher,
long n)
Create a new
TestSubscriber that requests initially n elements,
and make the specified publisher subscribe to it. |
org.reactivestreams.Subscription |
upstream()
Return the direct source of data, Supports reference.
|
protected java.lang.String |
valueAndClass(java.lang.Object o) |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
expectedFromUpstream, getCapacity, getError, getPending, limit
public static final java.time.Duration DEFAULT_VALUES_TIMEOUT
public static void await(java.time.Duration timeout, java.util.function.Supplier<java.lang.String> errorMessageSupplier, java.util.function.BooleanSupplier conditionSupplier)
conditionSupplier
returns true, or if it
does not before the specified timeout, throws an AssertionError
with the
specified error message supplier.timeout
- the timeout durationerrorMessageSupplier
- the error message supplierconditionSupplier
- condition to break out of the wait loopjava.lang.AssertionError
public static void await(java.time.Duration timeout, java.lang.String errorMessage, java.util.function.BooleanSupplier conditionSupplier)
conditionSupplier
returns true, or if it
does not before the specified timeout, throw an AssertionError
with the
specified error message.timeout
- the timeout durationerrorMessage
- the error messageconditionSupplier
- condition to break out of the wait loopjava.lang.AssertionError
public static <T> TestSubscriber<T> create()
TestSubscriber
that requests an unbounded number of elements.
Be sure at least a publisher has subscribed to it via Publisher.subscribe(Subscriber)
before use assert methods.
T
- the observed value typesubscribe(Publisher)
public static <T> TestSubscriber<T> create(long n)
TestSubscriber
that requests initially n
elements. You
can then manage the demand with Subscription.request(long)
.
Be sure at least a publisher has subscribed to it via Publisher.subscribe(Subscriber)
before use assert methods.
T
- the observed value typen
- Number of elements to request (can be 0 if you want no initial demand).subscribe(Publisher, long)
public static <T> TestSubscriber<T> subscribe(org.reactivestreams.Publisher<T> publisher)
TestSubscriber
that requests an unbounded number of elements,
and make the specified publisher
subscribe to it.T
- the observed value typepublisher
- The publisher to subscribe withpublic static <T> TestSubscriber<T> subscribe(org.reactivestreams.Publisher<T> publisher, long n)
TestSubscriber
that requests initially n
elements,
and make the specified publisher
subscribe to it. You can then manage the
demand with Subscription.request(long)
.T
- the observed value typepublisher
- The publisher to subscribe withn
- Number of elements to request (can be 0 if you want no initial demand).public final TestSubscriber<T> configureValuesStorage(boolean enabled)
enabled
- enable value storage?public final TestSubscriber<T> configureValuesTimeout(java.time.Duration timeout)
timeout
- the new default value timeout durationpublic final int establishedFusionMode()
public final TestSubscriber<T> assertComplete()
public final TestSubscriber<T> assertContainValues(java.util.Set<? extends T> expectedValues)
expectedValues
- the values to assertconfigureValuesStorage(boolean)
public final TestSubscriber<T> assertError()
public final TestSubscriber<T> assertError(java.lang.Class<? extends java.lang.Throwable> clazz)
clazz
- The class of the exception contained in the error signalpublic final TestSubscriber<T> assertErrorMessage(java.lang.String message)
public final TestSubscriber<T> assertErrorWith(java.util.function.Consumer<? super java.lang.Throwable> expectation)
expectation
- A method that can verify the exception contained in the error signal
and throw an exception (like an AssertionError
) if the exception is not valid.public final TestSubscriber<T> assertFuseableSource()
public final TestSubscriber<T> assertFusionEnabled()
public final TestSubscriber<T> assertFusionMode(int expectedMode)
public final TestSubscriber<T> assertFusionRejected()
public final TestSubscriber<T> assertNoError()
public final TestSubscriber<T> assertNoValues()
public final TestSubscriber<T> assertNonFuseableSource()
public final TestSubscriber<T> assertNotComplete()
public final TestSubscriber<T> assertNotSubscribed()
public final TestSubscriber<T> assertNotTerminated()
public final TestSubscriber<T> assertSubscribed()
public final TestSubscriber<T> assertTerminated()
public final TestSubscriber<T> assertValueCount(long n)
n
values has been received.n
- the expected value countpublic final TestSubscriber<T> assertValueSequence(java.lang.Iterable<? extends T> expectedSequence)
Iterable
. Values storage
should be enabled to
use this method.expectedSequence
- the values to assertconfigureValuesStorage(boolean)
@SafeVarargs public final TestSubscriber<T> assertValues(T... expectedValues)
expectedValues
- the values to assertconfigureValuesStorage(boolean)
@SafeVarargs public final TestSubscriber<T> assertValuesWith(java.util.function.Consumer<T>... expectations)
expectations
- One or more methods that can verify the values and throw a
exception (like an AssertionError
) if the value is not valid.configureValuesStorage(boolean)
public final TestSubscriber<T> await()
public final TestSubscriber<T> await(java.time.Duration timeout)
timeout
- The timeout valuepublic final TestSubscriber<T> awaitAndAssertNextValueCount(long n)
n
next values have been received.n
- the value count to assert@SafeVarargs public final TestSubscriber<T> awaitAndAssertNextValues(T... values)
n
next values have been received (n is the
number of values provided) to assert them.values
- the values to assert@SafeVarargs public final TestSubscriber<T> awaitAndAssertNextValuesWith(java.util.function.Consumer<T>... expectations)
n
next values have been received
(n is the number of expectations provided) to assert them.expectations
- One or more methods that can verify the values and throw a
exception (like an AssertionError
) if the value is not valid.public void cancel()
cancel
in interface org.reactivestreams.Subscription
public final boolean isCancelled()
isCancelled
in interface Trackable
public final boolean isStarted()
Trackable
public final boolean isTerminated()
Trackable
isTerminated
in interface Trackable
public void onComplete()
onComplete
in interface org.reactivestreams.Subscriber<T>
public void onError(java.lang.Throwable t)
onError
in interface org.reactivestreams.Subscriber<T>
public void onSubscribe(org.reactivestreams.Subscription s)
onSubscribe
in interface org.reactivestreams.Subscriber<T>
public void request(long n)
request
in interface org.reactivestreams.Subscription
public final long requestedFromDownstream()
Trackable
Subscription
request needs.
This is the maximum in-flight data allowed to transit to this elements.requestedFromDownstream
in interface Trackable
public final TestSubscriber<T> requestedFusionMode(int requestMode)
requestMode
- the mode to request, see Fuseable constantspublic org.reactivestreams.Subscription upstream()
Receiver
protected final void normalRequest(long n)
protected final void requestDeferred()
protected final boolean set(org.reactivestreams.Subscription s)
s
- protected final boolean setWithoutRequesting(org.reactivestreams.Subscription s)
s
- the Subscription to setprotected final void assertionError(java.lang.String message, java.lang.Throwable cause)
message
- the messagecause
- the optional Throwable causejava.lang.AssertionError
- as expectedprotected final java.lang.String fusionModeName(int mode)
protected final java.lang.String valueAndClass(java.lang.Object o)