
Whether you have written a simple chain of Reactor operators or your own operator, automated testing is always a good idea.

Reactor comes with a few elements dedicated to testing, gathered into their own artifact: reactor-test. You can find that project on Github, inside of the reactor-core repository.

To use it in your tests, you must add it as a test dependency. The following example shows how to add reactor-test as a dependency in Maven:

reactor-test in Maven, in <dependencies>
1 If you use the BOM, you do not need to specify a <version>.

The following example shows how to add reactor-test as a dependency in Gradle:

reactor-test in Gradle, amend the dependencies block
dependencies {
   testCompile 'io.projectreactor:reactor-test'

The three main uses of reactor-test are as follows:

  • Testing that a sequence follows a given scenario, step-by-step, with StepVerifier.

  • Producing data in order to test the behavior of downstream operators (including you own operators) with TestPublisher.

  • In sequences that can go through several alternative Publisher (for example, a chain that uses switchIfEmpty, probing such a Publisher to ensure it was used (that is, subscribed to).

1. Testing a Scenario with StepVerifier

The most common case for testing a Reactor sequence is to have a Flux or a Mono defined in your code (for example, it might be returned by a method) and to want to test how it behaves when subscribed to.

This situation translates well to defining a “test scenario,” where you define your expectations in terms of events, step-by-step. You can ask and answer questions such as the following:

  • What is the next expected event?

  • Do you expect the Flux to emit a particular value?

  • Or maybe to do nothing for the next 300ms?

You can express all of that through the StepVerifier API.

For instance, you could have the following utility method in your codebase that decorates a Flux:

public <T> Flux<T> appendBoomError(Flux<T> source) {
  return source.concatWith(Mono.error(new IllegalArgumentException("boom")));

In order to test it, you want to verify the following scenario:

I expect this Flux to first emit thing1, then emit thing2, and then produce an error with the message, boom. Subscribe and verify these expectations.

In the StepVerifier API, this translates to the following test:

public void testAppendBoomError() {
  Flux<String> source = Flux.just("thing1", "thing2"); (1)

  StepVerifier.create( (2)
    appendBoomError(source)) (3)
    .expectNext("thing1") (4)
    .expectErrorMessage("boom") (5)
    .verify(); (6)
1 Since our method needs a source Flux, define a simple one for testing purposes.
2 Create a StepVerifier builder that wraps and verifies a Flux.
3 Pass the Flux to be tested (the result of calling our utility method).
4 The first signal we expect to happen upon subscription is an onNext, with a value of thing1.
5 The last signal we expect to happen is a termination of the sequence with an onError. The exception should have boom as a message.
6 It is important to trigger the test by calling verify().

The API is a builder. You start by creating a StepVerifier and passing the sequence to be tested. This offers a choice of methods that let you:

  • Express expectations about the next signals to occur. If any other signal is received (or the content of the signal does not match the expectation), the whole test fails with a meaningful AssertionError. For example, you might use expectNext(T…​) and expectNextCount(long).

  • Consume the next signal. This is used when you want to skip part of the sequence or when you want to apply a custom assertion on the content of the signal (for example, to check that there is an onNext event and assert that the emitted item is a list of size 5). For example, you might use consumeNextWith(Consumer<T>).

  • Take miscellaneous actions such as pausing or running arbitrary code. For example, if you want to manipulate a test-specific state or context. To that effect, you might use thenAwait(Duration) and then(Runnable).

For terminal events, the corresponding expectation methods (expectComplete() and expectError() and all their variants) switch to an API where you cannot express expectations anymore. In that last step, all you can do is perform some additional configuration on the StepVerifier and then trigger the verification, often with verify() or one of its variants.

What happens at this point is that the StepVerifier subscribes to the tested Flux or Mono and plays the sequence, comparing each new signal with the next step in the scenario. As long as these match, the test is considered a success. As soon as there is a discrepancy, an AssertionError is thrown.

Remember the verify() step, which triggers the verification. To help, the API includes a few shortcut methods that combine the terminal expectations with a call to verify(): verifyComplete(), verifyError(), verifyErrorMessage(String), and others.

Note that, if one of the lambda-based expectations throws an AssertionError, it is reported as is, failing the test. This is useful for custom assertions.

By default, the verify() method and derived shortcut methods (verifyThenAssertThat, verifyComplete(), and so on) have no timeout. They can block indefinitely. You can use StepVerifier.setDefaultTimeout(Duration) to globally set a timeout for these methods, or specify one on a per-call basis with verify(Duration).

1.1. Better Identifying Test Failures

StepVerifier provides two options to better identify exactly which expectation step caused a test to fail:

  • as(String): Used after most expect* methods to give a description to the preceding expectation. If the expectation fails, its error message contains the description. Terminal expectations and verify cannot be described that way.

  • StepVerifierOptions.create().scenarioName(String): By using StepVerifierOptions to create your StepVerifier, you can use the scenarioName method to give the whole scenario a name, which is also used in assertion error messages.

Note that, in both cases, the use of the description or name in messages is only guaranteed for StepVerifier methods that produce their own AssertionError (for example, throwing an exception manually or through an assertion library in assertNext does not add the description or name to the error’s message).

2. Manipulating Time

You can use StepVerifier with time-based operators to avoid long run times for corresponding tests. You can do so through the StepVerifier.withVirtualTime builder.

It looks like the following example:

StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
//... continue expectations here

This virtual time feature plugs in a custom Scheduler in Reactor’s Schedulers factory. Since these timed operators usually use the default Schedulers.parallel() scheduler, replacing it with a VirtualTimeScheduler does the trick. However, an important prerequisite is that the operator be instantiated after the virtual time scheduler has been activated.

To increase the chances that this happens correctly, the StepVerifier does not take a simple Flux as input. withVirtualTime takes a Supplier, which guides you into lazily creating the instance of the tested flux after having done the scheduler set up.

Take extra care to ensure the Supplier<Publisher<T>> can be used in a lazy fashion. Otherwise, virtual time is not guaranteed. Especially avoid instantiating the Flux earlier in the test code and having the Supplier return that variable. Instead, always instantiate the Flux inside the lambda.

There are two expectation methods that deal with time, and they are both valid with or without virtual time:

  • thenAwait(Duration): Pauses the evaluation of steps (allowing a few signals to occur or delays to run out).

  • expectNoEvent(Duration): Also lets the sequence play out for a given duration but fails the test if any signal occurs during that time.

Both methods pause the thread for the given duration in classic mode and advance the virtual clock instead in virtual mode.

expectNoEvent also considers the subscription as an event. If you use it as a first step, it usually fails because the subscription signal is detected. Use expectSubscription().expectNoEvent(duration) instead.

In order to quickly evaluate the behavior of our Mono.delay above, we can finish writing our code as follows:

StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
    .expectSubscription() (1)
    .expectNoEvent(Duration.ofDays(1)) (2)
    .expectNext(0L) (3)
    .verifyComplete(); (4)
1 See the preceding tip.
2 Expect nothing to happen for a full day.
3 Then expect a delay that emits 0.
4 Then expect completion (and trigger the verification).

We could have used thenAwait(Duration.ofDays(1)) above, but expectNoEvent has the benefit of guaranteeing that nothing happened earlier than it should have.

Note that verify() returns a Duration value. This is the real-time duration of the entire test.

Virtual time is not a silver bullet. All Schedulers are replaced with the same VirtualTimeScheduler. In some cases, you can lock the verification process because the virtual clock has not moved forward before an expectation is expressed, resulting in the expectation waiting on data that can only be produced by advancing time. In most cases, you need to advance the virtual clock for sequences to emit. Virtual time also gets very limited with infinite sequences, which might hog the thread on which both the sequence and its verification run.

3. Performing Post-execution Assertions with StepVerifier

After having described the final expectation of your scenario, you can switch to a complementary assertion API instead of triggering verify(). To do so, use verifyThenAssertThat() instead.

verifyThenAssertThat() returns a StepVerifier.Assertions object, which you can use to assert a few elements of state once the whole scenario has played out successfully (because it also calls verify()). Typical (albeit advanced) usage is to capture elements that have been dropped by some operator and assert them (see the section on Hooks).

4. Testing the Context

For more information about the Context, see Adding a Context to a Reactive Sequence.

StepVerifier comes with a couple of expectations around the propagation of a Context:

  • expectAccessibleContext: Returns a ContextExpectations object that you can use to set up expectations on the propagated Context. Be sure to call then() to return to the set of sequence expectations.

  • expectNoAccessibleContext: Sets up an expectation that NO Context can be propagated up the chain of operators under test. This most likely occurs when the Publisher under test is not a Reactor one or does not have any operator that can propagate the Context (for example, a generator source).

Additionally, you can associate a test-specific initial Context to a StepVerifier by using StepVerifierOptions to create the verifier.

These features are demonstrated in the following snippet:

StepVerifier.create(Mono.just(1).map(i -> i + 10),
				StepVerifierOptions.create().withInitialContext(Context.of("thing1", "thing2"))) (1)
		            .expectAccessibleContext() (2)
		            .contains("thing1", "thing2") (3)
		            .then() (4)
		            .verifyComplete(); (5)
1 Create the StepVerifier by using StepVerifierOptions and pass in an initial Context
2 Start setting up expectations about Context propagation. This alone ensures that a Context was propagated.
3 An example of a Context-specific expectation. It must contain value "thing2" for key "thing1".
4 We then() switch back to setting up normal expectations on the data.
5 Let us not forget to verify() the whole set of expectations.

5. Manually Emitting with TestPublisher

For more advanced test cases, it might be useful to have complete mastery over the source of data, to trigger finely chosen signals that closely match the particular situation you want to test.

Another situation is when you have implemented your own operator and you want to verify how it behaves with regards to the Reactive Streams specification, especially if its source is not well behaved.

For both cases, reactor-test offers the TestPublisher class. This is a Publisher<T> that lets you programmatically trigger various signals:

  • next(T) and next(T, T…​) triggers 1-n onNext signals.

  • emit(T…​) triggers 1-n onNext signals and does complete().

  • complete() terminates with an onComplete signal.

  • error(Throwable) terminates with an onError signal.

You can get a well behaved TestPublisher through the create factory method. Also, you can create a misbehaving TestPublisher by using the createNonCompliant factory method. The latter takes a value or multiple values from the TestPublisher.Violation enum. The values define which parts of the specification the publisher can overlook. These enum values include:

  • REQUEST_OVERFLOW: Allows next calls to be made despite an insufficient request, without triggering an IllegalStateException.

  • ALLOW_NULL: Allows next calls to be made with a null value without triggering a NullPointerException.

  • CLEANUP_ON_TERMINATE: Allows termination signals to be sent several times in a row. This includes complete(), error(), and emit().

  • DEFER_CANCELLATION: Allows the TestPublisher to ignore cancellation signals and continue emitting signals as if the cancellation lost the race against said signals.

Finally, the TestPublisher keeps track of internal state after subscription, which can be asserted through its various assert* methods.

You can use it as a Flux or Mono by using the conversion methods, flux() and mono().

6. Checking the Execution Path with PublisherProbe

When building complex chains of operators, you could come across cases where there are several possible execution paths, materialized by distinct sub-sequences.

Most of the time, these sub-sequences produce a specific-enough onNext signal that you can assert that it was executed by looking at the end result.

For instance, consider the following method, which builds a chain of operators from a source and uses a switchIfEmpty to fall back to a particular alternative if the source is empty:

public Flux<String> processOrFallback(Mono<String> source, Publisher<String> fallback) {
    return source
            .flatMapMany(phrase -> Flux.fromArray(phrase.split("\\s+")))

You can test which logical branch of the switchIfEmpty was used, as follows:

public void testSplitPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.just("just a  phrase with    tabs!"),
                .expectNext("just", "a", "phrase", "with", "tabs!")

public void testEmptyPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.empty(), Mono.just("EMPTY_PHRASE")))

However, think about an example where the method produces a Mono<Void> instead. It waits for the source to complete, performs an additional task, and completes. If the source is empty, a fallback Runnable-like task must be performed instead. The following example shows such a case:

private Mono<String> executeCommand(String command) {
    return Mono.just(command + " DONE");

public Mono<Void> processOrFallback(Mono<String> commandSource, Mono<Void> doWhenEmpty) {
    return commandSource
            .flatMap(command -> executeCommand(command).then()) (1)
            .switchIfEmpty(doWhenEmpty); (2)
1 then() forgets about the command result. It cares only that it was completed.
2 How to distinguish between two cases that are both empty sequences?

To verify that your processOrFallback method does indeed go through the doWhenEmpty path, you need to write a bit of boilerplate. Namely you need a Mono<Void> that:

  • Captures the fact that it has been subscribed to.

  • Lets you assert that fact after the whole process has terminated.

Before version 3.1, you would need to manually maintain one AtomicBoolean per state you wanted to assert and attach a corresponding doOn* callback to the publisher you wanted to evaluate. This could be a lot of boilerplate when having to apply this pattern regularly. Fortunately, 3.1.0 introduced an alternative with PublisherProbe. The following example shows how to use it:

public void testCommandEmptyPathIsUsed() {
    PublisherProbe<Void> probe = PublisherProbe.empty(); (1)

    StepVerifier.create(processOrFallback(Mono.empty(), probe.mono())) (2)

    probe.assertWasSubscribed(); (3)
    probe.assertWasRequested(); (4)
    probe.assertWasNotCancelled(); (5)
1 Create a probe that translates to an empty sequence.
2 Use the probe in place of Mono<Void> by calling probe.mono().
3 After completion of the sequence, the probe lets you assert that it was used. You can check that is was subscribed to…​
4 …​as well as actually requested data…​
5 …​and whether or not it was cancelled.

You can also use the probe in place of a Flux<T> by calling .flux() instead of .mono(). For cases where you need to probe an execution path but also need the probe to emit data, you can wrap any Publisher<T> by using PublisherProbe.of(Publisher).