1. FAQ, Best Practices, and "How do I…​?"

This section covers the following content:

1.1. How Do I Wrap a Synchronous, Blocking Call?

It is often the case that a source of information is synchronous and blocking. To deal with such sources in your Reactor applications, apply the following pattern:

Mono blockingWrapper = Mono.fromCallable(() -> { (1)
    return /* make a remote synchronous call */ (2)
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.boundedElastic()); (3)
1 Create a new Mono by using fromCallable.
2 Return the asynchronous, blocking resource.
3 Ensure each subscription happens on a dedicated worker from Schedulers.boundedElastic().

You should use a Mono, because the source returns one value. You should use Schedulers.boundedElastic, because it creates a dedicated thread to wait for the blocking resource without impacting other non-blocking processing, while also ensuring that there is a limit to the amount of threads that can be created, and blocking tasks that can be enqueued and deferred during a spike.

Note that subscribeOn does not subscribe to the Mono. It specifies what kind of Scheduler to use when a subscribe call happens.

Also, note that subscribeOn operator should immediately follow the source and any further operators are defined after the subscribeOn wrapper.

1.2. I Used an Operator on my Flux but it Doesn’t Seem to Apply. What Gives?

Make sure that the variable you .subscribe() to has been affected by the operators you think should have been applied to it.

Reactor operators are decorators. They return a different instance that wraps the source sequence and add behavior. That is why the preferred way of using operators is to chain the calls.

Compare the following two examples:

without chaining (incorrect)
Flux<String> flux = Flux.just("something", "chain");
flux.map(secret -> secret.replaceAll(".", "*")); (1)
flux.subscribe(next -> System.out.println("Received: " + next));
1 The mistake is here. The result is not attached to the flux variable.
without chaining (correct)
Flux<String> flux = Flux.just("something", "chain");
flux = flux.map(secret -> secret.replaceAll(".", "*"));
flux.subscribe(next -> System.out.println("Received: " + next));

The following sample is even better (because it is simpler):

with chaining (best)
Flux.just("something", "chain")
    .map(secret -> secret.replaceAll(".", "*"))
    .subscribe(next -> System.out.println("Received: " + next));

The first version outputs the following:

Received: something
Received: chain

The two other versions output the expected values, as follows:

Received: *********
Received: *****

1.3. My Mono zipWith or zipWhen is never called

Consider the following example:

myMethod.process("a") // this method returns Mono<Void>
        .zipWith(myMethod.process("b"), combinator) //this is never called
        .subscribe();

If the source Mono is either empty or a Mono<Void> (a Mono<Void> is empty for all intents and purposes), some combinations are never called.

This is the typical case for any transformer such as the zip static method or the zipWith zipWhen operators, which (by definition) need an element from each source to produce their output.

Using data-suppressing operators on sources of zip is thus problematic. Examples of data-suppressing operators include then(), thenEmpty(Publisher<Void>), ignoreElements() and ignoreElement(), and when(Publisher…​).

Similarly, operators that use a Function<T,?> to tune their behavior, such as flatMap, need at least one element to be emitted for the Function to have a chance to apply. Applying these on an empty (or <Void>) sequence nevers produce an element.

You can use .defaultIfEmpty(T) and .switchIfEmpty(Publisher<T>) to replace an empty sequence of T with a default value or a fallback Publisher<T> (respectively), which could help avoid some of these situations. Note that this does not apply to Flux<Void>/Mono<Void> sources, as you can only switch to another Publisher<Void>, which is still guaranteed to be empty. The following example uses defaultIfEmpty:

use defaultIfEmpty before zipWhen
myMethod.emptySequenceForKey("a") // this method returns empty Mono<String>
        .defaultIfEmpty("") // this converts empty sequence to just the empty String
        .zipWhen(aString -> myMethod.process("b")) //this is called with the empty String
        .subscribe();

1.4. Using zip along with empty-completed publishers

When using the zip operator along with empty-completed publishers (i.e., publishers completing without emitting an item), it is important to be aware of the following behavior.

Consider the following test case:

    @Test
    public void testZipEmptyCompletionAllSubscribed() {
        AtomicInteger cnt = new AtomicInteger();
        Mono<Integer> mono1 = Mono.create(sink -> {
            cnt.incrementAndGet();
            sink.success();
        });
        Mono<Integer> mono2 = Mono.create(sink -> {
            cnt.incrementAndGet();
            sink.success();
        });
        Mono<Integer> zippedMono = Mono.zip(mono1, mono2, (v1, v2) -> v1);
        zippedMono.subscribe();
        assertEquals(2, cnt.get());
    }

While in this case the resulting zippedMono subscribes to both mono1 and mono2, such behaviour is not guaranteed for all cases. For instance, consider the following test case:

    @Test
    public void testZipEmptyCompletionOneSubscribed() {
        AtomicInteger cnt = new AtomicInteger();
        Mono<Integer> mono1 = Mono.create(sink -> {
            cnt.incrementAndGet();
            sink.success();
        });
        Mono<Integer> mono2 = Mono.create(sink -> {
            cnt.incrementAndGet();
            sink.success();
        });
        Mono<Integer> mono3 = Mono.create(sink -> {
            cnt.incrementAndGet();
            sink.success();
        });
        Mono<Integer> zippedMono = Mono.zip(mono1, Mono.zip(mono2, mono3, (v1, v2) -> v1), (v1, v2) -> v1);
        zippedMono.subscribe();
        assertEquals(1, cnt.get());
    }

In this case upon empty completion of mono1, zippedMono completes immediately and does not subscribe to mono2 and mono3.

Therefore, in cases where zip operator is used to combine empty-completed publishers, it is not guaranteed that the resulting publisher will subscribe to all the empty-completed publishers.

If it is necessary to keep the semantics as shown in the second test case and to ensure subscription to all the publishers to be zipped, consider using singleOptional operator, as demonstrated in the test case below:

@Test
public void testZipOptionalAllSubscribed() {
	AtomicInteger cnt = new AtomicInteger();
	Mono<Integer> mono1 = Mono.create(sink -> {
		cnt.incrementAndGet();
		sink.success();
	});
	Mono<Integer> mono2 = Mono.create(sink -> {
		cnt.incrementAndGet();
		sink.success();
	});
	Mono<Integer> mono3 = Mono.create(sink -> {
		cnt.incrementAndGet();
		sink.success();
	});
	Mono<Optional<Integer>> zippedMono =
			Mono.zip(
					mono1.singleOptional(),
					Mono.zip(mono2.singleOptional(), mono3.singleOptional(), (v1, v2) -> v1),
					(v1, v2) -> v1);
	zippedMono.subscribe();
	assertEquals(3, cnt.get());
}

1.5. How to Use retryWhen to Emulate retry(3)?

The retryWhen operator can be quite complex. Hopefully the following snippet of code can help you understand how it works by attempting to emulate a simpler retry(3):

AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
		Flux.<String>error(new IllegalArgumentException())
				.doOnError(e -> errorCount.incrementAndGet())
				.retryWhen(Retry.from(companion -> (1)
						companion.map(rs -> { (2)
							if (rs.totalRetries() < 3) return rs.totalRetries(); (3)
							else throw Exceptions.propagate(rs.failure()); (4)
						})
				));
1 We customize Retry by adapting from a Function lambda rather than providing a concrete class
2 The companion emits RetrySignal objects, which bear number of retries so far and last failure
3 To allow for three retries, we consider indexes < 3 and return a value to emit (here we simply return the index).
4 In order to terminate the sequence in error, we throw the original exception after these three retries.

1.6. How can I use retryWhen for Exponential Backoff?

Exponential backoff produces retry attempts with a growing delay between each of the attempts, so as not to overload the source systems and risk an all-out crash. The rationale is that, if the source produces an error, it is already in an unstable state and is not likely to immediately recover from it. So blindly retrying immediately is likely to produce yet another error and add to the instability.

Since 3.3.4.RELEASE, Reactor comes with a builder for such a retry, to be used with Flux#retryWhen: Retry.backoff.

The following example showcases a simple use of the builder, with hooks logging message right before and after the retry attempt delays. It delays retries and increases the delay between each attempt (pseudocode: delay = 100ms * 2^attempt_number_starting_at_zero):

AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalStateException("boom"))
		.doOnError(e -> { (1)
			errorCount.incrementAndGet();
			System.out.println(e + " at " + LocalTime.now());
		})
		.retryWhen(Retry
				.backoff(3, Duration.ofMillis(100)).jitter(0d) (2)
				.doAfterRetry(rs -> System.out.println("retried at " + LocalTime.now() + ", attempt " + rs.totalRetries())) (3)
				.onRetryExhaustedThrow((spec, rs) -> rs.failure()) (4)
		);
1 We will log the time of errors emitted by the source and count them.
2 We configure an exponential backoff retry with at most 3 attempts and no jitter.
3 We also log the time at which the retry happens, and the retry attempt number (starting from 0).
4 By default, an Exceptions.retryExhausted exception would be thrown, with the last failure() as a cause. Here we customize that to directly emit the cause as onError.

When subscribed to, this fails and terminates after printing out the following:

java.lang.IllegalStateException: boom at 00:00:00.0
retried at 00:00:00.101, attempt 0 (1)
java.lang.IllegalStateException: boom at 00:00:00.101
retried at 00:00:00.304, attempt 1 (2)
java.lang.IllegalStateException: boom at 00:00:00.304
retried at 00:00:00.702, attempt 2 (3)
java.lang.IllegalStateException: boom at 00:00:00.702
1 First retry after about 100ms
2 Second retry after about 200ms
3 Third retry after about 400ms

1.7. How Do I Ensure Thread Affinity when I Use publishOn()?

As described in Schedulers, publishOn() can be used to switch execution contexts. The publishOn operator influences the threading context where the rest of the operators in the chain below it run, up to a new occurrence of publishOn. So the placement of publishOn is significant.

Consider the following example:

Sinks.Many<Integer> dataSinks = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> source = dataSinks.asFlux();
source.publishOn(scheduler1)
	  .map(i -> transform(i))
	  .publishOn(scheduler2)
	  .doOnNext(i -> processNext(i))
	  .subscribe();

The transform function in map() is run on a worker of scheduler1, and the processNext method in doOnNext() is run on a worker of scheduler2. Each subscription gets its own worker, so all elements pushed to the corresponding subscriber are published on the same Thread.

You can use single-threaded schedulers to ensure thread affinity for different stages in the chain or for different subscribers.

1.8. What Is a Good Pattern for Contextual Logging? (MDC)

Most logging frameworks allow contextual logging, letting users store variables that are reflected in the logging pattern, generally by way of a Map called the MDC ("Mapped Diagnostic Context"). This is one of the most recurring use of ThreadLocal in Java, and as a consequence this pattern assumes that the code being logged is tied in a one-to-one relationship with a Thread.

That might have been a safe assumption to make before Java 8, but with the advent of functional programming elements in the Java language things have changed a bit…​

Let’s take the example of an API that was imperative and used the template method pattern, then switched to a more functional style. With the template method pattern, inheritance was at play. Now in its more functional approach, higher order functions are passed to define the "steps" of the algorithm. Things are now more declarative than imperative, and that frees the library to make decisions about where each step should run. For instance, knowing which steps of the underlying algorithm can be parallelized, the library can use an ExecutorService to execute some of the steps in parallel.

One concrete example of such a functional API is the Stream API introduced in Java 8 and its parallel() flavor. Logging with a MDC in a parallel Stream is not a free lunch: one need to ensure the MDC is captured and reapplied in each step.

The functional style enables such optimizations, because each step is thread-agnostic and referentially transparent, but it can break the MDC assumption of a single Thread. The most idiomatic way of ensuring any kind of contextual information is accessible to all stages would be to pass that context around through the composition chain. During the development of Reactor we encountered the same general class of problem, and we wanted to avoid this very hands-down and explicit approach. This is why the Context was introduced: it propagates through the execution chain as long as Flux and Mono are used as the return value, by letting stages (operators) peek at the Context of their downstream stage. So instead of using ThreadLocal, Reactor offers this map-like object that is tied to a Subscription and not a Thread.

Now that we’ve established that MDC "just working" is not the best assumption to make in a declarative API, how can we perform contextualized log statements in relation to events in a Reactive Stream (onNext, onError, and onComplete)?

This entry of the FAQ offers a possible intermediate solution when one wants to log in relation to these signals in a straightforward and explicit manner. Make sure to read the Adding a Context to a Reactive Sequence section beforehand, and especially how a write must happen towards the bottom of the operator chain for operators above it to see it.

To get contextual information from the Context to the MDC, the simplest way is to wrap logging statements in a doOnEach operator with a little bit of boilerplate code. This boilerplate depends on both the logging framework/abstraction of your choice and the information you want to put in the MDC, so it has to be in your codebase.

The following is an example of such a helper function around a single MDC variable and focused on logging onNext events, using Java 9 enhanced Optional API:

public static <T> Consumer<Signal<T>> logOnNext(Consumer<T> logStatement) {
	return signal -> {
		if (!signal.isOnNext()) return; (1)
		Optional<String> toPutInMdc = signal.getContextView().getOrEmpty("CONTEXT_KEY"); (2)

		toPutInMdc.ifPresentOrElse(tpim -> {
			try (MDC.MDCCloseable cMdc = MDC.putCloseable("MDC_KEY", tpim)) { (3)
				logStatement.accept(signal.get()); (4)
			}
		},
		() -> logStatement.accept(signal.get())); (5)
	};
}
1 doOnEach signals include onComplete and onError. In this example we’re only interested in logging onNext
2 We will extract one interesting value from the Reactor Context (see the The Context API section)
3 We use the MDCCloseable from SLF4J 2 in this example, allowing try-with-resource syntax for automatic cleanup of the MDC after the log statement is executed
4 Proper log statement is provided by the caller as a Consumer<T> (consumer of the onNext value)
5 In case the expected key wasn’t set in the Context we use the alternative path where nothing is put in the MDC

Using this boilerplate code ensures that we are good citizens with the MDC: we set a key right before we execute a logging statement and remove it immediately after. There is no risk of polluting the MDC for subsequent logging statements.

Of course, this is a suggestion. You might be interested in extracting multiple values from the Context or in logging things in case of onError. You might want to create additional helper methods for these cases or craft a single method that makes use of additional lambdas to cover more ground.

In any case, the usage of the preceding helper method could look like the following reactive web controller:

@GetMapping("/byPrice")
public Flux<Restaurant> byPrice(@RequestParam Double maxPrice, @RequestHeader(required = false, name = "X-UserId") String userId) {
	String apiId = userId == null ? "" : userId; (1)

	return restaurantService.byPrice(maxPrice))
			   .doOnEach(logOnNext(r -> LOG.debug("found restaurant {} for ${}", (2)
					r.getName(), r.getPricePerPerson())))
			   .contextWrite(Context.of("CONTEXT_KEY", apiId)); (3)
}
1 We need to get the contextual information from the request header to put it in the Context
2 Here we apply our helper method to the Flux, using doOnEach. Remember: operators see Context values defined below them.
3 We write the value from the header to the Context using the chosen key CONTEXT_KEY.

In this configuration, the restaurantService can emit its data on a shared thread, yet the logs will still reference the correct X-UserId for each request.

For completeness, we can also see what an error-logging helper could look like:

public static Consumer<Signal<?>> logOnError(Consumer<Throwable> errorLogStatement) {
	return signal -> {
		if (!signal.isOnError()) return;
		Optional<String> toPutInMdc = signal.getContextView().getOrEmpty("CONTEXT_KEY");

		toPutInMdc.ifPresentOrElse(tpim -> {
			try (MDC.MDCCloseable cMdc = MDC.putCloseable("MDC_KEY", tpim)) {
				errorLogStatement.accept(signal.getThrowable());
			}
		},
		() -> errorLogStatement.accept(signal.getThrowable()));
	};
}

Nothing much has changed, except for the fact that we check that the Signal is effectively an onError, and that we provide said error (a Throwable) to the log statement lambda.

Applying this helper in the controller is very similar to what we’ve done before:

@GetMapping("/byPrice")
public Flux<Restaurant> byPrice(@RequestParam Double maxPrice, @RequestHeader(required = false, name = "X-UserId") String userId) {
	String apiId = userId == null ? "" : userId;

	return restaurantService.byPrice(maxPrice))
			   .doOnEach(logOnNext(v -> LOG.info("found restaurant {}", v))
			   .doOnEach(logOnError(e -> LOG.error("error when searching restaurants", e)) (1)
			   .contextWrite(Context.of("CONTEXT_KEY", apiId));
}
1 In case the restaurantService emits an error, it will be logged with MDC context here