Adding a Context to a Reactive Sequence

One of the big technical challenges encountered when switching from an imperative programming perspective to a reactive programming mindset lies in how you deal with threading.

Contrary to what you might be used to, in reactive programming, you can use a Thread to process several asynchronous sequences that run at roughly the same time (actually, in non-blocking locksteps). The execution can also easily and often jump from one thread to another.

This arrangement is especially hard for developers that use features dependent on the threading model being more “stable,” such as ThreadLocal. As it lets you associate data with a thread, it becomes tricky to use in a reactive context. As a result, libraries that rely on ThreadLocal at least introduce new challenges when used with Reactor. At worst, they work badly or even fail. Using the MDC of Logback to store and log correlation IDs is a prime example of such a situation.

The usual workaround for ThreadLocal usage is to move the contextual data, C, along your business data, T, in the sequence, by using (for instance) Tuple2<T, C>. This does not look good and leaks an orthogonal concern (the contextual data) into your method and Flux signatures.

Since version 3.1.0, Reactor comes with an advanced feature that is somewhat comparable to ThreadLocal but can be applied to a Flux or a Mono instead of a Thread. This feature is called Context.

As an illustration of what it looks like, the following example both reads from and writes to Context:

String key = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap(s -> Mono.deferContextual(ctx ->
         Mono.just(s + " " + ctx.get(key))))
    .contextWrite(ctx -> ctx.put(key, "World"));

StepVerifier.create(r)
            .expectNext("Hello World")
            .verifyComplete();

In the following sections, we cover Context and how to use it, so that you can eventually understand the preceding example.

This is an advanced feature that is more targeted at library developers. It requires good understanding of the lifecycle of a Subscription and is intended for libraries that are responsible for the subscriptions.

1. The Context API

Context is an interface reminiscent of Map. It stores key-value pairs and lets you fetch a value you stored by its key. It has a simplified version that only exposes read methods, the ContextView. More specifically:

  • Both key and values are of type Object, so a Context (and ContextView) instance can contain any number of highly divergent values from different libraries and sources.

  • A Context is immutable. It exposes write methods like put and putAll but they produce a new instance.

  • For a read-only API that doesn’t even expose such write methods, there’s the ContextView superinterface since 3.4.0

  • You can check whether the key is present with hasKey(Object key).

  • Use getOrDefault(Object key, T defaultValue) to retrieve a value (cast to a T) or fall back to a default one if the Context instance does not have that key.

  • Use getOrEmpty(Object key) to get an Optional<T> (the Context instance attempts to cast the stored value to T).

  • Use put(Object key, Object value) to store a key-value pair, returning a new Context instance. You can also merge two contexts into a new one by using putAll(ContextView).

  • Use delete(Object key) to remove the value associated to a key, returning a new Context.

When you create a Context, you can create pre-valued Context instances with up to five key-value pairs by using the static Context.of methods. They take 2, 4, 6, 8 or 10 Object instances, each couple of Object instances being a key-value pair to add to the Context.

Alternatively you can also create an empty Context by using Context.empty().

2. Tying a Context to a Flux and Writing

To make a Context be useful, it must be tied to a specific sequence and be accessible by each operator in a chain. Note that the operator must be a Reactor-native operator, as Context is specific to Reactor.

Actually, a Context is tied to each Subscriber in a chain. It uses the Subscription propagation mechanism to make itself available to each operator, starting with the final subscribe and moving up the chain.

In order to populate the Context, which can only be done at subscription time, you need to use the contextWrite operator.

contextWrite(ContextView) merges the ContextView you provide and the Context from downstream (remember, the Context is propagated from the bottom of the chain towards the top). This is done through a call to putAll, resulting in a NEW Context for upstream.

You can also use the more advanced contextWrite(Function<Context, Context>). It receives a copy of the Context from downstream, lets you put or delete values as you see fit, and returns the new Context to use. You can even decide to return a completely different instance, although it is really not recommended (doing so might impact third-party libraries that depend on the Context).

3. Reading a Context, through the ContextView

Once you have populated a Context, you may want to peek into it at runtime. Most of the time, the responsibility of putting information into the Context is on the end user’s side, while exploiting that information is on the third-party library’s side, as such libraries are usually upstream of the client code.

The read oriented operators allow to obtain data from the Context in a chain of operators by exposing its ContextView:

  • to access the context from a source-like operator, use deferContextual factory method

  • to access the context from the middle of an operator chain, use transformDeferredContextual(BiFunction)

  • alternatively, when dealing with an inner sequence (like inside a flatMap), the ContextView can be materialized using Mono.deferContextual(Mono::just). Usually though, you might want to perform meaningful work directly within the defer’s lambda, eg. Mono.deferContextual(ctx → doSomethingAsyncWithContextData(v, ctx.get(key))) where v is the value being flatMapped.

In order to read from the Context without misleading users into thinking one can write to it while data is running through the pipeline, only the ContextView is exposed by the operators above. In case one needs to use one of the remaining APIs that still require a Context, one can use Context.of(contextView) for conversion.

4. Simple Context Examples

The examples in this section are meant as ways to better understand some of the caveats of using a Context.

We first look back at our simple example from the introduction in a bit more detail, as the following example shows:

String key = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap(s -> Mono.deferContextual(ctx ->
         Mono.just(s + " " + ctx.get(key)))) (2)
    .contextWrite(ctx -> ctx.put(key, "World")); (1)

StepVerifier.create(r)
            .expectNext("Hello World") (3)
            .verifyComplete();
1 The chain of operators ends with a call to contextWrite(Function) that puts "World" into the Context under a key of "message".
2 We flatMap on the source element, materializing the ContextView with Mono.deferContextual() and directly extract the data associated to "message" and concatenate that with the original word.
3 The resulting Mono<String> emits "Hello World".
The numbering above versus the actual line order is not a mistake. It represents the execution order. Even though contextWrite is the last piece of the chain, it is the one that gets executed first (due to its subscription-time nature and the fact that the subscription signal flows from bottom to top).
In your chain of operators, the relative positions of where you write to the Context and where you read from it matters. The Context is immutable and its content can only be seen by operators above it, as demonstrated in the following example:
String key = "message";
Mono<String> r = Mono.just("Hello")
    .contextWrite(ctx -> ctx.put(key, "World")) (1)
    .flatMap( s -> Mono.deferContextual(ctx ->
        Mono.just(s + " " + ctx.getOrDefault(key, "Stranger")))); (2)

StepVerifier.create(r)
            .expectNext("Hello Stranger") (3)
            .verifyComplete();
1 The Context is written to too high in the chain.
2 As a result, in the flatMap, there is no value associated with our key. A default value is used instead.
3 The resulting Mono<String> thus emits "Hello Stranger".

Similarly, in the case of several attempts to write the same key to the Context, the relative order of the writes matters, too. Operators that read the Context see the value that was set closest to being under them, as demonstrated in the following example:

String key = "message";
Mono<String> r = Mono
    .deferContextual(ctx -> Mono.just("Hello " + ctx.get(key)))
    .contextWrite(ctx -> ctx.put(key, "Reactor")) (1)
    .contextWrite(ctx -> ctx.put(key, "World")); (2)

StepVerifier.create(r)
            .expectNext("Hello Reactor") (3)
            .verifyComplete();
1 A write attempt on key "message".
2 Another write attempt on key "message".
3 The deferContextual only saw the value set closest to it (and below it): "Reactor".

In the preceding example, the Context is populated with "World" during subscription. Then the subscription signal moves upstream and another write happens. This produces a second immutable Context with a value of "Reactor". After that, data starts flowing. The deferContextual sees the Context closest to it, which is our second Context with the "Reactor" value (exposed to the user as a ContextView).

You might wonder if the Context is propagated along with the data signal. If that was the case, putting another flatMap between these two writes would use the value from the top Context. But this is not the case, as demonstrated by the following example:

String key = "message";
Mono<String> r = Mono
    .deferContextual(ctx -> Mono.just("Hello " + ctx.get(key))) (3)
    .contextWrite(ctx -> ctx.put(key, "Reactor")) (2)
    .flatMap( s -> Mono.deferContextual(ctx ->
        Mono.just(s + " " + ctx.get(key)))) (4)
    .contextWrite(ctx -> ctx.put(key, "World")); (1)

StepVerifier.create(r)
            .expectNext("Hello Reactor World") (5)
            .verifyComplete();
1 This is the first write to happen.
2 This is the second write to happen.
3 The top context read sees second write.
4 The flatMap concatenates the result from initial read with the value from the first write.
5 The Mono emits "Hello Reactor World".

The reason is that the Context is associated to the Subscriber and each operator accesses the Context by requesting it from its downstream Subscriber.

One last interesting propagation case is the one where the Context is also written to inside a flatMap, as in the following example:

String key = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap( s -> Mono
        .deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))
    )
    .flatMap( s -> Mono
        .deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))
        .contextWrite(ctx -> ctx.put(key, "Reactor")) (1)
    )
    .contextWrite(ctx -> ctx.put(key, "World")); (2)

StepVerifier.create(r)
            .expectNext("Hello World Reactor")
            .verifyComplete();
1 This contextWrite does not impact anything outside of its flatMap.
2 This contextWrite impacts the main sequence’s Context.

In the preceding example, the final emitted value is "Hello World Reactor" and not "Hello Reactor World", because the contextWrite that writes "Reactor" does so as part of the inner sequence of the second flatMap. As a consequence, it is not visible or propagated through the main sequence and the first flatMap does not see it. Propagation and immutability isolate the Context in operators that create intermediate inner sequences such as flatMap.

5. Full Example

Now we can consider a more real life example of a library reading information from the Context: a reactive HTTP client that takes a Mono<String> as the source of data for a PUT but also looks for a particular Context key to add a correlation ID to the request’s headers.

From the user perspective, it is called as follows:

doPut("www.example.com", Mono.just("Walter"))

In order to propagate a correlation ID, it would be called as follows:

doPut("www.example.com", Mono.just("Walter"))
	.contextWrite(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))

As the preceding snippets show, the user code uses contextWrite to populate a Context with an HTTP_CORRELATION_ID key-value pair. The upstream of the operator is a Mono<Tuple2<Integer, String>> (a simplistic representation of an HTTP response) returned by the HTTP client library. So it effectively passes information from the user code to the library code.

The following example shows mock code from the library’s perspective that reads the context and “augments the request” if it can find the correlation ID:

static final String HTTP_CORRELATION_ID = "reactive.http.library.correlationId";

Mono<Tuple2<Integer, String>> doPut(String url, Mono<String> data) {
  Mono<Tuple2<String, Optional<Object>>> dataAndContext =
      data.zipWith(Mono.deferContextual(c -> (1)
          Mono.just(c.getOrEmpty(HTTP_CORRELATION_ID))) (2)
      );

  return dataAndContext.<String>handle((dac, sink) -> {
      if (dac.getT2().isPresent()) { (3)
        sink.next("PUT <" + dac.getT1() + "> sent to " + url +
            " with header X-Correlation-ID = " + dac.getT2().get());
      }
      else {
        sink.next("PUT <" + dac.getT1() + "> sent to " + url);
      }
        sink.complete();
      })
      .map(msg -> Tuples.of(200, msg));
}
1 Materialize the ContextView through Mono.deferContextual and…​
2 within the defer, extract a value for the correlation ID key, as an Optional.
3 If the key was present in the context, use the correlation ID as a header.

The library snippet zips the data Mono with Mono.deferContextual(Mono::just). This gives the library a Tuple2<String, ContextView>, and that context contains the HTTP_CORRELATION_ID entry from downstream (as it is on the direct path to the subscriber).

The library code then uses map to extract an Optional<String> for that key, and, if the entry is present, it uses the passed correlation ID as a X-Correlation-ID header. That last part is simulated by the handle.

The whole test that validates the library code used the correlation ID can be written as follows:

@Test
public void contextForLibraryReactivePut() {
  Mono<String> put = doPut("www.example.com", Mono.just("Walter"))
      .contextWrite(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
      .filter(t -> t.getT1() < 300)
      .map(Tuple2::getT2);

  StepVerifier.create(put)
              .expectNext("PUT <Walter> sent to www.example.com" +
                  " with header X-Correlation-ID = 2-j3r9afaf92j-afkaf")
              .verifyComplete();
}