Introducing Reactor

Reactor is a foundational library for building demanding, realtime Data-Streaming applications and micro, nano, or pico-services that must exhibit Low-Latency and be Fault-Tolerant.
— Preface
TL;DR

What is Reactor ?

So you came to have a look at Reactor. Maybe you typed some keywords into your favorite search engine like Reactive, Spring+Reactive, Asynchronous+Java or just What the heck is Reactor?. In a nutshell Reactor is a lightweight, foundational library for the JVM that helps your service or application to efficiently and asynchronously pass messages.

What do you mean by "efficiently"?
  • Little to no memory garbage created just to pass a message from A to B.

  • Handle overflow when consumers are slower at processing messages than the producer is at producing them.

  • Provide for asynchronous flow--without blocking—​if at all possible.

From empirical studies (mostly #rage and #drunk tweets), we know that asynchronous programming is hard—​especially when a platform like the JVM offers so many options. Reactor aims to be truly non-blocking for a majority of use cases and we offer an API that is measurably more efficient than relying on low-level primitives from the JDK’s java.util.concurrent library. Reactor provides alternatives to (and discourages the use of):

  • Blocking wait : e.g. Future.get()

  • Unsafe data access : e.g. ReentrantLock.lock()

  • Exception Bubbles : e.g. try…​catch…​finally

  • Synchronization blocks : e.g. synchronized{ }

  • Wrapper Allocation (GC Pressure) : e.g. new Wrapper<T>(event)

Being non-blocking matters—​especially when scaling message-passing becomes critical (10k msg/s, 100k msg/s 1M…​). There is some theory behind this (see Amdahl’s Law), but we get bored and distracted easily, so let’s first appeal to common sense.

Let’s say you use a pure Executor approach:

private ExecutorService  threadPool = Executors.newFixedThreadPool(8);

final List<T> batches = new ArrayList<T>();

Callable<T> t = new Callable<T>() { (1)

    public T run() {
        synchronized(batches) { (2)
            T result = callDatabase(msg); (3)
            batches.add(result);
            return result;
        }
    }
};

Future<T> f = threadPool.submit(t); (4)
T result = f.get() (5)
1 Allocate Callable—​might lead to GC pressure.
2 Synchronization will force stop-and-check for every thread.
3 Potentially consumes slower than producer produces.
4 Use a ThreadPool to pass the task to the target thread—​definitely produces GC pressure via FutureTask.
5 Block until callDatabase() replies.

In this simple example, it’s easy to point out why scale-up is very limited:

  • Allocating objects might cause GC pauses, especially if the tasks stick around too long.

    • Every GC Pause will degrade performance globally.

  • A Queue is unbounded by default. Because of the database call, tasks will pile up.

    • A backlog is not really a Memory Leak but the side effects are just as nasty: more objects to scan during GC pauses; risk of losing important bits of data; etc…​

    • Classic Linked Queues generate memory pressure by allocating Nodes. Lots of them.

  • A vicious cycle kicks-in when blocking replies are used.

    • Blocking replies will cause producer slow-down. In practice, the flow becomes basically synchronous since we have to wait for each reply before submitting more tasks.

    • Any Exception thrown during the conversation with the datastore will be passed in an uncontrolled fashion to the producer, negating any fault-tolerance normally available by segregating work around a Thread boundary.

Being fully and truly non-blocking is hard to achieve—​especially in a world of distributed systems which have fashionable monikers like Micro-Service Architectures. Reactor, however, makes few compromises and tries to leverage the best patterns available so the developer doesn’t have to feel like they’re writing a mathematical thesis rather than an asynchronous nanoservice.

Nothing travels faster than light (besides gossip and viral cat videos) and latency is a real-world concern every system has to deal with at some point. To that end:

Reactor offers a framework that helps you mitigate nasty latency-induced side-effects in your application and do it with minimal overhead by:
  • Leveraging some smart structures, we traded-off the allocation issue at runtime with pre-allocation at startup-time;

  • Main message-passing structures come bounded so we don’t pile up tasks infinitely;

  • Using popular patterns such as Reactive and Event-Driven Architectures, we offer non-blocking end-to-end flows including replies;

  • Implementing the new Reactive Streams Standard, to make bounded structures efficient by not requesting more than their current capacity;

  • Applied these concepts to IPC and provide non-blocking IO drivers that understand flow-control;

  • Expose a Functional API to help developers organize their code in a side-effect free way, which helps you determine where you are thread-safe and fault-tolerant.

About the Project

The project started in 2012, with a long internal incubation time. Reactor 1.x appeared in 2013. Reactor 1 has been deployed successfully by various organizations, both Open Source (e.g. Meltdown) and Commercial (e.g. Pivotal RTI). In 2014 we started collaborating on the emerging Reactive Streams Standard and started a massive re-engineering targeting April 2015 for version 2.0. The Reactive Streams Standard closed the last gap in our Dispatching mechanism: controlling how much in-flight data was hitting Thread boundaries.

Parallel to that work we also decided to re-align some of our Event-Driven and Task Coordination API to the increasingly popular and documented Reactive Extensions.

Reactor is sponsored by Pivotal where the two core committers are employed. Since Pivotal is also the home of the Spring Framework and many of our colleagues are core committers to the various Spring efforts, we both provide integration support from Reactor to Spring as well as support some important functionality of the Spring Framework like the STOMP broker relay in spring-messaging. That said, we don’t force anyone to adopt Spring just to use Reactor. We remain an embeddable toolkit "for the Reactive masses". In fact one of the goals of Reactor is to stay un-opinionated in the ways you solve asynchronous and functional problems.

Reactor is Apache 2.0 licensed and available on GitHub.

Requirements

  • Reactor needs at minimum Java 7 to execute.

    • But the full expressive potential of functional composition happens with Java 8 Lambdas.

    • As a fallback have a look at Spring, Clojure or Groovy extensions.

  • Reactor runs at full capacity when the JVM supports Unsafe access (e.g., not the case for Android).

    • All RingBuffer based features will not work when Unsafe is missing.

  • Reactor is packaged as traditional JAR archives in Maven Central and can be pulled into any JVM project as a dependency using your preferred build tool.

Architecture Overview

Module Organization
Figure 1. The main modules present in Reactor 2.0

The Reactor codebase is divided into several submodules to help you pick the ones that suit your needs while not burdening you with functionality you don’t need.

Following are some examples of how one might mix-and-match reactive technologies and Reactor modules to achieve your asynchronous goals:

  • Spring XD + Reactor-Net (Core/Stream) : Use Reactor as a Sink/Source IO driver.

  • Grails | Spring + Reactor-Stream (Core) : Use Stream and Promise for background Processing.

  • Spring Data + Reactor-Bus (Core) : Emit Database Events (Save/Delete/…​).

  • Spring Integration Java DSL + Reactor Stream (Core) : Microbatch MessageChannel from Spring Integration.

  • RxJavaReactiveStreams + RxJava + Reactor-Core : Combine rich composition with efficient asynchronous IO Processor

  • RxJavaReactiveStreams + RxJava + Reactor-Net (Core/Stream) : Compose input data with RxJava and gate with Async IO drivers.

Architecture Overview
Figure 2. A quick overview of how Reactor modules depend on one another

Reactive Streams

Reactive Streams is a new standard, adopted by different vendors and tech industrials including Netflix, Oracle, Pivotal or Typesafe with a target to include the specification into Java 9 and onwards.

The aim of the standard is to provide (a)synchronous data sequences with a flow-control mechanism. The specification is fairly light and first targets the JVM. It comes with 4 Java Interfaces, a TCK and a handful of examples. It is quite straightforward to implement the 4 interfaces for the need, but the meat of the project is actually the behaviors verified by the TCK. A provider is qualified Reactive Streams Ready since it successfully passed the TCK for the implementing classes, which fortunately we did.

The Reactive Streams Contract
Figure 3. The Reactive Streams Contract
The Reactive Streams Interfaces
  • org.reactivestreams.Pubslisher: A source of data (from 0 to N signals where N can be unlimited). It optionally provides for 2 terminal events: error and completion.

  • org.reactivestreams.Subscriber: A consumer of a data sequence (from 0 to N signals where N can be unlimited). It receives a subscription on initialization to request how many data it wants to process next. The other callbacks interact with the data sequence signals: next (new message) and the optional completion/error.

  • org.reactivestreams.Subscription: A small tracker passed on initialization to the Subscriber. It controls how many data we are ready to consume and when do we want to stop consuming (cancel).

  • org.reactivestreams.Processor: A marker for components that are both Subscriber and Publisher!

The Publishing Sequence
Figure 4. The Reactive Streams publishing protocol
There are two ways to request data to a Publisher from a Subscriber, through the passed Subscription:
  • Unbounded: On Subscribe, just call Subscription#request(Long.MAX_VALUE).

  • Bounded: On Subscribe, keep a reference to Subscription and hit its request(long) method when the Subscriber is ready to process data.

    • Typically, Subscribers will request an initial set of data, or even 1 data on Subscribe

    • Then after onNext has been deemed successful (e.g. after Commit, Flush etc…​), request more data

    • It is encouraged to use a linear number of requests. Try avoiding overlapping requests, e.g. requesting 10 more data every next signal.

Table 1. What are the artifacts that Reactor directly use so far:
Reactive Streams Reactor Module(s) Implementation(s) Description

Processor

reactor-core, reactor-stream

reactor.core.processor.*, reactor.rx.*

In Core, we offer backpressure-ready RingBuffer*Processor and more, in Stream we have a full set of Operations and Broadcasters.

Publisher

reactor-core, reactor-bus, reactor-stream, reactor-net

reactor.core.processor.*, reactor.rx.stream.*, reactor.rx.action.*, reactor.io.net.*

In Core, processors implement Publisher. In Bus we publish an unbounded emission of routed events. In Stream, our Stream extensions directly implement Publisher. In Net, Channels implement Publisher to consume incoming data, we also provide publishers for flush and close callbacks.

Subscriber

reactor-core, reactor-bus, reactor-stream, reactor-net

reactor.core.processor.*, reactor.bus.EventBus.*, reactor.rx.action.*, reactor.io.net.impl.*

In Core, our processor implement Subscriber. In Bus, we expose bus capacities with unbounded Publisher/Subscriber. In Stream, actions are Subscribers computing specific callbacks. In Net, our IO layer implements subscribers to handle writes, closes and flushes.

Subscription

reactor-stream, reactor-net

reactor.rx.subscription.*, reactor.io.net.impl.*

In Stream, we offer optimized PushSubscriptions and buffering-ready ReactiveSubscription. In Net, our Async IO reader-side use custom Subscriptions to implement backpressure.

We have worked with the standard since the inception of Reactor 2 and progressed in our journey until the 1.0.0 was about to release. It is now available on Maven Central and other popular mirrors. You will also find it as a transitive dependency to reactor-core.

Reactive Extensions

Reactive Extensions, or more commonly Rx, are a set of well-defined Functional APIs extending the Observer pattern to an epic scale.

Rx patterns support implementing Reactive data sequences handling with a few design keys:
  • Abstract the time/latency away with a callback chain: only called when data is available

  • Abstract the threading model away: Synchronous or Asynchronous it is just an Observable / Stream we deal with

  • Control error-passing and terminations: error and complete signals in addition to the data payload signal are passed to the chain

  • Solve multiple scatter-aggregation and other composition issues in various predefined API.

The standard implementation of Reactive Extensions in the JVM is RxJava. It provides a powerful functional API and ports mostly all the concept over from the original Microsoft library.

Reactor 2 provides a specific module implementing a subset of the documented Reactive Extensions and on a very few occasion adapting the name to match our specific behavior. This focused approach around data-centric issues (microbatching, composition…​) is depending on Reactor Functional units, Dispatchers and the Reactive Streams contract. We encourage users who need the full flavor of Reactive Extensions to try out RxJava and bridge with us. In the end the user can benefit from powerful asynchronous and IO capacities provided by Reactor while composing with the complete RxJava ecosystem.

Some operations, behaviors, and the immediate understanding of Reactive Streams are still unique to Reactor as of now and we will try to flesh out the unique features in the appropriate section.
Async IO capabilities are also depending on Stream Capacity for backpressure and auto-flush options.
Table 2. Misalignments between Rx and Reactor Streams
rx reactor-stream Comment

Observable

reactor.rx.Stream

Reflect the implementation of the Reactive Stream Publisher

Operator

reactor.rx.action.Action

Reflect the implementation of the Reactive Stream Processor

Observable with 1 data at most

reactor.rx.Promise

Type a unique result, reflect the implementation of the Reactive Stream Processor and provides for optional asynchronous dispatching.

Factory API (just, from, merge…​.)

reactor.rx.Streams

Aligned with a core data-focused subset, return Stream

Functional API (map, filter, take…​.)

reactor.rx.Stream

Aligned with a core data-focused subset, return Stream

Schedulers

reactor.core.Dispatcher, org.reactivestreams.Processor

Reactor Streams compute operations with unbounded shared Dispatchers or bounded Processors

Observable.observeOn()

Stream.dispatchOn()

Just an adapted naming for the dispatcher argument

reactor-core

You should never do your asynchronous work alone.
— Jon Brisbin
After writing Reactor 1
You should never do your asynchronous work alone.
— Stephane Maldini
After writing Reactor 2
Head first with a Groovy example of some Core work
//Initialize context and get default dispatcher
Environment.initialize()

//RingBufferDispatcher with 8192 slots by default
def dispatcher = Environment.sharedDispatcher()

//Create a callback
Consumer<Integer> c = { data ->
        println "some data arrived: $data"
}

//Create an error callback
Consumer<Throwable> errorHandler = { it.printStackTrace }

//Dispatch data asynchronously
dispatcher.dispatch(1234, c, errorHandler)

Environment.terminate()
A second taster, the Reactive Streams way
//standalone async processor
def processor = RingBufferProcessor.<Integer>create()

//send data, will be kept safe until a subscriber attaches to the processor
processor.onNext(1234)
processor.onNext(5678)

//consume integer data
processor.subscribe(new Subscriber<Integer>(){

  void onSubscribe(Subscription s){
    //unbounded subscriber
    s.request Long.MAX
  }

  void onNext(Integer data){
    println data
  }

  void onError(Throwable err){
    err.printStackTrace()
  }

  void onComplete(){
    println 'done!'
  }
}

//Shutdown internal thread and call complete
processor.onComplete()

Core Overview

Core Overview
Figure 5. How Doge can use Reactor-Core

Reactor Core has the following artefacts:

  • Common IO & functional types, some directly backported from Java 8 Functional Interfaces

    • Function, Supplier, Consumer, Predicate, BiConsumer, BiFunction

    • Tuples

    • Resource, Pausable, Timer

    • Buffer, Codec and a handful of predifined Codecs

  • Environment context

  • Dispatcher contract and a handful of predefined Dispatchers

  • Predefined Reactive Streams Processor

Alone, reactor-core can already be used as a drop-in replacement for another Message-Passing strategy, to schedule timed tasks or to organize your code in small functional blocks implementing the Java 8 backport interfaces. Such breakdown allows to play more nicely with other Reactive libraries especially removing the burden of understanding the RingBuffer for the impatient developer.

Reactor-Core implicitely shadows LMAX Disruptor, so it won’t appear nor collide with an existing Disruptor dependency

Functional Artefacts

Functional reusable blocks are core and mostly a required artefact to use as soon as you get into Reactor. [1] So what’s cool about Functional Programming ? One of the core ideas is to start treating executable code as a data like another. [2] To some extent it is akin to the concept of Closures or Anonymous Functions, where business logic can be decided by the original caller. It also avoids loads of imperative IF/SWITCH blocks and makes a clear separation of concerns: each block achieves one purpose and doesn’t need to share anything.

Organizing Functional Blocks

Every Functional component gives the explicit intent of its general mission:

  • Consumer: simple callback - fire-and-forget

  • BiConsumer: simple callback with two arguments (often used in sequence comparaisons, e.g. previous and next arguments)

  • Function: transforming logic - request/reply

  • BiFunction: transforming with two arguments (often used in accumulators, comparing previous and next arguments then returning a new value)

  • Supplier: factory logic - polling

  • Predicate: testing logic - filtering

We consider Publisher and Subscriber interfaces also functional blocks, dare we say Reactive Functional Blocks. Nevertheless they are the basic components used everywhere around in Reactor and Beyond. Stream API will usually accept reactor.fn arguments to create on your behalf the appropriate Subscribers.
The good news about wrapping executable instructions within Functional artefacts is that you can reuse them like Lego Bricks.
Consumer<String> consumer = new Consumer<String>(){
        @Override
        void accept(String value){
                System.out.println(value);
        }
};

//Now in Java 8 style for brievety
Function<Integer, String> transformation = integer -> ""+integer;

Supplier<Integer> supplier = () -> 123;

BiConsumer<Consumer<String>, String> biConsumer = (callback, value) -> {
        for(int i = 0; i < 10; i++){
                //lazy evaluate the final logic to run
                callback.accept(value);
        }
};

//note how the execution flows from supplier to biconsumer
biConsumer.accept(
        consumer,
        transformation.apply(
                supplier.get()
        )
);

It might not sound like a striking revolution at first, however this basic mindset change will reveal precious for our mission to make asynchronous code sane and composable. The Dispatchers will use Consumer for their typed Data and Error callbacks. The Reactor Streams module will use all these artifacts for greater good as well.

A good practice when using an IoC container such as Spring is to leverage the Java Configuration feature to return stateless Functional Beans. Then injecting the blocks in a Stream pipeline or dispatching their execution becomes quite elegant.

Tuples

You might have noticed these interfaces are strongly typed with Generic support and a small fixed number of argument. So how do you pass more than 1 or 2 arguments ? The answer is in one class : Tuple. Tuples are like typed CSV lines in a single object instance, you want them in functional programming to keep both the type safety and a variable number of arguments.

Let’s take the previous example and try replacing the double-argument BiConsumer with a single-argument Consumer:

Consumer<Tuple2<Consumer<String>, String>> biConsumer = tuple -> {
        for(int i = 0; i < 10; i++){
                //Correct typing, compiler happy
                tuple.getT1().accept(tuple.getT2());
        }
};

biConsumer.accept(
        Tuple.of(
                consumer,
                transformation.apply(supplier.get())
        )
);
Tuples involve a bit more allocation, and that’s why the common use cases of comparison or keyed signals are handled with Bi** artifacts directly.

Environment and Dispatchers

The functional building blocks in place, we can start playing asynchronously with them. First stop is bringing us to the Dispatcher section.

Before we can start any Dispatcher, we want to make sure we create them efficiently. Usually Dispatchers are expensive to create as they will pre-allocate a segment of memory to hold the dispatched signals, the famous runtime vs startup trade-off introduced in the preface. A specific shared context named Environment has been introduced to manage these various dispatchers, thus avoiding inapproriate creations.

Environment

Environments are created and terminated by the reactor user (or by the extension library if available, e.g. @Spring). They automatically read a configuration file located in META_INF/reactor/reactor-environment.properties.

Properties file can be tuned at runtime by providing under the classpath location META-INF/reactor a desired new properties configuration.
There switching from the default configuration at runtime is achieved by passing the followying Environment Variable: reactor.profiles.active.
java - jar reactor-app.jar -Dreactor.profiles.active=turbo
Starting and Terminating the Environment
Environment env = Environment.initialize();

//Current registered environment is the same than the one initialized
Assert.isTrue(Environment.get() == env);

//Find a dispatcher named "shared"
Dispatcher d  = Environment.dispatcher("shared");

//get the Timer bound to this environment
Timer timer = Environment.timer();

//Shutdown registered Dispatchers and Timers that might run non-daemon threads
Environment.terminate();
//An option could be to register a shutdownHook to automatically invoke terminate.
It’s best to try maintaining a single Environment alive for a given JVM application. Use of Environment.initializeIfEmpty() will be prefered most of the time.

Dispatchers

Dispatchers are there since Reactor 1, they abstract away the mean of message-passing in a common contract similar to the Java Executor. In fact they do extend Executor!

The Dispatcher contract offers a strongly typed way to pass a signal with its Data and Error Consumers executed (a)synchronously. This way we fix a first issue faced by classic Executors, the error isolation. In effect instead of interrupting the assigned resource, the Error Consumer will be invoked. If none has been provided it will try to find an existing Environment and use its assigned errorJournalConsumer.

A second unique feature offered by the asynchronous Dispatcher is to allow for reentrant dispatching by using a Tail Recurse strategy. Tail Recursion is used when dispatch detects the dispatcher classLoader has been assigned to the running thread and if so, enqueue the task to be executed when the current consumer returns.

Using a synchronous and a multi-threaded dispatcher like in this Groovy Spock test:
import reactor.core.dispatch.*

//...

given:
  def sameThread = new SynchronousDispatcher()
  def diffThread = new ThreadPoolExecutorDispatcher(1, 128)
  def currentThread = Thread.currentThread()
  Thread taskThread = null

  def consumer = { ev ->
    taskThread = Thread.currentThread()
  }

  def errorConsumer = { error ->
    error.printStackTrace()
  }

when: "a task is submitted"
  sameThread.dispatch('test', consumer, errorConsumer)

then: "the task thread should be the current thread"
  currentThread == taskThread

when: "a task is submitted to the thread pool dispatcher"
  def latch = new CountDownLatch(1)
  diffThread.dispatch('test', { ev -> consumer(ev); latch.countDown() }, errorConsumer)

  latch.await(5, TimeUnit.SECONDS) // Wait for task to execute

then: "the task thread should be different when the current thread"
  taskThread != currentThread
Like the Executor they will miss a feature that we will add along the 2.x release line: Reactive Streams protocol. They are ones of the few leftovers in Reactor that are not directly tied to the Reactive Streams standard directly. However, they can be combined with the Reactor Stream to quickly fix that as we will explore in the Stream Section. Essentially that means a user can directly hit them until they eventually and temporarely block since the capacity might be bounded by most Dispatcher implementations.
Table 3. An introduction to the Dispatcher family
Dispatcher From Environment Description Strengths Weaknesses

RingBuffer

sharedDispatcher()

An LMAX Disruptor RingBuffer based Dispatcher.

Small latency peaks tolerated

Fastest Async Dispatcher, 10-15M+ dispatch/sec on commodity hardware

Support ordering

'Spin' Loop when getting the next slot on full capcity

Single Threaded, no concurrent dispatch

Mpsc

sharedDispatcher() if Unsafe not available

Alternative optimized message-passing structure.

Latency peaks tolerated

5-10M+ dispatch/sec on commodity hardware

Support ordering

Unbounded and possibly using as much available heap memory as possible

Single Threaded, no concurrent dispatch

WorkQueue

workDispatcher()

An LMAX Disruptor RingBuffer based Dispatcher.

Latency Peak tolerated for a limited time

Fastest Multi-Threaded Dispatcher, 5-10M+ dispatch/sec on commodity hardware

'Spin' Loop when getting the next slot on full capcity

Concurrent dispatch

Doesn’t support ordering

Synchronous

dispatcher("sync") or SynchronousDispatcher. INSTANCE

Runs on the current thread.

Upstream and Consumer executions are colocated

Useful for Test support

Support ordering if the reentrant dispatch is on the current thread

No Tail Recursion support

Blocking

TailRecurse

tailRecurse() or TailRecurse Dispatcher. INSTANCE

Synchronous Reentrant Dispatcher that enqueue dispatches when currently dispatching.

Upstream and Consumer executions are colocated

Reduce execution stack, greatly expanded by functional call chains

Unbounded Tail Recurse depth

Blocking

Support ordering (Thread Stealing)

ThreadPoolExecutor

newDispatcher(int, int, DispatcherType. THREAD_POOL_EXECUTOR)

Use underlying ThreadPoolExecutor message-passing

Multi-Threaded

Blocking Consumers, permanent latency tolerated

1-5M+ dispatch/sec on commodity hardware

Concurrent run on a given consumer executed twice or more

Unbounded by default

Doesn’t support ordering

Traceable Delegating

N/A

Decorate an existing dispatcher with TRACE level logs.

Dispatch tapping

Runs slower than the delegated dispatcher alone

Log overhead (runtime, disk)

Ring Buffer message passing
Figure 6. RingBufferDispatcher at a given time T

DispatcherSupplier

You may have noticed some Dispatchers are single-threaded, especially the RingBufferDispatcher and MpscDispatcher. Going further, refering to the Reactive Stream specification, the Subscriber/Processor implementation should not allow for concurrent notifications. This impacts Reactor Streams in particular, and trying to use Stream.dispatchOn(Dispatcher) with a Dispatcher that leaves the door open to concurrent signals will fail explicitely.

There is however a way to workaround that limitation by using pools of Dispatcher or DispatcherSupplier. In effect, as a Supplier factory, the indirection offered by Supplier.get() to retrieve a Dispatcher allow for interesting pooling strategy : RoundRobin, Least-Used…​

Environment offers static helpers to create, and eventually register against the current active Environment pools of Dispatchers: groups of RoundRobin returned Dispatchers. Once ready, suppliers will provide for a controlled number of Dispatchers.

As usual with Dispatchers, Environment is the one-stop shop to manage them:
Environment.initialize();
//....

//Create an anonymous pool of 2 dispatchers with automatic default settings (same type than default dispatcher, default backlog size...)
DispatcherSupplier supplier = Environment.newCachedDispatchers(2);

Dispatcher d1 = supplier.get();
Dispatcher d2 = supplier.get();
Dispatcher d3 = supplier.get();
Dispatcher d4 = supplier.get();

Assert.isTrue( d1 == d3  && d2 == d4);
supplier.shutdown();

//Create and register a new pool of 3 dispatchers
DispatcherSupplier supplier1 = Environment.newCachedDispatchers(3, "myPool");
DispatcherSupplier supplier2 = Environment.cachedDispatchers("myPool");

Assert.isTrue( supplier1 == supplier2 );
supplier1.shutdown();

Timers

Dispatchers compute incoming tasks as soon as possible. Timers however come with periodic and one-time scheduling API. Reactor Core offers an HashWheelTimer by default and it is automatically bound to any new Environment. HashWheelTimers are perfect for dealing with massive concurrent in-memory scheduled tasks, it’s a powerful alternative to Java TaskScheduler.

While it is suited for windowing (mini tasks periods under the minute order of magnitude), it is not a resilient scheduler since all tasks are lost when the application shutdowns.
Timers will receive some attention along the next releases, e.g. we would love to add persisting/shared scheduling support with Redis. Please voice your opinion or propose any contribution here!
A simple timer creation as seen in one of our Groovy Spock test:
import reactor.fn.timer.Timer

//...

given: "a new timer"
    Environment.initializeIfEmpty()
    Timer timer = Environment.timer()
    def latch = new CountDownLatch(10)

when: "a task is submitted"
    timer.schedule(
            { Long now -> latch.countDown() } as Consumer<Long>,
            period,
            TimeUnit.MILLISECONDS
    )

then: "the latch was counted down"
    latch.await(1, TimeUnit.SECONDS)
    timer.cancel()
    Environment.terminate()

Core Processors

Core Processors are here to do a more focused job than Dispatchers: Computing asynchronous tasks with back-pressure support.

They also play nicely with other Reactive Streams vendors since they directly implement the org.reactivestreams.Processor interface. Remember that a Processor is both a Subscriber AND a Publisher, so you can insert it in a Reactive Streams chain where you wish (source, processing, sink).

The specification doesn’t recommend specifically to hit Processor.onNext(d) directly. We do offer that support but the backpressure will of course not be propagated except with eventual blocking. One can explicitely use an anonymous Subscription to pass first to a Processor using Processor.onSubscribe to get the backpressure feedback within the implemented request method.
OnNext must be serialized e.g. coming from a single thread at a time (no concurrent onXXX signal is allowed). However Reactor supports it if the Processors are created using the conventioned Processor.share() method, e.g. RingBufferProcessor.share(). This decision must be taken at creation time in order to use the right coordination logic within the implementation, so choose wisely: is this going to be a a standard publishing sequence (no concurrent) or is this going to be hit by multiple threads ?
Reactor makes a single exception to the standard when it comes to the specific XXXX Work Processor artefacts:
  • Usually Reactive Streams Processor will dispatch the same sequence data asynchronously to all Subscribers subscribed at a given time T. It’s akin to Publish/Subscribe pattern.

  • WorkProcessors will distribute the data to its convenience, making the most of each Subscriber. That means Subscribers at a given time T will always see distinct data. It’s akin to WorkQueue pattern.

We expect to increase our collection of Core Processors over the 2.x release line.

RingBuffer Processors

RingBuffer-based Reactive Streams Processors are good for a bunch of #awesome reasons:

  • ├ťber high throughput

  • Replay from the latest not consumed data

    • If no Subscriber is listening, data won’t be lost (unlike Broadcaster from Reactor-Stream).

    • If a Subscriber cancels during processing, signal will be safely replayed, it actually works nicely with the RingBufferWorkProcessor

  • Smart back-pressure, it allows for a bounded size anytime and the subscribers have the responsibility to consume and request more data

  • Propagated back-pressure, since it’s a Processor, it can be subscribed to pass the information along

  • Multi-threaded inbound/outbound Processor capabilities

Actually RingBuffer*Processor are like typed MicroMessageBroker !

Their only drawbacks is they might be costly at runtime to create and they can’t be shared as easily as its cousin RingBufferDispatcher. That makes them suited for High-Throughput Pre-defined Data Pipelines.

RingBufferProcessor

Reactor’s RingBufferProcessor component is essentially a Disruptor RingBuffer adapted to the Reactive Streams API. Its purpose is to provide as close to bare-metal efficiency as possible. It is intended for situations where you need to dispatch tasks onto another thread with extremely low overhead and extremely high throughput and manage backpressure in your workflow.

I use RingBufferProcessor to compute various output remote calls asynchronously: AMQP, SSD storage and an In-Memory store, the variable latency is totally eaten by the Processor and our Million-Message-Per-Sec Datasource never blocks !
— Happy Reactor user
Use Case for RingBufferProcessor
Ring Buffer message passing
Figure 7. RingBufferProcessor at a given time T, with 2 Subscribers, all consuming the same sequence, but delta consuming rate is allowed until the ring buffer is full. This will happen when blue cube is colliding with its next clockwise yellow cube.

To create a RingBufferProcessor, you use static create helper methods.

Processor<Integer, Integer> p = RingBufferProcessor.create("test", 32); (1)
Stream<Integer> s = Streams.wrap(p); (2)

s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); (3)
s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); (4)
s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); (5)

input.subscribe(p); (6)
1 Create a Processor with an internal RingBuffer capacity of 32 slots.
2 Create a Reactor Stream from this Reactive Streams Processor.
3 Each call to consume creates a Disruptor EventProcessor on its own Thread.
4 Each call to consume creates a Disruptor EventProcessor on its own Thread.
5 Each call to consume creates a Disruptor EventProcessor on its own Thread.
6 Subscribe this Processor to a Reactive Streams Publisher.

Each element of data passed to the Processor’s Subscribe.onNext(Buffer) method will be "broadcast" to all consumers. There’s no round-robin distribution with this Processor because that’s in the RingBufferWorkProcessor, discussed below. If you passed the integers 1, 2 and 3 into the Processor, you would see output in the console similar to this:

Thread[test-2,5,main] data=1
Thread[test-1,5,main] data=1
Thread[test-3,5,main] data=1
Thread[test-1,5,main] data=2
Thread[test-2,5,main] data=2
Thread[test-1,5,main] data=3
Thread[test-3,5,main] data=2
Thread[test-2,5,main] data=3
Thread[test-3,5,main] data=3

Each thread is receiving all values passed into the Processor and each thread gets the values in an ordered way since it’s using the RingBuffer internally to manage the slots available to publish values.

RingBufferProcessor can replay missed signals -0 subscribers- to any future subscribers. That will force a processor to wait onNext() if a full buffer is not being drained by a subscriber. From the last sequence acknowledged by a subsUp to the size of the configured ringbuffer will be kept ready to be replayed for every new subscriber, even if the event has already been sent (FanOut).

RingBufferWorkProcessor

Unlike the standard RingBufferProcessor, which broadcasts its values to all consumers, the RingBufferWorkProcessor partitions the incoming values based on the number of consumers. Values come into the Processor and are sent to the various threads (because each consumer has its own thread) in a round-robin fashion, while still using the internal RingBuffer to efficiently manage the publication of values by providing backpressure to the producer when appropriate.

We implemented a RingBufferWorkProcessor to scale-up and load-balance various HTTP microservices calls. I might be wrong but it looks like its faster than light (!) and the GC pressure is totally under control.
— Happy Reactor user
Use Case for RingBufferWorkProcessor
Ring Buffer message passing
Figure 8. RingBufferWorkProcessor at a given time T, with 2 Subscribers, each consuming unique sequence (availabilty FIFO), delta consuming rate is allowed until the ring buffer is full. This will happen when blue cube is colliding with its next clockwise yellow cube.

To use the RingBufferWorkProcessor, the only thing you have to change from the above code sample is the reference to the static create method. You’ll use the one on the RingBufferWorkProcessor class itself instead. The rest of the code remains identical.

Processor<Integer, Integer> p = RingBufferWorkProcessor.create("test", 32); (1)
1 Create a Processor with an internal RingBuffer capacity of 32 slots.

Now when values are published to the Processor, they will not be broadcast to every consumer, but be partitioned based on the number of consumers. When we run this sample, we see output like this now:

Thread[test-2,5,main] data=3
Thread[test-3,5,main] data=2
Thread[test-1,5,main] data=1
RingBufferWorkProcessor can replay interrupted signals, detecting CancelException from the terminating subscriber. It will be the only case where a signal will actually be played eventually once more with another Subscriber. We guarantee at-least-once delivery for any events. If you are familiar with semantic you might now say "Wait, this RingBufferWorkProcessor works like a Message Broker?", and the answer is yes.

Codecs and Buffer

Byte manipulation is a core concern that applies to a lot of data pipeline configurations. It is extensively used by reactor-net to marshall and unmarshall bytes received or sent over IO.

The reactor.io.buffer.Buffer is a decorator for Java ByteBuffer handling, adding a series of operations. The goal is to minimize byte copy by playing with the ByteBuffer limits and reading or overwritting pre-allocated bytes. Tracking the ByteBuffer positions can quickly switch the developer brain into headache mode. At least it did that to us and we just decided to propose this simple tool to our users.

A simple Buffer manipulation as seen in one of our Groovy Spock test:
import reactor.io.buffer.Buffer

//...

given: "an empty Buffer and a full Buffer"
                def buff = new Buffer()
                def fullBuff = Buffer.wrap("Hello World!")

when: "a Buffer is appended"
                buff.append(fullBuff)

then: "the Buffer was added"
                buff.position() == 12
                buff.flip().asString() == "Hello World!"

A useful application for Buffer is Buffer.View, which is returned by multiple operations such as split(). It simply provides for a copy-free way to scan and introspect bytes from ByteBuffer. Buffer.View is also a kind of Buffer, so the same operations are exposed.

Reusing the same bytes for chunked read using a delimiter and Buffer.View:
byte delimiter = (byte) ';';
byte innerDelimiter = (byte) ',';

Buffer buffer = Buffer.wrap("a;b-1,b-2;c;d;");

List<Buffer.View> views = buffer.split(delimiter);

int viewCount = views.size();
Assert.isTrue(viewCount == 4);

for (Buffer.View view : views) {
    System.out.println(view.get().asString()); //prints "a" then "b-1,b-2", then "c" and finally "d"

    if(view.indexOf(innerDelimiter) != -1){
      for(Buffer.View innerView : view.get().split(innerDelimiter)){
        System.out.println(innerView.get().asString()); //prints "b-1" and "b-2"
      }
    }
}

Playing with Buffer can feel a bit low-level for the common marshalling/unmarshalling use cases and Reactor comes with a series of pre-defined converters called Codec. Some Codec will be requiring the appropriate extra dependency in the classpath to work, like Jackson for JSON manipulation.

Codec works in two ways, first it implements Function to encode anything directly and return the encoded data, usually under the form of Buffer. This is great but it does only work with stateless Codecs, the alternative is to use the returned encoding function from Codec.encoder().

Codec.encoder() vs Codec.apply(Source)
  • Codec.encoder() returns a unique encoding function that should not be shared between different Threads.

  • Codec.apply() directly encodes (and save an encoder allocation), but the Codec itself should be shared between Threads in that case

Reactor Net handles that difference for you in fact by calling Codec.encoder for each new connection.

Codec can also decode data into from source type, usually Buffer for most of the Codec implementations. To decode a source data, we must retrieve a decoding function from Codec.decoder(). Unlike in the encoding side, there isn’t any apply shortcut as the method is already overloaded for the encoding purpose. Like the encoding side, the decoding function should not be shared between Threads.

There are two forms of Codec.decoder() functions, one will simply return the decoded data where Codec.decoder(Consumer) will call the passed consumer on each decoding event.

Codec.decoder() vs Codec.decoder(Consumer)
  • Codec.decoder() is a blocking decoding function, it will return directly the decoded data from the passed source.

  • Codec.decoder(Consumer) can be used for non-blocking decoding, it will return nothing (null) and only invoke the passed Consumer once decoded. It can be combined with any asynchronous facility.

Using one of the predefined Codecs as verified in this Groovy Spock test:
import reactor.io.json.JsonCodec

//...

given: 'A JSON codec'
                def codec = new JsonCodec<Map<String, Object>, Object>(Map);
    def latch = new CountDownLatch(1)

when: 'The decoder is passed some JSON'
                Map<String, Object> decoded;
                def callbackDecoder = codec.decoder{
                  decoded = it
                  latch.countDown()
                }
                def blockingDecoder = codec.decoder()

                //yes this is real simple async strategy, but that's not the point here :)
                Thread.start{
                  callbackDecoder.apply(Buffer.wrap("{\"a\": \"alpha\"}"))
    }

    def decodedMap = blockingDecoder.apply(Buffer.wrap("{\"a\": \"beta\"}")

then: 'The decoded maps have the expected entries'
    latch.await()
                decoded.size() == 1
                decoded['a'] == 'alpha'
                decodedMap['a'] == 'beta'
Table 4. Available Core Codecs:
Name Description Required Dependency

ByteArrayCodec

Wrap/unwrap byte arrays from/to Buffer.

N/A

DelimitedCodec

Split/Aggregate Buffer and delegate to the passed Codec for unit marshalling.

N/A

FrameCodec

Split/Aggregate Buffer into Frame Buffers according to successive prefix lengths.

N/A

JavaSerializationCodec

Deserialize/Serialize Buffers using Java Serialization.

N/A

PassThroughCodec

Leave the Buffers untouched.

N/A

StringCodec

Convert String to/from Buffer

N/A

LengthFieldCodec

Find the length and decode/encode the appropriate number of bytes into/from Buffer

N/A

KryoCodec

Convert Buffer into Java objects using Kryo with Buffers

com.esotericsoftware.kryo:kryo

JsonCodec,JacksonJsonCodec

Convert Buffer into Java objects using Jackson with Buffers

com.fasterxml.jackson.core:jackson-databind

SnappyCodec

A Compression Codec which applies a delegate Codec after unpacking/before packing Buffer

org.xerial.snappy:snappy-java

GZipCodec

A Compression Codec which applies a delegate Codec after unpacking/before packing Buffer

N/A

reactor-stream

Nope, you shall not use Future.get(), ever.
— Stephane Maldini
with a Banking Sector Customer
Head first with a Java 8 example of some Stream work
import static reactor.Environment.*;
import reactor.rx.Streams;
import reactor.rx.BiStreams;

//...

Environment.initialize()

//find the top 10 words used in a list of Strings
Streams.from(aListOfString)
  .dispatchOn(sharedDispatcher())
  .flatMap(sentence ->
    Streams
      .from(sentence.split(" "))
      .dispatchOn(cachedDispatcher())
      .filter(word -> !word.trim().isEmpty())
      .observe(word -> doSomething(word))
  )
  .map(word -> Tuple.of(word, 1))
  .window(1, TimeUnit.SECONDS)
  .flatMap(words ->
    BiStreams.reduceByKey(words, (prev, next) -> prev + next)
      .sort((wordWithCountA, wordWithCountB) -> -wordWithCountA.t2.compareTo(wordWithCountB.t2))
      .take(10)
      .finallyDo(event -> LOG.info("---- window complete! ----"))
  )
  .consume(
    wordWithCount -> LOG.info(wordWithCount.t1 + ": " + wordWithCount.t2),
    error -> LOG.error("", error)
  );

Coordinating tasks with Streams and Promises

Stream Overview
Figure 9. How Doge can use Reactor-Stream

Reactor Streams has the following artifacts:

  • Stream and its direct implementations

    • Contains reactive extensions and other composition API

  • Promise with a specific A+ flavored API

    • Can be transformed back to Stream with Promise.stream()

  • Static Factories, the one-stop-shops to create related components

    • Streams for Stream creation from well-defined data sources (Iterable, nothing, Future, Publisher…​)

    • BiStreams for key-value Stream<Tuple2> processing (reduceByKey…​)

    • IOStreams for Persisting and Decoding Streams

    • Promises for single-data-only Promise

  • Action and its direct implementations of every operation provided by the Stream following the Reactive Streams Processor specification

    • Not created directly, but with the Stream API (Stream.map() → MapAction, Stream.filter() → FilterAction…​)

  • Broadcaster, a specific kind of Action exposing onXXXX interfaces for dynamic data dispatch

    • Unlike Core Processors, they will usually not bother with buffering data if there is no subscriber attached

    • However the BehaviorBroadcaster can replay the latest signal to new Subscribers

Do not confuse reactor.rx.Stream with the new JDK 8 java.util.stream.Stream. The latter does not offer a Reactive Streams based API nor Reactive Extensions. However, the JDK 8 Stream API is quite complete when used with primitive types and Collections. In fact it’s quite interesting for JDK 8 enabled applications to mix both JDK and Reactive Streams.

Streams Basics

Reactor offers Stream or Promise based on the Reactive Streams standard to compose statically typed data pipelines.

It is an incredibly useful and flexible component. It’s flexible enough to be used to just compose asynchronous actions together like RxJava’s Observable. But it’s powerful enough it can function as an asynchronous work queue that forks and joins arbitrary compositions or other Reactive Streams components coming from one of the other implementors of the standard.[3].

There are basically two rough categories of streams
  • A hot Stream is unbounded and capable of accepting input data like a sink.

    • Think UI events such as mouse clicks or realtime feeds such as sensors, trade positions or Twitter.

    • Adapted backpressure strategies mixed with the Reactive Streams protocol will apply

  • A cold Stream is bounded and generally created from a fixed collection of data like a List or other Iterable.

    • Think Cursored Read such as IO reads, database queries,

    • Automatic Reactive Streams backpressure will apply

As seen previously, Reactor uses an Environment to keep sets of Dispatcher instances around for shared use in a given JVM (and classloader). An Environment instance can be created and passed around in an application to avoid classloading segregation issues or the static helpers can be used. Throughout the examples on this site, we’ll use the static helpers and encourage you to do likewise. To do that, you’ll need to initialize the static Environment somewhere in your application.

static {
  Environment.initialize();
}

Creating Streams and Promises

This is where you start if you are the owner of the data-source and want to just make it Reactive with direct access to various Reactive Extensions and Reactive Streams capacities.

Sometimes it’s also a case for expanding an existing Reactive Stream Publisher with Stream API and we fortunately offer one-shot static API to proceed to the conversion.

Extending existing Reactor Stream like we do with IterableStream, SingleValueStream etc is also an incentive option to create a Publisher ready source (Stream implements it) injected with Reactor API.

Streams and Promises are relatively inexpensive, our microbenchmark suite succeeds into creating more than 150M/s on commodity hardware. Most of the Streams stick to the Share-Nothing pattern, only creating new immutable objects when required.

Every operation will return a new instance:

Stream<A> stream = Streams.just(a);
Stream<B> transformedStream = stream.map(transformationToB);

Assert.isTrue(transformationStream != stream);
stream.subscribe(subscriber1); //subscriber1 will see the data A unaltered
transformedStream.subscribe(subscriber2); //subscriber2 will see the data B after transformation from A.

//Note theat these two subscribers will materialize independant stream pipelines, a process we also call lifting

From Cold Data Sources

You can create a Stream from a variety of sources, including an Iterable of known values, a single value to use as the basis for a flow of tasks, or even from blocking structures such as Future of Supplier.

Streams.just()
Stream<String> st = Streams.just("Hello ", "World", "!"); (1)

st.dispatchOn(Environment.cachedDispatcher()) (2)
  .map(String::toUpperCase) (3)
  .consume(s -> System.out.printf("%s greeting = %s%n", Thread.currentThread(), s)); (4)
1 Create a Stream from a known value but do not assign a default Dispatcher.
2 .dispatchOn(Dispatcher) tells the Stream which thread to execute tasks on. Use this to move execution from one thread to another.
3 Transform the input using a commonly-found convention: the map() method.
4 Produce demand on the pipeline, which means "start processing now". It’s an optimize shortcut for subscribe(Subscriber) where the Subscriber just requests Long.MAX_VALUE by default.
Cold Data Sources will be replayed from start for every fresh Subscriber passed to Stream.subscribe(Subscriber), and therefore duplicate consuming is possible.
Table 5. Creating pre-determined Streams and Promises
Factory method Data Type

Role

Streams.<T>empty()

T

Only emit onComplete() once requested by its Subscriber.

Streams.<T>never()

T

Never emit anything. Useful for keep-alive behaviors.

Streams.<T, Throwable>fail(Throwable)

T

Only emit onError(Throwable).

Streams.from(Future<T>)

T

Block the Subscription.request(long) on the passed Future.get() that might emit onNext(T) and onComplete() otherwise onError(Throwable) for any exception.

Streams.from(T[])

T

Emit N onNext(T) elements everytime Subscription.request(N) is invoked. If N == Long.MAX_VALUE, emit everything. Once all the array has been read, emit onComplete().

Streams.from(Iterable<T>)

T

Emit N onNext(T) elements everytime Subscription.request(N) is invoked. If N == Long.MAX_VALUE, emit everything. Once all the array has been read, emit onComplete().

Streams.range(long, long)

Long

Emit a sequence of N onNext(Long) everytime Subscription.request(N) is invoked. If N == Long.MAX_VALUE, emit everything. Once the inclusive upper bound been read, emit onComplete().

Streams.just(T, T, T, T, T, T, T, T)

T

An optimization over Streams.from(Iterable) that just behaves similarly. Also useful to emit Iterable, Array or Future without colliding with the Streams.from() signatures.

Streams.generate(Supplier<T>)

T

Emit onNext(T) from the producing Supplier.get() factory everytime Subscription.request(N) is called. The demand N is ignored as only one data is emitted. When a null value is returned, emit onComplete().

Promises.syncTask(Supplier<T>), Promises.task(, Supplier<T>)

T

Emit a single onNext(T) and onComplete() from the producing Supplier.get() on the first Subscription.request(N) received. The demand N is ignored.

Promises.success(T)

T

Emit onNext(T) and onComplete() whenever a Subscriber is provided to Promise.subscribe(Subscriber).

Promises.<T>error(Throwable)

T

Emit onError(Throwable) whenever a Subscriber is subscribed is provided to Promise.subscribe(Subscriber).

From Existing Reactive Publishers

Existing Reactive Streams Publishers can very well be from other implementations, including the user ones, or from Reactor itself.

The use cases incude:

  • Combinatory API to coordinate various data sources.

  • Lazy resource access, reading a Data Source on subscribe or on request, e.g. Remote HTTP calls.

  • Data-oriented operations such as Key/Value Tuples Streams, Persistent Streams or Decoding.

  • Plain Publisher decoration with Stream API

Streams.concat() and Streams.wrap() in action
Processor<String,String> processor = RingBufferProcessor.create();

Stream<String> st1 = Streams.just("Hello "); (1)
Stream<String> st2 = Streams.just("World "); (1)
Stream<String> st3 = Streams.wrap(processor); (2)

Streams.concat(st1, st2, st3) (3)
  .reduce( (prev, next) -> prev + next ) (4)
  .consume(s -> System.out.printf("%s greeting = %s%n", Thread.currentThread(), s)); (5)

processor.onNext("!");
processor.onComplete();
1 Create a Stream from a known value.
2 Decorate the core processor with Stream API. Note that Streams.concat() would have accepted the processor directly as a valid Publisher argument.
3 Concat the 3 upstream sources (all st1, then all st2, then all st3).
4 Accumulate the input 2 by 2 and emit the result on upstream completion, after the last complete from st3.
5 Produce demand on the pipeline, which means "start processing now".
Table 6. Creating from available Reactive Streams Publishers

Factory method

Data Type

Role

Streams.create(Publisher<T>)

T

Only subscribe to the passed Publisher when the first Subscription.request(N) hits the returned Stream. Therefore it supports malformed Publishers that do not invoke Subscriber.onSubscribe(Subscription) as required per specification.

Streams.wrap(Publisher<T>)

T

A simple delegating Stream to the passed Publisher.subscribe(Subscriber<T>) argument. Only supports well formed Publishers correctly using the Reactive Streams protocol:

onSubscribe > onNext\* > (onError | onComplete)

Streams.defer(Supplier<Publisher<T>>)

T

A lazy Publisher access using the level of indirection provided by Supplier.get() everytime Stream.subscribe(Subscriber) is called.

Streams.createWith(BiConsumer<Long,SubscriberWithContext<T, C>, Function<Subscriber<T>,C>, Consumer<C>)

T

A Stream generator with explicit callbacks for each Subscriber request, start and stop events. similar to Streams.create(Publisher) minus the boilerplate for common use.

Streams.switchOnNext(Publisher<Publisher<T>>)

T

A Stream alterning in FIFO order between emitted onNext(Publisher<T>) from the passed Publisher. The signals will result in downstream Subscriber<T> receiving the next Publisher sequence of onNext(T). It might interrupt a current upstream emission when the onNext(Publisher<T>) signal is received.

Streams.concat(Publisher<T>, Publisher<T>*)

Streams.concat(Publisher<Publisher<T>>)

T

If a Publisher<T> is already emitting, wait for it to onComplete() before draining the next pending Publisher<T>. As the name suggests its useful to concat various datasources and keep ordering right.

Streams.merge(Publisher<T>, Publisher<T>, Publisher<T>*)

Streams.merge(Publisher<Publisher<T>>)

T

Accept multiple sources and interleave their respective sequence. Order won’t be preserved like with concat. Demand from a Subscriber will be split between various sources with a minimum of 1 to make sure everyone has a chance to send something.

Streams.combineLatest(Publisher<T1>, Publisher<T2>, Publisher<T3-N> x6, Function<Tuple2-N, C>)

C

Combine most recent emitted elements from the passed sources using the given aggregating Function.

Streams.combineLatest(Publisher<T1>, Publisher<T2>, Publisher<T3-N> x6, Function<Tuple2-N, C>)

C

Combine most recent elements once, every time a source has emitted a signal, apply the given Function and clear the temporary aggregate. Effectively it’s a flexible join mechanism for multiple types of sources.

Streams.join(Publisher<T>, Publisher<T>, Publisher<T> x6)

List<T>

A shortcut for zip that only aggregates each complete aggregate in a List matching the order of the passed argument sources.

Streams.await(Publisher<>, long, unit, boolean)

void

Block the calling thread until onComplete of the passed Publisher. Optional arguments to tune the timeout and the need to request data as well can be passed. It will throw an exception if the final state is onError.

IOStreams.<K,V>persistentMap(String, deleteOnExit)

V

A simple shortcut over ChronicleStream constructors, a disk-based log appender/tailer. The name argument must match an existing persistent queue under /tmp/persistent-queue\[name\].

IOStreams.<K,V>persistentMapReader(String)

V

A simple shortcut over ChronicleReaderStream constructors, a disk-based log tailer. The name argument must match an existing persistent queue under /tmp/persistent-queue\[name\].

IOStreams.decode(Codec<SRC, IN, ?>, Publisher<SRC>)

IN

Use Codec decoder to decode the passed source data type into IN type.

BiStreams.reduceByKey(Publisher<Tuple2<KEY,VALUE>>, Map<KEY,VALUE>, Publisher<MapStream.Signal<KEY, VALUE>>, BiFunction<VALUE, VALUE, VALUE>)

Tuple2<KEY,VALUE>

A key-value operation that accumulates computed results for each 2 sequential onNext(VALUE) passed to the BiFunction argument. The result will be released onComplete() only. The options allow to use an existing map store and listen for its events.

BiStreams.scanByKey(Publisher<Tuple2<KEY,VALUE>>, Map<KEY,VALUE>, Publisher<MapStream.Signal<KEY, VALUE>>, BiFunction<VALUE, VALUE, VALUE>)

Tuple2<KEY,VALUE>

A key-value operation that accumulates computed results for each 2 sequential onNext(VALUE) passed to the BiFunction argument. The result will be released every time just after it has been stored. The options allow you to use an existing map store and listen for its events.

Promises.when(Promise<T1>, Promise<T2>, Promise<T3-N> x6)

TupleN<T1,T2,\*?>

Join all unique results from Promises and provide for the new Promise with the aggregated Tuple.

Promises.any(Promise<T>, Promise<T>, Promise<T> x6)

T

Pick the first signal available among the passed promises and onNext(T) the returned Promise with this result.

Promises.multiWhen(Promise<T>, Promise<T>, Promise<T> x6)

List<T>

Join all unique results from Promises and provide for the new Promise with the aggregated List. The difference with the when alternative is that the type of promises must match.

From Custom Reactive Publishers

Over time, the Reactor user will become more familiar with Reactive Streams. That’s the perfect moment to start creating custom reactive data-sources! Usually the implementor would have to respect the specification and verify his work with the reactive-streams-tck dependency. Respecting the contract requires a Subscription and a call to onSubscribe + a request(long) before sending any data.

However Reactor allows some flexibility to only deal with the message passing part and will automatically provide the buffering Subscription transparently, the difference is demonstrated in the code sample below.

Streams.create and Streams.defer in action
final Stream<String> stream1 = Streams.create(new Publisher<String>() {
  @Override
  public void subscribe(Subscriber<? super String> sub) {
    sub.onSubscribe(new Subscription() { (1)
      @Override
      public void request(long demand) {
        if(demand == 2L){
          sub.onNext("1");
          sub.onNext("2");
          sub.onComplete();
        }
      }

      @Override
      public void cancel() {
        System.out.println("Cancelled!");
      }
    });
  }
});

final Stream<String> stream2 = Streams.create(sub -> {
  sub.onNext("3"); (2)
  sub.onNext("4");
  sub.onComplete();
});

final AtomicInteger counterSubscriber = new AtomicInteger();

Stream<String> deferred = Streams.defer(() -> {
  if (counterSubscriber.incrementAndGet() == 1) { (3)
    return stream1;
  }
  else {
     return stream2;
  }
});

deferred
  .consume(s -> System.out.printf("%s First subscription = %s%n", Thread.currentThread(), s));
deferred
  .consume(s -> System.out.printf("%s Second subscription = %s%n", Thread.currentThread(), s));
1 Create a Stream from a custom valid Publisher which first calls onSubscribe(Subscription).
2 Create a Stream from a custom malformed Publisher which skips `onSubscribe(Subscription) and immediately calls onNext(T).
3 Create a DeferredStream that will alternate source Publisher<T> on each Stream.subscribe call, evaluating the total number of Subscribers,

Where to go from here? There are plenty of use cases that can benefit from a custom Publisher:

  • Reactive Facade to convert any IO call with a matching demand and compose: HTTP calls (read N times), SQL queries (select max N), File reads (read N lines)…​

  • Async Facade to convert any hot data callback into a composable API: AMQP Consumer, Spring MessageChannel endpoint…​

Reactor offers some reusable components to avoid the boilerplate checking you would have to do without extending exsiting Stream or PushSubscription

  • Extending PushSubscription instead of implementing Subscription directly to benefit from terminal state (PushSubscription.isComplete())

  • Using PublisherFactory.create(args) or Streams.createWith(args) to use Functional consumers for every lifecycle step (requested, stopped, started).

  • Extending Stream instead of implementing Publisher directly to benefit from composition API

Streams.createWith, an alternative to create() minus some boilerplate
final Stream<String> stream = Streams.createWith(
  (demand, sub) -> { (1)
      sub.context(); (2)
      if (demand >= 2L && !sub.isCancelled()) {
          sub.onNext("1");
          sub.onNext("2");
          sub.onComplete();
      }
  },
  sub -> 0, (3)
  sub -> System.out.println("Cancelled!") (4)
);

stream.consume(s -> System.out.printf("%s greeting = %s%n", Thread.currentThread(), s));
1 Attach a request consumer reacting on Subscriber requests and passing the demand and the requesting subscriber.
2 The sub argument is actually a SubscriberWithContext possibly assigned with some initial state shared by all request callbacks.
3 Executed once on start, this is also where we initialize the optional shared context; every request callback will receive 0 from context()
4 Executed once on any terminal event : cancel(), onComplete() or onError(e).

A good place to start coding the reactive streams way is to simply look at a more elaborate, back-pressure ready File Stream.

From Hot Data Sources

If you are dealing with an unbounded stream of data like what would be common with a web application that accepts user input via a REST interface, you probably want to use the "hot" variety of Stream in Reactor, which we call a Broadcaster.

To use it, you simply declare a pipeline of composable, functional tasks on the Broadcaster and later call Broadcaster.onNext(T) to publish values into the pipeline.

Broadcaster is a valid Processor and Consumer. It’s possible to onSubscribe a Broadcaster as it’s also possible to use it as a Consumer delegating Consumer.accept(T) to Broadcaster.onNext(T).
Broadcaster.create()
Broadcaster<String> sink = Broadcaster.create(Environment.get()); (1)

sink.map(String::toUpperCase) (2)
    .consume(s -> System.out.printf("%s greeting = %s%n", Thread.currentThread(), s)); (3)

sink.onNext("Hello World!"); (4)
1 Create a Broadcaster using the default, shared RingBufferDispatcher as the Dispatcher.
2 Transform the input using a commonly-found convention: the map() method.
3 .consume() is a "terminal" operation, which means it produces demand in Reactive Streams parlance.
4 Publish a value into the pipeline, which will cause the tasks to be invoked.
Hot Data Sources will never be replayed. Subscribers will only see data from the moment they have been passed to Stream.subscribe(Subscriber). An exception applies for BehaviorBroadcaster (last emitted element is replayed); Streams.timer() and Streams.period() will also maintain unique timed cursors but will still ignore backpressure.
Subscribers will see new data N flowing through a Broadcaster every T+IN only after they have subscribed at time T.
Table 7. Creating flexible Streams

Factory

Input

Output

Role

Streams.timer(delay, unit, timer)

N/A

Long

Start a Timer on Stream.subscribe(Subscriber) call and emit a single onNext(0L) then onComplete() once the delay is elapsed. Be sure to pass the optional argument Timer if there is no current active Environment. Subscription.request(long) will be ignored as no backpressure can apply to a scheduled emission.

Streams.period(period, unit, timer)

N/A

Long

Start a Timer on Stream.subscribe(Subscriber) call and every period of time emit onNext(N) where N is an incremented counter starting from 0. Be sure to pass the optional argument Timer if there is no current active Environment. Subscription.request(long) will be ignored as no backpressure can apply to a scheduled emission.

Streams.<T>switchOnNext()

Publisher<T>

T

An Action which for the record is also a Processor. The onNext(Publisher<T>) signals will result in downstream Subscriber<T> receiving the next Publisher sequence of onNext(T). It might interrupt a current upstream emission when the onNext(Publisher<T>) signal is received.

Broadcaster.<T>create(Environment, Dispatcher)

T

T

Create a hot bridge between any context allowed to call onSubscribe, onNext, onComplete or onError and a composable sequence of these signals under a Stream. If no subscribers are actively registered, next signals might trigger a CancelException. The optional Dispatcher and Environment arguments define where to emit each signal. Finally, a Broadcaster can be subscribed any time to a Publisher, like a Stream.

SerializedBroadcaster.create(Environment, Dispatcher)

T

T

Similar to Broadcaster.create() but adds support for concurrent onNext from parallel contexts possibly calling the same broadcaster onXXX methods.

BehaviorBroadcaster.create(Environment, Dispatcher)

T

T

Simlar to Broadcaster.create() but always replays the last data signal (if any) and the last terminal signal (onComplete(), onError(Throwable)) to the new Subscribers.

BehaviorBroadcaster.first(T, Environment, Dispatcher)

T

T

Similar to BehaviorBroadcaster but starts with a default value T.

Streams.wrap(Processor<I, O>)

I

O

A simple delegating Stream to the passed Publisher.subscribe(Subscriber<O>) argument. Only supports well formed Publishers correctly using the Reactive Streams protocol:

onSubscribe > onNext\* > (onError | onComplete)

Promises.<T>prepare(Environment, Dispatcher)

Promises.ready()

T

T

Prepare a Promise ready to be called exactly once by any external context through onNext. Since it’s a stateful container holding the result of the fulfilled promise, new subscribers will immediately run on the current thread.

For Asynchronous broadcasting, always consider a Core Processor alternative to a Broadcaster:

  • A Broadcaster will trigger a CancelException if there are no subscribers. A Core RingBuffer*Processor will always deliver buffered data to the first subscriber.

  • Some Dispatcher types that can be assigned to a Broadcaster might not support concurrent onNext. Use RingBuffer*Processor.share() for an alternative, thread-safe, concurrent onNext.

  • RingBuffer*Processor supports replaying an event cancelled in-flight by a downstream subscriber if it’s still running on the processor thread. A Broadcaster won’t support replaying.

  • RingBuffer*Processor are faster than their alternative Broadcaster with a RingBufferDispatcher

  • RingBufferWorkProcessor supports scaling up with the number of attached subscribers.

  • Broadcaster might be promoted to a Processor in 2.1 anyway, achieving the same thing and removing the need for the Reactor user to struggle picking between Processor and Broadcaster.

Wiring up a Stream

Streams operations — except for a few exceptions like terminal actions and broadcast() — will never directly subscribe. Instead they will lazily prepare for subscribe. This is usually called lift in Functional programming.

That basically means the Reactor Stream user will explicitely call Stream.subscribe(Subscriber) or, alternativly, terminal actions such as Stream.consume(Consumer) to materialize all the registered operations. Before that Actions don’t really exist. We use Stream.lift(Supplier) to defer the creation of these Actions until Stream.subscribe(Subscriber) is explicitely called.

Once everything is wired, each action maintains an upstream Subscription and a downstream Subscription and the Reactive Streams contract applies all along the pipeline.

Usually the terminal actions return a Control object instead of Stream. This is an component you can use to request or cancel a pipeline without being inside a Subscriber context or implementing the full Subscriber contract.
Wiring up 2 pipelines
import static reactor.Environment.*;
import reactor.rx.Streams;
import reactor.rx.Stream;
//...

Stream<String> stream = Streams.just("a","b","c","d","e","f","g","h");

//prepare two unique pipelines
Stream<String> actionChain1 = stream.map(String::toUpperCase).filter(w -> w.equals("C"));
Stream<Long> actionChain2 = stream.dispatchOn(sharedDispatcher()).take(5).count();

actionChain1.consume(System.out::println); //start chain1
Control c = actionChain2.consume(System.out::println); //start chain2
//...
c.cancel(); //force this consumer to stop receiving data
The 2 Pipelines wired
Figure 10. After Wiring

Publish/Subscribe

For Fan-Out to subscribers from a unified pipeline, Stream.process(Processor), Stream.broadcast(), Stream.broadcastOn() and Stream.broadcastTo() can be used.

Sharing an upstream pipeline and wiring up 2 downstream pipelines
import static reactor.Environment.*;
import reactor.rx.Streams;
import reactor.rx.Stream;
//...

Stream<String> stream = Streams.just("a","b","c","d","e","f","g","h");

//prepare a shared pipeline
Stream<String> sharedStream = stream.observe(System.out::println).broadcast();

//prepare two unique pipelines
Stream<String> actionChain1 = sharedStream.map(String::toUpperCase).filter(w -> w.equals("C"));
Stream<Long> actionChain2 = sharedStream.take(5).count();

actionChain1.consume(System.out::println); //start chain1
actionChain2.consume(System.out::println); //start chain2
The 3 Pipelines wired
Figure 11. After Wiring a Shared Stream
Table 8. Operations considered terminal or explicitely subscribing

Stream<T> method

Return Type

Role

subscribe(Subscriber<T>)

subscribeOn

void

Subscribe the passed Subscriber<T> and materialize any pending upstream, wired up lazily (the implicit lift for non terminal operation). Note a Subscriber must request data if it expects some. The dispatchOn and subscribeOn alternatives provide for signalling onSubscribe using the passed Dispatcher.

consume(Consumer<T>,Consumer<T>,Consumer<T>)

consumeOn

Control

Call subscribe with a ConsumerAction which interacts with each passed Consumer, each time the interest signal is detected. It will request(Streams.capacity()) to the received Subscription, which is Long.MAX_VALUE by default, which results in unbounded consuming. The subscribeOn and consumeOn alternatives provide for signalling onSubscribe using the passed Dispatcher. Returns a Control component to cancel the materialized Stream, if necessary. Note that ConsumeAction takes care of unbounded recursion if the onNext(T) signal triggers a blocking request.

consumeLater()

Control

Similar to consume but does not fire an initial Subscription.request(long). The returned Control can be used to request(long) anytime.

tap()

TapAndControls

Similar to consume but returns a TapAndControls that will be dynamically updated each time a new onNext(T) is signalled or cancelled.

batchConsume(Consumer<T>, Consumer<T>, Consumer<T>, Function<Long,Long>)

batchConsumeOn

Control

Similar to consume but will request the mapped Long demand given the previous demand and starting with the default Stream.capacity(). Useful for adapting the demand dynamically due to various factors.

adaptiveConsume(Consumer<T>, Consumer<T>, Consumer<T>, Function<Stream<Long>,Publisher<Long>>)

adaptiveConsumeOn

Control

Similar to batchConsume but will request the computed sequence of demand Long. It can be used to insert flow-control such as Streams.timer() to delay demand.

next()

Promise<T>

Return a Promise<T> that is actively subscribing to the Stream, materializing it, and requesting a single data before unregistering. The immediate next signal onNext(T), onComplete() or onError(Throwable) will fulfill the promise.

toList()

Promise<List<T>>

Similar to next() but will wait until the entire sequence has been produced (onComplete()) and pass the accumulated onNext(T) in a single List<T> fulfilling the returned promise.

Stream.toBlockingQueue()

CompletableBlockingQueue<T>

Subscribe to the Stream and return an iterable blocking Queue<T> accumulating all onNext signals. CompletableBlockingQueue.isTerminated() can be used as a condition to exit a blocking poll() loop.

cache()

Stream<T>

Turn any Stream into a Cold Stream, able to replay all the sequence of signals individually for each Subscriber. Due to the unbounded nature of the action, you should probably use it only with small(ish) sequences.

broadcast()

broadcastOn(Environment, Dispatcher)

Stream<T>

Turn Any Stream into a Hot Stream. This will prevent pipeline duplication by immediately materializing the Stream and be ready to publish the signal to N Subscribers downstream. The demand will be aggregated from all child Subscribers.

broadcastTo(Subscriber<T>)

Subscriber<T>

An alternative to Stream.subscribe which allows method chaining since the returned instance is the same as the passed argument.

process(Processor<T, O>)

Stream<O>

Similar to broadcast() but accept any given Processor<T, O>. A perfect place to introduce Core Processors !

Setting Capacity

The Reactive Streams standard encourages application developers to set reasonable limits on in-flight data. This prevents components from becoming inundated with more data than they can handle, which causes unpredictable problems throughout an application. One of the core concepts of Reactive Streams is that of "backpressure", or the ability of a pipeline to communicate to upstream components that it can only handle a fixed number of items at a time. A useful term to describe this process of queueing and requesting small chunks of a large volume of data is "microbatching".

Within a Reactor Stream, it’s possible to microbatch items to limit the amount of data in-flight at any given time. This has distinct advantages in a number of ways, not the least of which is that it limits exposure to data loss by preventing the system from accepting more data than it can afford to lose if the system was to crash.

To limit the amount of data in-flight in a Stream, use the .capacity(long) method.

Streams.just()
Stream<String> st;

st
  .dispatchOn(sharedDispatcher())
  .capacity(256) (1)
  .consume(s -> service.doWork(s)); (2)
1 Limit the amount of data in-flight to no more than 256 elements at a time.
2 Produce demand upstream by requesting the next 256 elements of data.
capacity will not affect consume actions if the current Stream dispatcher set with dispatchOn is a SynchronousDispatcher.INSTANCE (default if unset).
We leave as an exercise to the Reactor User to study the benefit of setting capacity vs computing dynamic demand with Stream.adaptiveConsume or a custom Subscriber.

Functional Composition

Similar to many other functional libraries, Reactor provides a number of useful methods for composing functions on a Stream. You can passively observe values, transform them from one kind to another, filter out values you don’t want, buffer values until a size or time trigger is tripped, and many other useful operations.

These operations are called Actions, and they will not wire up the Stream directly. They are available on any Stream instance, which means you should have one by this stage.
  • Actions are onSubscribe() in declarative order (left to right), so stream.actionA().actionB() will execute actionA first then actionB.

    • onSubscribe() runs on the parent Publisher thread context which can be altered by subscribeOn(Dispatcher) for instance.

  • Actions subscribe() in inverse declarative order (right to left). Whenever subscribe is excplicitely called at the end of the pipeline, subscribe() propagates backward.

    • subscribe() synchronously propagates back which might affect stack size use. If that becomes an issue, use a delegate Processor that runs subscribe() on a Environment.tailRecurse() dispatcher. Then process() it at any point of the chain.

Observe

If you want to passively observe data as it passes through the pipeline, then use the .observe(Consumer) methods and other reactor.rx.action.passive actions. To observe values, use .observe(Consumer<? super T>). To observe errors without dealing with them definitively, use .observe(Class<? extends Throwable>, BiConsumer<Object,? extends Throwable>). To observe the Reactive Streams complete signal, use .observeComplete(Consumer<Void>). To observe the cancel signal, use .observeCancel(Consumer<Void>). To observe the Reactive Streams subscribe signal, use .observeSubscribe(Consumer<? super Subscription<T>>).

observe(Consumer<T>)
Stream<String> st;

st.observe(s -> LOG.info("Got input [{}] on thread [{}}]", s, Thread.currentThread())) (1)
  .observeComplete(v -> LOG.info("Stream is complete")) (2)
  .observeError(Throwable.class, (o, t) -> LOG.error("{} caused an error: {}", o, t)) (3)
  .consume(s -> service.doWork(s)); (4)
1 Passively observe values passing through without producing demand.
2 Run once all values have been processed and the Stream is marked complete.
3 Run any time an error is propagated.
4 Produce demand on the pipeline and consume any values.

Filter

It’s possible to filter items passing through a Stream so that downstream actions only see the data you want them to see. Filtering actions can be found under the reactor.rx.action.filter package. The most popular one is the .filter(Predicate<T>) method.

Unmatched data will trigger a Subscription.request(1) if the stream is actually not unbounded with a previous demand of Long.MAX_VALUE.
filter(Predicate<T>)
Stream<String> st;

st.filter(s -> s.startsWith("Hello")) (1)
  .consume(s -> service.doWork(s)); (2)
1 This will only allow values that start with the string 'Hello' to pass downstream.
2 Produce demand on the pipeline and consume any values.

Limits

A specific application of filters is for setting limits to a Stream. Limiting actions can be found under the reactor.rx.action.filter package. There are various ways to tell a Stream<T> its boundary in time, in size and/or on a specific condition. The most popular one is the .take(long) method.

Stream.take(long)
Streams
  .range(1, 100)
  .take(50) (1)
  .consume(
    System.out::println,
    Throwable::printStackTrace,
    avoid -> System.out.println("--complete--")
  );
1 Only take the 50 first elements then cancel upstream and complete downstream.

Transformation

If you want to actively transform data as it passes through the pipeline, then use .map(Function) and other reactor.rx.action.transformation actions. The most popular transforming action is .map(Function<? super I, ? extends O>). A few other Actions depend on transforming data, especially Combinatory operations like flatMap or concatMap.

Stream.map(Function<T,V>)
Streams
  .range(1, 100)
  .map(number -> ""+number) (1)
  .consume(System.out::println);
1 Transform each Long into a String.

(A)Sync Transformation: FlatMap, ConcatMap, SwitchMap

If you want to execute a distinct pipeline Stream<V> or Publisher<V> given an actual input data, you can use combinatory actions such as .flatMap(Function) and other reactor.rx.action.combination actions.

To transform values into a distinct, possibly asynchronous Publisher<V>, use .flatMap(Function<? super I, ? extends Publisher<? extends O>). The returned Publisher<V> will then be merged back to the main flow signaling onNext(V). They are properly removed from the merging action whey they complete. The difference between flatMap, concatMap and switchOnMap is the merging strategy, respectively Interleave, Fully Sequential and Partially Sequential (interrupted by onNext(Publisher<T>)).

The downstream request is split (minimum 1 by merged Publisher)
Stream.flatMap(Function)
Streams
  .range(1, 100)
  .flatMap(number -> Streams.range(1, number).subscribeOn(Environment.workDispatcher()) ) (1)
  .consume(
    System.out::println, (2)
    Throwable::printStackTrace,
    avoid -> System.out.println("--complete--")
  );
1 Transform any incoming number into a range of 1-N number merged back and executed on the given Dispatcher.

Blocking and Promises

Blocking is considered an anti-pattern in Reactor. That said, we do offer an appropriate API (Ah AH!) for integration with legacy operations and for testing support.

The Promise API offers a range of stateful actions which inspect the current ready|error|complete state and, if fulfilled, immediately calls the wired action.

Stream.toList()
Promise<List<Long>> result = Streams
  .range(1, 100)
  .subscribeOn(Environment.workDispatcher())
  .toList(); (1)

System.out.println(result.await()); (2)
result.onSuccess(System.out::println); (3)
1 Consume the entire sequence on the dispatcher thread given in subscribeOn(Dispatcher) operation.
2 Block (default 30 Seconds) until onComplete() and print only onNext(List<Long>); or, if onError(e), wrap as RuntimeException and re-raise.
3 Since the promise is already fulfilled, System.out.println() will run immediately on the current context.
Table 9. Waiting for a Stream or Promise

Functional API or Factory method

Role

Streams.await(Publisher<?>)

Block until the passed Publisher onComplete() or onError(e), bubbling up the eventual exception.

Stream.next()

with Promise.await(), Promise.get()…​

Capture in a Promise the immediate next signal only and onComplete() if the signal was a data. get() can be used to touch but not wait on the promise to fulfill.

Stream.toList()

with Promise.await(), Promise.get()…​

Similar to next() but capture the full sequence in a List<T> to fulfill the Promise<List<T>> returned.

Stream.toBlockingQueue()

Subscribe to the Stream and return an iterable blocking Queue<T> accumulating all onNext signals. CompletableBlockingQueue.isTerminated() can be used as a condition to exit a blocking poll() loop.

Wiring up Synchronous Streams

It’s not specific to any API, but if the current Stream is dispatched on a SynchronousDispatcher, it is actually blocking when a terminal action is starting, such as consume().

Understanding the threading model

One common purpose for Reactive Streams and Reactive Extensions is to be unopinionated about threading behavior thanks to the signal callbacks. Streams are all about it will be executed at some point between now and some time T. Non-concurrent signals may also preserve Subscriber from concurrency access (share-nothing), however signals and requests can run on 2 asymmetric threads.

By default the Stream is assigned with a SynchronousDispatcher and will inform its immediate child Actions via Stream.getDispatcher().

Various Stream factories, the Broadcaster, the Stream.dispatchOn and the terminal xxxOn methods might alter the default SynchronousDispatcher.
It is fundamental to understand the three major thread switchs available in Reactor Stream:
  • The Stream.dispatchOn action is the only one available under Stream that will be dispatching onError, onComplete and onNext signals on the given Dispatcher.

    • Since an action is a Processor it doesn’t support concurrent Dispatcher such as WorkQueueDispatcher.

    • request and cancel will run on the dispatcher as well if in its context already. Otherwise it will execute after the current dispatch ends.

  • The Stream.subscribeOn action will be executing onSubscribe only on the passed dispatcher.

    • Since the only time the passed Dispatcher is called is onSubscribe, any dispatcher can be used including the concurrent ones such as WorkQueueDispatcher.

    • The first request might still execute in the onSubscribe thread, for instance with Stream.consume() actions.

  • Attaching a Processor via Stream.process for instance can affect the thread too. The Processor such as RingBufferProcessor will run the Subscribers on its managed threads.

    • request and cancel will run on the processor as well if in its context already.

    • RingBufferWorkProcessor will only dispatch onNext signals to one Subscriber at most unless it has cancelled in-flight (replay to a new Subscriber).

Since the common contract is to start requesting data onSubscribe, subscribeOn is an efficient tool to scale-up streams, particulary unbounded ones. If a Subscriber requests Long.MAX_VALUE in onSubscribe, it will then be the only request executed and it will run on the dispatcher assigned in subscribeOn. This is the default behaviour for unbounded Stream.consume actions.

Jumping between threads with an unbounded demand
Streams
  .range(1, 100)
  .dispatchOn(Environment.sharedDispatcher()) (2)
  .subscribeOn(Environment.workDispatcher()) (1)
  .consume(); (3)
1 Assign an onSubscribe work queue dispatcher.
2 Assign a signal onNext, onError, onComplete dispatcher.
3 Consume the Stream onSubscribe with Subscription.request(Long.MAX)
Unbounded threading
Figure 12. subscribeOn and dispatchOn/process with an unbounded Subscriber

However, subscribeOn is less useful when more than 1 request will be involved, like in step-consuming with Stream.capacity(n). The only request executed possibly running on the dispatcher assigned in subscribeOn is the first one.

Jumping between thread with a bounded demand 1
Streams
  .range(1, 100)
  .process(RingBufferProcessor.create()) (2)
  .subscribeOn(Environment.workDispatcher()) (1)
  .capacity(1); (3)
  .consume(); (4)
1 Assign an onSubscribe work queue dispatcher. Note that it is placed after process as the subscribeOn will run on the ringBuffer thread on subscriber and we want to alter it to the work dispatcher.
2 Assign an async signal onNext, onError, onComplete processor. Similar to dispatchOn behavior.
3 Assign a Stream capacity to 1 so the downstream action adapts
4 Consume the Stream onSubscribe with Subscription.request(1) and after every 1 onNext.
Bounded threading
Figure 13. subscribeOn and dispatchOn/process with an bounded (demand N < Long.MAX) Subscriber

MicroBatching

Better trade your unused CPU and Memory for your overused Latency
— Klingon Proverb

After one or two reads of the 101 Stream crash intro, you courageous hacker are ready for some quick ROI. In effect dispatching efficiently is far away from the only item to check in the way of millions messages per sec todo list.

A common issue in Distributed Systems lies in the latency cost over indivudual vs buffered IO writes. When such situation arises, MicroBatching or small chunk-processing is the action to group individual data operations. Behind the term Micro hides a more concrete behavior named In Memory. Since the Speed of Light is still a limitation of systems today, main memory remains cheaper to read than disk.

Latency Comparison Numbers

L1 cache reference                            0.5 ns
Branch mispredict                             5   ns
L2 cache reference                            7   ns             14x L1 cache
Mutex lock/unlock                            25   ns
Main memory reference                       100   ns             20x L2 cache, 200x L1 cache
Compress 1K bytes with Zippy              3,000   ns
Send 1K bytes over 1 Gbps network        10,000   ns    0.01 ms
Read 4K randomly from SSD*              150,000   ns    0.15 ms
Read 1 MB sequentially from memory      250,000   ns    0.25 ms
Round trip within same datacenter       500,000   ns    0.5  ms
Read 1 MB sequentially from SSD*      1,000,000   ns    1    ms  4X memory
Disk seek                            10,000,000   ns   10    ms  20x datacenter roundtrip
Read 1 MB sequentially from disk     20,000,000   ns   20    ms  80x memory, 20X SSD
Send packet CA->Netherlands->CA     150,000,000   ns  150    ms

Notes
-----
1 ns = 10-9 seconds
1 ms = 10-3 seconds
* Assuming ~1GB/sec SSD

Credit
------
By Jeff Dean:               http://research.google.com/people/jeff/
Originally by Peter Norvig: http://norvig.com/21-days.html#answers

Streams are sequences of data, so finding boundaries to cut aggregated buffers is an out-of-the box API.

There are two categories for delimitations:
  • Buffer : Concrete boundaries accumulating onNext(T) inside grouped List<T> passed to the child Subscriber.

    • Used best with external API requiring Iterable<T> input argument.

  • Window : Discrete boundaries forwarding onNext(T) into distinct Stream<T> passed to the child Subscriber.

    • Used best with accumulators such as reduce or any subscriber/action reacting to onComplete().

    • Can be combined with flatMap or concatMap which merge back the individual windows in a common Stream<T>

Into Buffers

Collecting grouped sequences of data T into lists List<T> serves two main purposes:

  • Expose a sequence matching the boundary conditions into an Iterable structure commonly used by JVM APIs

  • Reduce the volume of onNext(T) signals, e.g. buffer(5) will transform a sequence of 10 elements into a sequence of 2 lists (of 5 elements).

Collecting data incurs an overhead in memory and possibly CPU that should be sized appropriately. Small and timed boundaries are advised to avoid any long lasting aggregates.
An Environment must be initialized if the timed buffer() signatures are used without providing the Timer argument.
long timeout = 100;
final int batchsize = 4;
CountDownLatch latch = new CountDownLatch(1);

final Broadcaster<Integer> streamBatcher = Broadcaster.<Integer>create(env);
streamBatcher
  .buffer(batchsize, timeout, TimeUnit.MILLISECONDS)
  .consume(i -> latch.countDown());


streamBatcher.onNext(12);
streamBatcher.onNext(123);
Thread.sleep(200);
streamBatcher.onNext(42);
streamBatcher.onNext(666);

latch.await(2, TimeUnit.SECONDS);
Table 10. Chunk processing with Stream buffers (returning Stream<List<T>>):

Stream<T> API

Role

buffer(int)

Aggregate until onComplete() or the given int argument is reached which starts over a new aggregation.

buffer(Publisher<?>, Supplier<? extends Publisher<?>>)

Aggregate until onComplete() or when the first Publisher<?> argument emits a signal. The optional Supplier<? extends Publisher<?>> supplies a sequence whose first signal will end the linked aggregation. That means overlapping (sliding buffers) and disjointed aggregation can be emitted to the child Subscriber<List<T>>.

buffer(Supplier<? extends Publisher<?>>)

Aggregate until onComplete() or in coordination with a provided Publisher<?>. The Supplier<? extends Publisher<?>> supplies a sequence whose first signal will end the linked aggregation and start a new one immediately.

buffer(int, int)

Aggregate until onComplete() or the given skip (the second int argument) is reached which starts over a new aggregation. The first size int argument will delimit the maximum numger of aggregated elements by buffer. That means overlapping (sliding buffers) and disjointed aggregation can be emitted to the child Subscriber<List<T>>.

buffer(long, TimeUnit, Timer_)

Aggregate until onComplete() or the elapsed period (the first long argument) is reached, which starts over a new aggregation.

buffer(long, long, TimeUnit, Timer_)

Aggregate until onComplete() or the given timeshift (the second long argument) is reached. The timespan (the first long argument) will delimit the maximum numger of aggregated elements by buffer. That means overlapping (sliding buffers) and disjointed aggregation can be emitted to the child Subscriber<List<T>>.

buffer(int, long, TimeUnit, Timer)

A combination of buffer(int) OR buffer(long, TimeUnit, Timer) conditons. It accumulates until the given size has been reached or the timespan has elapsed.

Into Windows

Forwarding grouped sequences of data T into a Stream<T> serves three main purposes:

  • Expose a sequence of data T to various limited grouped observations and accumulation: metrics, average, flexible aggregate (Map, Tuple…​).

  • Parallelizing grouped sequences combined with dispatchOn for each generated Stream<T> and merging their results back.

  • Repeat onComplete() for individual grouped sequences, e.g. in Async IO module to delimit a flush.

Stream<T> windows are slightly less optimized but equivalent aggregating producer than buffer API if combined with the aggregate-all Stream.buffer() method:

stream.buffer(10, 1, TimeUnit.SECONDS);

//equivalent to
stream.window(10, 1, TimeUnit.SECONDS).flatMap( window -> window.buffer() )
An Environment must be initialized if the alias for timed window() are used without providing the Timer argument.
//create a list of 1000 numbers and prepare a Stream to read it
Stream<Integer> sensorDataStream = Streams.from(createTestDataset(1000));

//wait for all windows of 100 to finish
CountDownLatch endLatch = new CountDownLatch(1000 / 100);

Control controls = sensorDataStream
  .window(100)
  .consume(window -> {
    System.out.println("New window starting");
    window
      .reduce(Integer.MAX_VALUE, (acc, next) -> Math.min(acc, next))
      .finallyDo(o -> endLatch.countDown())
      .consume(i -> System.out.println("Minimum " + i));
  });

endLatch.await(10, TimeUnit.SECONDS);
System.out.println(controls.debug());

Assert.assertEquals(0, endLatch.getCount());
Table 11. Chunk processing with Stream (returning Stream<Stream<T>>):

Stream<T> API

Role

window(int)

Forward to a generated Stream<T> until onComplete() or the given int argument is reached which starts over a new Stream.

window(Publisher<?>, Supplier<? extends Publisher<?>>)

Forward to a generated Stream<T> until onComplete() or when the first Publisher<?> argument emits a signal. The optional Supplier<? extends Publisher<?>> supplies a sequence whose first signal will end the linked aggregation. That means overlapping (sliding buffers) and disjointed aggregations can be emitted to the child Subscriber<Stream<T>>.

window(Supplier<? extends Publisher<?>>)

Forward to a generated Stream<T> until onComplete() or in coordination with a provided Publisher<?>. The Supplier<? extends Publisher<?>> supplies a sequence whose first signal will end the linked Stream<T> and start a new one immediately.

window(int, int)

Forward to a generated Stream<T> until onComplete() or the given skip (the second int argument) is reached which starts over a new Stream<T>. The size (the first int argument) will delimit the maximum numger of aggregated elements by buffer. That means overlapping (sliding buffers) and disjointed sequences can be emitted to the child Subscriber<Stream<T>>.

window(long, TimeUnit, Timer_)

Forward to a generated Stream<T> until onComplete() or the elpased period (the long argument) is reached, which starts over a new Stream<T>.

window(long, long, TimeUnit, Timer_)

Forward to a generated Stream<T> until onComplete() or the given timeshift (the second long argument) is reached. The timespan (the first long argument) will delimit the maximum numger of aggregated elements by buffer. That means overlapping (sliding buffers) and disjointed sequenced can be emitted to the child Subscriber<Stream<T>>.

window(int, long, TimeUnit, Timer)

A combination of buffer(int) OR buffer(long, TimeUnit, Timer) conditons. It forwards to a generated Stream<T> until the given size has been reached or the timespan has elapsed.

Backpressure and Overflow

Backpressure is addressed automatically in many situations with the Reactive Streams contract. If a Subscriber doesn’t request more than it can actually process (e.g. something other than Long.MAX_VALUE), the upstream source can avoid sending too much data. With a "cold" Publisher this only works when you can stop reading from source at any time: How much to read from a socket, How many rows from a SQL query cursor, how many lines from a File, how many elements from an Iterable…​

If the source is hot, such as a timer or UI events, or the Subscriber might request Long.MAX_VALUE on a large dataset, a strategy must be explicitely picked by the developer to deal with backpressure.

Reactor provides a set of APIs to deal with Hot and Cold sequences:
  • Uncontrolled sequences (Hot) should be actively managed

    • By reducing the sequence volume, e.g. "sampling"

    • By ignoring data when the demand exceeds capacity

    • By buffering data when the demand exceeds capacity

  • Controlled sequences (Cold) should be passively managed

    • By lowering demand from the Subscriber or at any point of the Stream

    • By gapping demand with delayed requests

A common example used extensively in the Reactive Extensions documentation is the Marble Diagram. The dual timeline helps visualize when and what is observed in the Publisher or Stream and in a Subscriber (e.g. an Action). We will use these diagrams here to emphasize the demand flow, where usually such a diagram details the nature of the transformation like map or filter.

Marble Diagrams

Reactor will automatically provide for an in-memory overflow buffer when the dispatcher or the capacity differs from one action to another. This will not apply to Core Processors, which handle the overflow in their own way. Dispatchers can be re-used and Reactor must limit the number of dispatches where it can, hence the in-memory buffer added by Action when dispatchers differ.

Streams.just(1,2,3,4,5)
  .buffer(3) (1)
  //onOverflowBuffer()
  .capacity(2) (2)
  .consume()


Streams.just(1,2,3,4,5)
  .dispatchOn(dispatcher1) (3)
  //onOverflowBuffer()
  .dispatchOn(dispatcher2) (4)
  .consume()
1 The buffer operation set capacity(3)
2 consume() or any downstream action is set with capacity(2), an implicit onOverflowBuffer() is added
3 A first action running on dispatcher1
4 A second action running on a different dispatcher2, an implicit onOverflowBuffer() is added

Ultimately the Subscriber can request data one by one, limiting the in-flight data to one element all along the pipeline and requesting one more after each successful onNext(T). The same behavior can be obtained with capacity(1).consume(...).

Streams.range(1,1000000)
  .subscribe(new DefaultSubscriber<Long>(){ (1)
    Subscription sub;

    @Override
    void onSubscribe(Subscription sub){
      this.sub = sub;
      sub.request(1); (2)
    }

    @Override
    void onNext(Long n){
      httpClient.get("localhost/"+n).onSuccess(rep -> sub.request(1)); (3)
    }
  );
1 Use a DefaultSubscriber to avoid implementing all Subscriber methods.
2 Schedule a first demand request after keeping a reference to the subscription.
3 Use Async HTTP API to request more only on successful GET. That will naturally propagate the latency information back to the RangeStream Publisher. One can imagine then measuring the time difference between two requests and how that gives an interesting insight into the processing and IO latency.
Table 12. Controlling the volume of in-flight data

Stream<T>

Role

subscribe(Subscriber<T>)

A custom Subscriber<T> will have the flexibility to request whenever it wishes. It’s best to change the size these requests if the Subscriber uses blocking operations.

capacity(long)

Set the capacity to this Stream<T> and all downstream actions.

onOverflowBuffer(CompletableQueue)

Create or use the given CompletableQueue to store the overflow elements. Overflow occurs when a Publisher sends more data than a Subscriber has actually requested. Overflow will be drained over the next calls to request(long).

onOverflowBuffer()

onOverflowDrop()

Ignore the overflowed elements. Overflow occurs when a Publisher sends more data than a Subscriber has actually requested. Overflow will be drained over the next calls to request(long).

onOverflowDrop()

throttle(long)

Delay downstream request(long) and periodically decrement the accumulated demand one by one to request upstream.

throttle(delay)

requestWhen(Function<Stream<Long>, Publisher<Long>>)

Pass any downstream request(long) to Stream<Long> sequence of requests that can be altered and returned using any form of Publisher<Long>. The RequestWhenAction will subscribe to the produced sequence and immiately forward onNext(Long) to the upstream request(long). It behaves similarly to adaptiveConsume but can be inserted at any point in the Stream pipeline.

requestWhen(requestMapper)

batchConsume(Consumer<T>, Consumer<T>, Consumer<T>, Function<Long,Long>)

batchConsumeOn

Similar to consume but will request the mapped Long demand given the previous demand and starting with the default Stream.capacity(). Useful for adapting the demand from various factors.

adaptiveConsume(Consumer<T>, Consumer<T>, Consumer<T>, Function<Stream<Long>,Publisher<Long>>),

adaptiveConsumeOn

Similar to batchConsume but will request the computed sequence of demand Long. It can be used to insert flow-control such as Streams.timer() to delay demand. The AdaptiveConsumerAction will subscribe to the produced sequence and immediately forwards onNext(Long) to the upstream request(long).

process(Processor<T, ?>)

Any Processor can also take care of transforming the demand or buffer. It is worth checking into the behavior of the specific Processor implementation in use.

All filter(arguments), take(arguments), takeWhile(arguments)…​

All limit operations can be used to proactively limit the volume of a Stream.

buffer(arguments), reduce(arguments), count(arguments)…​

All aggregating and metrics operations can be used to proactively limit the volume of a Stream.

All sample(arguments), sampleFirst(arguments)

Reduce the volume of a Stream<T> by selecting the last (or the first) onNext(T) signals matching the given conditions. These conditions can be timed, sized, timed or sized, and interactive (event-driven).

zip(arguments), zipWith(arguments)

Reduce the volume of N Stream<T> to the least signals producing zipped Publisher. The aggregated signals from each Publisher can be used to produce a distinct value from the N most recent upstream onNext(T).

Combinatory Operations

Combining Publishers allows for coordination between multiple concurrent sequences of data. They also serve the purpose of asynchronous transformations, with the resulting sequences being merged.

Coordinating in a non-blocking way will free the developer from using Future.get() or Promise.await(), a perilous task when it comes to more than one signal. Being non-blocking means that distinct pipelines won’t wait on anything other than Subscriber demand. The Subscriber requests will be split, with a minimum request of one for each merged Publisher.

Merging actions are modeled in FanInAction and take care of concurrent signaling with a thread-stealing SerializedSubscriber proxy to the delegate Subscriber. For each signal it will verify if the correct thread is already running the delegate Subscriber and rescheduling the signal if not. The signal will then be polled when the busy thread exits Subscriber code, possibly running the signal in a different thread than originally produced on.

Reducing the demand volume before using flatMap might be a good or a bad idea. In effect it doesn’t deserve the merging action to subscribe to many parallel Publisher if it can’t actually process them all. However it limiting the parallel Publisher size might also not give a chance to faster Publisher pending a request to be delivered.
Stream.zipWith(Function)
Streams
  .range(1, 100)
  .zipWith( Streams.generate(System::currentTimeMillis), tuple -> tuple ) (1)
  .consume(
    tuple -> System.out.println("number: "+tuple.getT1()+" time: "+tuple.getT2()) , (2)
    Throwable::printStackTrace,
    avoid -> System.out.println("--complete--")
  );
1 "Zip" or aggregate the most recent signal from RangeStream and the passed SupplierStream providing current time
2 "Zip" produces tuples of data from each zipped Publisher in the declarative order (left to right, stream1.zipWith(stream2)).
Table 13. Combining Data Sources

Functional API or Factory method

Role

Stream.flatMap(Function<T, Publisher<V>>)

An Async transformation is a typed shortcut for map(Function<T, Publisher<V>>).merge().

The mapping part produces a Publisher<V> eventully using the passed data T, a common pattern used in MicroService architecture.

The merging part transforms the sequence of produced Publisher<V> into a sequence of V by safely subscribing in parallel to all of them. There is no ordering guaranteed, it is interleaved sequence of V. All merged Publisher<T> must complete before the Subscriber<T> can complete.

Streams.switchOnNext(Publisher<Publisher<T>>)

A Stream alterning in FIFO order between emitted onNext(Publisher<T>) from the passed Publisher. The signals will result in downstream Subscriber<T> receiving the next Publisher sequence of onNext(T). It might interrupt a current upstream emission when the onNext(Publisher<T>) signal is received. All merged Publisher<T> must complete before the Subscriber<T> can complete.

Streams.merge(Publisher<T>, Publisher<T> x7)

Streams.merge(Publisher<Publisher<T>>)

Stream.mergeWith(Publisher<T>)

Stream.merge()

Transform upstream sequence of Publisher<T> into a sequence of T by safely subscribing in parallel to all of them. There is no ordering guaranteed, it is interleaved sequence of T. If the arguments are directly Publisher<T> like in Stream.mergeWith(Publisher<T>) or Streams.merge(Publisher<T>, Publisher<T>), the MergeAction will subscribe to them directly and size more efficiently (known number of parallel upstreams). All merged Publisher<T> must complete before the Subscriber<T> can complete.

Streams.concat(Publisher<T>, Publisher<T> x7)

Streams.concat(Publisher<Publisher<T>>)

Stream.concatWith(Publisher)

Stream.startWith(Publisher)

Similar to merge() actions but if a Publisher<T> is already emitting, wait for it to onComplete() before draining the next pending Publisher<T>. The sequences will be subscribed in declarative order, from left to right, e.g. stream1.concatWith(stream2) or with the argument given in stream2.startWith(stream1).

Streams.combineLatest(Publisher<T>, Publisher<T> x7, Function<Tuple,V>)

Streams.combineLatest(Publisher<Publisher<T>>, Function<Tuple,V>)

Combine the most recent onNext(T) signal from each distinct Publisher<T>. Each signal combines until a future onNext(T) from its source Publisher<T> replaces it. After all Publisher<T> have emitted at least one signal, the given combinator function will accept all recent signals and produce the desired combined object. If any Publisher<T> completes, the downstream Subscriber<T> will complete.

Streams.zip(Publisher<T>, Publisher<T> x7, Function<Tuple,V>)

Streams.zip(Publisher<Publisher<T>>, Function<Tuple,V>)

Stream.zipWith(Publisher<T>, Function<Tuple2,V>)

Combine the most recent onNext(T) signal from each distinct Publisher<T>. Each signal combines only once. Everytime all Publisher<T> have emitted one signal, the given zipper function will receive them and produce the desired zipped object. If any Publisher<T> completes, the downstream Subscriber<T> will complete.

Streams.join(Publisher<T>, Publisher<T> x7)

Streams.join(Publisher<Publisher<T>>)

Stream.joinWith(Publisher<T>)

A shortcut for zip with a predefined zipper function transforming each most recent Tuple into a List<?>.

MicroServices

The notion of MicroService has been an increasingly popular term over the last years. Simply put, we code software components with a focused purpose to encourage isolation, adapted scaling and reuse. In fact it has been over 30 years we use them:

An example of microservices in Unix
history | grep password

Even within the boundaries of the application, we can find the similar concept of functional granularity:

An example of microservices in imperative Java code
User rick = userService.get("Rick");
User morty = userService.get("Morty");
List<Mission> assigned = missionService.findAllByUserAndUser(rick, morty);

Of course the application has been widely popular within distributed systems and cloud-ready architectures. When the function is isolated enough, it will depend on N other ones for : Data access, subroutine calls over network, posting into message bus, querying an HTTP REST endpoint etc. This is where troubles begin: the execution flow is crossing multiple context boundaries. Relatively latency and failure will start to scale up as the system grows in volume and access.

At this point we can decide to scale-out, after all platforms such as CloudFoundry allow for elastic scaling of JVM apps and beyond. But looking at our CPU and memory use, it didn’t seem particularly under pressure. Of course it was not, each remote call was just blocking the whole service and preventing concurrent user requests to kick in: They are just parked in some thread pool queue. In the meantime the active request was happily seating for a few milliseconds or more waiting for a remote HTTP call socket to actually write, a delay we call latency here.

The same applies to errors, we can make applications more resilient (fallbacks, timeouts, retries…​) individually first and not rely on scaling out. The classic hope is that a replicate microservice will pick up the requests when a load-balancer will detect the failure:

Load Balancer: "are you dead ?"
30 sec later
Load Balancer: "are you dead ?"
30 sec later
Load Balancer: "you're dead !"
MicroService "I'am alive !"

In a Distributed System, coordination pulls a very long string of issues you wish you have never faced.

A Publisher like a Stream or a Promise is ideal to confront MicroServices latency and errors. To improve the situation with better error isolation and non-blocking service calls, code has to be designed with these two constraints in mind. To put on your side the best chances for a successful migration story to a Reactive Architecture, you might prefer to work step by step with quick wins and a few adjustements, test and iterate to the next step.

In this section we’re going to cover the basics to create a reactive facade gating each costly remote call, build functional services and make them latency-ready.

Becoming Reactive with Reactor in 3 steps:
  1. Transform target service calls into unbounded Stream or Promise return types

    • Asynchronous switch for Blocking → Non Blocking conversion

    • Error isolation

  2. Compose services with the Reactor Stream API

    • Blocking → Non Blocking coordination

    • Parallelize Blocking calls

  3. Evolve transformed services to backpressure ready Stream

    • Chunk processing/reading with bounded access

    • Optimize IO operations with Microbatching

Table 14. Common Actions at play when reading remote resources

Functional API or Factory method

Role

Streams.create(Publisher), Streams.defer(Supplier), Streams.wrap(Publisher), Streams.generate(Supplier)

Protecting resource access with a Publisher is encouraged. A few Stream factories will be particularly useful. The point of creating a Publisher is to only onNext(T) when the data is ready such as in an IO callback. The read should be triggered by a Subscriber request if possible to implement a form of backpressure.

Stream.timeout(arguments)

Accessing an external resource, especially remote, should always be limited in time to become more resilient to external conditions such as network partitions. Timeout operations can fallback to another Publisher for alternative service call or just onError(TimeoutException). The timer resets each time a fresh onNext(T) is observed.

Stream.take(arguments)

Similar to timeout(), a need to scope in size an external resource is a common one. It’s also useful to fully trigger a pipeline including onComplete() processing.

Stream.flatMap(Function<T,Publisher<V>)

An Async transformation that produces a Publisher<V> eventully using the passed data T, the ideal place to hook in a call to another service before resuming the current processing.

The sequence of produced Publisher<V> will flow in the Subscriber into a sequence of V by safely subscribing in parallel.

Stream.subscribeOn(Dispatcher), Stream.dispatchOn(Dispatcher), Core Processors

Threading control is strategic:

  • Slow MicroService, low volume or low throughput, e.g. HTTP GET → subscribeOn(workQueueDispatcher()) to scale-up concurrent service calls.

  • Fast MicroService, high volume or high throughput, e.g. Message Bus → dispatchOn(sharedDispatcher()) or RingBufferXXXProcessor.create() to scale up message-dispatching.

Creating Non-Blocking Services

The first step is to isolate the microservice access. Instead of returning a type T or Future<T>, we will now start using Publisher<T> and specifically Stream<T> or Promise<T>. The immediate benefit is we don’t need to worry anymore about error handling and threading (yet): Errors are propagated in onError calls (no bubble up), threading might be tuned later for instance using dispatchOn. The additional bonus is we get to make our code more functional. It also works nice with Java 8 Lambdas! The target will be to reduce control brackets noise (if, for, while…​) and limit more the need for sharing context. Ultimately our target design will encourage streaming over polling large datasets : functions will apply to a sequence, result by result, avoiding loop duplication.

We prefer to use the implementation artefacts and not Publisher<T> to get compile-time access to all the Reactor Stream API, unless we want to be API agnostic (a possible case for library developers). Streams.wrap(Publisher<T>) will do the trick anyway to convert such generic return type into a proper Stream<T>.
Table 15. Evolving to reactive microservices, part 1, error isolation and non-blocking in some UserService

The Not So Much Win

The Win

public User get(String name)
throws Exception {
  Result r = userDB.findByName(name);
  return convert(r);
}

public List<User> allFriends(User user)
throws Exception {
  ResultSet rs = userDB.findAllFriends(user);
  return convertToList(r);
}

public Future<List<User>> filteredFind(String name)
throws Exception {
  User user = get(name);
  if(user == null || !user.isAdmin()){
    return CompletedFuture.completedFuture(null);
  } else {
    //could be in an async thread if wanted
    return CompletedFuture.completedFuture(allFriends(user));
  }
}
public Promise<User> get(final String name) {
  return Promises
    .task( () -> userDB.findByName(name))
    .timeout(3, TimeUnit.Seconds)
    .map(this::convert)
    .subscribeOn(workDispatcher());
}

public Stream<User> allFriends(final User user)  {
  return Streams
    .defer(() ->
      Streams.just(userDB.findAllFriends(user)))
    .timeout(3, TimeUnit.Seconds)
    .map(this::convertToList)
    .flatMap(Streams::from)
    .dispatchOn(cachedDispatcher());
    .subscribeOn(workDispatcher());
}

public Stream<User> filteredFind(String name){
    return get(name)
      .stream()
      .filter(User::isAdmin)
      .flatMap(this::allFriends);
}

The Result

  • In all query methods:

    • No more throws Exception, its all passed in the pipeline

    • No more control logic, we use predefined operators such as map or filter

    • Only return Publisher (Stream or Promise)

    • Limit blocking queries in time with timeout (can be used later for retrying, fallback etc)

    • Use a pooled workDispatcher thread On Subscribe

  • In get(name):

    • Use of typed single data Publisher, or Promise.

    • On Subscribe, call the task callback

  • In allFriends(user):

    • Use defer() to invoke the DB query on the onSubscribe thread, lazily

    • No backpressure strategy yet and we read all the results in one blocking (but async) call

    • We convert returned list into a data stream in FlatMap

    • Dispatch each signal on an async dispatcher so downstream processing doesn’t negatively impact the read

  • In filteredFind(name):

    • We convert a Promise from first get to a Stream with stream()

    • We only call allFriends() sub-stream if there is a valid user

    • The returned Stream<User> resume on the first allFriend() signal

Composing multiple Services Calls

In this second step, we will expand our thinking to the consuming aspect. In a transition phase, keep in mind that Stream can be blocked using operators.

There are two issues to address in target: robustness (network partition tolerance etc) and avoiding to wait for a service before processing another:

Table 16. Evolving to reactive microservices, part 2, parallel requests and resiliency in some RickAndMortyService

The Not So Much Win

The Win

int tries = 0;
while(tries < 3){
  try{
    Future<List<User>> rickFriends =
      userService.fitleredFind("Rick");

    Future<List<User>> mortyFriends =
      userService.fitleredFind("Morty");

    System.out.println(
      rickFriends.get(3, TimeUnit.SECONDS)
      .addAll(
        mortyFriends.get(3, TimeUnit.SECONDS))
    );

  }catch(Exception e){
    if(tries++ >= 3) throw e;
    Thread.sleep(tries*1000);
  }
}
return Streams.merge(
  userService.filteredFind("Rick"),
  userService.filteredFind("Morty")
)
.buffer()
.retryWhen( errors ->
  errors
  .zipWith(Streams.range(1,3), t -> t.getT2())
  .flatMap( tries -> Streams.timer(tries) )
)
.consume(System.out::println);

The Result

  • Streams.merge() is a non-blocking coordinating operation mixing the two queries in one

  • buffer() will aggregate all results until completion or error (which we timed previously)

  • retryWhen(Function<Stream<Throwable>, Publisher<?>> will keep re-subscribing if an error is propagated

    • zipWith will combine errors with a number of tries up to 3 times

    • zipWith only return the number of tries from the tuple

    • flatMap + Streams.timer(long) convert each try into a delayed signal (seconds by default)

    • Each time a signal is sent by this returned Publisher, cancel and subscribe again, until a onComplete or onError is sent.

    • flatMap only completes if the internal timer AND the upstream have completed, so after the range of 3 or after errors sequence itself terminates.

Supporting Reactive Backpressure

In this last step, we pay a visit to the UserService.allFriends query which is right now polling entire datasets from Database.

Table 17. Evolving to reactive microservices, part 3, backpressure in UserService.allFriends

The Win

The Epic Win

return Streams
  .defer(() ->
    Streams.just(userDB.findAllFriends(user)))
  .timeout(3, TimeUnit.Seconds)
  .map(this::convertToList)
  .flatMap(Streams::from)
  .dispatchOn(cachedDispatcher());
  .subscribeOn(workDispatcher());

stream
  .buffer()
  .consume(System.out::println);
return Streams
  .createWith(
    (demand, sub) -> {
      ResultSet rs = sub.context();
      long cursor = 0l;

      while(rs.hasNext()
        && cursor++ < demand
        && !sub.isCancelled()){

        sub.onNext(rs.next());
      }

      if(!rs.hasNext()){
        sub.onComplete();
      }
    },
    sub -> userDB.findAllFriends(user),
    resultSet -> resultSet.close()
  )
  .timeout(3, TimeUnit.Seconds)
  .map(this::convert)
  .dispatchOn(cachedDispatcher());
  .subscribeOn(workDispatcher());

stream
  .buffer(5, 200, TimeUnit.MILLISECONDS)
  .consume(System.out::println);

The Result

  • Yes it’s more verbose…​

  • …​But now we stream result by result from the query (could have used pagination with SQL limits as well).

  • Streams.createWith is a PublisherFactory which intercepts requests, start and stop.

    • The request consumer gives precisely how many elements a subscriber is ready to receive.

    • The request consumer receives a SubscriberWithContext delegating to the real Subscriber, it gives access to shared context and cancel status.

    • We send at most as many individual Result as demanded

    • We complete when the query read is fully processed

  • Since the data is individual now, convertToList is unecessary, replaced with convert

  • The Consuming aspect can start using tools such as capacity(long) or buffer(int) to batch consume the request 5 by 5.

    • As a result the flow will be perceived faster because we don’t print after every rows have been read

    • We add a time limit to the batch since it might not match with the size

It’s important to balance the use of stateful Iterable<T> like List<T> vs individual streaming T. A List might incur at some point more latency since we take more time to create it. It’s also not playing that well in favor of resiliency since it’s a whole batch we can lose if a fatal error occurs. Finally, streaming T data makes sizing demand more predictable because we can score individual signals instead of batches of signals.

Error Handling

Since error isolation is an important part of the Reactive contract, Stream API is equipped to build fault tolerant pipelines or service call.

Error isolation comes simply by preventing onNext, onSubscribe and onComplete callbacks to bubble up any exception. Instead they are passed to the onError callback and propagated downstream. A few Action can react passively or actively on such signal, e.g. when() will just observe errors and onErrorResumeNext() will switch to a fallback Publisher.

Inversing the propagation to the consuming side instead of bubbling up to the producer side is the reactive pattern to isolate the data producer from the pipeline errors and keep producers alive and happy.

In the end the last Subscriber in the chain will be notified with the onError(t) callback method. If that Subscriber is a ConsumerAction for instance, Reactor will re-route an error if no errorConsumer callback has been assigned using Stream.consume(dataConsumer, errorConsumer). The route will trigger the current Environment error journal if set, which by default uses SLF4J to log errors.

Reactor also distinguishes fatal exceptions from normal ones, specially during onSubscribe process. These exceptions will not be isolated nor passed downstream to the subscriber(s) :

  • CancelException

    • Happens if no subscriber is available during onNext propagation, e.g. when a subscriber asynchronously cancelled during onNext emission

    • Use the JVM property -Dreactor.trace.cancel=true to enable verbose CancelException and logging in Environment default journal. If not set, Environment will not report these exceptions and there won’t be any stacktrace associated neither.

  • ReactorFatalException

    • Happens when Reactor defines an unrecoverable situation like a scheduling on Timer not matching the resolution.

  • JVM unsafe exceptions:

    • StackOverflowError

    • VirtualMachineError

    • ThreadDeath

    • LinkageError

A good practice as seen in various sections is to set time limits explicitely, so timeout() + retry() will be your best mates especially to protect against network partitioning. The more data flows in the Stream the better it should be able to auto heal to keep a good service availability.

In Reactive Streams, at most one error can traverse a pipeline, so you can’t really double onError(e) a Subsriber, in theory. In practice we implemented the Rx operators retry() and retryWhen() that will cancel/re-subscribe onError. That means we still respect the contract as an entire new pipeline will be materialized transparently, with fresh action instances. That also means stateful Action like buffer() should be used with caution in this scenario since we just de-reference them, their state might be lost. We are working on alternatives, one of them involving external persistence for safe stateful Actions. A glimpse of that can be read in the related section.
Fallback cascade fun
Broadcaster<String> broadcaster = Broadcaster.create();

Promise<List<String>> promise =
    broadcaster
        .timeout(1, TimeUnit.SECONDS, Streams.fail(new Exception("another one!"))) (1)
        .onErrorResumeNext(Streams.just("Alternative Message")) (2)
        .toList();

broadcaster.onNext("test1");
broadcaster.onNext("test2");
Thread.sleep(1500);

try {
  broadcaster.onNext("test3");
} catch (CancelException ce) {
  //Broadcaster has no subscriber, timeout disconnected the pipeline
}

promise.await();

assertEquals(promise.get().get(0), "test1");
assertEquals(promise.get().get(1), "test2");
assertEquals(promise.get().get(2), "Alternative Message");
1 TimeoutAction can fallback when no data is emited for the given time period, but in this case it will just emit another Exception…​
2 …​However, we are lucky to have onErrorResumeNext(Publisher) to catch this exception and actually deliver some String payload

Another classic example of fault-tolerant pipeline can be found in Recipes Section.

Table 18. Handling errors

Stream<T> API

Role

when(Class<Throwable>, Consumer<Throwable>)

Observe specific exception types (and their hierarchy) coming from onError(Throwable).

oberveError(Class<Throwable>, BiConsumer<Object,Throwable>)

Similar to when but allows introspection of the failing onNext(Object) if any when the exception originally rose.

onErrorReturn(Class<Throwable, Function<Throwable,T>)

Provide a fallback signal T given an exception matching the passed type otherwise any exception. Commonly use in self-healing services.

onErrorResume(Class<Throwable, Publisher<T>)

Provide a fallback sequence of signal T given an exception matching the passed type otherwise any exception. Commonly use in self-healing services.

materialize() dematerialize()

Transform upstream signal into Signal<T>, and treat them as onNext(Signal<T>) signals. The immediate effect: it swallows error and completion signals, so it’s an effective way to process errors. Once errors are processed we can still run them by transforming the Signal<T> into the Reactive Streams right callback via dematerialize().

retry(int, Predicate<Throwable)

Cancel/Re-Subscribe the parent Stream up to the optional tries argument and matching the passed Predicate if provided.

retryWhen(Function<Stream<Throwable>,Publisher<?>>)

Cancel/Re-Subscribe the parent Stream when the returned Publisher from the passed Function emits onNext(Object). The function is called once on subscribe and the generated Publisher is subscribed. If the Publisher emits onError(e) or onComplete(), they will be propagated downstream. The Function receives a single Stream of errors which have occured in any subscribed pipeline. Can be combined with counting and delaying actions to provide for bounded and exponantial retry strategies.

recover(Class<Throwable>, Subscriber<Object>)

A retryWhen() shortcut to re-subscribe parent Publisher if the onError(Throwable) matches the given type. On recovery success, the passed Subscriber argument will receive the onNext(Object) that was the root signal associated with the exception, if any.

ignoreError(Predicate<Throwable>)

Transform the matching onError(Throwable) signals into onComplete(). If no argument has been provided, just transform any error into completion.

throw CancelException

That might be the only time we will mention anything related to exception bubbling up. However throwing CancelException.INSTANCE in any onNext(T) callback is a simple way to no-ack an incoming value and inform colocated (within the same thread stack) Publishers like Core Processor they might have to re-schedule this data later.

Persisting Stream Data

Not everything has to be in-memory and Reactor has started a story to integrate (optional dependency) with Java Chronicle.

return Streams.merge(
  userService.filteredFind("Rick"),
  userService.filteredFind("Morty")
)
.buffer()
.retryWhen( errors ->
  errors
  .zipWith(Streams.range(1,3), t -> t.getT2())
  .flatMap( tries -> Streams.timer(tries) )
)
.consume(System.out::println);
Table 19. Persisting signals safely

Functional API or Factory method

Role

Stream.onOverflowBuffer(CompletableQueue)

IOStreams.persistentMapReader()

IOStreams.persistentMap()

Analytics

Metrics and other stateful operations are fully part of the Stream API. Users familiar with Spark will recognize some method names in fact. ScanAction also offers a popular accumulating functional contract with reduce() and scan().

Playing with metrics and key/value data
Broadcaster<Integer> source = Broadcaster.<Integer> create(Environment.get());
long avgTime = 50l;

Promise<Long> result = source
    .throttle(avgTime) (1)
    .elapsed() (2)
    .nest() (3)
    .flatMap(self ->
            BiStreams.reduceByKey(self, (prev, next) -> prev + 1) (4)
    )
    .sort((a,b) -> a.t1.compareTo(b.t1)) (5)
    .log("elapsed")
    .reduce(-1L, (acc, next) ->
            acc > 0l ? ((next.t1 + acc) / 2) : next.t1 (6)
    )
    .next(); (7)

for (int i = 0; i < 10; i++) {
  source.onNext(1);
}
source.onComplete();
1 Slow down incoming Subscriber request to one every ~50 milliseconds, polling waiting data one by one.
2 Produce a Tuple2 of Time delta and payload between 2 signals or between onSubscribe and the first signal.
3 Make the current Stream available with onNext so we can compose it with a flatMap.
4 Accumulate all data until onComplete() in internal Map keyed with the Tuple2.t1 and valued by default with Tuple2.t2. Next matching keys will provide the previous value and the incoming new onNext in the accumulator BiFunction. In this case we only increment the initial payload 1 by key.
5 Accumulate all data until onComplete() in internal PriorityQueue and sort elapsed time t1 using the given comparator. After onComplete() all data are emitted in order, then complete.
6 Accumulate until onComplete a moving time average defaulting to the first received time.
7 Take the next and only produced average.
Output
03:14:42.013 [main] INFO  elapsed - subscribe: ScanAction
03:14:42.021 [main] INFO  elapsed - onSubscribe: {push}
03:14:42.022 [main] INFO  elapsed - request: 9223372036854775807
03:14:42.517 [hash-wheel-timer-run-3] INFO  elapsed - onNext: 44,1
03:14:42.518 [hash-wheel-timer-run-3] INFO  elapsed - onNext: 48,1
03:14:42.518 [hash-wheel-timer-run-3] INFO  elapsed - onNext: 49,2
03:14:42.518 [hash-wheel-timer-run-3] INFO  elapsed - onNext: 50,3
03:14:42.518 [hash-wheel-timer-run-3] INFO  elapsed - onNext: 51,3
03:14:42.519 [hash-wheel-timer-run-3] INFO  elapsed - complete: SortAction
03:14:42.520 [hash-wheel-timer-run-3] INFO  elapsed - cancel: SortAction
Table 20. Operations useful for metrics and other stateful accumulation.

Stream<T> API or Factory method

Output Type

Role

count()

Long

Produce the total number of observed onNext(T) after observing onComplete(). Useful when combined with timed windows. Not so useful with sized windows, e.g. stream.window(5).flatMap(w -> w.count()) → produce 5, awesome.

scan(BiFunction<T,T>)

T

scan(A, BiFunction<A,T>)

A

reduce(BiFunction<T,T>)

T

reduce(A, BiFunction<A,T>)

A

BiStreams.reduceByKey()

BiStreams.scanByKey()

timestamp()

Tuple2<Long,T>

elapsed()

Tuple2<Long,T>

materialize() dematerialize()

Signal<T>

Transform upstream signal into Signal<T>, and treat them as onNext(Signal<T>) signals. The immediate effect: it swallows error and completion signals, so it’s an effective way to count errors and completions if the Stream is using retry or repeat API. Once completion and errors are processed we can still run them by transforming the Signal<T> into the Reactive Streams right callback via dematerialize().

Partitioning

Partition a Stream for concurrent, parallel work.

An important aspect of the functional composition approach to reactive programming is that work can be broken up into discreet chunks and scheduled to run on arbitrary Dispatchers. This means you can easily compose a flow of work that starts with an input value, executes work on another thread, and then passes through subsequent transformation steps once the result is available. This is one of the more common usage patterns with Reactor.

DispatcherSupplier supplier1 = Environment.newCachedDispatchers(2, "groupByPool");
DispatcherSupplier supplier2 = Environment.newCachedDispatchers(5, "partitionPool");

Streams
    .range(1, 10)
    .groupBy(n -> n % 2 == 0) (1)
    .flatMap(stream -> stream
            .dispatchOn(supplier1.get()) (2)
            .log("groupBy")
    )
    .partition(5) (3)
    .flatMap(stream -> stream
            .dispatchOn(supplier2.get()) (4)
            .log("partition")
    )
    .dispatchOn(Environment.sharedDispatcher()) (5)
    .log("join")
    .consume();
1 Create at most two streams (odd/even) keyed by 0 or 1 and forward the onNext(T) to the matching one.
2 Add one of the pooled dispatchers for the two emitted Stream by previous GroupByAction. Effectively this is scaling up a stream by using 2 partitions assigned with their own dispatcher. FlatMap will merge the result returned by both partitions, running on one of the two threads, but never concurrently.
3 Create 5 Streams and forward onNext(T) to them in a round robin fashion
4 Use the second dispatcher pool of 5 to assign to the newly generated streams. The returned sequences will be merged.
5 Dispatch data on the Environment.sharedDispatcher(), so neither the first or the second pool. The 5 threads will then be merged under the Dispatcher thread
Output extract
03:53:42.060 [groupByPool-3] INFO  groupBy - onNext: 4
03:53:42.060 [partitionPool-8] INFO  partition - onNext: 9
03:53:42.061 [groupByPool-3] INFO  groupBy - onNext: 6
03:53:42.061 [partitionPool-8] INFO  partition - onNext: 4
03:53:42.061 [shared-1] INFO  join - onNext: 9
03:53:42.061 [groupByPool-3] INFO  groupBy - onNext: 8
03:53:42.061 [partitionPool-4] INFO  partition - onNext: 6
03:53:42.061 [shared-1] INFO  join - onNext: 4
03:53:42.061 [groupByPool-3] INFO  groupBy - onNext: 10
03:53:42.061 [shared-1] INFO  join - onNext: 6
03:53:42.061 [groupByPool-3] INFO  groupBy - complete: DispatcherAction
Table 21. Grouping operations

Stream<T> API

Output Type

Role

groupBy(Function<T,K>)

GroupedStream<K,T>

partition(int)

GroupedStream<K,T>

All window(arguments)

Stream<T>

Windows are actually for cutting partitions over time, size or coordinated with external signals.

process(XXXWorkProcessor)

T

Since a RingBufferWorkProcessor distributes the signals to each subscribe, it is an efficient alternative to partition() when its just about scaling-up, not routing.

Other API beyond Rx

In addition to implementing directly the Reactive Streams, some more Stream methods not covered differ or are simply not documented by Reactive Extensions.

Table 22. Other methods uncovered in the previous use cases.

Stream<T> API

Input Type

Output Type

Role

after()

T

Void

Only consume onComplete() and onError(Throwable) signals.

log(String)

T

T

Use SLF4J and the given category to log each signal.

split

Iterable<T>

T

Blocking transformation from Iterable<T> to as many onNext(T) as available.

sort(int, Comparator<T>)

T

T

Accept up to the given size into an in memory PriorityQueue, apply the Comparator<T> to sort, and emit all its pending onNext(T) signals.

combine()

I

O

Scan for the most ancient parent or Action, from right to left. As a result, it will create a new Processor with the input onXXXX signals dispatched to the old action and the output subscribe delegated to the current action.

Example:

Action<Integer, String> processor = stream
  .filter( i -> i<2 )
  .map(Object::toString)
  .combine();

  processor.consume(System.out::println);
  processor.onNext(1);
  processor.onNext(3);

keepAlive()

T

T

Prevent any Subscription.cancel() to propagate from the Subscriber.

reactor-bus

Routing data

Stream Overview
Figure 14. How Doge can use Reactor-Bus

Publish/Subscribe

Using an EventBus to publish and respond to events using Publish/Subscribe.

Reactor’s EventBus allows you to register a Consumer to handle events when the notification key matches a certain condition. This assignment is achieved via the Selector. It’s similar to subscribing to a topic, though Reactor’s Selector implementations can match on a variety of critera, from Class<?> type to regexes, to JsonPath expressions. It is a very flexible and powerful abstraction that provides a wide range of possibilities.

You can register multiple Consumers using the same Selector and multiple Selectors can match a given key. This way it’s easy to do aggregation and broadcasting: you simply subscribe multiple Consumers to the same topic Selector.

If you’re upgrading from Reactor 1.1, you’ll see that the Reactor class no longer exists. It has been renamed to EventBus to more accurately reflect its role in the framework.

Handling events using a Selector
EventBus bus = EventBus.create(Environment.get()); (1)

bus.on($("topic"), (Event<String> ev) -> {
  String s = ev.getData();
  System.out.printf("Got %s on thread %s%n", s, Thread.currentThread());
}); (2)

bus.notify("topic", Event.wrap("Hello World!")); (3)
1 Create an EventBus using the default, shared RingBufferDispatcher from the static Environment.
2 Assign a Consumer to invoke when the EventBus is notified with a key that matches the Selector.
3 Publish an Event into the EventBus using the given topic.

The shorthand static method $ is just a convenience helper that is identical to Selectors.object(). Some people don’t like to use the shorthand methods like $() for ObjectSelector, R() for RegexSelector, T() for ClassSelector, and so forth. The Selectors class has longer method name alternatives for these shorthand versions, which are simply aliases to reduce code noise and make reactive code a little more readable.

Request/Reply

Using an EventBus to publish and respond to events using Request/Reply.

It’s often the case that you want to receive a reply from a task executed on an EventBus’s configured Dispatcher. Reactor’s EventBus provides for more event handling models beyond the simple publish/subscribe model. You can also register a Function rather than a Consumer and have the EventBus automatically notify a replyTo key of the return value of the Function. Rather than using the .on() and .notify() methods, you use the .receive(), and .send() methods.

Request/Reply
EventBus bus;

bus.on($("reply.sink"), ev -> {
  System.out.printf("Got %s on thread %s%n", ev, Thread.currentThread())
}); (1)

bus.receive($("job.sink"), ev -> {
  return doWork(ev);
}); (2)

bus.send("job.sink", Event.wrap("Hello World!", "reply.sink")); (3)
1 Assign a Consumer to handle all replies indiscriminantly.
2 Assign a Function to perform work in the Dispatcher thread and return a result.
3 Publish an Event into the bus using the given replyTo key.

If you don’t have a generic topic on which to publish replies, you can combine the request and reply operation into a single call using the .sendAndReceive(Object, Event<?>, Consumer<Event<?>>) method. This performs a .send() call and invokes the given replyTo callback on the Dispatcher thread when the Functions are invoked.

sendAndReceive()
EventBus bus;

bus.receive($("job.sink"), (Event<String> ev) -> {
  return ev.getData().toUpperCase();
}); (1)

bus.sendAndReceive(
    "job.sink",
   Event.wrap("Hello World!"),
   s -> System.out.printf("Got %s on thread %s%n", s, Thread.currentThread())
); (2)
1 Assign a Function to perform work in the Dispatcher thread and return a result.
2 Publish an Event into the bus and schedule the given replyTo Consumer on the Dispatcher, passing the receive Function’s result as input.

Cancelling a Task

Sometimes you want to cancel a task to cause it to stop responding to event notifications. The registration methods .on() and .receive() return a Registration object which, if a reference to it is held, can be used later to cancel a Consumer or Function for a given Selector.

EventBus bus;

Registration reg = bus.on($("topic"),
                          s -> System.out.printf("Got %s on thread %s%n", s, Thread.currentThread()));

bus.notify("topic", Event.wrap("Hello World!")); (1)

// ...some time later...
reg.cancel(); (2)

// ...some time later...
bus.notify("topic", Event.wrap("Hello World!")); (3)
1 Publish an event to the given topic. Should print Event.toString() in the console.
2 Cancel the Registration to prevent further events from reaching the Consumer.
3 Nothing should happen as a result of this notification.

Keep in mind that cancelling a Registration involves accessing the internal Registry in an atomic way. In a system in which a large number of events are flowing into Consumers, it’s likely that your Consumer or Function might see some values after you’ve invoked the .cancel() method, but before the Registry has had a chance to clear the caches and remove the Registration. The .cancel() method could be described as a "request to cancel as soon as possible".

You’ll notice this behavior right away in test classes where there’s no time delay between the .on(), .notify(), and .cancel() invocations.

Registry

Using a Registry to cache in-memory values.

reactor-net

Asynchronous TCP, UDP and HTTP

Nothing travels faster than the speed of light, with the possible exception of bad news, which obeys its own special laws.
— Douglas Noel Adams
Mostly Harmless (1992)
Head first with a Java 8 example of some Net work
import reactor.io.net.NetStreams;
import reactor.io.net.tcp.TcpServer;
import reactor.io.net.tcp.TcpClient;

//...

CountDownLatch latch = new CountDownLatch(10);

TcpServer<Buffer, Buffer> server = NetStreams.tcpServer(port);
TcpClient<Buffer, Buffer> client = NetStreams.tcpClient("localhost", port);

final JsonCodec<Pojo, Pojo> codec = new JsonCodec<Pojo, Pojo>(Pojo.class);

//the client/server are prepared
server.start( input ->

        //for each connection echo any incoming data

        //return the write confirm publisher from writeWith
        // >>> close when the write confirm completed

        input.writeWith(

                //read incoming data
                input
                        .decode(codec) //transform Buffer into Pojo
                        .log("serve")
                        .map(codec)    //transform Pojo into Buffer
                        .capacity(5l)  //auto-flush every 5 elements
        )
).await();

client.start( input -> {

        //read 10 replies and close
        input
                .take(10)
                .decode(codec)
                .log("receive")
                .consume( data -> latch.countDown() );

        //write data
        input.writeWith(
                Streams.range(1, 10)
                        .map( it -> new Pojo("test" + it) )
                        .log("send")
                        .map(codec)
        );

        //keep-alive, until 10 data have been read
        return Streams.never();

}).await();

latch.await(10, TimeUnit.SECONDS);

client.shutdown().await();
server.shutdown().await();

Overview

How is Reactor Net module working ?

Net Overview
Figure 15. How Doge can use Reactor-Net

So why should you care about an asynchronous runtime to deal with network operations ? As seen in the Microservice with Streams section, it is preferred to not block for a service reply. Non-Blocking write over network will slightly be more costly than blocking ones in terms of resources, however it might be perceived more responsive to the producer. Responsiveness all along the request flow impacts various systems and eventually, 1 or N users waiting their turn to push new requests.

Net Latency
Figure 16. Doge trades off CPU for Latency for better responsivity and to leave the service available to his friends

Blocking Read or Write become more like a nightmare for concurrent services use over long-living connections such as TCP or WebSocket. Apart from network routing component which might timeout a too long connection, little can be done with a blocking socket in the application locking the thread on read or write IO methods.

Of course there is always the choice to provide for a pool of threads or any Async Facade such as a Core Processor to mitigate the blocking read/write contention. The problem is there won’t be many of these threads available in a Reactive world of non blocking dispatching, so blocking behind 4/8/16 async facades is a limited option. Again the thread pool with a large queue or even many threads won’t necessarely solve the situation neither.

Instead why not invoking callbacks on different IO operations: connection, read, write, close…​ ?

Reactor Net aims to provide an Asynchronous IO runtime that supports Reactive Streams backpressure for client or server needs over a range of protocols and drivers. Some drivers will not implement every protocol but at least one, Netty, implements all current protocols. At the moment, Reactor Net is supporting Netty 4.x and ZeroMQ through jeroMQ 0.3.+ and you must add explicitely one of them in the application classpath.

Reactor Net has the following artifacts:

  • ReactorChannel and its direct implementations ChannelStream and HttpChannel

    • Represents a direct connection between the application and the remote host

    • Contains non blocking IO write and read operations

    • Reactor drivers will directly expose ChannelStream to access the Stream functional API for read operations

  • ReactorPeer and ReactorChannelHandler for common network component (client/server) contract

    • Provides for start and shutdown operations

    • Binds a ReactorChannelHandler on start to listen to the requesting ChannelStream

    • ReactorChannelHandler is a function accepting ChannelStream requests and returning a Publisher for connection close management

  • ReactorClient for common client contract

    • Extends ReactorPeer to provide a reconnect friendly start operation

  • NetStreams and Spec to create any client or server

    • Looks like Streams, BiStreams and other Reactor Stream Factories

    • NetStreams factories will accept Function<Spec,Spec> called once on creation to customize the configuration of the network component.

  • HTTP/WS/UDP/TCP protocol ReactorPeer implementations

    • HttpServer & HttpClient will provide routing extensions

    • DatagramServer will provide multicast extensions

    • TcpServer & TcpClient will provide additional TCP/IP context informations

  • Netty and ZeroMQ drivers

Reactor Net implements a model discussed under the Reactive IPC initiative. As we progress we will align more and eventually depend on the specified artefacts likely over 2016. We give you a chance to experiment as of today with some of the principles and make our best to prepare our users to this next-generation standard.

Channels

Channel Handlers

Specifications

Client Specification

Server Specification

Backpressure

Using Reactor and Reactive Stream standard for flow-control with TCP network peers.

TCP 101

Using Reactor’s TCP support to create high-performance TCP clients and servers.

Start and Stop

Writing Data

From a Server perspective

From a Client perspective

Flushing Strategies

Consuming Data

From a Server perspective

From a Client perspective

Backpressure Strategies

Closing the Channel

HTTP 101

Using Reactor’s HTTP support to create high-performance HTTP clients and servers.

Start and Stop

Routing HTTP

SSE

Routing WebSocket

Writing Data

Adding Headers and other Metadata

From a Server perspective

From a Client perspective

Flushing Strategies

Consuming Data

Reading Headers, URI Params and other Metadata

From a Server perspective

From a Client perspective

Backpressure Strategies

Closing the Channel

End To End

Combining Reactor Net, Stream and Core to create standalone Ingesters or Data (micro)Services

Extensions

Spring Support

Spring Ecosystem Support

Writing Reactor applications that are deployed inside a Spring ApplicationContext.

Spring Core Basics

Writing Reactor applications that are deployed inside a Spring ApplicationContext. TODO:

Reactor with Spring Boot

Reactor with Spring Messaging

Reactor is abstracted under the Spring Messaging TCP support. Just make sure both Reactor Net io.projectreactor:reactor-net::2.0.8.BUILD-SNAPSHOT is in the Spring application classpath.

Reactor with Spring Integration

Spring Integration Java DSL

Reactor with Spring XD

Groovy extensions

Groovy support

Grails support

Reactor is provided in core Grails 3.0 distribution as part of the Eventing support.

Clojure support

Meltdown is a Clojure interface to Reactor, an asynchronous programming, event passing and stream processing toolkit for the JVM. It follows the path of Romulan, an old ClojureWerkz project on top of LMAX Disruptor that’s been abandoned.
— README
project page

Cookbook

Building a simple File Stream

Let’s start with a pure Publisher implementation, we’ll use Reactor API afterwads to simplify the following example. As Publisher you will have to take care about a lot of small things that should be tested against the Reactive Streams TCK module. The purpose is to understand what Reactor can do for you in such situation to avoid all this machinery.

In theory, Reactive Streams won’t buy you much in a scenario of File Read blocking consuming, single threaded, over a simple loop doing that. If the sink endpoint is blocking you already have a form of backpressure since it won’t read more than it sends. The point of such Reactive File Stream is when in between it and the consumer there is one or more boundaries to cross, a decoupling that can take the form of a queue or a ring buffer. You could envision this scenario where you want to keep reading while the consumer is sending so the next time it asks for data (after sending its previous one), the data is already in-memory. A sort of prefetching in other words.
Build a lazy file read Publisher matching the Subscriber request
  Publisher<String> fileStream = new Publisher<String>() { (1)
          @Override
          public void subscribe(final Subscriber<? super String> subscriber) {
                  final File file = new File("settings.gradle"); (2)
  
                  try {
                          final BufferedReader is = new BufferedReader(new FileReader(file)); (2)
  
                          subscriber.onSubscribe(new Subscription() {
  
                                  final AtomicBoolean terminated = new AtomicBoolean(false);
  
                                  @Override
                                  public void request(long n) {
                                          long requestCursor = 0l;
                                          try {
                                                  String line;
                                                  while ((requestCursor++ < n || n == Long.MAX_VALUE) (3)
                                                                  && !terminated.get()) { (4)
  
                                                          line = is.readLine();
                                                          if (line != null) {
                                                                  subscriber.onNext(line);
                                                          } else {
                                                                  if (terminate()) {
                                                                          subscriber.onComplete(); (5)
                                                                  }
                                                                  return;
                                                          }
                                                  }
                                          } catch (IOException e) {
                                                  if (terminate()) {
                                                          subscriber.onError(e); (6)
                                                  }
                                          }
                                  }
  
                                  @Override
                                  public void cancel() {
                                          terminate();
                                  }
  
                                  private boolean terminate() {
                                          if (terminated.compareAndSet(false, true)) {
                                                  try {
                                                          is.close(); (7)
                                                  } catch (Exception t) {
                                                          subscriber.onError(t);
                                                  }
                                                  return true;
                                          }
                                          return false;
                                  }
                          });
  
                  } catch (FileNotFoundException e) {
                          Streams.<String, FileNotFoundException> fail(e)
                                  .subscribe(subscriber); (8)
                  }
          }
  };
  
  Streams.wrap(fileStream)
          .capacity(4L) (9)
          .consumeOn( (10)
                  Environment.sharedDispatcher(),
                  System.out::println,
                  Throwable::printStackTrace,
                  nothing -> System.out.println("## EOF ##")
  );
1 Implement a Publisher. We’ll see in the next example how to be smart about it with core and stream
2 Open a File cursor and reader by Subscriber to allow for replayability: It’s a Cold Stream.
3 Match the number of read lines with the demand and ignore the demand if special Long.MAX_VALUE escaping number is passed.
4 Check before each possible onNext() if the Stream is not cancelled.
5 Call onComplete() which set the state of the Subscription to cancelled, ignoring further terminal signal if any.
6 Call onError(e) which set the state of the Subscription to cancelled, ignoring further terminal signal if any.
7 Close the file if the subscriber is not interested any more in the content (error, completion, cancel).
8 Create a failed Stream that only onSubscribe() the pass subscriber and onError(e) it.
9 capacity will hint downstream operations (consumeOn here) to chunk requests 4 by 4.
10 consumeOn takes an extra argument to run the requests on a dispatcher in addition to the 3 possible Consumer reacting to each type of signal.
Build a lazy file read with Core PublisherFactory┬ž (from 2.0.2+) and compose with Stream API
final String filename = "settings.gradle";
Publisher<String> fileStream = PublisherFactory.create(
        (n, sub) -> { (1)
                String line;
                final BufferedReader inputStream = sub.context() (2)
                long requestCursor = 0l;
                while ((requestCursor++ < n || n == Long.MAX_VALUE) && !sub.isCancelled()) { (3)

                        try {
                                line = inputStream.readLine();
                                if (line != null) {
                                        sub.onNext(line);
                                } else {
                                        sub.onComplete(); (4)
                                        return;
                                }
                        }
                        catch (IOException exc) {
                                sub.onError(exc);
                        }
                }
        },
        sub -> new BufferedReader(new FileReader(filename)), (5)
        inputStream -> inputStream.close() (6)
);

Streams
        .wrap(fileStream)
        .process(RingBufferProcessor.create())
        .capacity(4L)
        .consume(
                System.out::println,
                Throwable::printStackTrace,
                nothing -> System.out.println("## EOF ##")
);
1 Implement a BiConsumer to react on every Subscriber request Long n. Any unchecked exception will trigger the terminal callback and Subscriber.onError(e).
2 The Subscriber passed in the callback is a SubscriberWithContext decorator allowing access to context() populated on start
3 Match the number of read lines with the demand and ignore the demand if special Long.MAX_VALUE escaping number is passed. Also use SubscriberWithContext.isCancelled() to check asynchronous cancel from Subscribers before each read.
4 Call onComplete() which set the state of the SubscriberWithContext to cancelled, ignoring further terminal signal if any.
5 Define a context once for a new Subscriber that will be available later for each request SubscriberWithContext.context()
6 Define a terminal callback once intercepting cancel(), onComplete() or onError(e).

We can use PublisherFactory, or Streams factories like Streams.createWith() to quickly achieve common use cases:

  • Open IO once

  • React on requests

  • Handle shutdown gracefully

Building a Quick Circuit Breaker

In this other exercise, we will focus more on the composition power in your hands with Reactor Stream module. A classic use-case is to build self-healing data pipelines using the Circuit Breaker Pattern (maybe soon available in Stream API, maybe).

In this scenario, we want to keep alive a Stream even if errors might fly in. When a certain number of errors is reached, we want to stop consuming from the main circuit, the actual Stream. For a short period we will trip the circuit and use a fallback publisher Stream. This fallback can actually be any sort of Publisher, we will just emit an alternative message. The point is to avoid new access to the failing Stream for a while and give it a chance to recover.

Quick (and dirty) Circuit Breaker test
final Broadcaster<String> closeCircuit = Broadcaster.create(); (1)
final Stream<String> openCircuit = Streams.just("Alternative Message"); (2)

final Action<Publisher<? extends String>, String> circuitSwitcher = Streams.switchOnNext(); (3)

final AtomicInteger successes = new AtomicInteger(); (4)
final AtomicInteger failures = new AtomicInteger();

final int maxErrors = 3;

Promise<List<String>> promise = (5)
                circuitSwitcher (6)
                        .observe(d -> successes.incrementAndGet()) (7)
                        .when(Throwable.class, error -> failures.incrementAndGet())
                        .observeStart(s -> { (8)

                                System.out.println("failures: " + failures +
                                         " successes:" + successes);

                                if (failures.compareAndSet(maxErrors, 0)) {
                                        circuitSwitcher.onNext(openCircuit); (9)
                                        successes.set(0);

                                        Streams
                                                .timer(1)  (10)
                                                .consume(ignore -> circuitSwitcher.onNext(closeCircuit));
                                }
                        })
                        .retry() (11)
                        .toList(); (5)

circuitSwitcher.onNext(closeCircuit); (12)

closeCircuit.onNext("test1");
closeCircuit.onNext("test2");
closeCircuit.onNext("test3");
closeCircuit.onError(new Exception("test4"));
closeCircuit.onError(new Exception("test5"));
closeCircuit.onError(new Exception("test6"));
Thread.sleep(1500); (13)
closeCircuit.onNext("test7");
closeCircuit.onNext("test8");
closeCircuit.onComplete();  (14)
circuitSwitcher.onComplete();

System.out.println(promise.await());
Assert.assertEquals(promise.get().get(0), "test1");
Assert.assertEquals(promise.get().get(1), "test2");
Assert.assertEquals(promise.get().get(2), "test3");
Assert.assertEquals(promise.get().get(3), "Alternative Message");
Assert.assertEquals(promise.get().get(4), "test7");
Assert.assertEquals(promise.get().get(5), "test8");
1 Create the main hot Broadcaster stream to send data later on.
2 Create a simple fallback stream when sh*t hits the fan.
3 Create a SwitchAction which is a Processor accepting new Publisher to consume data from.
4 Prepare shared counters for successes and failures.
5 Return a Promise from Stream.toList() to convert the Stream into an eventual List
6 Consume data from the circuitSwitcher Processor proxy that will be updated depending on the failures number.
7 Count success on every valid onNext(String) and count errors on every Throwable exception
8 Monitor onSubscribe(Subscription) call which is called after successful stream start.
9 If the number of error is maxErrors, trip the circuit by changing the current circuitSwitcher data source to the fallback one.
10 Re-consume from the main stream in 1 Second by signalling circuitSwitcher with it.
11 Keep retrying on any exception, which means Cancel / Re Subscribe. That’s why we observeStart() as any error will trigger it.
12 Start the circuitSwitcher with the main Stream
13 Artificial wait to leave a chance to the timer to close the circuit,
14 Double onComplete() the current main stream and the circuitSwitcher itself (otherwise they will hang for the missing onComplete().

Building Efficient Data Pipelines

Building Non-Blocking MicroServices

Building CQRS-like Applications

Other Example Applications

Quickstart

An example application showing several ways to use basic components in Reactor.

Samples

A simple sample application that demonstrates Reactor functionality in JUnit tests.

Non Blocking Konami Code

If you made it that far, here a simple non blocking stream to scale up your Konami codes over websocket. Tell me about a reward…​ Don’t forget to add Netty in your classpath along reactor-net.

final Processor<Integer, Integer> keyboardStream = RingBufferProcessor.create();

NetStreams.<String, String>httpServer(spec ->
                spec
                        .codec(StandardCodecs.STRING_CODEC)
                        .listen(3000)
        )
        .ws("/", channel -> {
                System.out.println("Connected a websocket client: " + channel.remoteAddress());

                return Streams
                        .wrap(keyboardStream)
                        .skipWhile(key -> KeyEvent.VK_UP != key)
                        .buffer(10, 1) (1)
                        .map(keys -> keys.size() == 10 &&
                                keys.get(0) == KeyEvent.VK_UP &&
                                keys.get(1) == KeyEvent.VK_UP &&
                                keys.get(2) == KeyEvent.VK_DOWN &&
                                keys.get(3) == KeyEvent.VK_DOWN &&
                                keys.get(4) == KeyEvent.VK_LEFT &&
                                keys.get(5) == KeyEvent.VK_RIGHT &&
                                keys.get(6) == KeyEvent.VK_LEFT &&
                                keys.get(7) == KeyEvent.VK_RIGHT &&
                                keys.get(8) == KeyEvent.VK_B &&
                                keys.get(9) == KeyEvent.VK_A
                )
                .map(isKonami -> isKonami ? "Konami!" : "Nah")
                .nest()
                .flatMap(konamis ->
                        channel.writeWith(konamis)
                );
        })
        .start()
        .await();

keyboardStream.onNext(KeyEvent.VK_RIGHT);
keyboardStream.onNext(KeyEvent.VK_UP);
keyboardStream.onNext(KeyEvent.VK_UP);
keyboardStream.onNext(KeyEvent.VK_DOWN);
keyboardStream.onNext(KeyEvent.VK_DOWN);
keyboardStream.onNext(KeyEvent.VK_LEFT);
keyboardStream.onNext(KeyEvent.VK_RIGHT);
keyboardStream.onNext(KeyEvent.VK_LEFT);
keyboardStream.onNext(KeyEvent.VK_RIGHT);
keyboardStream.onNext(KeyEvent.VK_B);
keyboardStream.onNext(KeyEvent.VK_A);
keyboardStream.onNext(KeyEvent.VK_C);
keyboardStream.onComplete();

System.out.println(konamis.await());
1 Note the buffer(size, skip) use, we want to evaluate all last 10 keys combinations, 10 by 10. That means we need to create a new list of 10 keys for every key.

1. Unless you only want to use the Core Processor which are mostly standalone at this stage. We plan to align Dispatcher with Core Processors overtime.
2. Some will challenge that over-simplified vision but let’s stay pragmatic over here :)
3. including Akka Streams, Ratpack, and RxJava