1. About the Documentation
This section provides a brief overview of Reactor reference documentation. You can read this reference guide in a linear fashion, or you can skip sections if something doesn’t interest you.
1.1. Latest version & Copyright Notice
The Reactor reference guide is available as html documents. The latest copy is available at http://projectreactor.io/docs/core/release/reference/docs/index.html
Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically.
1.2. Getting help
There are several ways to reach out for help with Reactor.
-
Get in touch with the community on Gitter.
-
Ask a question on stackoverflow.com, the tag is
project-reactor
. -
Report bugs (or ask questions) in github issues, the most relevant repositories that are most monitored are reactor-core and reactor-addons (which covers reactor-test and adapters issues)
All of Reactor is open source, including this documentation! If you find problems with the docs or if you just want to improve them, please get involved. |
1.3. Where to go from here
-
Head to Getting started if you feel like jumping straight into the code.
-
If you’re new to Reactive Programming though, you’d probably better start with the Introduction to Reactive Programming…
-
In order to dig deeper into the core features of Reactor, head to Reactor Core Features:
-
If you’re looking for the right tool for the job but cannot think of a relevant operator, maybe the Which operator do I need? section in there could help?
-
Learn more about Reactor’s reactive types in the "
Flux
, an asynchronous sequence of 0-n items" and "Mono
, an asynchronous 0-1 result" sections -
Switch threading contexts using a Scheduler.
-
Learn how to handle errors in the Handling Errors section.
-
-
Unit testing? Yes it is possible with the
reactor-test
project! See Testing. -
Programmatically creating a sequence is possible for more advanced creation of reactive sources.
2. Getting started
2.1. Introducing Reactor
Reactor is a fully non-blocking reactive programming foundation for the JVM,
with efficient demand management (backpressure). It integrates directly with
Java 8 functional APIs, notably CompletableFuture
, Stream
and Duration
.
It offers composable asynchronous sequence APIs Flux
([N] elements) and Mono
([0|1] elements), extensively implementing the Reactive Extensions specification.
Reactor also supports non-blocking IPC with the reactor-ipc
components.
Suited for Microservices Architecture, Reactor IPC offers backpressure-ready
network engines for HTTP (including Websockets), TCP and UDP. Reactive Encoding/
Decoding is fully supported.
2.2. The BOM
Reactor 3 uses a BOM[1]
model since reactor-core 3.0.4
, with the Aluminium
release train.
This allows to regroup artifacts that are meant to work well together without having to wonder about the sometimes divergent versioning schemes of these artifacts.
The BOM is like a curated list of versions. It is itself versioned, using a release train scheme with a codename followed by a qualifier:
Aluminium-RELEASE Carbon-BUILD-SNAPSHOT Aluminium-SR1 Carbon-SR32[2]
The codenames represent what would traditionally be the MAJOR.MINOR number. They come from the Periodic Table of Elements (mostly), in growing alphabetical order.
The qualifiers are (in chronological order):
-
BUILD-SNAPSHOT
-
M1
..N
: Milestones or developer previews -
RELEASE
: The first GA release in a codename series -
SR1
..N
: The subsequent GA releases in a codename series (equivalent to PATCH number, SR stands for "Service Release").
2.3. Getting Reactor
As mentioned above, the easiest way to use Reactor in your core is to use the BOM and add the relevant dependencies to your project. Note that when adding such a dependency, you omit the version so that it gets picked up from the BOM.
However, if you want to force the use of a specific artifact’s version, you can specify it when adding your dependency, as you usually would. You can also of course forgo the BOM entirely and always specify dependencies with their artifact versions.
2.3.1. Maven installation
The BOM concept is natively supported by Maven. First, you’ll need to import the
BOM by adding the following to your pom.xml
:[3]
<dependencyManagement> (1)
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Aluminium-SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
1 | Notice the dependencyManagement tag, this is in addition to the regular
dependencies section. |
Next, add your dependencies to the relevant reactor projects as usual, except
without a <version>
:
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId> (1)
(2)
</dependency>
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-test</artifactId> (3)
<scope>test</scope>
</dependency>
</dependencies>
1 | dependency to the core library |
2 | no version tag here |
3 | reactor-test provides facilities to unit test reactive streams |
2.3.2. Gradle installation
Gradle has no core support for Maven BOMs, but you can use Spring’s gradle-dependency-management plugin.
First, apply the plugin from Gradle Plugin Portal:
plugins {
id "io.spring.dependency-management" version "1.0.1.RELEASE" (1)
}
1 | as of this writing, 1.0.1.RELEASE is the latest version of the plugin, check for updates. |
Then use it to import the BOM:
dependencyManagement {
imports {
mavenBom "io.projectreactor:reactor-bom:Aluminium-SR1"
}
}
Finally add a dependency to your project without a version number:
dependencies {
compile 'io.projectreactor:reactor-core' (1)
}
1 | no third : separated section for the version, it is taken from the BOM |
3. Introduction to Reactive Programming
Reactor is an implementation of the Reactive Programming paradigm, which can be summed up as:
Reactive programming is oriented around data flows and the propagation of change. This means that the underlying execution model will automatically propagate changes through the data flow.
In this particular instance, pioneered by the Reactive Extensions (Rx) library in the .NET ecosystem, and also implemented by RxJava on the JVM, the reactive aspect is translated in our object-oriented languages to a kind of extension of the Observer design pattern.
As time went, a standardization emerged through the Reactive Streams effort,
a specification which defines a set of interfaces and interaction rules for
reactive libraries on the JVM. It will be integrated into Java 9 (with the
Flow
class).
One can also compare the main reactive streams pattern with the familiar Iterator
design pattern, as there is a duality to the Iterable
-Iterator
pair in all
these libraries. One major difference is that while an Iterator is pull based,
reactive streams are push-based.
Using an iterator is quite imperative, even though the method of accessing
values is solely the responsibility of the Iterable
. Indeed, it is up to the
developer to choose when to access the next()
item in the sequence. In
reactive streams, the equivalent of the above pair is Publisher
-Subscriber
.
But it is the Publisher
that notifies the Subscriber of newly available values
as they come, and this push aspect is key to being reactive. Plus operations
applied to pushed values are expressed declaratively rather than imperatively.
Additionally to pushing values, the error handling and completion aspects are
also covered in a well defined manner, so a Publisher
can push new values to
its Subscriber
(calling onNext
), but also signal an error (calling onError
and terminating the sequence) or completion (calling onComplete
and
terminating the sequence).
onNext x 0..N [onError | onComplete]
This approach is very flexible, as the pattern applies indifferently to use cases where there is at most one value, n values or even an infinite sequence of values (for instance the ticks of a clock).
But let’s step back a bit and reflect on why we would need such an asynchronous reactive library in the first place.
3.1. Blocking can be wasteful
Modern applications nowadays can reach huge scales of users, and even though the capabilities of modern hardware have continued to improve, performance of the modern software is still a key concern.
There are broadly two ways one can improve a program’s performance:
-
parallelize: use more threads and more hardware resources
and/or -
seek more efficiency in how current resources are used.
Usually, Java developers will naturally write program using blocking code. This is all well until there is a performance bottleneck, at which point the time comes to introduce additional thread(s), running similar blocking code. But this scaling in resource utilization can quickly introduce contention and concurrency problems.
Worse! If you look closely, as soon as a program involves some latency (notably I/O, like a database request or a network call), there is a waste of resources in the sense that the thread now sits idle, waiting for some data.
So the parallelization approach is not a silver bullet: although it is necessary in order to access the full power of the hardware, it is also complex to reason about and susceptible to resource wasting…
3.2. Asynchronicity to the rescue?
The second approach described above, seeking more efficiency, can be a solution to that last problem. By writing asynchronous non-blocking code, you allow for the execution to switch to another active task using the same underlying resources, and to later come back to the current "train of thought" when the asynchronous processing has completed.
But how can you produce asynchronous code on the JVM?
Java offers mainly two models of asynchronous programming:
-
Callbacks: asynchronous methods don’t have a return value but take an extra
callback
parameter (a lambda or simple anonymous class) that will get called when the result is available. Most well known example is Swing’sEventListener
hierarchy. -
Futures: asynchronous methods return a
Future<T>
immediately. The asynchronous process computes aT
value, but the future wraps access to it, isn’t immediately valued and can be polled until it becomes valued.ExecutorService
runningCallable<T>
tasks use Futures for instance.
So is it good enough? Well, not for every use cases, and both approaches have limitations…
Callbacks are very hard to compose together, quickly leading to code that is difficult to read and maintain ("Callback Hell").
Futures are a bit better, but they are still not so good at composition, despite
the improvements brought in Java 8 by CompletableFuture
… Orchestrating
multiple futures together is doable, but not that easy. Plus it is very (too?)
easy to stay in familiar territory and block on a Future
by calling their
get()
method. And lastly, they lack the support for multiple values and
advanced error handling.
This might seem familiar: isn’t that what Reactive Programming directly tries to
address with the Publisher
-Subscriber
pair?
3.3. From Imperative to Reactive Programming
Indeed, reactive libraries like Reactor aim at addressing these drawbacks of "classic" asynchronous approaches on the JVM, while also focusing on a few additional aspects. To sum it up:
-
Composability and readability
-
Data as a flow manipulated using a rich vocabulary of operators
-
Nothing happens until you subscribe
-
Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high for it to keep up
-
High level but high value abstraction that is concurrency-agnostic
3.4. Composability and readability
By composability, we mean the ability to orchestrate multiple asynchronous tasks together, using results from previous tasks to feed input to subsequent ones, or executing several tasks in a fork-join style, as well as reusing asynchronous tasks as discrete components in an higher level system.
This is tightly coupled to readability and maintainability of one’s code, as these layers of asynchronous processes get more and more complex. As we saw, the callback model is simple, but one of its main drawbacks is that for complex processes you need to have a callback executed from a callback, itself nested inside another callback, and so on…
That is what is referred to as Callback Hell. And as you can guess (or know from experience), such code is pretty hard to go back to and reason about.
Reactor on the other hand offers rich composition options where code mirrors the organization of the abstract process, and everything is kept at the same level (no nesting if it is not necessary).
3.5. The assembly line analogy
You can think of data processed by a reactive application as moving through
an assembly line. Reactor is the conveyor belt and working stations. So the
raw material pours from a source (the original Publisher
) and ends up as a
finished product ready to be pushed to the consumer (or Subscriber
).
It can go to various transformations and other intermediary steps, or be part of a larger assembly line that aggregates intermediate pieces together.
Finally, if there is a glitch or a clogging at one point (for example boxing the products takes a disproportionately long time), the workstation can signal that upstream and limit the flow of raw material.
3.6. Operators
In Reactor, operators are what we represented in the above analogy as the
assembly line’s workstations. Each operator adds behavior to a Publisher
, and
it actually wraps the previous step’s Publisher
into a new instance.
The whole chain is thus layered, like an onion, where data originates from the
first Publisher
in the center and moves outward, transformed by each layer.
Understanding this can help you avoid a common mistake that would lead you to believe that an operator you used in your chain is not being applied. See this item in the FAQ. |
While the Reactive Streams specification doesn’t specify operators at all, one of the high added values of derived reactive libraries like Reactor is the rich vocabulary of operators that they bring along. These cover a lot of ground, from simple transformation and filtering to complex orchestration and error handling.
3.7. Nothing happens until you subscribe()
In Reactor when you write a Publisher
chain, data doesn’t start pumping into
it by default. Instead, what you have is a abstract description of your
asynchronous process (which can help with reusability and composition by the
way).
By the act of subscribing, you tie the Publisher
to a Subscriber
, which
triggers the flow of data in the whole chain. This is achieved internally by a
single request
signal from the Subscriber
that is propagated upstream, right
back to the source Publisher
.
3.8. Backpressure
The same mechanism is in fact used to implement backpressure, which we described in the assembly line analogy as a feedback signal sent up the line when a working station is slower to process than the upstream.
The real mechanism defined by the Reactive Streams specification is pretty close
to the analogy: a subscriber can work in unbounded mode and let the source
push all the data at its fastest achievable rate, but can also use the request
mechanism to signal the source that it is ready to process at most n
elements.
Intermediate operators can also change the request in-flight. Imagine a buffer
operator that groups elements in batches of 10. If the subscriber requests 1
buffer, then it is acceptable for the source to produce 10 elements. Prefetching
strategies can also be applied is producing the elements before they are
requested is not too costly.
This transforms the push model into a push-pull hybrid where the downstream can pull n elements from upstream if they are readily available, but if they’re not then they will get pushed by the upstream whenever they are produced.
3.9. Hot vs Cold
In the Rx family of reactive libraries, one can distinguish two broad categories of reactive sequences: hot and cold. This distinction mainly has to do with how the reactive stream reacts to subscribers:
-
a Cold sequence will start anew for each
Subscriber
, including at the source of data. If the source wraps an HTTP call, a new HTTP request will be made for each subscription -
a Hot sequence will not start from scratch for each
Subscriber
. Rather, late subscribers will receive signals emitted after they subscribed. Note however that some hot reactive streams can cache or replay the history of emissions totally or partially… From a general perspective, a hot sequence will emit wether or not there are some subscribers listening.
For more information on hot vs cold in the context of Reactor, see this reactor-specific section.
4. Reactor Core Features
reactor-core
is the main artifact of the project, a reactive library that
focuses on the Reactive Streams specification and targets Java 8.
Reactor introduces composable reactive types that implement Publisher
but also
provide a rich vocabulary of operators, Flux
and Mono
. The former represents
a reactive sequence of 0..N items, while the later represents a single-valued-or-empty
result.
This distinction allows to carry a bit of semantic into the type, indicating the
rough cardinality of the asynchronous processing. For instance, an HTTP request
only produces one response so there wouldn’t be much sense in doing a count
operation… Expressing the result of such an HTTP call as a
Mono<HttpResponse>
thus makes more sense than as a Flux<HttpResponse>
, as it
offers only operators that are relevant to a "zero or one item" context.
In parallel, operators that change the maximum cardinality of the processing
will also switch to the relevant type. For instance the count
operator exists
in Flux
, but returns a Mono<Long>
.
4.1. Flux
, an asynchronous sequence of 0-n items
A Flux<T>
is a standard Publisher<T>
representing an asynchronous sequence
of 0 to N emitted items, optionally terminated by either a success signal or an
error.
As in the RS spec, these 3 types of signal translate to calls to downstream’s
onNext
, onComplete
or onError
methods.
With this large scope of possible signal, Flux
is the general-purpose reactive
type. Note that all events, even terminating ones, are optional: no onNext event
but an onComplete event represents an empty finite sequence, but remove the
onComplete and you have an infinite empty sequence. Similarly, infinite
sequences are not necessarily empty: Flux.interval(Duration)
produces a
Flux<Long>
that is infinite and emits regular ticks from a clock.
4.2. Mono
, an asynchronous 0-1 result
A Mono<T>
is a specialized Publisher<T>
that emits at most one item then
optionally terminates with an onComplete
signal or an onError
.
As such it offers only a relevant subset of operators. For instance, combination
operators can either ignore the right hand-side emissions and return another
Mono
or emit values from both sides, in which case they’ll switch to a Flux
.
Note that a Mono
can be used to represent no-value asynchronous processes that
only have the concept of completion (think Runnable
): just use an empty
Mono<Void>
.
4.3. Simple ways to create a Flux/Mono and to subscribe to it
The easiest way to get started with Flux
and Mono
is to use one of the
numerous factory methods found in their respective classes.
For instance, to create a simple sequence of String
, you can either enumerate
them or put them in a collection and create the Flux from it:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
Other examples of factory methods include:
Mono<String> noData = Mono.empty(); (1)
Mono<String> data = Mono.just("foo");
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); (2)
1 | notice the factory method honors the generic type even though there will be no value |
2 | the subtlety is that the first parameter is the start of the range, while the second parameter is the number of items to produce. |
When it comes to subscribing, Flux
and Mono
make use of Java 8 lambdas. You
have a wide choice of .subscribe()
variants that take lambdas for different
combinations of callbacks:
Flux
subscribe(); (1)
subscribe(Consumer<? super T> consumer); (2)
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer); (3)
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer); (4)
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer); (5)
1 | Just subscribe and trigger the sequence. |
2 | Do something with each produced value. |
3 | Deal with values but also react to an error. |
4 | Deal with values, errors but also execute some code when the sequence successfully completes. |
5 | Deal with values, errors, successful completion but also do something with
the Subscription produced by this subscribe call. |
These variants return a reference to the subscription that one can use to
cancel said subscription when no more data is needed. Upon cancellation, the
source should stop producing values and clean up any resources it created. This
cancel and clean-up behavior is represented in Reactor by the general-purpose
Disposable interface.
|
These are convenience variant over the Reactive Streams defined subscribe:
subscribe(Subscriber<? super T> subscriber);
That last variant is useful if you already have a Subscriber
handy, but more
often you’ll need it because you want to do something subscription-related in
the other callbacks. Most probably, that’d be dealing with backpressure and
triggering the requests yourself.
In that case, you can ease things up by using the BaseSubscriber
abstract
class, which offers convenience methods for that:
BaseSubscriber
to fine tune backpressureFlux<String> source = someStringSource();
source.map(String::toUpperCase)
.subscribe(new BaseSubscriber<String>() { (1)
@Override
protected void hookOnSubscribe(Subscription subscription) {
(2)
request(1); (3)
}
@Override
protected void hookOnNext(String value) {
request(1); (4)
}
(5)
});
1 | The BaseSubscriber is an abstract class so we create an anonymous
implementation and specify the generic type. |
2 | BaseSubscriber defines hooks for the various signal handling you can
implement in a Subscriber . It also deals with the boilerplate of capturing the
Subscription object so you can manipulate it in other hooks. |
3 | request(n) is such a method: it propagates backpressure request to the
capture subscription from any of the hooks. Here we start the stream by
requesting 1 element from the source. |
4 | upon receiving a new value, we continue requesting new items from the source one by one. |
5 | Other hooks are hookOnComplete , hookOnError , hookOnCancel and
hookFinally (which is always called when the sequence terminates, with the
type of termination passed in as a SignalType parameter). |
When manipulating request like that, you must be careful to produce
enough demand for the sequence to advance or your Flux will get "stuck". That is
the reason why BaseSubscriber forces you to implement the subscription and
onNext hooks, where you should usually call request at least once.
|
BaseSubscriber
also offers a requestUnbounded()
method to switch to unbounded
mode (equivalent to request(Long.MAX_VALUE)
.
4.4. Programmatically creating a sequence
In this section, we’ll introduce means of creating a Flux
(or Mono
) by
programmatically defining its associated events (onNext, onError, onComplete).
All these methods share the fact that they expose an API to trigger
the events that we call a sink. There are actually a few sink variants, as
you will discover below.
4.4.1. Generate
The simplest form of programmatic creation of a Flux
is through the generate
method, which takes a generator function.
This is for synchronous and one-by-one emissions, meaning that
the sink is a SynchronousSink
and that its next()
method can only be called
at most once per callback invocation. You can then additionally call error(Throwable)
or complete()
.
The most useful variant is probably the one that also allow you to keep a state
that you can refer to in your sink usage to decide what to emit next. The generator
function then becomes a BiFunction<S, SynchronousSink<T>, S>
, with <S>
the
type of the state object. You have to provide a Supplier<S>
for the initial
state, and your generator function now returns a new state on each round.
For instance, you could simply use an int
as the state:
generate
Flux<String> flux = Flux.generate(
() -> 0, (1)
(state, sink) -> {
sink.next("3 x " + state + " = " + 3*state); (2)
if (state == 10) sink.complete(); (3)
return state + 1; (4)
});
1 | we supply the initial state value of 0 |
2 | we use the state to choose what to emit (a row in the multiplication table of 3) |
3 | we also use it to choose when to stop (multiplication tables traditionally stop at times ten) |
4 | we return a new state that will be used in next invocation (unless the sequence terminated in this one) |
The code above generates the table of 3, as the following sequence:
3 x 0 = 0 3 x 1 = 3 3 x 2 = 6 3 x 3 = 9 3 x 4 = 12 3 x 5 = 15 3 x 6 = 18 3 x 7 = 21 3 x 8 = 24 3 x 9 = 27 3 x 10 = 30
You can also use a mutable <S>
. The example above could for instance be rewritten
using a single AtomicLong
as the state, mutating it on each round:
Flux<String> flux = Flux.generate(
AtomicLong::new, (1)
(state, sink) -> {
long i = state.getAndIncrement(); (2)
sink.next("3 x " + i + " = " + 3*i);
if (i == 10) sink.complete();
return state; (3)
});
1 | this time we generate a mutable object as the state |
2 | we mutate the state here |
3 | we return the same instance as the new state |
If your state object needs to clean up some resources, use the
generate(Supplier<S>, BiFunction, Consumer<S>) variant to clean up the last
state instance.
|
4.4.2. Create
The more advanced form of programmatic creation of a Flux
, create
can both
work asynchronously or synchronously and is suitable for multiple emissions per
round.
It exposes a FluxSink
, with its next
/error
/complete
methods. Contrary
to generate, it doesn’t have a state-based variant, but on the other hand it
can trigger multiple events in the callback (and even from any thread at a later
point in time).
create can be very useful to bridge an existing API with the reactive
world. For instance, an asynchronous API based on listeners.
|
Imagine that you use an API that is listener-based. It processes data by chunks
and has two events: (1) a chunk of data is ready and (2) the processing is
complete (terminal event), as represented in the MyEventListener
interface:
interface MyEventListener<T> {
void onDataChunk(List<T> chunk);
void processComplete();
}
You can use create
to bridge this into a Flux<T>
:
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register( (4)
new MyEventListener<String>() { (1)
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s); (2)
}
}
public void processComplete() {
sink.complete(); (3)
}
});
});
1 | we bridge to the MyEventListener API |
2 | each element in a chunk becomes an element in the Flux . |
3 | the processComplete event is translated to an onComplete |
4 | all of this is done asynchronously whenever the myEventProcessor executes |
Additionally, since create
can be asynchronous and manages backpressure, you
can refine how to behave backpressure-wise, by indicating an OverflowStrategy
:
-
IGNORE
to Completely ignore downstream backpressure requests. This may yieldIllegalStateException
when queues get full downstream. -
ERROR
to signal anIllegalStateException
when the downstream can’t keep up -
DROP
to drop the incoming signal if the downstream is not ready to receive it. -
LATEST
to let downstream only get the latest signals from upstream. -
BUFFER
(the default) to buffer all signals if the downstream can’t keep up. (this does unbounded buffering and may lead toOutOfMemoryError
)
Mono also has a create generator. As you should expect, the
MonoSink of Mono’s create doesn’t allow several emissions. It will drop all
signals subsequent to the first one.
|
Push model
A variant of create
is push
, which is suitable for processing events
from a single producer. Similar to create
, push
can also be asynchronous
and can manage backpressure using any of the overflow strategies supported
by create
. But only one producing thread may invoke next
, complete
or
error
at a time.
Flux<String> bridge = Flux.push(sink -> {
myEventProcessor.register(
new SingleThreadEventListener<String>() { (1)
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s); (2)
}
}
public void processComplete() {
sink.complete(); (3)
}
public void processError(Throwable e) {
sink.error(e); (4)
}
});
});
1 | we bridge to the SingleThreadEventListener API |
2 | events are pushed to sink using next from a single listener thread |
3 | complete event generated from the same listener thread |
4 | error event also generated from the same listener thread |
Hybrid push/pull model
Unlike push
, create
may be used in push
or pull
mode, making it suitable
for bridging with listener-based APIs where data may be delivered asynchronously
at any time. onRequest
callback can be registered on FluxSink
to track requests.
The callback may be used to request more data from source if required and to manage
backpressure by delivering data to sink only when requests are pending. This enables
a hybrid push/pull model where downstream can pull data that is already available
from upstream and upstream can push data to downstream when data becomes available
at a later time.
Flux<String> bridge = Flux.create(sink -> {
myMessageProcessor.register(
new MyMessageListener<String>() {
public void onMessage(List<String> messages) {
for(String s : messages) {
sink.next(s); (3)
}
}
});
sink.onRequest(n -> {
List<String> messages = myMessageProcessor.request(n); (1)
for(String s : message) {
sink.next(s); (2)
}
});
1 | Poll for messages when requests are made |
2 | If messages are available immediately, push them to sink |
3 | Remaining messages that arrive asynchronously later are also delivered |
Cleaning up
Two callbacks onDispose
and onCancel
are provided to perform any cleanup
on cancellation or termination. onDispose
can be used to perform cleanup
when the Flux
completes, errors out or is cancelled. 'onCancel can be
used to perform any action specific to cancellation prior to cleanup using
onDispose
.
Flux<String> bridge = Flux.create(sink -> {
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel()) (1)
.onDipose(() -> channel.close()) (2)
});
1 | onCancel is invoked for cancel signal |
2 | onDispose is invoked for complete/error/cancel |
4.4.3. Handle
Both present in Mono
and Flux
, handle
is a tiny bit different. It is an
instance method, meaning that it is chained on an existing source like common
operators.
It is close to generate
, in the sense that it uses a SynchronousSink
and
only allows one-by-one emissions.
But handle
can be used to generate an arbitrary value out of each source
element, possibly skipping some elements. In that sense, it can serve as a
combination of map
and filter
.
As such, the signature of handle is handle(BiConsumer<T, SynchronousSink<R>>)
.
Let’s take an example: the reactive streams specification disallows null
values in a sequence. What if you want to perform a map
but you want to use
a preexisting method as the map function, and said method sometimes returns null?
For instance, the following method:
public String alphabet(int letterNumber) {
if (letterNumber < 1 || letterNumber > 26) {
return null;
}
int letterIndexAscii = 'A' + letterNumber - 1;
return "" + (char) letterIndexAscii;
}
Can be applied safely to a source of integers:
.Using handle
for a "map and eliminate nulls" scenario
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
.handle((i, sink) -> {
String letter = alphabet(i); (1)
if (letter != null) (2)
sink.next(letter); (3)
});
alphabet.subscribe(System.out::println);
1 | map to letters |
2 | but if the "map function" returns null… |
3 | …filter it out by not calling sink.next |
Which will print out:
M I T
4.5. Schedulers
Reactor, like RxJava, can be considered concurrency agnostic. It doesn’t enforce a concurrency model but rather leave you, the developer, in command.
But that doesn’t prevent the library from helping you with concurrency…
In Reactor, the execution model and where the execution happens is determined by
the Scheduler
that is used. A Scheduler
is an interface that can abstract
a wide range of implementations. The Schedulers
class has static methods that
give access to the following execution contexts:
-
the current thread (
Schedulers.immediate()
) -
a single, reusable thread (
Schedulers.single()
). Note that this method reuses the same thread for all callers, until the Scheduler is disposed. If you want a per-call dedicated thread, useSchedulers.newSingle()
instead. -
an elastic thread pool (
Schedulers.elastic()
). It will create new worker pools as needed, and reuse idle ones unless they stay idle for too long (default is 60s), in which case the workers are disposed. This is a good choice for I/O blocking work for instance. -
a fixed pool of workers that is tuned for parallel work (
Schedulers.parallel()
). It will create as many workers as you have CPU cores. -
a time-aware scheduler capable of scheduling tasks in the future, including recurring tasks (
Schedulers.timer()
).
Additionally, you can create a Scheduler
out of any pre-existing
ExecutorService
[4] using Schedulers.fromExecutorService(ExecutorService)
, and
also create new instances of the various scheduler types using newXXX
methods.
Operators are implemented using non-blocking algorithms that are tuned to facilitate the work-stealing that can happen in some Schedulers. |
Some operators use a specific Scheduler from Schedulers
by default (and will
usually give you the option of providing a different one). For instance, calling
the factory method Flux.interval(Duration.ofMillis(300))
will produces a Flux<Long>
that ticks every 300ms. This is enabled by Schedulers.timer()
by default.
Reactor offers two means of switching execution context (or Scheduler
) in a
reactive chain: publishOn
and subscribeOn
. Both take a Scheduler
and allow
to switch the execution context to that scheduler. But publishOn
placement in
the chain matters, while subscribeOn
's doesn’t. To understand that difference,
you first have to remember that Nothing happens until you subscribe()
.
In Reactor, when you chain operators you wrap as many Flux
/Mono
specific
implementations inside one another. And as soon as you subscribe, a chain of
Subscriber
is created backward. This is effectively hidden from you and all
you can see is the outer layer of Flux
(or Mono
) and Subscription
, but
these intermediate operator-specific subscribers are where the real work happens.
With that knowledge, let’s have a closer look at the two operators:
-
publishOn
applies as any other operator, in the middle of that subscriber chain. As such, it takes signals from downstream and replays them upstream, but executing the callback on a worker from the associatedScheduler
. So it affects where the subsequent operators will execute (until another publishOn is chained in). -
scheduleOn
rather applies to the subscription process, when that backward chain is constructed. As a consequence, no matter where you place thesubscribeOn
in the chain, it is always the context of the source emission that is affected. However, this doesn’t affect the behavior of subsequent calls topublishOn
: they will still switch the execution context for the part of the chain after them. Also, only the earliestsubscribeOn
call in the chain is actually taken into account.
4.6. Handling Errors
For a quick look at the available operators for error handling, see the relevant operator decision tree. |
In Reactive Streams, errors are terminal events. As soon as an error occurs, it
stop the sequence and gets propagated down the chain of operators to the last
step, the Subscriber
you defined and its onError
method.
Such errors should still be dealt with at the application level, for instance
by displaying an error notification in a UI, or sending a meaningful error
payload in a REST endpoint, so the subscriber’s onError
method should always
be defined.
If not defined, onError will throw an UnsupportedOperationException .
You can further detect and triage it by the Exceptions.isErrorCallbackNotImplemented
method.
|
But Reactor also offers alternative means of dealing with errors in the middle of the chain, as error-handling operators.
Before you learn about error-handling operators, you must keep in
mind that any error in a reactive sequence is a terminal event. Even if an
error-handling operator is used, it doesn’t allow the original sequence to
continue, but rather converts the onError signal into the start of a new
sequence (the fallback one). As such it replaces the terminated sequence
upstream.
|
Let’s go through each mean of error handling one-by-one. When relevant we’ll
make a parallel with imperative world’s try
patterns.
4.6.1. Error handling operators
The onError
at the end of the chain is akin to a try/catch
block. There,
execution skips to the catch in case an Exception is thrown:
Flux<String> s = Flux.range(1, 10)
.map(v -> doSomethingDangerous(v)) (1)
.map(v -> doSecondTransform(v)); (2)
s.subscribe(value -> System.out.println("RECEIVED " + value), (3)
error -> System.err.println("CAUGHT " + error) (4)
);
1 | a transformation is performed that can throw an exception. |
2 | if everything went well, a second transformation is performed. |
3 | each successfully transformed value is printed out. |
4 | in case of an error, the sequence terminates and an error message is displayed. |
This is conceptually similar to the following try/catch block:
try {
for (int i = 1; i < 11; i++) {
String v1 = doSomethingDangerous(i); (1)
String v2 = doSecondTransform(v1); (2)
System.out.println("RECEIVED " + v2);
}
} catch (Throwable t) {
System.err.println("CAUGHT " + t); (3)
}
1 | if an exception is thrown here… |
2 | …the rest of the loops is skipped… |
3 | …and the execution goes straight to here. |
Now that we’ve established a parallel, you may be familiar with several ways of dealing with exceptions in a try/catch block. Most notably:
-
catch and return a default value
-
catch and execute an alternative path (fallback method)
-
catch, wrap to a
BusinessException
and re-throw -
catch, log an error specific message and re-throw
-
the
finally
block to clean up resources, or a Java 7’s "try-with-resource" construct
All of these have equivalent in Reactor, in the form of error handling operators.
Default value
The equivalent of (1) is onErrorReturn
:
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn("RECOVERED");
You also have the option of filtering when to recover with a default value vs letting the error propagate, depending on the exception that occurred:
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");
Fallback method
If you want more than a single default value and you have an alternative safer
way of processing your data, you can use onErrorResume
. This would be the
equivalent of (2).
For example, if your nominal process is fetching data from an external unreliable service, but you also keep a local cache of the same data that can be a bit more out of date but is more reliable, you could do the following:
Flux.just("key1", "key2")
.flatMap(k ->
callExternalService(k) (1)
.onErrorResume(e -> getFromCache(k)) (2)
);
1 | for each key, we asynchronously call the external service. |
2 | if the external service call fails, we fallback to the cache for that key. Note we
always apply the same fallback, whatever the source error e is. |
Like onErrorReturn
, onErrorResume
has variants that let you filter which exceptions
to fallback on, based either on the exception’s class or a Predicate
. The fact that it
takes a Function
also allows you to choose a different fallback sequence to switch to,
depending on the error encountered:
Flux.just("timeout1", "unknown", "key2")
.flatMap(k ->
callExternalService(k)
.onErrorResume(error -> { (1)
if (error instanceof TimeoutException) (2)
return getFromCache(k);
else if (error instanceof UnknownKeyException) (3)
return registerNewEntry(k, "DEFAULT");
else
return Flux.error(error); (4)
})
);
1 | The function allows to dynamically choose how to continue. |
2 | If the source times out, let’s hit the local cache. |
3 | If the source says the key is unknown, let’s create a new entry. |
4 | In all other cases, "re-throw". |
Catch and rethrow
That last line inside the previous flatMap
gives us an hint as to how item
(3) (catch wrap and rethrow) could be achieved:
Flux.just("timeout1")
.flatMap(k -> callExternalService(k)
.onErrorResume(original -> Flux.error(
new BusinessException("oops, SLA exceeded", original))
)
);
But actually, there is a more straightforward way of achieving the same with onErrorMap
:
Flux.just("timeout1")
.flatMap(k -> callExternalService(k)
.onErrorMap(original -> new BusinessException("oops, SLA exceeded", original))
);
Log or react on the side
For cases where you want the error to continue propagating, but you still want
to react to it without modifying the sequence (for instance logging it like in
item (4)), there is the doOnError
operator. This operator as well as all
doOn
prefixed operators are sometimes referred to as a "side-effect". That is
because they allow to peek inside the sequence’s events without modifying them.
The example below makes use of that to ensure that when we fallback to the cache, we at least log that the external service had a failure. We could also imagine we have statistic counters to increment as an error side-effect…
LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
.flatMap(k -> callExternalService(k) (1)
.doOnError(e -> {
failureStat.increment();
log("uh oh, falling back, service failed for key " + k); (2)
})
.onErrorResume(e -> getFromCache(k)) (3)
);
1 | the external service call that can fail… |
2 | is decorated with a logging side-effect… |
3 | and then protected with the cache fallback. |
Using resources and the finally block
The last parallel to draw with the imperative world is the cleaning up that can
be done either via a Java 7 "try-with-resources" construct or the use of the
finally
block ((5)). Both have their Reactor equivalent, actually: using
and doFinally
:
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
@Override
public void dispose() {
isDisposed.set(true); (4)
}
@Override
public String toString() {
return "DISPOSABLE";
}
};
Flux<String> flux =
Flux.using(
() -> disposableInstance, (1)
disposable -> Flux.just(disposable.toString()), (2)
Disposable::dispose (3)
);
1 | The first lambda generates the resource. Here we return our mock Disposable . |
2 | The second lambda processes the resource, returning a Flux<T> . |
3 | The third lambda is called when the flux from 2) terminates or is cancelled, to clean up resources. |
4 | After subscription and execution of the sequence, the isDisposed atomic boolean would become true . |
On the other hand, doFinally
is about side-effects that you want to be executed
whenever the sequence terminates, either with onComplete, onError or a cancel.
It gives you a hint as to what kind of termination triggered the side-effect:
LongAdder statsCancel = new LongAdder(); (1)
Flux<String> flux =
Flux.just("foo", "bar")
.doFinally(type -> {
if (type == SignalType.CANCEL) (2)
statsCancel.increment(); (3)
})
.take(1); (4)
1 | We assume we want to gather statistics, here we use a LongAdder . |
2 | doFinally consumes a SignalType for the type of termination. |
3 | Here we increment statistics in case of cancellation only. |
4 | take(1) will cancel after 1 item is emitted. |
Demonstrating the terminal aspect of onError
In order to demonstrate that all these operators cause the upstream
original sequence to terminate when the error happens, let’s take a more visual
example with a Flux.interval
. The interval operator ticks every x units of time
with an increasing Long
:
Flux<String> flux =
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.onErrorReturn("Uh oh");
flux.subscribe(System.out::println);
Thread.sleep(2100); (1)
1 | Note that interval executes on the timer Scheduler by default.
Assuming we’d want to run that example in a main class, we add a sleep here so
that the application doesn’t exit immediately without any value being produced. |
This prints out, one line every 250ms:
tick 0 tick 1 tick 2 Uh oh
Even with one extra second of runtime, no more tick comes in from the interval
.
The sequence was indeed terminated by the error.
Retrying
There is another operator of interest with regards to error handling, and you
might be tempted to use it in the case above. retry
, as its mame indicates,
allows to retry an erroring sequence.
But the caveat is that it works by re-subscribing to the upstream Flux
. So
this is still in effect a different sequence, and the original one is still
terminated. To verify that, we can re-use the previous example and append a
retry(1)
to retry once instead of the onErrorReturn:
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.elapsed() (1)
.retry(1)
.subscribe(System.out::println,
System.err::println); (2)
Thread.sleep(2100); (3)
1 | elapsed will associate each value with the duration since previous value
was emitted. |
2 | We also want to see when there is an onError |
3 | We have enough time for our 4x2 ticks |
This prints out:
259,tick 0 249,tick 1 251,tick 2 506,tick 0 (1) 248,tick 1 253,tick 2 java.lang.RuntimeException: boom
1 | Here a new interval started, from tick 0. The additional 250ms duration is
coming from the 4th tick, the one that causes the exception and subsequent retry |
As you can see above, retry(1)
merely re-subscribed to the original interval
once, restarting the tick from 0. The second time around, since the exception
still occurs, it gives up and propagate it downstream.
There is a more advanced version of retry
that uses a "companion" flux to tell
whether or not a particular failure should retry: retryWhen
. This companion
flux is created by the operator but decorated by the user, in order to customize
the retry condition.
The companion flux is a Flux<Throwable>
that gets passed to a Function
, the
sole parameter of retryWhen. As the user, you define that function and make it
return a new Publisher<?>
. Retry cycles will go like this:
-
each time an error happens (potential for a retry), the error is emitted into the companion flux. That flux has been originally decorated by your function.
-
If the companion flux emits something, a retry happens.
-
If the companion flux completes, the retry cycle stops and the original sequence completes too.
-
If the companion flux errors, the retry cycle stops and the original sequence stops too. or completes, the error causes the original sequence to fail and terminate.
The distinction between the last two cases is important. Simply completing the
companion would effectively swallow an error. Consider the following attempt
at emulating retry(3)
using retryWhen
:
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException()) (1)
.doOnError(System.out::println) (2)
.retryWhen(companion -> companion.take(3)); (3)
1 | This continuously errors, calling for retry attempts |
2 | doOnError before the retry will let us see all failures |
3 | Here we just consider the first 3 errors as retry-able (take(3) ), then give up. |
In effect, this results in an empty flux, but that completes successfully.
Since retry(3)
on the same flux would have terminated with the latest error,
this is not entirely the same…
Getting to the same behavior involves a few additional tricks:
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.retryWhen(companion -> companion
.zipWith(Flux.range(1, 4), (1)
(error, index) -> { (2)
if (index < 4) return index; (3)
else throw Exceptions.propagate(error); (4)
})
);
1 | Trick one: use zip and a range of "number of acceptable retries + 1"… |
2 | The zip function will allow to count the retries while keeping track of the original error. |
3 | To allow for 3 retries, indexes before 4 return a value to emit… |
4 | …but in order to terminate the sequence in error, we throw the original exception after these 3 retries. |
A similar code can be used to implement an exponential backoff and retry pattern, as shown in the FAQ. |
4.6.2. How are exceptions in operators or functions handled?
In general, all operators can themselves contain code that potentially trigger an exception, or calls a user-defined callback that similarly can fail, so they all contain some form of error handling.
As a rule of thumb, an Unchecked Exception will always be propagated through
onError
. For instance, throwing a RuntimeException
inside a map
function
will translate to an onError
event:
Flux.just("foo")
.map(s -> { throw new IllegalArgumentException(s); })
.subscribe(v -> System.out.println("GOT VALUE"),
e -> System.out.println("ERROR: " + e));
This would print out:
ERROR: java.lang.IllegalArgumentException: foo
Reactor however defines a set of exceptions that are always deemed fatal[5] , meaning that Reactor cannot keep operating. These are thrown rather than propagated.
Internally There are also cases where an unchecked exception still
cannot be propagated, most notably during the subscribe and request phases, due
to concurrency races that could lead to double onError/onComplete. When these
races happen, the error that cannot be propagated is "dropped". These cases can
still be managed to some extent, as the error goes through the
Hooks.onErrorDropped customizable hook.
|
You may wonder, what about Checked Exceptions?
If, say, you need to call some method that declares it throws
exceptions, you
will still have to deal with said exceptions in a try/catch
block. You have
several options, though:
-
catch the exception and recover from it, the sequence continues normally.
-
catch the exception and wrap it into an unchecked one, then throw it (interrupting the sequence). The
Exceptions
utility class can help you with that (see below). -
if you’re expected to return a
Flux
(eg. you’re in aflatMap
), just wrap the exception into an erroring flux:return Flux.error(checkedException)
. (the sequence also terminates)
Reactor has an Exceptions
utility class that you can use, notably to ensure
that exceptions are wrapped only if they are checked exceptions:
-
use the
Exceptions.propagate
method to wrap exceptions if necessary. It will also callthrowIfFatal
first, and won’t wrapRuntimeException
. -
use the
Exceptions.unwrap
method to get the original unwrapped exception (going back to the root cause of a hierarchy of reactor-specific exceptions).
Let’s take the example of a map
that uses a conversion method that can throw
an IOException
:
public String convert(int i) throws IOException {
if (i > 3) {
throw new IOException("boom " + i);
}
return "OK " + i;
}
Now imagine you want to use that method in a map
. You now have to explicitly
catch the exception, and your map function cannot re-throw it. So you can
propagate it to map’s onError
as a RuntimeException
:
Flux<String> converted = Flux
.range(1, 10)
.map(i -> {
try { return convert(i); }
catch (IOException e) { throw Exceptions.propagate(e); }
});
Later on, when subscribing to the above flux and reacting to errors, eg. in the UI, you could revert back to the original exception in case you want to do something special for IOExceptions:
converted.subscribe(
v -> System.out.println("RECEIVED: " + v),
e -> {
if (Exceptions.unwrap(e) instanceof IOException) {
System.out.println("Something bad happened with I/O");
} else {
System.out.println("Something bad happened");
}
}
);
4.7. Processor
Processors are a special kind of Publisher
that are also a Subscriber
. That
means that you can subscribe
to a Processor
(generally, they implement
Flux
), but also call methods to manually inject data into the sequence or
terminate it…
There are several kind of Processors, each with a few particular semantics, but before you start looking into these, you need to ask yourself the following question:
4.7.1. Do I need a Processor?
Most of the time, you should try to avoid using a Processor
. They are harder
to use correctly and prone to some corner cases.
So if you think a Processor
could be a good match for your use-case, ask
yourself if you have tried these two alternatives before:
-
could a classic operator or combination of operators fit the bill? (see Which operator do I need?)
-
could a generator operator work instead? (generally these operators are made to bridge APIs that are not reactive, providing a "sink" that is very similar in concept to a
Processor
in the sense that it allows you to populate the sequence with data, or terminate it).
If after exploring the above alternatives you still think you need a Processor
,
head to the Choosing the right Processor
appendix to learn about the different implementations.
5. Which operator do I need?
In this section, if an operator is specific to Flux or Mono it will be
prefixed accordingly, common operators have no prefix. When a specific use case
is covered by a combination of operators, it is presented as a method call, with
leading dot and parameters in parenthesis, like .methodCall() .
|
I want to deal with: Creating a new sequence…, An existing sequence, Peeking into a sequence,
Errors, Time, Splitting a Flux
or Going back to the Synchronous world.
5.1. Creating a new sequence…
-
that emits a
T
I already have:just
-
…from an
Optional<T>
:Mono#justOrEmpty(Optional<T>)
-
…from a potentially
null
T:Mono#justOrEmpty(T)
-
-
that emits a
T
returned by a method:just
as well-
…but lazily captured: use
Mono#fromSupplier
or wrapjust
insidedefer
-
-
that emits several
T
I can explicitly enumerate:Flux#just(T...)
-
that iterates over…
-
an array:
Flux#fromArray
-
a collection / iterable:
Flux#fromIterable
-
a range of integers:
Flux#range
-
-
that emits from various single-valued sources like…
-
a
Supplier<T>
:Mono#fromSupplier
-
a task:
Mono#fromCallable
,Mono#fromRunnable
-
a
CompletableFuture<T>
:Mono#fromFuture
-
-
that completes:
empty
-
that errors immediately:
error
-
that never does anything:
never
-
that is decided at subscription:
defer
-
that depends on a disposable resource:
using
-
that generates events programmatically (can use state)…
-
synchronously and one-by-one:
Flux#generate
-
asynchronously (can also be sync), multiple emissions possible in one pass:
Flux#create
(Mono#create
as well, without the multiple emission aspect)
-
5.2. An existing sequence
-
I want to transform existing data…
-
on a 1-to-1 basis (eg. strings to their length):
map
-
…by just casting it:
cast
-
-
on a 1-to-n basis (eg. strings to their characters):
flatMap
+ use a factory method -
on a 1-to-n basis with programmatic behavior for each source element and/or state:
handle
-
running an asynchronous task for each source item (eg. urls to http request):
flatMap
+ an asyncPublisher
-returning method-
…ignoring some data: conditionally return a
Mono.empty()
in the flatMap lambda -
…retaining the original sequence order:
Flux#flatMapSequential
(this triggers the async processes immediately but reorders the results)
-
-
-
I want to aggregate a
Flux
… (theFlux#
prefix is assumed below)-
into a List:
collectList
,collectSortedList
-
into a Map:
collectMap
,collectMultiMap
-
into an arbitrary container:
collect
-
into the size of the sequence:
count
-
by applying a function between each element (eg. running sum):
reduce
-
…but emitting each intermediary value:
scan
-
-
into a boolean value from a predicate…
-
applied to all values (AND):
all
-
applied to at least one value (OR):
any
-
testing the presence of any value:
hasElements
-
testing the presence of a specific value:
hasElement
-
-
-
I want to combine publishers…
-
in sequential order:
Flux#concat
/.concatWith(other)
-
…but delaying any error until remaining publishers have been emitted:
Flux#concatDelayError
-
…but eagerly subscribing to subsequent publishers:
Flux#mergeSequential
-
-
in emission order (combined items emitted as they come):
Flux#merge
/.mergeWith(other)
-
…with different types (transforming merge):
Flux#zip
/Flux#zipWith
-
-
by pairing values…
-
from 2 Monos into a
Tuple2
:Mono#and
-
from n Monos when they all completed:
Mono#when
-
into an arbitrary container type…
-
each time all sides have emitted:
Flux#zip
(up to the smallest cardinality) -
each time a new value arrives at either side:
Flux#combineLatest
-
-
-
only considering the sequence that emits first:
Flux#firstEmitting
,Mono#first
,mono.or(otherMono).or(thirdMono)
-
triggered by the elements in a source sequence:
switchMap
(each source element is mapped to a Publisher) -
triggered by the start of the next publisher in a sequence of publishers:
switchOnNext
-
-
I want to repeat an existing sequence:
repeat
-
…but at time intervals:
Flux.interval(duration).flatMap(tick -> myExistingPublisher)
-
-
I have an empty sequence but…
-
I want a value instead:
defaultIfEmpty
-
I want another sequence instead:
switchIfEmpty
-
-
I have a sequence but I’m not interested in values:
ignoreElements
-
…and I want the completion represented as a
Mono
:then
-
…and I want to wait for another task to complete at the end:
thenEmpty
-
…and I want to switch to another
Mono
at the end:Mono#then(mono)
-
…and I want to switch to a
Flux
at the end:thenMany
-
-
I have a Mono for which I want to defer completion…
-
…only when 1-N other publishers have all emitted (or completed):
Mono#untilOther
-
5.3. Peeking into a sequence
-
Without modifying the final sequence, I want to…
-
get notified of / execute additional behavior [6] on…
-
emissions:
doOnNext
-
completion:
Flux#doOnComplete
,Mono#doOnSuccess
(includes the result if any) -
error termination:
doOnError
-
cancellation:
doOnCancel
-
subscription:
doOnSubscribe
-
request:
doOnRequest
-
completion or error:
doOnTerminate
(Mono version includes the result if any)-
but after it has been propagated downstream:
doAfterTerminate
-
-
any type of signal, represented as a
Signal
:Flux#doOnEach
-
any terminating condition (complete, error, cancel):
doFinally
-
-
log what happens internally:
log
-
-
I want to know of all events…
-
each represented as
Signal
object…-
in a callback outside the sequence:
doOnEach
-
instead of the original onNext emissions:
materialize
-
…and get back to the onNexts:
dematerialize
-
-
-
as a line in a log:
log
-
5.4. Filtering a sequence
-
I want to filter a sequence…
-
based on an arbitrary criteria:
filter
-
…that is asynchronously computed:
filterWhen
-
-
restricting on the type of the emitted objects:
ofType
-
by ignoring the values altogether:
ignoreElements
-
by ignoring duplicates…
-
in the whole sequence (logical set):
Flux#distinct
-
between subsequently emitted items (deduplication):
Flux#distinctUntilChanged
-
-
-
I want to keep only a subset of the sequence…
-
by taking elements…
-
at the beginning of the sequence:
Flux#take(int)
-
…based on a duration:
Flux#take(Duration)
-
…only the first element, as a
Mono
:Flux#next()
-
-
at the end of the sequence:
Flux#takeLast
-
until a criteria is met (inclusive):
Flux#takeUntil
(predicate-based),Flux#takeUntilOther
(companion publisher-based) -
while a criteria is met (exclusive):
Flux#takeWhile
-
-
by taking at most 1 element…
-
at a specific position:
Flux#elementAt
-
at the end:
.takeLast(1)
-
…and emit an error if empty:
Flux#last()
-
…and emit a default value if empty:
Flux#last(T)
-
-
-
by skipping elements…
-
at the beginning of the sequence:
Flux#skip(int)
-
…based on a duration:
Flux#skip(Duration)
-
-
at the end of the sequence:
Flux#skipLast
-
until a criteria is met (inclusive):
Flux#skipUntil
(predicate-based),Flux#skipUntilOther
(companion publisher-based) -
while a criteria is met (exclusive):
Flux#skipWhile
-
-
by sampling items…
-
by duration:
Flux#sample(Duration)
-
but keeping the first element in the sampling window instead of the last:
sampleFirst
-
-
by a publisher-based window:
Flux#sample(Publisher)
-
based on a publisher "timing out":
Flux#sampleTimeout
(each element triggers a publisher, and is emitted if that publisher doesn’t overlap with the next)
-
-
-
I expect at most 1 element (error if more than one)…
-
and I want an error if the sequence is empty:
Flux#single()
-
and I want a default value if the sequence is empty:
Flux#single(T)
-
and I accept an empty sequence as well:
Flux#singleOrEmpty
-
5.5. Errors
-
I want to create an erroring sequence:
error
-
…to replace the completion of a successful
Flux
:.concat(Flux.error(e))
-
…to replace the emission of a successful
Mono
:.then(Mono.error(e))
-
…if too much time elapses between onNexts:
timeout
-
-
I want the try/catch equivalent of…
-
throwing:
error
-
catching an exception…
-
and falling back to a default value:
onErrorReturn
-
and falling back to another
Flux
orMono
:onErrorResume
-
and wrapping and re-throwing:
.onErrorMap(t -> new RuntimeException(t))
-
-
the finally block:
doFinally
-
the using pattern from Java 7:
using
factory method
-
-
I want to recover from errors…
-
by falling back…
-
to a value:
onErrorReturn
-
to a
Publisher
orMono
, possibly different ones depending on the error:Flux#onErrorResume
andMono#onErrorResume
-
-
by retrying:
retry
-
…triggered by a companion control Flux:
retryWhen
-
-
-
I want to deal with backpressure "errors"[7]…
-
by throwing a special
IllegalStateException
:Flux#onBackpressureError
-
by dropping excess values:
Flux#onBackpressureDrop
-
…except the last one seen:
Flux#onBackpressureLatest
-
-
by buffering excess values (bounded or bounded):
Flux#onBackpressureBuffer
-
…and applying a strategy when bounded buffer also overflows:
Flux#onBackpressureBuffer
with aBufferOverflowStrategy
-
-
5.6. Time
-
I want to associate emissions with a timing (
Tuple2<Long, T>
) measured…-
since subscription:
elapsed
-
since the dawn of time (well, computer time):
timestamp
-
-
I want my sequence to be interrupted if there’s too much delay between emissions:
timeout
-
I want to get ticks from a clock, regular time intervals:
Flux#interval
-
I want to introduce a delay…
-
between each onNext signal:
delay
-
before the subscription happens:
delaySubscription
-
5.7. Splitting a Flux
-
I want to split a
Flux<T>
into aFlux<Flux<T>>
, by a boundary criteria…-
of size:
window(int)
-
…with overlapping or dropping windows:
window(int, int)
-
-
of time
window(Duration)
-
…with overlapping or dropping windows:
window(Duration, Duration)
-
-
of size OR time (window closes when count is reached or timeout elapsed):
windowTimeout(int, Duration)
-
based on a predicate on elements:
windowUntil
-
……emitting the element that triggered the boundary in the next window (
cutBefore
variant):.windowUntil(predicate, true)
-
…keeping the window open while elements match a predicate:
windowWhile
(non-matching elements are not emitted)
-
-
driven by an arbitrary boundary represented by onNexts in a control Publisher:
window(Publisher)
,windowWhen
-
-
I want to split a
Flux<T>
and buffer elements within boundaries together…-
into
List
…-
by a size boundary:
buffer(int)
-
…with overlapping or dropping buffers:
buffer(int, int)
-
-
by a duration boundary:
buffer(Duration)
-
…with overlapping or dropping buffers:
buffer(Duration, Duration)
-
-
by a size OR duration boundary:
bufferTimeout(int, Duration)
-
by an arbitrary criteria boundary:
bufferUntil(Predicate)
-
…putting the element that triggered the boundary in the next buffer:
.bufferUntil(predicate, true)
-
…buffering while predicate matches and dropping the element that triggered the boundary:
bufferWhile(Predicate)
-
-
driven by an arbitrary boundary represented by onNexts in a control Publisher:
buffer(Publisher)
,bufferWhen
-
-
into an arbitrary "collection" type
C
: use variants likebuffer(int, Supplier<C>)
-
-
I want to split a
Flux<T>
so that element that share a characteristic end up in the same sub-flux:groupBy(Function<T,K>)
TIP: Note that this returns aFlux<GroupedFlux<K, T>>
, each innerGroupedFlux
shares the sameK
key accessible throughkey()
.
5.8. Going back to the Synchronous world
-
I have a
Flux<T>
and I want to…-
block until I can get the first element:
Flux#blockFirst
-
…with a timeout:
Flux#blockFirst(Duration)
-
-
block until I can get the last element (or null if empty):
Flux#blockLast
-
…with a timeout:
Flux#blockLast(Duration)
-
-
synchronously switch to an
Iterable<T>
:Flux#toIterable
-
synchronously switch to a Java 8
Stream<T>
:Flux#toStream
-
-
I have a
Mono<T>
and I want…-
to block until I can get the value:
Mono#block
-
…with a timeout:
Mono#block(Duration)
-
-
a
CompletableFuture<T>
:Mono#toFuture
-
6. Testing
Whether you have written a simple chain of Reactor operators or your very own operator, automated testing is always a good idea.
Reactor comes with a few elements dedicated to testing, gathered into their own
artifact: reactor-test
. You can find that project on Github
inside of the reactor-addons repository.
To use it in your tests, add it as a test dependency:
<dependencies>
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
(1)
</dependency>
1 | If you use the BOM, no need to specify a <version> … |
dependencies
blockdependencies {
testcompile 'io.projectreactor.addons:reactor-test'
}
The two main uses of reactor-test
are:
-
test a sequence follows a given scenario, step-by-step, with
StepVerifier
-
produce data in order to test behavior of operators downstream (eg. your own operator) with
TestPublisher
6.1. Testing a scenario with StepVerifier
The most common case for testing a Reactor sequence is to have a Flux
or Mono
defined in your code (eg. returned by a method), and wanting to test how it
behaves when subscribed to.
This translates well to defining a "test scenario", where you define your
expectations in terms of events, step-by-step: what is the next expected even?
Do you expect the Flux to emit a particular value? Or maybe to do nothing for
the next 300ms? All of that can be expressed through the StepVerifier
API.
For instance, you could have the following utility method in your codebase that
decorates a Flux
:
public <T> Flux<T> appendBoomError(Flux<T> source) {
return source.concatWith(Mono.error(new IllegalArgumentException("boom")));
}
So in order to test it, you’d want to verify the following scenario:
I expect this
Flux
to first emit"foo"
, then"bar"
, then to error with the message"boom"
. Subscribe and verify these expectations.
In the StepVerifier
API, this translates to:
@Test
public void testAppendBoomError() {
Flux<String> source = Flux.just("foo", "bar"); (1)
StepVerifier.create( (2)
appendBoomError(source)) (3)
.expectNext("foo") (4)
.expectNext("bar")
.expectErrorMessage("boom") (5)
.verify(); (6)
}
1 | Since our method needs a source Flux , we’ll define a simple one for
testing purposes. |
2 | Create a StepVerifier builder that will wrap and verify a Flux /Mono … |
3 | Here we pass the flux to be tested (the result of calling our utility method) |
4 | The first signal we expect to happen upon subscription is an onNext , with
the value "foo" . |
5 | The last signal we expect to happen is a termination of the sequence with an
onError . The exception should have "boom" as a message. |
6 | It is important to trigger the test by calling verify() . |
The API is a builder. You start by creating a StepVerifier
and passing the
sequence to be tested. This offers a choice of methods that allow you to:
-
express expectations about the next signals to occur: if any other signal is received (or the content of the signal doesn’t match the expectation), the whole test will fail with a meaningful
AssertionError
. For exampleexpectNext(T...)
,expectNextCount(long)
. -
consume the next signal. This is used when you want to skip part of the sequence OR when you want to apply a custom
assertion
on the content of the signal (eg. check there is anonNext
and assert the emitted item is a list of size 5). For exampleconsumeNextWith(Consumer<T>)
. -
miscellaneous actions like pausing, running arbitrary code (eg. if you want to manipulate a test specific state/context). For example
thenAwait(Duration)
,then(Runnable)
.
For terminal events, the corresponding expectation methods (expectComplete()
,
expectError()
and all its variants) will switch to an API where you cannot
express expectations anymore. In that last step, all you can do is perform some
additional configuration on the StepVerifier
then trigger the verification.
What happens at this point is that the StepVerifier subscribes to the tested
flux/mono and plays the sequence, comparing each new signal with the next step
in the scenario. As long as these match, the test is considered a success. As
soon as there is a discrepancy, an AssertionError
is thrown.
Don’t forget the verify() step, which triggers the verification.
In order to help, a few shortcut methods were added to the API that combine the
terminal expectations with a call to verify() : verifyComplete() ,
verifyError() , verifyErrorMessage(String) , etc.
|
Note that if one of the lambda-based expectations throws an AssertionError
, it
will be reported as is, failing the test. This is useful for custom assertions.
6.2. Manipulating Time
Another very interesting capability of StepVerifier
is the way it can be used
with time-based operators in order to avoid long run times for corresponding
tests. This is done through the StepVerifier.withVirtualTime
builder.
It looks like this:
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
//... continue expectations here
The way this virtual time feature works is that it plugs in a custom Scheduler
in Reactor’s Schedulers
factory. Since these timed operators usually use the
default Schedulers.timer()
scheduler, replacing it with a VirtualTimeScheduler
does the trick. However, an important pre-requisite is that the operator be
instantiated after the virtual time scheduler has been activated.
In order to increase the chances this happens correctly, the StepVerifier
won’t take a simple Flux
as input. withVirtualTime
takes a Supplier
, which
allows to lazily create the instance of the tested flux AFTER having done the
scheduler set up.
Take extra care of ensuring the Supplier<Publisher<T>> can be used
in a lazy fashion, otherwise virtual time is not guaranteed. Especially avoid
instantiating the flux earlier in the test code and having the Supplier just
return that variable, but rather always instantiate the flux inside the lambda.
|
There are a couple of expectation methods that deal with time, and they are both valid with or without virtual time:
-
thenAwait(Duration)
pauses the evaluation of steps (allowing a few signals to occur, or delays to run out) -
expectNoEvent(Duration)
also lets the sequence play out for a given duration, but fails the test if any signal occurs during that time.
Both methods will pause the thread for the given duration in classic mode, and advance the virtual clock instead in virtual mode.
expectNoEvent also considers the subscription as an event. If you use
it as a first step, it will usually fail because the subscription signal will be
detected. Use expectSubscription().expectNoEvent(duration) instead.
|
So in order to quickly evaluate the behavior of our Mono.delay
above, we can
finish writing up our code like this:
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
.expectSubscription() (1)
.expectNoEvent(Duration.ofDays(1)) (2)
.expectNext(0) (3)
.verifyComplete(); (4)
1 | See the tip above |
2 | Expect nothing happens during a full day… |
3 | …then expect delay emits 0 … |
4 | …then expect completion (and trigger the verification). |
We could have used thenAwait(Duration.ofDays(1))
above, but expectNoEvent
has the benefit of guaranteeing that nothing happened earlier that it should
have.
Note also that verify()
returns a Duration
value. This is the real time
duration of the entire test.
6.3. Performing post-execution assertions with StepVerifier
After having described the final expectation of your scenario, you can switch to
a complementary assertion API instead of plainly triggering the verify()
:
use verifyThenAssertThat()
instead.
This returns a StepVerifier.Assertions
object which you can use to assert a few
elements of state once the whole scenario has played out successfully (since it
does also call verify()
). Typical (albeit advanced) usage is to capture
elements that have been dropped by some operator and assert them (see the
section on Hooks).
6.4. Manually emitting with TestPublisher
For more advanced test cases, it might be useful to have complete mastery over the source of data, in order to trigger finely chosen signals that closely match the particular situation you want to test.
Another situation is when you have implemented your own operator and you want to verify how it behaves with regards to the Reactive Streams specification, especially if its source is not well behaved.
For both cases, reactor-test offers the TestPublisher
. This is a Publisher<T>
that lets you programmatically trigger various signals:
-
next(T)
andnext(T, T...)
will trigger 1-nonNext
signals -
emit(T...)
will do the same ANDcomplete()
-
complete()
will terminate with anonComplete
signal -
error(Throwable)
will terminate with anonError
signal
A well-behaved TestPublisher
can be obtained through the create
factory
method. Additionally, misbehaving TestPublisher
can be created using the
createNonCompliant
factory method. The later takes a number of Violation
enums that will define which parts of the specification the publisher can
overlook. For instance:
-
REQUEST_OVERFLOW
: Allowsnext
calls to be made despite insufficient request, without triggering anIllegalStateException
. -
ALLOW_NULL
: Allowsnext
calls to be made with anull
value without triggering aNullPointerException
. -
CLEANUP_ON_TERMINATE
: Allows termination signals to be sent several times in a row. This includescomplete()
,error()
andemit()
.
Finally, the TestPublisher
keeps track of internal state after subscription,
which can be asserted through its various assertXXX
methods.
It can be used as a Flux
or Mono
by using the conversion methods flux()
and mono()
.
7. Debugging Reactor
Switching from an imperative and synchronous programming paradigm to a reactive and asynchronous one can sometimes be daunting. One of the steepest steps in the learning curve is how to analyze and debug when something goes wrong.
In the imperative world, this is usually pretty straightforward nowadays: just
read the stacktrace and you’ll spot where the problem originated, and more: was
it entirely a failure of your code? Did the failure occur in some library code?
If so, what part of your code called the library, potentially passing in
improper parameters that ultimately caused the failure? (I’m looking at you,
null
!)
7.1. The typical Reactor stack trace
But as soon as you shift to asynchronous code, things can get much more complicated…
Consider the following stacktrace:
java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:120)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.emitScalar(FluxFlatMap.java:380)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:349)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:119)
at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:144)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:99)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:172)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:316)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:94)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:67)
at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:98)
at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:2668)
at reactor.core.publisher.Mono.subscribe(Mono.java:2629)
at reactor.core.publisher.Mono.subscribe(Mono.java:2604)
at reactor.core.publisher.Mono.subscribe(Mono.java:2582)
at reactor.guide.GuideTests.debuggingCommonStacktrace(GuideTests.java:722)
There is a lot going on there! We get an IndexOutOfBoundsException
which tell
us that a "source emitted more than one item".
We can probably quickly come to assume that this source is a Flux/Mono, as
confirmed by the line below that mentions MonoSingle
. So it appears to be some
sort of complaint from a single
operator.
Referring to the javadoc for Mono#single
operator, we indeed remember that
single
has a contract: the source must emit exactly one element. It appears
we had a source that emitted more than one and thus violated that contract.
Can we dig deeper and identify that source? The following rows don’t seem very
helpful. They take us on a travel inside the internals of what seems to be a
reactive chain, through subscribes
and requests
…
By skimming over these rows, we can at least start to form a picture of the kind
of chain that went wrong: it seems to involve a MonoSingle
, a FluxFlatMap
and a FluxRange
(each get several rows in the trace, but overall these 3
classes are involved). So a range().flatMap().single()
chain maybe?
But what if we use that pattern a lot in our application? This still doesn’t
tell us much, and simply searching for single
isn’t going to cut it. Then the
last line refers to some of our code. Finally!
Hold on… When we go to the source file, all we see is that a pre-existing
Flux
is subscribed to:
toDebug.subscribe(System.out::println, Throwable::printStackTrace);
All of this happened at subscription time, but the Flux
itself wasn’t
declared there. Worse, when we go to where the variable is declared, we see:
public Mono<String> toDebug; //please overlook the public class attribute :p
The variable isn’t even instantiated where it is declared. Let’s assume a worst case scenario where we find out there could be a few different codepath that set it in the application… So we’re still unsure of which one caused the issue.
This is kind of the Reactor equivalent of a runtime error, as opposed to a compilation error. |
What we want to find out more easily is where the operator was added into the
chain, where the Flux
was declared. We usually refer to that as the assembly
of the Flux.
7.2. Activating debug mode
Even though the stacktrace was still able to convey some information for someone with a bit of experience, we can see that it is not ideal by itself in more advanced cases.
Fortunately, Reactor comes with a debugging-oriented capability of assembly-time instrumentation.
This is done by customizing the Hook.onOperator
hook at application start
(or at least before the incriminated flux or mono can be instantiated), like so:
Hooks.onOperator(providedHook -> providedHook.operatorStacktrace());
The idea is that this will start instrumenting the calls to Flux
(and
Mono
)'s operator methods (where they are assembled into the chain) by wrapping
the construction of the operator and capturing a stacktrace there. Since this is
done when the operator chain is declared, the hook should be activate before
that, so the safest way is to activate it right at the start of your
application.
Later on, if an exception occurs, the failing operator will be able to refer to that capture and append it to the stacktrace.
In the next section, we’ll see how the stacktrace differs and how to interpret that new information.
7.3. Reading a stack trace in debug mode
Reusing our initial example but activating the operatorStacktrace
debug
feature, here is the stack we now get:
java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:120)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:314) (1)
...
(2)
...
at reactor.core.publisher.Mono.subscribeWith(Mono.java:2668)
at reactor.core.publisher.Mono.subscribe(Mono.java:2629)
at reactor.core.publisher.Mono.subscribe(Mono.java:2604)
at reactor.core.publisher.Mono.subscribe(Mono.java:2582)
at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:727)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: (3)
Assembly trace from producer [reactor.core.publisher.MonoSingle] : (4)
reactor.core.publisher.Flux.single(Flux.java:5335)
reactor.guide.GuideTests.scatterAndGather(GuideTests.java:689)
reactor.guide.GuideTests.populateDebug(GuideTests.java:702)
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
org.junit.rules.RunRules.evaluate(RunRules.java:20)
Error has been observed by the following operator(s): (5)
|_ Flux.single(TestWatcher.java:55) (6)
1 | This is new: what we see here is the wrapper operator that captures the stack. |
2 | Apart from that, the first section of the stacktrace is still the same for the most part,showing a bit of operators internals (so we removed a bit of the snippet here) |
3 | This is where the new stuff from debugging mode starts appearing. |
4 | First we get some details on where the operator was assembled, hurray! |
5 | We also get a traceback of the error as it propagated through the operator chain, from first to last (error site to subscribe site). |
6 | Each operator that saw the error is mentioned along with the class and line where it originated. If an operator is assembled from within Reactor code, the later would be omitted. |
As you can see, the captured stacktrace is appended to the original error as a
suppressed OnAssemblyException
. There are two parts to it, but the first
section is the most interesting. It shows the path of construction for the
operator that triggered the exception. Here it shows that the single
that
caused our issue was created in the scatterAndGather
method, itself called
from a populateDebug
method that got executed through JUnit.
We are now armed with enough information to find the culprit, let’s have a look
at that scatterAndGather
method:
private Mono<String> scatterAndGather(Flux<String> urls) {
return urls.flatMap(url -> doRequest(url))
.single(); (1)
}
1 | Sure enough, here is our single . |
Now we can see what the root cause of the error was: a flatMap
that performs
several HTTP calls to a few urls is chained with single
, which seem a bit too
restrictive. After a short git blame
and a quick discussion with the author of
that line, we find out he meant to use the less restrictive take(1)
instead…
Congratulations, we solved our problem!
Error has been observed by the following operator(s):
That second part of the debug stacktrace was not necessarily very interesting in
this particular example, because the error was actually happening in the last
operator in the chain (the one closest to subscribe
). Taking another example
might make it clearer:
FakeRepository.findAllUserByName(Flux.just("pedro", "simon", "stephane"))
.transform(FakeUtils1.applyFilters)
.transform(FakeUtils2.enrichUser)
.blockLast();
Now imagine that inside findAllUserByName
there is a map
that fails. Here we
would see the following final traceback:
Error has been observed by the following operator(s):
|_ Flux.map(FakeRepository.java:27)
|_ Flux.map(FakeRepository.java:28)
|_ Flux.filter(FakeUtils1.java:29)
|_ Flux.transform(GuideDebuggingExtraTests.java:41)
|_ Flux.elapsed(FakeUtils2.java:30)
|_ Flux.transform(GuideDebuggingExtraTests.java:42)
This correspond to a flattened out version of the chain of operators, or rather of the section of the chain that gets notified of the error:
-
the exception originates in the first
map
-
it is seen by a second
map
(both in fact correspond to thefindAllUserByName
method) -
then is is seen by a
filter
and atransfom
, which indicates us that part of the chain is constructed via a reusable transformation function (here, theapplyFilters
utility method). -
finally it is seen by an
elapsed
and atransform
. Once again, elapsed is what is applied by the transformation function of that second transform.
7.3.1. Cost of debug mode
We are dealing with a form of instrumentation here, and creating a stacktrace is costly. That is why this debugging feature should only be activated in a controlled manner, as a last resort. There are ways of limiting the impact of that feature by restricting the hook to the type of operator that is causing an issue.
The filter to use is best determined by looking at the class in the stack trace,
after removing any Parallel
, Flux
and Mono
prefixes and the Fuseable
suffix. For instance in our case:
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:120)
We have MonoSingle.java:120
, so MonoSingle
operator implementation and
single
as the filtering keyword.
So we could only instrument uses of the incriminating operator by doing:
Hooks.onOperator(providedHook -> providedHook.ifName("single") (1)
.operatorStacktrace());
1 | Only activate for operator classes named "single", ignoring case and the "Parallel", "Flux" or "Mono" prefixes, as well as "Fuseable" suffix (as seen in stacktrace) |
7.3.2. The checkpoint()
alternative
The debug mode is global and affects every single operator assembled into a
Flux
or Mono
inside the application. This has the benefit of allowing
after the fact debugging: whatever the error, we will obtain additional info
to debug it.
As we saw in the "Cost of debug mode" above, this is at the cost of an impact on performance (due to the number of populated stacktraces). That cost can be reduced if we have an idea of likely problematic operators. But usually this isn’t known unless we observed an error in the wild, saw we were missing assembly information and then modified the code to activate assembly tracking, hoping we can observe the same error again…
In that scenario, we have to switch into debugging gear and make preparations in order to better observe a second occurrence of the error, this time capturing all the additional information.
If you can identify reactive chains that you assemble in your application for
which serviceability is critical, a mix of both world can be achieved with the
checkpoint()
operator.
You can chain this operator towards their end. The checkpoint
operator will
work like the hook version, but only for its link of that particular chain.
Additionally, there is a checkpoint(String)
variant that allows you to add a
description to the assembly traceback. It could for example be a static
identifier or user-readable description, or a wider correlation ID coming from
a header in the case of an HTTP request for instance…
That information appears in the first line of the traceback:
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Assembly trace from producer [reactor.core.publisher.ParallelSource], described as [fooCorrelation1234] : (1) reactor.core.publisher.ParallelFlux.checkpoint(ParallelFlux.java:174) reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescription(FluxOnAssemblyTest.java:159) Error has been observed by the following operator(s): |_ ParallelFlux.checkpointnull
1 | fooCorrelation1234 is the description provided in checkpoint |
When both global debugging and local checkpoint()
are enabled, checkpointed snapshot
stacks will be appended as suppressed error after the observing operator graph and
following the same declarative order.
7.4. Logging a stream
Additionally to stacktrace debugging and analysis, another powerful tool to have in your toolbelt is the capability to trace and log events in an asynchronous sequence.
The log()
operator can do just that. Chained inside a sequence, it will peek
at every event of the Flux/Mono upstream of it (including onNext
, onError
and onComplete
of course, but also subscriptions, cancellation and
requests).
The operator picks up common logging frameworks like Log4J and Logback through SLF4J, and will default to the JDK Logger in case none can be found.
For instance, supposing we have logback activated and configured, and a chain
like range(1,10).take(3)
. By placing a log()
just before the take, we can
get some insight as to how it works and what kind of events it propagates
upstream to the range:
Flux<Integer> flux = Flux.range(1, 10)
.log()
.take(3);
flux.subscribe();
This prints out (through the logger’s console appender):
10:45:20.200 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) (1) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(unbounded) (2) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(1) (3) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(2) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(3) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | cancel() (4)
Here, additionally to the logger’s own formatter (time, thread, level, message),
the log()
operator outputs a few things in its own format:
-
reactor.Flux.Range.1
is an automatic category for the log, in case you use the operator several times in a chain. It allows you to distinguish which operator’s events are being logged (here, therange
). This can be overwritten with your own custom category using thelog(String)
signature. -
After the few separating characters, the actual event gets printed: here we get
onSubscribe
,request
, 3onNext
and acancel
… -
For the first line,
onSubscribe
, we get the implementation of theSubscriber
, that usually correspond to the operator-specific implementation. Between square brackets, we get additional information if the operator can be automatically optimized via synchronous or asynchronous fusion (see the appendix on Micro-fusion). -
on the second line (2) we can see that an unbounded request was propagated up from downstream.
-
Then the range sends three values in a row ((3))…
-
On the last line we see a cancel.
The last line (4) is the most interesting: we can see the take
in action
there: it operates by cutting the sequence short after it has seen enough
elements emitted. In a word, take
simply cancel()
the source once it has
emitted the user-requested amount!
8. Advanced features and concepts
8.1. Mutualizing operator usage
From a clean code perspective, code reuse is generally a good thing. Reactor offers a few patterns that will help you reuse and mutualize code, notably for operators or combination of operators that you might want to apply regularly in your codebase.
8.1.1. transform
The transform
operator lets you encapsulate a piece of an operator chain into
a function. That function will be applied to an original operator chain at
assembly time to augment it with the encapsulated operators. So this applies the
same to all the subscribers of a sequence, and is basically equivalent to
chaining the operators directly.
Function<Flux<String>, Flux<String>> filterAndMap =
f -> f.filter(color -> !color.equals("orange"))
.map(String::toUpperCase);
Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
.doOnNext(System.out::println)
.transform(filterAndMap)
.subscribe(d -> System.out.println("Subscriber to Transformed MapAndFilter: "+d));
This outputs:
blue Subscriber to Transformed MapAndFilter: BLUE green Subscriber to Transformed MapAndFilter: GREEN orange purple Subscriber to Transformed MapAndFilter: PURPLE
8.1.2. compose
The compose
operator is very similar to transform
and also lets you
encapsulate operators in a function. The major difference is that this function
is applied to the original sequence on a per-subscriber basis. It means that
the function can actually produce a different operator chain for each
subscription (eg. by maintaining some state).
AtomicInteger ai = new AtomicInteger();
Function<Flux<String>, Flux<String>> filterAndMap = f -> {
if (ai.incrementAndGet() == 1) {
return f.filter(color -> !color.equals("orange"))
.map(String::toUpperCase);
}
return f.filter(color -> !color.equals("purple"))
.map(String::toUpperCase);
};
Flux<String> composedFlux =
Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
.doOnNext(System.out::println)
.compose(filterAndMap);
composedFlux.subscribe(d -> System.out.println("Subscriber 1 to Composed MapAndFilter :"+d));
composedFlux.subscribe(d -> System.out.println("Subscriber 2 to Composed MapAndFilter: "+d));
This outputs:
blue Subscriber 1 to Composed MapAndFilter :BLUE green Subscriber 1 to Composed MapAndFilter :GREEN orange purple Subscriber 1 to Composed MapAndFilter :PURPLE blue Subscriber 2 to Composed MapAndFilter: BLUE green Subscriber 2 to Composed MapAndFilter: GREEN orange Subscriber 2 to Composed MapAndFilter: ORANGE purple
8.2. Hot vs Cold
So far we have considered that all Flux
(and Mono
) are the same: they all
represent an asynchronous sequence of data, and nothing happens before you
subscribe.
There are however in reality two broad families of publishers: cold ones and hot ones.
The description above applies to the cold family of publishers. They generate data anew for each subscription, and if no subscription is done then data never start generating.
Think HTTP request: each new subscriber will trigger an HTTP call, but no call is made if no one is interested in the result.
Hot publishers on the other hand don’t really depend on any number of
subscribers. They might start publishing data right away, and would continue
doing so whenever a new Subscriber
comes in (in which case said subscriber
would only see new elements emitted after it subscribed). So for such hot
publishers, something indeed happens before you subscribe.
One example of the few hot operators in Reactor is just
: it directly capture
the value at assembly time, and will replay it to anybody subscribing to it
later on. To re-use the HTTP call analogy, if the captured data is the result
of an HTTP call then only one network call is made, when instantiating just.
To transform just
into a cold publisher, you can use defer
. This will
defer the HTTP request in our example to subscription time (and would result in
a separate network call for each new subscription).
Most other hot publishers in Reactor are Processor .
|
Contrast these two other examples:
Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
.doOnNext(System.out::println)
.filter(s -> s.startsWith("o"))
.map(String::toUpperCase);
source.subscribe(d -> System.out.println("Subscriber 1: "+d));
source.subscribe(d -> System.out.println("Subscriber 2: "+d));
Which outputs:
blue green orange Subscriber 1: ORANGE purple blue green orange Subscriber 2: ORANGE purple
Compared to:
UnicastProcessor<String> hotSource = UnicastProcessor.create();
Flux<String> hotFlux = hotSource.publish()
.autoConnect()
.map(String::toUpperCase);
hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));
hotSource.onNext("blue");
hotSource.onNext("green");
hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));
hotSource.onNext("orange");
hotSource.onNext("purple");
hotSource.onComplete();
Which outputs:
Subscriber 1 to Hot Source: BLUE Subscriber 1 to Hot Source: GREEN Subscriber 1 to Hot Source: ORANGE Subscriber 2 to Hot Source: ORANGE Subscriber 1 to Hot Source: PURPLE Subscriber 2 to Hot Source: PURPLE
8.3. Broadcast to multiple subscribers with ConnectableFlux
Sometimes, you don’t only want to defer some processing to the subscription time of one subscriber, but you might actually want for several of them to rendez-vous and then trigger the subscription / data generation.
This is what ConnectableFlux
is made for. Two main patterns are covered in the
Flux
API that return a ConnectableFlux
: publish
and replay
.
-
publish
will dynamically try to respect the demand from its various subscribers, in terms of backpressure, by forwarding these requests to the source. Most notably, if any subscriber has a pending demand of0
, publish will pause its requesting to the source. -
replay
will bufferize data seen through the first subscription, up to configurable limits (in time and buffer size). It will replay these to subsequent subscribers.
A ConnectableFlux
offers additional methods to manage subscriptions downstream
vs subscription to the original source. For instance:
-
connect
can be called manually once you’ve reached enough subscriptions to the flux. That will trigger the subscription to the upstream source. -
autoConnect(n)
can do the same job automatically oncen
subscriptions have been made. -
refCount(n)
not only automatically tracks incoming subscriptions but also detects when these subscriptions are cancelled. If not enough subscribers are tracked, the source is "disconnected", causing a new subscription to the source later on if additional subscribers come back in.
Flux<Integer> source = Flux.range(1, 3)
.doOnSubscribe(s -> System.out.println("subscribed to source"));
ConnectableFlux<Integer> co = source.publish();
co.subscribe(System.out::println, e -> {}, () -> {});
co.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("done subscribing");
Thread.sleep(500);
System.out.println("will now connect");
co.connect();
This outputs:
done subscribing will now connect subscribed to source 1 1 2 2 3 3
With autoConnect
:
Flux<Integer> source = Flux.range(1, 3)
.doOnSubscribe(s -> System.out.println("subscribed to source"));
Flux<Integer> autoCo = source.publish().autoConnect(2);
autoCo.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("subscribed first");
Thread.sleep(500);
System.out.println("subscribing second");
autoCo.subscribe(System.out::println, e -> {}, () -> {});
Which outputs:
subscribed first subscribing second subscribed to source 1 1 2 2 3 3
8.4. Parallelize work with ParallelFlux
With multi-core architectures being a commodity nowadays, being able to easily
parallelize work is very important. Reactor helps with that by providing a
special type, ParallelFlux
, that exposes operators that are optimized for
parallelized work.
To obtain a ParallelFlux
, one can use the parallel()
operator on any Flux
.
This will not by itself parallelize the work however, but rather will divide
the workload into "rails" (by default as many rails as there are CPU cores).
In order to tell the resulting ParallelFlux where to execute each rail (and
by extension to execute rails in parallel) you have to use runOn(Scheduler)
.
Note that there is a recommended dedicated Scheduler for parallel work:
Schedulers.parallel()
.
Contrast:
Flux.range(1, 10)
.parallel(2) (1)
.subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
1 | here we force a number of rails instead of relying on the number of CPU cores |
with:
Flux.range(1, 10)
.parallel(2)
.runOn(Schedulers.parallel())
.subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
The first outputs:
main -> 1 main -> 2 main -> 3 main -> 4 main -> 5 main -> 6 main -> 7 main -> 8 main -> 9 main -> 10
While the second correctly parallelizes on two threads:
parallel-1 -> 1 parallel-2 -> 2 parallel-1 -> 3 parallel-2 -> 4 parallel-1 -> 5 parallel-2 -> 6 parallel-1 -> 7 parallel-1 -> 9 parallel-2 -> 8 parallel-2 -> 10
If once you’ve processed your sequence in parallel you want to revert back to a
"normal" flux and apply the rest of the operator chain in a sequential manner,
you can use the sequential()
method on ParallelFlux
.
Note that it is the case by default if you subscribe
to the ParallelFlux with
a single provided Subscriber
, but not when using the lambda-based variants of
subscribe
.
You can also access individual rails or "groups" as a Flux<GroupedFlux<T>>
via
the groups()
method and apply additional operators to them via the
composeGroup()
method.
9. FAQ, best practices and other "How do I…?"
9.1. I just used an operator on my Flux
but it doesn’t seem to apply… What gives?
Check you have affected the result of the operator to the variable you .subscribe()
to.
Reactor operators are decorators, they return a different instance that wraps the source sequence and add behavior. That is why the preferred way of using operators is to chain the calls.
Compare the following:
Flux<String> secrets = Flux.just("foo", "chain");
flux.map(secret -> secret.replaceAll(".", "*")); (1)
flux.subscribe(next -> System.out.println("Received: " + next));
1 | mistake is here, the result isn’t reaffected to flux variable |
With:
Flux<String> secrets = Flux.just("foo", "chain");
flux = flux.map(secret -> secret.replaceAll(".", "*"));
flux.subscribe(next -> System.out.println("Received: " + next));
And even better:
Flux<String> secrets = Flux
.just("foo", "chain")
.map(secret -> secret.replaceAll(".", "*"))
.subscribe(next -> System.out.println("Received: " + next));
The first version will output:
Received: foo
Received: chain
Whereas the two other versions will output the expected:
Received: ***
Received: *****
9.2. My Mono
continuation (then
or and
) is never called
myMethod.process("a") // this method returns Mono<Void>
.then(aVoid -> myMethod.process("b")) //this is never called
.subscribe();
If the source Mono
is either empty
or a Mono<Void>
(a Mono<Void>
is
empty for all intent and purposes), some combinations will never be called. You
should expect that if there is a callback and the combination depends on the
value…
This includes the Function
based variant of then (then(v -> Mono.just(1))
)
and all versions of and
.
In order to aVoid this problem and still invoke the continuation lazily, when
the source Mono terminates, use the Supplier
based version:
myMethod.process("a")
.then(() -> myMethod.process("b")) //executed lazily
.subscribe();
9.3. How to use retryWhen
to emulate retry(3)
?
The retryWhen
operator can be quite complex. Hopefully this snippet of code
can help you understand how it works by attempting to emulate a simpler
retry(3)
:
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.retryWhen(companion -> companion
.zipWith(Flux.range(1, 4), (1)
(error, index) -> { (2)
if (index < 4) return index; (3)
else throw Exceptions.propagate(error); (4)
})
);
1 | Trick one: use zip and a range of "number of acceptable retries + 1"… |
2 | The zip function will allow to count the retries while keeping track of the original error. |
3 | To allow for 3 retries, indexes before 4 return a value to emit… |
4 | …but in order to terminate the sequence in error, we throw the original exception after these 3 retries. |
9.4. How to use retryWhen
for exponential backoff?
Exponential backoff produces retry attempts with a growing delay between each of the attempts, so as not to overload the source systems and risk an all out crash. The rationale is that if the source errors, it is already in an unstable state, and not likely to immediately recover from it. So blindly retrying immediately is likely to produce yet another error and add to the instability.
Here is how to implement an exponential backoff that delays retries and increase the delay between each attempt (delay == attempt number * 100 milliseconds):
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.retryWhen(companion -> companion
.doOnNext(s -> System.out.println(s + " at " + LocalTime.now())) (1)
.zipWith(Flux.range(1, 4), (error, index) -> { (2)
if (index < 4) return index;
else throw Exceptions.propagate(error);
})
.flatMap(index -> Mono.delay(Duration.ofMillis(index * 100))) (3)
.doOnNext(s -> System.out.println("retried at " + LocalTime.now())) (4)
);
1 | We log the time of errors |
2 | We use the retryWhen + zipWith trick to propagate the error after 3 retries. |
3 | Through flatMap , we cause a delay that depends on the attempt’s index. |
4 | We also log the time at which the retry effectively occurs. |
When subscribed to, this fails and terminates after printing out:
java.lang.IllegalArgumentException at 18:02:29.338 retried at 18:02:29.459 (1) java.lang.IllegalArgumentException at 18:02:29.460 retried at 18:02:29.663 (2) java.lang.IllegalArgumentException at 18:02:29.663 retried at 18:02:29.964 (3) java.lang.IllegalArgumentException at 18:02:29.964
1 | first retry after about 100ms |
2 | second retry after about 200ms |
3 | third retry after about 300ms |