Context-Propagation Support
Since 3.5.0, Reactor-Core embeds support for the io.micrometer:context-propagation SPI.
This library is intended as a means to easily adapt between various implementations of the concept of a Context, of which
ContextView/Context is an example, and between ThreadLocal variables as well.
ReactorContextAccessor allows the Context-Propagation library to understand Reactor
Context and ContextView.
It implements the SPI and is loaded via java.util.ServiceLoader.
No user action is required, other than having a dependency on both reactor-core and io.micrometer:context-propagation. The ReactorContextAccessor class is public but shouldn’t generally be accessed by user code.
Reactor-Core supports two modes of operation with io.micrometer:context-propagation:
-
the default (limited) mode,
-
and the automatic mode, enabled via
Hooks.enableAutomaticContextPropagation(). Please note that this mode applies only to new subscriptions, so it is recommended to enable this hook when the application starts.
Their key differences are discussed in the context of either writing data
to Reactor Context, or accessing ThreadLocal state that
reflects the contents of the Context of currently attached Subscriber for reading.
1. Writing to Context
Depending on the individual application, you might either have to store already populated
ThreadLocal state as entries in the Context, or might only need to directly populate
the Context.
1.1. contextWrite Operator
When the values meant to be accessed as ThreadLocal are not (or do not need to be)
present at the time of subscription, they can immediately be stored in the Context:
// assuming TL is known to Context-Propagation as key TLKEY.
static final ThreadLocal<String> TL = new ThreadLocal<>();
// in the main Thread, TL is not set
Mono.deferContextual(ctx ->
Mono.delay(Duration.ofSeconds(1))
// we're now in another thread, TL is not explicitly set
.map(v -> "delayed ctx[" + TLKEY + "]=" + ctx.getOrDefault(TLKEY, "not found") + ", TL=" + TL.get()))
.contextWrite(ctx -> ctx.put(TLKEY, "HELLO"))
.block(); // returns "delayed ctx[TLKEY]=HELLO, TL=null" in default mode
// returns "delayed ctx[TLKEY]=HELLO, TL=HELLO" in automatic mode
1.2. contextCapture Operator
This operator can be used when one needs to capture ThreadLocal value(s) at subscription time and reflect these values in the Reactor Context for the benefit of upstream operators.
In contrast to the manual contextWrite operator, contextCapture uses the
context-propagation API to obtain a ContextSnapshot and then uses that snapshot
to populate the Reactor Context.
As a result, if there were any ThreadLocal values during subscription phase, for which there is a registered ThreadLocalAccessor, their values would now be stored in the Reactor Context and visible
at runtime in upstream operators.
// assuming TL is known to Context-Propagation as key TLKEY.
static final ThreadLocal<String> TL = new ThreadLocal<>();
// in the main Thread, TL is set to "HELLO"
TL.set("HELLO");
Mono.deferContextual(ctx ->
Mono.delay(Duration.ofSeconds(1))
// we're now in another thread, TL is not explicitly set
.map(v -> "delayed ctx[" + TLKEY + "]=" + ctx.getOrDefault(TLKEY, "not found") + ", TL=" + TL.get()))
.contextCapture() // can be skipped in automatic mode when a blocking operator follows
.block(); // returns "delayed ctx[TLKEY]=HELLO, TL=null" in default mode
// returns "delayed ctx[TLKEY]=HELLO, TL=HELLO" in automatic mode
In the automatic mode, blocking operators, such as Flux#blockFirst(),
Flux#blockLast(), Flux#toIterable(), Mono#block(), Mono#blockOptional(), and
relevant overloads, all perform contextCapture() transparently, so in most cases it is
not necessary to add it.
|
2. Accessing ThreadLocal state
Starting from Reactor-Core 3.5.0, ThreadLocal state is restored in a limited set
of operators. We call this behaviour the default (limited) mode. In 3.5.3, a new
mode was added, the automatic mode, which provides access to ThreadLocal values
throughout the reactive chain.
Reactor-Core performs ThreadLocal state restoration using the values
stored in Context and ThreadLocalAccessor instances registered in ContextRegistry
that match by key.
2.1. Default mode operators for snapshot restoration: handle and tap
In the default mode, both Flux and Mono variants of handle and tap will have
their behavior slightly modified if the Context-Propagation library is available at runtime.
Namely, if their downstream ContextView is not empty they will assume a context
capture has occurred (either manually or via the contextCapture() operator) and will attempt to restore ThreadLocals from that snapshot transparently. Any ThreadLocals for keys that are missing in the ContextView are left untouched.
These operators will ensure restoration is performed around the user-provided code, respectively:
-
handlewill wrap theBiConsumerin one which restores `ThreadLocal`s -
tapvariants will wrap theSignalListenerinto one that has the same kind of wrapping around each method (this includes theaddToContextmethod)
The intent is to have a minimalistic set of operators transparently perform restoration. As a result we chose operators with rather general and broad applications (one with transformative capabilities, one with side-effect capabilities)
//assuming TL is known to Context-Propagation.
static final ThreadLocal<String> TL = new ThreadLocal<>();
//in the main thread, TL is set to "HELLO"
TL.set("HELLO");
Mono.delay(Duration.ofSeconds(1))
//we're now in another thread, TL is not set yet
.doOnNext(v -> System.out.println(TL.get()))
//inside the handler however, TL _is_ restored
.handle((v, sink) -> sink.next("handled delayed TL=" + TL.get()))
.contextCapture()
.block(); // prints "null" and returns "handled delayed TL=HELLO"
2.2. Automatic mode
In the automatic mode, all operators restore ThreadLocal state across Thread
boundaries. In contrast, in the default mode only selected operators do so.
Hooks.enableAutomaticContextPropagation() can be called upon application start to
enable the automatic mode. Please note that this mode applies only to new subscriptions,
so it is recommended to enable this hook when the application starts.
It is not an easy task to achieve, as the Reactive Streams specification makes reactive
chains Thread-agnostic. However, Reactor-Core does its best to control sources of
Thread switches and perform snapshot restoration based on the Reactor Context,
which is treated as the source of truth for ThreadLocal state.
While the default mode limits the ThreadLocal state only to the user code
executed as arguments to the chosen operators, the automatic mode allows
ThreadLocal state to cross operator boundaries. This requires proper cleanup to avoid
leaking the state to unrelated code which reuses the same Thread. This requires to
treat absent keys in the Context for registered instances of ThreadLocalAccessor as
signals to clear the corresponding ThreadLocal state. This is especially important for
an empty Context, which clears all state for registered ThreadLocalAccessor instances.
|
3. Which mode should I choose?
Both default and automatic modes have an impact on performance. Accessing
ThreadLocal variables can impact a reactive pipeline significantly. If the highest
scalability and performance is the goal, more verbose approaches for logging and
explicit argument passing can be considered instead of relying on ThreadLocal state. If
access to established libraries in the space of Observability, such as Micrometer and
SLF4J, which use ThreadLocal state for convenience to provide meaningful production
grade features is an understood compromise, the choice of the mode is yet another
compromise to make. The automatic mode, depending on the flow of your application and
the amount of operators used, can be either better or worse than the default mode. The
only recommendation that can be given is to measure how your application behaves and what
scalability and performance characteristics you obtain when presented with a load you
expect.