- 1. FAQ, Best Practices, and "How do I…?"
- 1.1. How Do I Wrap a Synchronous, Blocking Call?
- 1.2. I Used an Operator on my
Flux
but it Doesn’t Seem to Apply. What Gives? - 1.3. My
Mono
zipWith
orzipWhen
is never called - 1.4. Using
zip
along with empty-completed publishers - 1.5. How to Use
retryWhen
to Emulateretry(3)
? - 1.6. How can I use
retryWhen
for Exponential Backoff? - 1.7. How Do I Ensure Thread Affinity when I Use
publishOn()
? - 1.8. What Is a Good Pattern for Contextual Logging? (MDC)
1. FAQ, Best Practices, and "How do I…?"
This section covers the following content:
1.1. How Do I Wrap a Synchronous, Blocking Call?
It is often the case that a source of information is synchronous and blocking. To deal with such sources in your Reactor applications, apply the following pattern:
Mono blockingWrapper = Mono.fromCallable(() -> { (1)
return /* make a remote synchronous call */ (2)
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.boundedElastic()); (3)
1 | Create a new Mono by using fromCallable . |
2 | Return the asynchronous, blocking resource. |
3 | Ensure each subscription happens on a dedicated worker
from Schedulers.boundedElastic() . |
You should use a Mono
, because the source returns one value. You should use
Schedulers.boundedElastic
, because it creates a dedicated thread to wait for the
blocking resource without impacting other non-blocking processing, while also ensuring
that there is a limit to the amount of threads that can be created, and blocking tasks
that can be enqueued and deferred during a spike.
Note that subscribeOn
does not subscribe to the Mono
. It specifies what
kind of Scheduler
to use when a subscribe call happens.
Also, note that subscribeOn
operator should immediately follow the source and any
further operators are defined after the subscribeOn
wrapper.
1.2. I Used an Operator on my Flux
but it Doesn’t Seem to Apply. What Gives?
Make sure that the variable you .subscribe()
to has been affected by the
operators you think should have been applied to it.
Reactor operators are decorators. They return a different instance that wraps the source sequence and add behavior. That is why the preferred way of using operators is to chain the calls.
Compare the following two examples:
Flux<String> flux = Flux.just("something", "chain");
flux.map(secret -> secret.replaceAll(".", "*")); (1)
flux.subscribe(next -> System.out.println("Received: " + next));
1 | The mistake is here. The result is not attached to the flux variable. |
Flux<String> flux = Flux.just("something", "chain");
flux = flux.map(secret -> secret.replaceAll(".", "*"));
flux.subscribe(next -> System.out.println("Received: " + next));
The following sample is even better (because it is simpler):
Flux.just("something", "chain")
.map(secret -> secret.replaceAll(".", "*"))
.subscribe(next -> System.out.println("Received: " + next));
The first version outputs the following:
Received: something
Received: chain
The two other versions output the expected values, as follows:
Received: *********
Received: *****
1.3. My Mono
zipWith
or zipWhen
is never called
Consider the following example:
myMethod.process("a") // this method returns Mono<Void>
.zipWith(myMethod.process("b"), combinator) //this is never called
.subscribe();
If the source Mono
is either empty
or a Mono<Void>
(a Mono<Void>
is
empty for all intents and purposes), some combinations are never called.
This is the typical case for any transformer such as the zip
static method or
the zipWith
zipWhen
operators, which (by definition) need an element from each
source to produce their output.
Using data-suppressing operators on sources of zip
is thus problematic.
Examples of data-suppressing operators include then()
, thenEmpty(Publisher<Void>)
,
ignoreElements()
and ignoreElement()
, and when(Publisher…)
.
Similarly, operators that use a Function<T,?>
to tune their behavior, such as flatMap
,
need at least one element to be emitted for the Function
to have a chance
to apply. Applying these on an empty (or <Void>
) sequence nevers produce an element.
You can use .defaultIfEmpty(T)
and .switchIfEmpty(Publisher<T>)
to
replace an empty sequence of T
with a default value or a fallback Publisher<T>
(respectively),
which could help avoid some of these situations. Note that this does not apply to
Flux<Void>
/Mono<Void>
sources, as you can only switch to another Publisher<Void>
,
which is still guaranteed to be empty. The following example uses defaultIfEmpty
:
defaultIfEmpty
before zipWhen
myMethod.emptySequenceForKey("a") // this method returns empty Mono<String>
.defaultIfEmpty("") // this converts empty sequence to just the empty String
.zipWhen(aString -> myMethod.process("b")) //this is called with the empty String
.subscribe();
1.4. Using zip
along with empty-completed publishers
When using the zip
operator along with empty-completed publishers (i.e., publishers completing without emitting an item), it is important to be aware of the following behavior.
Consider the following test case:
@Test
public void testZipEmptyCompletionAllSubscribed() {
AtomicInteger cnt = new AtomicInteger();
Mono<Integer> mono1 = Mono.create(sink -> {
cnt.incrementAndGet();
sink.success();
});
Mono<Integer> mono2 = Mono.create(sink -> {
cnt.incrementAndGet();
sink.success();
});
Mono<Integer> zippedMono = Mono.zip(mono1, mono2, (v1, v2) -> v1);
zippedMono.subscribe();
assertEquals(2, cnt.get());
}
While in this case the resulting zippedMono
subscribes to both mono1
and mono2
, such behaviour is not guaranteed for all cases. For instance, consider the following test case:
@Test
public void testZipEmptyCompletionOneSubscribed() {
AtomicInteger cnt = new AtomicInteger();
Mono<Integer> mono1 = Mono.create(sink -> {
cnt.incrementAndGet();
sink.success();
});
Mono<Integer> mono2 = Mono.create(sink -> {
cnt.incrementAndGet();
sink.success();
});
Mono<Integer> mono3 = Mono.create(sink -> {
cnt.incrementAndGet();
sink.success();
});
Mono<Integer> zippedMono = Mono.zip(mono1, Mono.zip(mono2, mono3, (v1, v2) -> v1), (v1, v2) -> v1);
zippedMono.subscribe();
assertEquals(1, cnt.get());
}
In this case upon empty completion of mono1
, zippedMono
completes immediately and does not subscribe to mono2
and mono3
.
Therefore, in cases where zip
operator is used to combine empty-completed publishers, it is not guaranteed that the resulting publisher will subscribe to all the empty-completed publishers.
If it is necessary to keep the semantics as shown in the second test case and to ensure subscription to all the publishers to be zipped, consider using singleOptional
operator, as demonstrated in the test case below:
@Test
public void testZipOptionalAllSubscribed() {
AtomicInteger cnt = new AtomicInteger();
Mono<Integer> mono1 = Mono.create(sink -> {
cnt.incrementAndGet();
sink.success();
});
Mono<Integer> mono2 = Mono.create(sink -> {
cnt.incrementAndGet();
sink.success();
});
Mono<Integer> mono3 = Mono.create(sink -> {
cnt.incrementAndGet();
sink.success();
});
Mono<Optional<Integer>> zippedMono =
Mono.zip(
mono1.singleOptional(),
Mono.zip(mono2.singleOptional(), mono3.singleOptional(), (v1, v2) -> v1),
(v1, v2) -> v1);
zippedMono.subscribe();
assertEquals(3, cnt.get());
}
1.5. How to Use retryWhen
to Emulate retry(3)
?
The retryWhen
operator can be quite complex. Hopefully the following snippet of code
can help you understand how it works by attempting to emulate a simpler
retry(3)
:
AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.doOnError(e -> errorCount.incrementAndGet())
.retryWhen(Retry.from(companion -> (1)
companion.map(rs -> { (2)
if (rs.totalRetries() < 3) return rs.totalRetries(); (3)
else throw Exceptions.propagate(rs.failure()); (4)
})
));
1 | We customize Retry by adapting from a Function lambda rather than providing a concrete class |
2 | The companion emits RetrySignal objects, which bear number of retries so far and last failure |
3 | To allow for three retries, we consider indexes < 3 and return a value to emit (here we simply return the index). |
4 | In order to terminate the sequence in error, we throw the original exception after these three retries. |
1.6. How can I use retryWhen
for Exponential Backoff?
Exponential backoff produces retry attempts with a growing delay between each of the attempts, so as not to overload the source systems and risk an all-out crash. The rationale is that, if the source produces an error, it is already in an unstable state and is not likely to immediately recover from it. So blindly retrying immediately is likely to produce yet another error and add to the instability.
Since 3.3.4.RELEASE
, Reactor comes with a builder for such a retry, to be used with Flux#retryWhen
: Retry.backoff
.
The following example showcases a simple use of the builder, with hooks logging message right before and after the retry attempt delays. It delays retries and increases the delay between each attempt (pseudocode: delay = 100ms * 2^attempt_number_starting_at_zero):
AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalStateException("boom"))
.doOnError(e -> { (1)
errorCount.incrementAndGet();
System.out.println(e + " at " + LocalTime.now());
})
.retryWhen(Retry
.backoff(3, Duration.ofMillis(100)).jitter(0d) (2)
.doAfterRetry(rs -> System.out.println("retried at " + LocalTime.now() + ", attempt " + rs.totalRetries())) (3)
.onRetryExhaustedThrow((spec, rs) -> rs.failure()) (4)
);
1 | We will log the time of errors emitted by the source and count them. |
2 | We configure an exponential backoff retry with at most 3 attempts and no jitter. |
3 | We also log the time at which the retry happens, and the retry attempt number (starting from 0). |
4 | By default, an Exceptions.retryExhausted exception would be thrown, with the last failure() as a cause.
Here we customize that to directly emit the cause as onError . |
When subscribed to, this fails and terminates after printing out the following:
java.lang.IllegalStateException: boom at 00:00:00.0
retried at 00:00:00.101, attempt 0 (1)
java.lang.IllegalStateException: boom at 00:00:00.101
retried at 00:00:00.304, attempt 1 (2)
java.lang.IllegalStateException: boom at 00:00:00.304
retried at 00:00:00.702, attempt 2 (3)
java.lang.IllegalStateException: boom at 00:00:00.702
1 | First retry after about 100ms |
2 | Second retry after about 200ms |
3 | Third retry after about 400ms |
1.7. How Do I Ensure Thread Affinity when I Use publishOn()
?
As described in Schedulers, publishOn()
can be used to switch
execution contexts. The publishOn
operator influences the threading context
where the rest of the operators in the chain below it run, up to a new
occurrence of publishOn
. So the placement of publishOn
is significant.
Consider the following example:
Sinks.Many<Integer> dataSinks = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> source = dataSinks.asFlux();
source.publishOn(scheduler1)
.map(i -> transform(i))
.publishOn(scheduler2)
.doOnNext(i -> processNext(i))
.subscribe();
The transform
function in map()
is
run on a worker of scheduler1
, and the processNext
method in
doOnNext()
is run on a worker of scheduler2
. Each subscription gets
its own worker, so all elements pushed to the corresponding subscriber are published
on the same Thread
.
You can use single-threaded schedulers to ensure thread affinity for different stages in the chain or for different subscribers.
1.8. What Is a Good Pattern for Contextual Logging? (MDC)
Most logging frameworks allow contextual logging, letting users store variables that are reflected in the logging pattern, generally by way of a Map
called the MDC ("Mapped Diagnostic Context").
This is one of the most recurring use of ThreadLocal
in Java, and as a consequence this pattern assumes that the code being logged is tied in a one-to-one relationship with a Thread
.
That might have been a safe assumption to make before Java 8, but with the advent of functional programming elements in the Java language things have changed a bit…
Let’s take the example of an API that was imperative and used the template method pattern, then switched to a more functional style.
With the template method pattern, inheritance was at play. Now in its more functional approach, higher order functions are passed to define the "steps" of the algorithm.
Things are now more declarative than imperative, and that frees the library to make decisions about where each step should run.
For instance, knowing which steps of the underlying algorithm can be parallelized, the library can use an ExecutorService
to execute some of the steps in parallel.
One concrete example of such a functional API is the Stream
API introduced in Java 8 and its parallel()
flavor.
Logging with a MDC in a parallel Stream
is not a free lunch: one need to ensure the MDC is captured and reapplied in each step.
The functional style enables such optimizations, because each step is thread-agnostic and referentially transparent, but it can break the MDC assumption of a single Thread
.
The most idiomatic way of ensuring any kind of contextual information is accessible to all stages would be to pass that context around through the composition chain.
During the development of Reactor we encountered the same general class of problem, and we wanted to avoid this very hands-down and explicit approach.
This is why the Context
was introduced: it propagates through the execution chain as long as Flux
and Mono
are used as the return value, by letting stages (operators) peek at the Context
of their downstream stage.
So instead of using ThreadLocal
, Reactor offers this map-like object that is tied to a Subscription
and not a Thread
.
Now that we’ve established that MDC "just working" is not the best assumption to make in a declarative API, how can we perform contextualized log statements in relation to events in a Reactive Stream (onNext
, onError
, and onComplete
)?
This entry of the FAQ offers a possible intermediate solution when one wants to log in relation to these signals in a straightforward and explicit manner. Make sure to read the Adding a Context to a Reactive Sequence section beforehand, and especially how a write must happen towards the bottom of the operator chain for operators above it to see it.
To get contextual information from the Context
to the MDC, the simplest way is to wrap logging statements in a doOnEach
operator with a little bit of boilerplate code.
This boilerplate depends on both the logging framework/abstraction of your choice and the information you want to put in the MDC, so it has to be in your codebase.
The following is an example of such a helper function around a single MDC variable and focused on logging onNext
events, using Java 9 enhanced Optional
API:
public static <T> Consumer<Signal<T>> logOnNext(Consumer<T> logStatement) {
return signal -> {
if (!signal.isOnNext()) return; (1)
Optional<String> toPutInMdc = signal.getContextView().getOrEmpty("CONTEXT_KEY"); (2)
toPutInMdc.ifPresentOrElse(tpim -> {
try (MDC.MDCCloseable cMdc = MDC.putCloseable("MDC_KEY", tpim)) { (3)
logStatement.accept(signal.get()); (4)
}
},
() -> logStatement.accept(signal.get())); (5)
};
}
1 | doOnEach signals include onComplete and onError . In this example we’re only interested in logging onNext |
2 | We will extract one interesting value from the Reactor Context (see the The Context API section) |
3 | We use the MDCCloseable from SLF4J 2 in this example, allowing try-with-resource syntax for automatic cleanup of the MDC after the log statement is executed |
4 | Proper log statement is provided by the caller as a Consumer<T> (consumer of the onNext value) |
5 | In case the expected key wasn’t set in the Context we use the alternative path where nothing is put in the MDC |
Using this boilerplate code ensures that we are good citizens with the MDC: we set a key right before we execute a logging statement and remove it immediately after. There is no risk of polluting the MDC for subsequent logging statements.
Of course, this is a suggestion. You might be interested in extracting multiple values from the Context
or in logging things in case of onError
.
You might want to create additional helper methods for these cases or craft a single method that makes use of additional lambdas to cover more ground.
In any case, the usage of the preceding helper method could look like the following reactive web controller:
@GetMapping("/byPrice")
public Flux<Restaurant> byPrice(@RequestParam Double maxPrice, @RequestHeader(required = false, name = "X-UserId") String userId) {
String apiId = userId == null ? "" : userId; (1)
return restaurantService.byPrice(maxPrice))
.doOnEach(logOnNext(r -> LOG.debug("found restaurant {} for ${}", (2)
r.getName(), r.getPricePerPerson())))
.contextWrite(Context.of("CONTEXT_KEY", apiId)); (3)
}
1 | We need to get the contextual information from the request header to put it in the Context |
2 | Here we apply our helper method to the Flux , using doOnEach . Remember: operators see Context values defined below them. |
3 | We write the value from the header to the Context using the chosen key CONTEXT_KEY . |
In this configuration, the restaurantService
can emit its data on a shared thread, yet the logs will still reference the correct X-UserId
for each request.
For completeness, we can also see what an error-logging helper could look like:
public static Consumer<Signal<?>> logOnError(Consumer<Throwable> errorLogStatement) {
return signal -> {
if (!signal.isOnError()) return;
Optional<String> toPutInMdc = signal.getContextView().getOrEmpty("CONTEXT_KEY");
toPutInMdc.ifPresentOrElse(tpim -> {
try (MDC.MDCCloseable cMdc = MDC.putCloseable("MDC_KEY", tpim)) {
errorLogStatement.accept(signal.getThrowable());
}
},
() -> errorLogStatement.accept(signal.getThrowable()));
};
}
Nothing much has changed, except for the fact that we check that the Signal
is effectively an onError
, and that we provide said error (a Throwable
) to the log statement lambda.
Applying this helper in the controller is very similar to what we’ve done before:
@GetMapping("/byPrice")
public Flux<Restaurant> byPrice(@RequestParam Double maxPrice, @RequestHeader(required = false, name = "X-UserId") String userId) {
String apiId = userId == null ? "" : userId;
return restaurantService.byPrice(maxPrice))
.doOnEach(logOnNext(v -> LOG.info("found restaurant {}", v))
.doOnEach(logOnError(e -> LOG.error("error when searching restaurants", e)) (1)
.contextWrite(Context.of("CONTEXT_KEY", apiId));
}
1 | In case the restaurantService emits an error, it will be logged with MDC context here |