Adding a Context to a Reactive Sequence
One of the big technical challenges encountered when switching from an imperative programming perspective to a reactive programming mindset lies in how you deal with threading.
Contrary to what you might be used to, in reactive programming, you can use a Thread
to process several asynchronous sequences that run at roughly the same time (actually, in
non-blocking locksteps). The execution can also easily and often jump from one thread to
another.
This arrangement is especially hard for developers that use features dependent on the
threading model being more “stable,” such as ThreadLocal
. As it lets you associate
data with a thread, it becomes tricky to use in a reactive context. As a result,
libraries that rely on ThreadLocal
at least introduce new challenges when used with
Reactor. At worst, they work badly or even fail. Using the MDC of Logback to store and
log correlation IDs is a prime example of such a situation.
The usual workaround for ThreadLocal
usage is to move the contextual data, C
, along
your business data, T
, in the sequence, by using (for instance) Tuple2<T, C>
. This does
not look good and leaks an orthogonal concern (the contextual data) into your method and
Flux
signatures.
Since version 3.1.0
, Reactor comes with an advanced feature that is somewhat comparable
to ThreadLocal
but can be applied to a Flux
or a Mono
instead of a Thread
.
This feature is called Context
.
As an illustration of what it looks like, the following example both reads from and
writes to Context
:
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap(s -> Mono.deferContextual(ctx ->
Mono.just(s + " " + ctx.get(key))))
.contextWrite(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello World")
.verifyComplete();
In the following sections, we cover Context
and how to use it, so that you
can eventually understand the preceding example.
This is an advanced feature that is more targeted at library developers. It
requires good understanding of the lifecycle of a Subscription and is intended for
libraries that are responsible for the subscriptions.
|
1. The Context
API
Context
is an interface reminiscent of Map
. It stores key-value pairs and lets you
fetch a value you stored by its key. It has a simplified version that only exposes read
methods, the ContextView
. More specifically:
-
Both key and values are of type
Object
, so aContext
(andContextView
) instance can contain any number of highly divergent values from different libraries and sources. -
A
Context
is immutable. It exposes write methods likeput
andputAll
but they produce a new instance. -
For a read-only API that doesn’t even expose such write methods, there’s the
ContextView
superinterface since 3.4.0 -
You can check whether the key is present with
hasKey(Object key)
. -
Use
getOrDefault(Object key, T defaultValue)
to retrieve a value (cast to aT
) or fall back to a default one if theContext
instance does not have that key. -
Use
getOrEmpty(Object key)
to get anOptional<T>
(theContext
instance attempts to cast the stored value toT
). -
Use
put(Object key, Object value)
to store a key-value pair, returning a newContext
instance. You can also merge two contexts into a new one by usingputAll(ContextView)
. -
Use
delete(Object key)
to remove the value associated to a key, returning a newContext
.
When you create a Alternatively you can also create an empty |
2. Tying a Context
to a Flux
and Writing
To make a Context
be useful, it must be tied to a specific sequence and be accessible by
each operator in a chain. Note that the operator must be a Reactor-native operator, as
Context
is specific to Reactor.
Actually, a Context
is tied to each Subscriber
in a chain. It uses the Subscription
propagation mechanism to make itself available to each operator, starting with the final
subscribe
and moving up the chain.
In order to populate the Context
, which can only be done at subscription time, you need
to use the contextWrite
operator.
contextWrite(ContextView)
merges the ContextView
you provide and the
Context
from downstream (remember, the Context
is propagated from the bottom of the
chain towards the top). This is done through a call to putAll
, resulting in a NEW
Context
for upstream.
You can also use the more advanced contextWrite(Function<Context, Context>) .
It receives a copy of the Context from downstream, lets you put or delete values
as you see fit, and returns the new Context to use. You can even decide to return a
completely different instance, although it is really not recommended (doing so might
impact third-party libraries that depend on the Context ).
|
3. Reading a Context
, through the ContextView
Once you have populated a Context
, you may want to peek into it at runtime.
Most of the time, the responsibility of putting information into the Context
is on the end user’s side, while exploiting that information is on the third-party library’s side,
as such libraries are usually upstream of the client code.
The read oriented operators allow to obtain data from the Context
in a chain of operators by exposing
its ContextView
:
-
to access the context from a source-like operator, use
deferContextual
factory method -
to access the context from the middle of an operator chain, use
transformDeferredContextual(BiFunction)
-
alternatively, when dealing with an inner sequence (like inside a
flatMap
), theContextView
can be materialized usingMono.deferContextual(Mono::just)
. Usually though, you might want to perform meaningful work directly within the defer’s lambda, eg.Mono.deferContextual(ctx → doSomethingAsyncWithContextData(v, ctx.get(key)))
wherev
is the value being flatMapped.
In order to read from the Context without misleading users into thinking one can write to it
while data is running through the pipeline, only the ContextView is exposed by the operators above.
In case one needs to use one of the remaining APIs that still require a Context , one can use Context.of(contextView) for conversion.
|
4. Simple Context
Examples
The examples in this section are meant as ways to better understand some of the caveats of
using a Context
.
We first look back at our simple example from the introduction in a bit more detail, as the following example shows:
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap(s -> Mono.deferContextual(ctx ->
Mono.just(s + " " + ctx.get(key)))) (2)
.contextWrite(ctx -> ctx.put(key, "World")); (1)
StepVerifier.create(r)
.expectNext("Hello World") (3)
.verifyComplete();
1 | The chain of operators ends with a call to contextWrite(Function) that puts
"World" into the Context under a key of "message" . |
2 | We flatMap on the source element, materializing the ContextView with Mono.deferContextual()
and directly extract the data associated to "message" and concatenate that with the original word. |
3 | The resulting Mono<String> emits "Hello World" . |
The numbering above versus the actual line order is not a mistake. It represents
the execution order. Even though contextWrite is the last piece of the chain, it is
the one that gets executed first (due to its subscription-time nature and the fact that
the subscription signal flows from bottom to top).
|
In your chain of operators, the relative positions of where you write to the
Context and where you read from it matters. The Context
is immutable and its content can only be seen by operators above it, as demonstrated in
the following example:
|
String key = "message";
Mono<String> r = Mono.just("Hello")
.contextWrite(ctx -> ctx.put(key, "World")) (1)
.flatMap( s -> Mono.deferContextual(ctx ->
Mono.just(s + " " + ctx.getOrDefault(key, "Stranger")))); (2)
StepVerifier.create(r)
.expectNext("Hello Stranger") (3)
.verifyComplete();
1 | The Context is written to too high in the chain. |
2 | As a result, in the flatMap , there is no value associated with our key. A default value
is used instead. |
3 | The resulting Mono<String> thus emits "Hello Stranger" . |
Similarly, in the case of several attempts to write the same key to the Context
, the
relative order of the writes matters, too. Operators that read the Context
see
the value that was set closest to being under them, as demonstrated in the following example:
String key = "message";
Mono<String> r = Mono
.deferContextual(ctx -> Mono.just("Hello " + ctx.get(key)))
.contextWrite(ctx -> ctx.put(key, "Reactor")) (1)
.contextWrite(ctx -> ctx.put(key, "World")); (2)
StepVerifier.create(r)
.expectNext("Hello Reactor") (3)
.verifyComplete();
1 | A write attempt on key "message" . |
2 | Another write attempt on key "message" . |
3 | The deferContextual only saw the value set closest to it (and below it): "Reactor" . |
In the preceding example, the Context
is populated with "World"
during subscription.
Then the subscription signal moves upstream and another write happens. This produces a
second immutable Context
with a value of "Reactor"
. After that, data starts flowing.
The deferContextual
sees the Context
closest to it, which is our second Context
with the
"Reactor"
value (exposed to the user as a ContextView
).
You might wonder if the Context
is propagated along with the data signal. If that was
the case, putting another flatMap
between these two writes would use the value from
the top Context
. But this is not the case, as demonstrated by the following example:
String key = "message";
Mono<String> r = Mono
.deferContextual(ctx -> Mono.just("Hello " + ctx.get(key))) (3)
.contextWrite(ctx -> ctx.put(key, "Reactor")) (2)
.flatMap( s -> Mono.deferContextual(ctx ->
Mono.just(s + " " + ctx.get(key)))) (4)
.contextWrite(ctx -> ctx.put(key, "World")); (1)
StepVerifier.create(r)
.expectNext("Hello Reactor World") (5)
.verifyComplete();
1 | This is the first write to happen. |
2 | This is the second write to happen. |
3 | The top context read sees second write. |
4 | The flatMap concatenates the result from initial read with the value from the first write. |
5 | The Mono emits "Hello Reactor World" . |
The reason is that the Context
is associated to the Subscriber
and each operator
accesses the Context
by requesting it from its downstream Subscriber
.
One last interesting propagation case is the one where the Context
is also written to
inside a flatMap
, as in the following example:
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap( s -> Mono
.deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))
)
.flatMap( s -> Mono
.deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))
.contextWrite(ctx -> ctx.put(key, "Reactor")) (1)
)
.contextWrite(ctx -> ctx.put(key, "World")); (2)
StepVerifier.create(r)
.expectNext("Hello World Reactor")
.verifyComplete();
1 | This contextWrite does not impact anything outside of its flatMap . |
2 | This contextWrite impacts the main sequence’s Context . |
In the preceding example, the final emitted value is "Hello World Reactor"
and not "Hello
Reactor World", because the contextWrite
that writes "Reactor"
does so as part of
the inner sequence of the second flatMap
. As a consequence, it is not visible or propagated
through the main sequence and the first flatMap
does not see it. Propagation and immutability
isolate the Context
in operators that create intermediate inner sequences such as flatMap
.
5. Full Example
Now we can consider a more real life example of a library reading information from the Context
:
a reactive HTTP client that takes a Mono<String>
as the source of data for a PUT
but
also looks for a particular Context key to add a correlation ID to the request’s headers.
From the user perspective, it is called as follows:
doPut("www.example.com", Mono.just("Walter"))
In order to propagate a correlation ID, it would be called as follows:
doPut("www.example.com", Mono.just("Walter"))
.contextWrite(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
As the preceding snippets show, the user code uses contextWrite
to populate
a Context
with an HTTP_CORRELATION_ID
key-value pair. The upstream of the operator is
a Mono<Tuple2<Integer, String>>
(a simplistic representation of an HTTP response)
returned by the HTTP client library. So it effectively passes information from the
user code to the library code.
The following example shows mock code from the library’s perspective that reads the context and “augments the request” if it can find the correlation ID:
static final String HTTP_CORRELATION_ID = "reactive.http.library.correlationId";
Mono<Tuple2<Integer, String>> doPut(String url, Mono<String> data) {
Mono<Tuple2<String, Optional<Object>>> dataAndContext =
data.zipWith(Mono.deferContextual(c -> (1)
Mono.just(c.getOrEmpty(HTTP_CORRELATION_ID))) (2)
);
return dataAndContext.<String>handle((dac, sink) -> {
if (dac.getT2().isPresent()) { (3)
sink.next("PUT <" + dac.getT1() + "> sent to " + url +
" with header X-Correlation-ID = " + dac.getT2().get());
}
else {
sink.next("PUT <" + dac.getT1() + "> sent to " + url);
}
sink.complete();
})
.map(msg -> Tuples.of(200, msg));
}
1 | Materialize the ContextView through Mono.deferContextual and… |
2 | within the defer, extract a value for the correlation ID key, as an Optional . |
3 | If the key was present in the context, use the correlation ID as a header. |
The library snippet zips the data Mono
with Mono.deferContextual(Mono::just)
.
This gives the library a Tuple2<String, ContextView>
, and that
context contains the HTTP_CORRELATION_ID
entry from downstream (as it is on the direct
path to the subscriber).
The library code then uses map
to extract an Optional<String>
for that key, and, if
the entry is present, it uses the passed correlation ID as a X-Correlation-ID
header.
That last part is simulated by the handle
.
The whole test that validates the library code used the correlation ID can be written as follows:
@Test
public void contextForLibraryReactivePut() {
Mono<String> put = doPut("www.example.com", Mono.just("Walter"))
.contextWrite(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
.filter(t -> t.getT1() < 300)
.map(Tuple2::getT2);
StepVerifier.create(put)
.expectNext("PUT <Walter> sent to www.example.com" +
" with header X-Correlation-ID = 2-j3r9afaf92j-afkaf")
.verifyComplete();
}