Testing
Whether you have written a simple chain of Reactor operators or your own operator, automated testing is always a good idea.
Reactor comes with a few elements dedicated to testing, gathered into their own
artifact: reactor-test
. You can find that project
on Github,
inside of the reactor-core
repository.
To use it in your tests, you must add it as a test dependency.
The following example shows how to add reactor-test
as a dependency in Maven:
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
(1)
</dependency>
1 | If you use the BOM, you do not need to specify a <version> . |
The following example shows how to add reactor-test
as a dependency in Gradle:
dependencies
blockdependencies {
testCompile 'io.projectreactor:reactor-test'
}
The three main uses of reactor-test
are as follows:
-
Testing that a sequence follows a given scenario, step-by-step, with
StepVerifier
. -
Producing data in order to test the behavior of downstream operators (including you own operators) with
TestPublisher
. -
In sequences that can go through several alternative
Publisher
(for example, a chain that usesswitchIfEmpty
, probing such aPublisher
to ensure it was used (that is, subscribed to).
1. Testing a Scenario with StepVerifier
The most common case for testing a Reactor sequence is to have a Flux
or a Mono
defined
in your code (for example, it might be returned by a method) and to want to test how it
behaves when subscribed to.
This situation translates well to defining a “test scenario,” where you define your expectations in terms of events, step-by-step. You can ask and answer questions such as the following:
-
What is the next expected event?
-
Do you expect the
Flux
to emit a particular value? -
Or maybe to do nothing for the next 300ms?
You can express all of that through the StepVerifier
API.
For instance, you could have the following utility method in your codebase that
decorates a Flux
:
public <T> Flux<T> appendBoomError(Flux<T> source) {
return source.concatWith(Mono.error(new IllegalArgumentException("boom")));
}
In order to test it, you want to verify the following scenario:
I expect this
Flux
to first emitthing1
, then emitthing2
, and then produce an error with the message,boom
. Subscribe and verify these expectations.
In the StepVerifier
API, this translates to the following test:
@Test
public void testAppendBoomError() {
Flux<String> source = Flux.just("thing1", "thing2"); (1)
StepVerifier.create( (2)
appendBoomError(source)) (3)
.expectNext("thing1") (4)
.expectNext("thing2")
.expectErrorMessage("boom") (5)
.verify(); (6)
}
1 | Since our method needs a source Flux , define a simple one for testing purposes. |
2 | Create a StepVerifier builder that wraps and verifies a Flux . |
3 | Pass the Flux to be tested (the result of calling our utility method). |
4 | The first signal we expect to happen upon subscription is an onNext , with a value
of thing1 . |
5 | The last signal we expect to happen is a termination of the sequence with an
onError . The exception should have boom as a message. |
6 | It is important to trigger the test by calling verify() . |
The API is a builder. You start by creating a StepVerifier
and passing the
sequence to be tested. This offers a choice of methods that let you:
-
Express expectations about the next signals to occur. If any other signal is received (or the content of the signal does not match the expectation), the whole test fails with a meaningful
AssertionError
. For example, you might useexpectNext(T…)
andexpectNextCount(long)
. -
Consume the next signal. This is used when you want to skip part of the sequence or when you want to apply a custom
assertion
on the content of the signal (for example, to check that there is anonNext
event and assert that the emitted item is a list of size 5). For example, you might useconsumeNextWith(Consumer<T>)
. -
Take miscellaneous actions such as pausing or running arbitrary code. For example, if you want to manipulate a test-specific state or context. To that effect, you might use
thenAwait(Duration)
andthen(Runnable)
.
For terminal events, the corresponding expectation methods (expectComplete()
and
expectError()
and all their variants) switch to an API where you cannot express
expectations anymore. In that last step, all you can do is perform some additional
configuration on the StepVerifier
and then trigger the verification, often
with verify()
or one of its variants.
What happens at this point is that the StepVerifier
subscribes to the tested Flux
or
Mono
and plays the sequence, comparing each new signal with the next step in the
scenario. As long as these match, the test is considered a success. As soon as there is a
discrepancy, an AssertionError
is thrown.
Remember the verify() step, which triggers the verification. To
help, the API includes a few shortcut methods that combine the terminal expectations with
a call to verify() : verifyComplete() , verifyError() , verifyErrorMessage(String) ,
and others.
|
Note that, if one of the lambda-based expectations throws an AssertionError
, it is
reported as is, failing the test. This is useful for custom assertions.
By default, the verify() method and derived shortcut methods (verifyThenAssertThat ,
verifyComplete() , and so on) have no timeout. They can block indefinitely. You can use
StepVerifier.setDefaultTimeout(Duration) to globally set a timeout for these methods,
or specify one on a per-call basis with verify(Duration) .
|
1.1. Better Identifying Test Failures
StepVerifier
provides two options to better identify exactly which expectation step caused
a test to fail:
-
as(String)
: Used after mostexpect*
methods to give a description to the preceding expectation. If the expectation fails, its error message contains the description. Terminal expectations andverify
cannot be described that way. -
StepVerifierOptions.create().scenarioName(String)
: By usingStepVerifierOptions
to create yourStepVerifier
, you can use thescenarioName
method to give the whole scenario a name, which is also used in assertion error messages.
Note that, in both cases, the use of the description or name in messages is only guaranteed for
StepVerifier
methods that produce their own AssertionError
(for example, throwing an exception
manually or through an assertion library in assertNext
does not add the description or name to
the error’s message).
2. Manipulating Time
You can use StepVerifier
with time-based operators to avoid long run times for
corresponding tests. You can do so through the StepVerifier.withVirtualTime
builder.
It looks like the following example:
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
//... continue expectations here
This virtual time feature plugs in a custom Scheduler
in Reactor’s Schedulers
factory. Since these timed operators usually use the default Schedulers.parallel()
scheduler, replacing it with a VirtualTimeScheduler
does the trick. However, an
important prerequisite is that the operator be instantiated after the virtual time
scheduler has been activated.
To increase the chances that this happens correctly, the StepVerifier
does not take
a simple Flux
as input. withVirtualTime
takes a Supplier
, which guides you into lazily
creating the instance of the tested flux after having done the scheduler set up.
Take extra care to ensure the Supplier<Publisher<T>> can be used in a lazy
fashion. Otherwise, virtual time is not guaranteed. Especially avoid instantiating the
Flux earlier in the test code and having the Supplier return that variable. Instead,
always instantiate the Flux inside the lambda.
|
There are two expectation methods that deal with time, and they are both valid with or without virtual time:
-
thenAwait(Duration)
: Pauses the evaluation of steps (allowing a few signals to occur or delays to run out). -
expectNoEvent(Duration)
: Also lets the sequence play out for a given duration but fails the test if any signal occurs during that time.
Both methods pause the thread for the given duration in classic mode and advance the virtual clock instead in virtual mode.
expectNoEvent also considers the subscription as an event. If you use it as a
first step, it usually fails because the subscription signal is detected. Use
expectSubscription().expectNoEvent(duration) instead.
|
In order to quickly evaluate the behavior of our Mono.delay
above, we can finish
writing our code as follows:
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
.expectSubscription() (1)
.expectNoEvent(Duration.ofDays(1)) (2)
.expectNext(0L) (3)
.verifyComplete(); (4)
1 | See the preceding tip. |
2 | Expect nothing to happen for a full day. |
3 | Then expect a delay that emits 0 . |
4 | Then expect completion (and trigger the verification). |
We could have used thenAwait(Duration.ofDays(1))
above, but expectNoEvent
has the
benefit of guaranteeing that nothing happened earlier than it should have.
Note that verify()
returns a Duration
value. This is the real-time duration of the
entire test.
Virtual time is not a silver bullet. All Schedulers are
replaced with the same VirtualTimeScheduler . In some cases, you can lock the
verification process because the virtual clock has not moved forward before an
expectation is expressed, resulting in the expectation waiting on data that can only be
produced by advancing time. In most cases, you need to advance the virtual clock for
sequences to emit. Virtual time also gets very limited with infinite sequences, which
might hog the thread on which both the sequence and its verification run.
|
3. Performing Post-execution Assertions with StepVerifier
After having described the final expectation of your scenario, you can switch to a
complementary assertion API instead of triggering verify()
. To do so, use
verifyThenAssertThat()
instead.
verifyThenAssertThat()
returns a StepVerifier.Assertions
object, which you can use to
assert a few elements of state once the whole scenario has played out successfully
(because it also calls verify()
). Typical (albeit advanced) usage is to capture
elements that have been dropped by some operator and assert them (see the section on
Hooks).
4. Testing the Context
For more information about the Context
, see Adding a Context to a Reactive Sequence.
StepVerifier
comes with a couple of expectations around the propagation of a Context
:
-
expectAccessibleContext
: Returns aContextExpectations
object that you can use to set up expectations on the propagatedContext
. Be sure to callthen()
to return to the set of sequence expectations. -
expectNoAccessibleContext
: Sets up an expectation that NOContext
can be propagated up the chain of operators under test. This most likely occurs when thePublisher
under test is not a Reactor one or does not have any operator that can propagate theContext
(for example, a generator source).
Additionally, you can associate a test-specific initial Context
to a StepVerifier
by
using StepVerifierOptions
to create the verifier.
These features are demonstrated in the following snippet:
StepVerifier.create(Mono.just(1).map(i -> i + 10),
StepVerifierOptions.create().withInitialContext(Context.of("thing1", "thing2"))) (1)
.expectAccessibleContext() (2)
.contains("thing1", "thing2") (3)
.then() (4)
.expectNext(11)
.verifyComplete(); (5)
1 | Create the StepVerifier by using StepVerifierOptions and pass in an initial Context |
2 | Start setting up expectations about Context propagation. This alone ensures that a
Context was propagated. |
3 | An example of a Context -specific expectation. It must contain value "thing2" for key "thing1". |
4 | We then() switch back to setting up normal expectations on the data. |
5 | Let us not forget to verify() the whole set of expectations. |
5. Manually Emitting with TestPublisher
For more advanced test cases, it might be useful to have complete mastery over the source of data, to trigger finely chosen signals that closely match the particular situation you want to test.
Another situation is when you have implemented your own operator and you want to verify how it behaves with regards to the Reactive Streams specification, especially if its source is not well behaved.
For both cases, reactor-test
offers the TestPublisher
class. This is a Publisher<T>
that lets you programmatically trigger various signals:
-
next(T)
andnext(T, T…)
triggers 1-nonNext
signals. -
emit(T…)
triggers 1-nonNext
signals and doescomplete()
. -
complete()
terminates with anonComplete
signal. -
error(Throwable)
terminates with anonError
signal.
You can get a well behaved TestPublisher
through the create
factory method. Also,
you can create a misbehaving TestPublisher
by using the createNonCompliant
factory
method. The latter takes a value or multiple values from the TestPublisher.Violation
enum. The values define which parts of the specification the publisher can overlook.
These enum values include:
-
REQUEST_OVERFLOW
: Allowsnext
calls to be made despite an insufficient request, without triggering anIllegalStateException
. -
ALLOW_NULL
: Allowsnext
calls to be made with anull
value without triggering aNullPointerException
. -
CLEANUP_ON_TERMINATE
: Allows termination signals to be sent several times in a row. This includescomplete()
,error()
, andemit()
. -
DEFER_CANCELLATION
: Allows theTestPublisher
to ignore cancellation signals and continue emitting signals as if the cancellation lost the race against said signals.
Finally, the TestPublisher
keeps track of internal state after subscription, which can
be asserted through its various assert*
methods.
You can use it as a Flux
or Mono
by using the conversion methods, flux()
and
mono()
.
6. Checking the Execution Path with PublisherProbe
When building complex chains of operators, you could come across cases where there are several possible execution paths, materialized by distinct sub-sequences.
Most of the time, these sub-sequences produce a specific-enough onNext
signal
that you can assert that it was executed by looking at the end result.
For instance, consider the following method, which builds a chain of operators from a
source and uses a switchIfEmpty
to fall back to a particular alternative if the source
is empty:
public Flux<String> processOrFallback(Mono<String> source, Publisher<String> fallback) {
return source
.flatMapMany(phrase -> Flux.fromArray(phrase.split("\\s+")))
.switchIfEmpty(fallback);
}
You can test which logical branch of the switchIfEmpty was used, as follows:
@Test
public void testSplitPathIsUsed() {
StepVerifier.create(processOrFallback(Mono.just("just a phrase with tabs!"),
Mono.just("EMPTY_PHRASE")))
.expectNext("just", "a", "phrase", "with", "tabs!")
.verifyComplete();
}
@Test
public void testEmptyPathIsUsed() {
StepVerifier.create(processOrFallback(Mono.empty(), Mono.just("EMPTY_PHRASE")))
.expectNext("EMPTY_PHRASE")
.verifyComplete();
}
However, think about an example where the method produces a Mono<Void>
instead. It waits
for the source to complete, performs an additional task, and completes. If the source
is empty, a fallback Runnable
-like task must be performed instead. The following example
shows such a case:
private Mono<String> executeCommand(String command) {
return Mono.just(command + " DONE");
}
public Mono<Void> processOrFallback(Mono<String> commandSource, Mono<Void> doWhenEmpty) {
return commandSource
.flatMap(command -> executeCommand(command).then()) (1)
.switchIfEmpty(doWhenEmpty); (2)
}
1 | then() forgets about the command result. It cares only that it was completed. |
2 | How to distinguish between two cases that are both empty sequences? |
To verify that your processOrFallback
method does indeed go through the doWhenEmpty
path,
you need to write a bit of boilerplate. Namely you need a Mono<Void>
that:
-
Captures the fact that it has been subscribed to.
-
Lets you assert that fact after the whole process has terminated.
Before version 3.1, you would need to manually maintain one AtomicBoolean
per state you
wanted to assert and attach a corresponding doOn*
callback to the publisher you wanted
to evaluate. This could be a lot of boilerplate when having to apply this pattern
regularly. Fortunately, 3.1.0 introduced an alternative with PublisherProbe
. The
following example shows how to use it:
@Test
public void testCommandEmptyPathIsUsed() {
PublisherProbe<Void> probe = PublisherProbe.empty(); (1)
StepVerifier.create(processOrFallback(Mono.empty(), probe.mono())) (2)
.verifyComplete();
probe.assertWasSubscribed(); (3)
probe.assertWasRequested(); (4)
probe.assertWasNotCancelled(); (5)
}
1 | Create a probe that translates to an empty sequence. |
2 | Use the probe in place of Mono<Void> by calling probe.mono() . |
3 | After completion of the sequence, the probe lets you assert that it was used. You can check that is was subscribed to… |
4 | …as well as actually requested data… |
5 | …and whether or not it was cancelled. |
You can also use the probe in place of a Flux<T>
by calling .flux()
instead of
.mono()
. For cases where you need to probe an execution path but also need the
probe to emit data, you can wrap any Publisher<T>
by using PublisherProbe.of(Publisher)
.