Adding a Context to a Reactive Sequence
One of the big technical challenges encountered when switching from an imperative programming perspective to a reactive programming mindset lies in how you deal with threading.
Contrary to what you might be used to, in reactive programming, you can use a Thread
to process several asynchronous sequences that run at roughly the same time (actually, in
non-blocking locksteps). The execution can also easily and often jump from one thread to
another.
This arrangement is especially hard for developers that use features dependent on the
threading model being more “stable,” such as ThreadLocal. As it lets you associate
data with a thread, it becomes tricky to use in a reactive context. As a result,
libraries that rely on ThreadLocal at least introduce new challenges when used with
Reactor. At worst, they work badly or even fail. Using the MDC of Logback to store and
log correlation IDs is a prime example of such a situation.
The usual workaround for ThreadLocal usage is to move the contextual data, C, along
your business data, T, in the sequence, by using (for instance) Tuple2<T, C>. This does
not look good and leaks an orthogonal concern (the contextual data) into your method and
Flux signatures.
Since version 3.1.0, Reactor comes with an advanced feature that is somewhat comparable
to ThreadLocal but can be applied to a Flux or a Mono instead of a Thread.
This feature is called Context.
As an illustration of what it looks like, the following example both reads from and
writes to Context:
String key = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap(s -> Mono.deferContextual(ctx ->
         Mono.just(s + " " + ctx.get(key))))
    .contextWrite(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
            .expectNext("Hello World")
            .verifyComplete();In the following sections, we cover Context and how to use it, so that you
can eventually understand the preceding example.
| This is an advanced feature that is more targeted at library developers. It
requires good understanding of the lifecycle of a Subscriptionand is intended for
libraries that are responsible for the subscriptions. | 
1. The Context API
Context is an interface reminiscent of Map. It stores key-value pairs and lets you
fetch a value you stored by its key. It has a simplified version that only exposes read
methods, the ContextView. More specifically:
- 
Both key and values are of type Object, so aContext(andContextView) instance can contain any number of highly divergent values from different libraries and sources.
- 
A Contextis immutable. It exposes write methods likeputandputAllbut they produce a new instance.
- 
For a read-only API that doesn’t even expose such write methods, there’s the ContextViewsuperinterface since 3.4.0
- 
You can check whether the key is present with hasKey(Object key).
- 
Use getOrDefault(Object key, T defaultValue)to retrieve a value (cast to aT) or fall back to a default one if theContextinstance does not have that key.
- 
Use getOrEmpty(Object key)to get anOptional<T>(theContextinstance attempts to cast the stored value toT).
- 
Use put(Object key, Object value)to store a key-value pair, returning a newContextinstance. You can also merge two contexts into a new one by usingputAll(ContextView).
- 
Use delete(Object key)to remove the value associated to a key, returning a newContext.
| When you create a  Alternatively you can also create an empty  | 
2. Tying a Context to a Flux and Writing
To make a Context be useful, it must be tied to a specific sequence and be accessible by
each operator in a chain. Note that the operator must be a Reactor-native operator, as
Context is specific to Reactor.
Actually, a Context is tied to each Subscriber in a chain. It uses the Subscription
propagation mechanism to make itself available to each operator, starting with the final
subscribe and moving up the chain.
In order to populate the Context, which can only be done at subscription time, you need
to use the contextWrite operator.
contextWrite(ContextView) merges the ContextView you provide and the
Context from downstream (remember, the Context is propagated from the bottom of the
chain towards the top). This is done through a call to putAll, resulting in a NEW
Context for upstream.
| You can also use the more advanced contextWrite(Function<Context, Context>).
It receives a copy of theContextfrom downstream, lets you put or delete values
as you see fit, and returns the newContextto use. You can even decide to return a
completely different instance, although it is really not recommended (doing so might
impact third-party libraries that depend on theContext). | 
3. Reading a Context, through the ContextView
Once you have populated a Context, you may want to peek into it at runtime.
Most of the time, the responsibility of putting information into the Context
is on the end user’s side, while exploiting that information is on the third-party library’s side,
as such libraries are usually upstream of the client code.
The read oriented operators allow to obtain data from the Context in a chain of operators by exposing
its ContextView:
- 
to access the context from a source-like operator, use deferContextualfactory method
- 
to access the context from the middle of an operator chain, use transformDeferredContextual(BiFunction)
- 
alternatively, when dealing with an inner sequence (like inside a flatMap), theContextViewcan be materialized usingMono.deferContextual(Mono::just). Usually though, you might want to perform meaningful work directly within the defer’s lambda, eg.Mono.deferContextual(ctx → doSomethingAsyncWithContextData(v, ctx.get(key)))wherevis the value being flatMapped.
| In order to read from the Contextwithout misleading users into thinking one can write to it
while data is running through the pipeline, only theContextViewis exposed by the operators above.
In case one needs to use one of the remaining APIs that still require aContext, one can useContext.of(contextView)for conversion. | 
4. Simple Context Examples
The examples in this section are meant as ways to better understand some of the caveats of
using a Context.
We first look back at our simple example from the introduction in a bit more detail, as the following example shows:
String key = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap(s -> Mono.deferContextual(ctx ->
         Mono.just(s + " " + ctx.get(key)))) (2)
    .contextWrite(ctx -> ctx.put(key, "World")); (1)
StepVerifier.create(r)
            .expectNext("Hello World") (3)
            .verifyComplete();| 1 | The chain of operators ends with a call to contextWrite(Function)that puts"World"into theContextunder a key of"message". | 
| 2 | We flatMapon the source element, materializing theContextViewwithMono.deferContextual()and directly extract the data associated to"message"and concatenate that with the original word. | 
| 3 | The resulting Mono<String>emits"Hello World". | 
| The numbering above versus the actual line order is not a mistake. It represents
the execution order. Even though contextWriteis the last piece of the chain, it is
the one that gets executed first (due to its subscription-time nature and the fact that
the subscription signal flows from bottom to top). | 
| In your chain of operators, the relative positions of where you write to the Contextand where you read from it matters. TheContextis immutable and its content can only be seen by operators above it, as demonstrated in
the following example: | 
String key = "message";
Mono<String> r = Mono.just("Hello")
    .contextWrite(ctx -> ctx.put(key, "World")) (1)
    .flatMap( s -> Mono.deferContextual(ctx ->
        Mono.just(s + " " + ctx.getOrDefault(key, "Stranger")))); (2)
StepVerifier.create(r)
            .expectNext("Hello Stranger") (3)
            .verifyComplete();| 1 | The Contextis written to too high in the chain. | 
| 2 | As a result, in the flatMap, there is no value associated with our key. A default value
is used instead. | 
| 3 | The resulting Mono<String>thus emits"Hello Stranger". | 
Similarly, in the case of several attempts to write the same key to the Context, the
relative order of the writes matters, too. Operators that read the Context see
the value that was set closest to being under them, as demonstrated in the following example:
String key = "message";
Mono<String> r = Mono
    .deferContextual(ctx -> Mono.just("Hello " + ctx.get(key)))
    .contextWrite(ctx -> ctx.put(key, "Reactor")) (1)
    .contextWrite(ctx -> ctx.put(key, "World")); (2)
StepVerifier.create(r)
            .expectNext("Hello Reactor") (3)
            .verifyComplete();| 1 | A write attempt on key "message". | 
| 2 | Another write attempt on key "message". | 
| 3 | The deferContextualonly saw the value set closest to it (and below it):"Reactor". | 
In the preceding example, the Context is populated with "World" during subscription.
Then the subscription signal moves upstream and another write happens. This produces a
second immutable Context with a value of "Reactor". After that, data starts flowing.
The deferContextual sees the Context closest to it, which is our second Context with the
"Reactor" value (exposed to the user as a ContextView).
You might wonder if the Context is propagated along with the data signal. If that was
the case, putting another flatMap between these two writes would use the value from
the top Context. But this is not the case, as demonstrated by the following example:
String key = "message";
Mono<String> r = Mono
    .deferContextual(ctx -> Mono.just("Hello " + ctx.get(key))) (3)
    .contextWrite(ctx -> ctx.put(key, "Reactor")) (2)
    .flatMap( s -> Mono.deferContextual(ctx ->
        Mono.just(s + " " + ctx.get(key)))) (4)
    .contextWrite(ctx -> ctx.put(key, "World")); (1)
StepVerifier.create(r)
            .expectNext("Hello Reactor World") (5)
            .verifyComplete();| 1 | This is the first write to happen. | 
| 2 | This is the second write to happen. | 
| 3 | The top context read sees second write. | 
| 4 | The flatMapconcatenates the result from initial read with the value from the first write. | 
| 5 | The Monoemits"Hello Reactor World". | 
The reason is that the Context is associated to the Subscriber and each operator
accesses the Context by requesting it from its downstream Subscriber.
One last interesting propagation case is the one where the Context is also written to
inside a flatMap, as in the following example:
String key = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap( s -> Mono
        .deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))
    )
    .flatMap( s -> Mono
        .deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))
        .contextWrite(ctx -> ctx.put(key, "Reactor")) (1)
    )
    .contextWrite(ctx -> ctx.put(key, "World")); (2)
StepVerifier.create(r)
            .expectNext("Hello World Reactor")
            .verifyComplete();| 1 | This contextWritedoes not impact anything outside of itsflatMap. | 
| 2 | This contextWriteimpacts the main sequence’sContext. | 
In the preceding example, the final emitted value is "Hello World Reactor" and not "Hello
Reactor World", because the contextWrite that writes "Reactor" does so as part of
the inner sequence of the second flatMap. As a consequence, it is not visible or propagated
through the main sequence and the first flatMap does not see it. Propagation and immutability
isolate the Context in operators that create intermediate inner sequences such as flatMap.
5. Full Example
Now we can consider a more real life example of a library reading information from the Context:
a reactive HTTP client that takes a Mono<String> as the source of data for a PUT but
also looks for a particular Context key to add a correlation ID to the request’s headers.
From the user perspective, it is called as follows:
doPut("www.example.com", Mono.just("Walter"))In order to propagate a correlation ID, it would be called as follows:
doPut("www.example.com", Mono.just("Walter"))
	.contextWrite(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))As the preceding snippets show, the user code uses contextWrite to populate
a Context with an HTTP_CORRELATION_ID key-value pair. The upstream of the operator is
a Mono<Tuple2<Integer, String>> (a simplistic representation of an HTTP response)
returned by the HTTP client library. So it effectively passes information from the
user code to the library code.
The following example shows mock code from the library’s perspective that reads the context and “augments the request” if it can find the correlation ID:
static final String HTTP_CORRELATION_ID = "reactive.http.library.correlationId";
Mono<Tuple2<Integer, String>> doPut(String url, Mono<String> data) {
  Mono<Tuple2<String, Optional<Object>>> dataAndContext =
      data.zipWith(Mono.deferContextual(c -> (1)
          Mono.just(c.getOrEmpty(HTTP_CORRELATION_ID))) (2)
      );
  return dataAndContext.<String>handle((dac, sink) -> {
      if (dac.getT2().isPresent()) { (3)
        sink.next("PUT <" + dac.getT1() + "> sent to " + url +
            " with header X-Correlation-ID = " + dac.getT2().get());
      }
      else {
        sink.next("PUT <" + dac.getT1() + "> sent to " + url);
      }
        sink.complete();
      })
      .map(msg -> Tuples.of(200, msg));
}| 1 | Materialize the ContextViewthroughMono.deferContextualand… | 
| 2 | within the defer, extract a value for the correlation ID key, as an Optional. | 
| 3 | If the key was present in the context, use the correlation ID as a header. | 
The library snippet zips the data Mono with Mono.deferContextual(Mono::just).
This gives the library a Tuple2<String, ContextView>, and that
context contains the HTTP_CORRELATION_ID entry from downstream (as it is on the direct
path to the subscriber).
The library code then uses map to extract an Optional<String> for that key, and, if
the entry is present, it uses the passed correlation ID as a X-Correlation-ID header.
That last part is simulated by the handle.
The whole test that validates the library code used the correlation ID can be written as follows:
@Test
public void contextForLibraryReactivePut() {
  Mono<String> put = doPut("www.example.com", Mono.just("Walter"))
      .contextWrite(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
      .filter(t -> t.getT1() < 300)
      .map(Tuple2::getT2);
  StepVerifier.create(put)
              .expectNext("PUT <Walter> sent to www.example.com" +
                  " with header X-Correlation-ID = 2-j3r9afaf92j-afkaf")
              .verifyComplete();
}