Context-Propagation Support
Since 3.5.0, Reactor-Core embeds support for the io.micrometer:context-propagation
SPI.
This library is intended as a means to easily adapt between various implementations of the concept of a Context, of which
ContextView
/Context
is an example, and between ThreadLocal
variables as well.
ReactorContextAccessor
allows the Context-Propagation library to understand Reactor
Context
and ContextView
.
It implements the SPI and is loaded via java.util.ServiceLoader
.
No user action is required, other than having a dependency on both reactor-core and io.micrometer:context-propagation
. The ReactorContextAccessor
class is public but shouldn’t generally be accessed by user code.
Reactor-Core supports two modes of operation with io.micrometer:context-propagation
:
-
the default (limited) mode,
-
and the automatic mode, enabled via
Hooks.enableAutomaticContextPropagation()
. Please note that this mode applies only to new subscriptions, so it is recommended to enable this hook when the application starts.
Their key differences are discussed in the context of either writing data
to Reactor Context
, or accessing ThreadLocal
state that
reflects the contents of the Context
of currently attached Subscriber
for reading.
1. Writing to Context
Depending on the individual application, you might either have to store already populated
ThreadLocal
state as entries in the Context
, or might only need to directly populate
the Context
.
1.1. contextWrite
Operator
When the values meant to be accessed as ThreadLocal
are not (or do not need to be)
present at the time of subscription, they can immediately be stored in the Context
:
// assuming TL is known to Context-Propagation as key TLKEY.
static final ThreadLocal<String> TL = new ThreadLocal<>();
// in the main Thread, TL is not set
Mono.deferContextual(ctx ->
Mono.delay(Duration.ofSeconds(1))
// we're now in another thread, TL is not explicitly set
.map(v -> "delayed ctx[" + TLKEY + "]=" + ctx.getOrDefault(TLKEY, "not found") + ", TL=" + TL.get()))
.contextWrite(ctx -> ctx.put(TLKEY, "HELLO"))
.block(); // returns "delayed ctx[TLKEY]=HELLO, TL=null" in default mode
// returns "delayed ctx[TLKEY]=HELLO, TL=HELLO" in automatic mode
1.2. contextCapture
Operator
This operator can be used when one needs to capture ThreadLocal
value(s) at subscription time and reflect these values in the Reactor Context
for the benefit of upstream operators.
In contrast to the manual contextWrite
operator, contextCapture
uses the
context-propagation
API to obtain a ContextSnapshot
and then uses that snapshot
to populate the Reactor Context
.
As a result, if there were any ThreadLocal
values during subscription phase, for which there is a registered ThreadLocalAccessor
, their values would now be stored in the Reactor Context
and visible
at runtime in upstream operators.
// assuming TL is known to Context-Propagation as key TLKEY.
static final ThreadLocal<String> TL = new ThreadLocal<>();
// in the main Thread, TL is set to "HELLO"
TL.set("HELLO");
Mono.deferContextual(ctx ->
Mono.delay(Duration.ofSeconds(1))
// we're now in another thread, TL is not explicitly set
.map(v -> "delayed ctx[" + TLKEY + "]=" + ctx.getOrDefault(TLKEY, "not found") + ", TL=" + TL.get()))
.contextCapture() // can be skipped in automatic mode when a blocking operator follows
.block(); // returns "delayed ctx[TLKEY]=HELLO, TL=null" in default mode
// returns "delayed ctx[TLKEY]=HELLO, TL=HELLO" in automatic mode
In the automatic mode, blocking operators, such as Flux#blockFirst() ,
Flux#blockLast() , Flux#toIterable() , Mono#block() , Mono#blockOptional() , and
relevant overloads, all perform contextCapture() transparently, so in most cases it is
not necessary to add it.
|
2. Accessing ThreadLocal
state
Starting from Reactor-Core 3.5.0, ThreadLocal
state is restored in a limited set
of operators. We call this behaviour the default (limited) mode. In 3.5.3, a new
mode was added, the automatic mode, which provides access to ThreadLocal
values
throughout the reactive chain.
Reactor-Core performs ThreadLocal
state restoration using the values
stored in Context
and ThreadLocalAccessor
instances registered in ContextRegistry
that match by key.
2.1. Default mode operators for snapshot restoration: handle
and tap
In the default mode, both Flux
and Mono
variants of handle
and tap
will have
their behavior slightly modified if the Context-Propagation library is available at runtime.
Namely, if their downstream ContextView
is not empty they will assume a context
capture has occurred (either manually or via the contextCapture()
operator) and will attempt to restore ThreadLocals from that snapshot transparently. Any ThreadLocals for keys that are missing in the ContextView
are left untouched.
These operators will ensure restoration is performed around the user-provided code, respectively:
-
handle
will wrap theBiConsumer
in one which restores `ThreadLocal`s -
tap
variants will wrap theSignalListener
into one that has the same kind of wrapping around each method (this includes theaddToContext
method)
The intent is to have a minimalistic set of operators transparently perform restoration. As a result we chose operators with rather general and broad applications (one with transformative capabilities, one with side-effect capabilities)
//assuming TL is known to Context-Propagation.
static final ThreadLocal<String> TL = new ThreadLocal<>();
//in the main thread, TL is set to "HELLO"
TL.set("HELLO");
Mono.delay(Duration.ofSeconds(1))
//we're now in another thread, TL is not set yet
.doOnNext(v -> System.out.println(TL.get()))
//inside the handler however, TL _is_ restored
.handle((v, sink) -> sink.next("handled delayed TL=" + TL.get()))
.contextCapture()
.block(); // prints "null" and returns "handled delayed TL=HELLO"
2.2. Automatic mode
In the automatic mode, all operators restore ThreadLocal
state across Thread
boundaries. In contrast, in the default mode only selected operators do so.
Hooks.enableAutomaticContextPropagation()
can be called upon application start to
enable the automatic mode. Please note that this mode applies only to new subscriptions,
so it is recommended to enable this hook when the application starts.
It is not an easy task to achieve, as the Reactive Streams specification makes reactive
chains Thread
-agnostic. However, Reactor-Core does its best to control sources of
Thread
switches and perform snapshot restoration based on the Reactor Context
,
which is treated as the source of truth for ThreadLocal
state.
While the default mode limits the ThreadLocal state only to the user code
executed as arguments to the chosen operators, the automatic mode allows
ThreadLocal state to cross operator boundaries. This requires proper cleanup to avoid
leaking the state to unrelated code which reuses the same Thread . This requires to
treat absent keys in the Context for registered instances of ThreadLocalAccessor as
signals to clear the corresponding ThreadLocal state. This is especially important for
an empty Context , which clears all state for registered ThreadLocalAccessor instances.
|
3. Which mode should I choose?
Both default and automatic modes have an impact on performance. Accessing
ThreadLocal
variables can impact a reactive pipeline significantly. If the highest
scalability and performance is the goal, more verbose approaches for logging and
explicit argument passing can be considered instead of relying on ThreadLocal
state. If
access to established libraries in the space of Observability, such as Micrometer and
SLF4J, which use ThreadLocal
state for convenience to provide meaningful production
grade features is an understood compromise, the choice of the mode is yet another
compromise to make. The automatic mode, depending on the flow of your application and
the amount of operators used, can be either better or worse than the default mode. The
only recommendation that can be given is to measure how your application behaves and what
scalability and performance characteristics you obtain when presented with a load you
expect.