Programmatically creating a sequence
In this section, we introduce the creation of a Flux
or a Mono
by
programmatically defining its associated events (onNext
, onError
, and
onComplete
). All these methods share the fact that they expose an API to
trigger the events that we call a sink. There are actually a few sink
variants, which we’ll get to shortly.
1. Synchronous generate
The simplest form of programmatic creation of a Flux
is through the generate
method, which takes a generator function.
This is for synchronous and one-by-one emissions, meaning that
the sink is a SynchronousSink
and that its next()
method can only be called
at most once per callback invocation. You can then additionally call error(Throwable)
or complete()
, but this is optional.
The most useful variant is probably the one that also lets you keep a state
that you can refer to in your sink usage to decide what to emit next. The generator
function then becomes a BiFunction<S, SynchronousSink<T>, S>
, with <S>
the
type of the state object. You have to provide a Supplier<S>
for the initial
state, and your generator function now returns a new state on each round.
For instance, you could use an int
as the state:
generate
Flux<String> flux = Flux.generate(
() -> 0, (1)
(state, sink) -> {
sink.next("3 x " + state + " = " + 3*state); (2)
if (state == 10) sink.complete(); (3)
return state + 1; (4)
});
1 | We supply the initial state value of 0. |
2 | We use the state to choose what to emit (a row in the multiplication table of 3). |
3 | We also use it to choose when to stop. |
4 | We return a new state that we use in the next invocation (unless the sequence terminated in this one). |
The preceding code generates the table of 3, as the following sequence:
3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30
You can also use a mutable <S>
. The example above could for instance be
rewritten using a single AtomicLong
as the state, mutating it on each round:
Flux<String> flux = Flux.generate(
AtomicLong::new, (1)
(state, sink) -> {
long i = state.getAndIncrement(); (2)
sink.next("3 x " + i + " = " + 3*i);
if (i == 10) sink.complete();
return state; (3)
});
1 | This time, we generate a mutable object as the state. |
2 | We mutate the state here. |
3 | We return the same instance as the new state. |
If your state object needs to clean up some resources, use the
generate(Supplier<S>, BiFunction, Consumer<S>) variant to clean up the last
state instance.
|
The following example uses the generate
method that includes a Consumer
:
Flux<String> flux = Flux.generate(
AtomicLong::new,
(state, sink) -> { (1)
long i = state.getAndIncrement(); (2)
sink.next("3 x " + i + " = " + 3*i);
if (i == 10) sink.complete();
return state; (3)
}, (state) -> System.out.println("state: " + state)); (4)
1 | Again, we generate a mutable object as the state. |
2 | We mutate the state here. |
3 | We return the same instance as the new state. |
4 | We see the last state value (11) as the output of this Consumer lambda. |
In the case of the state containing a database connection or other resource
that needs to be handled at the end of the process, the Consumer
lambda could
close the connection or otherwise handle any tasks that should be done at the
end of the process.
2. Asynchronous and Multi-threaded: create
create
is a more advanced form of programmatic creation of a Flux
which is
suitable for multiple emissions per round, even from multiple threads.
It exposes a FluxSink
, with its next
, error
, and complete
methods.
Contrary to generate
, it doesn’t have a state-based variant. On the other
hand, it can trigger multi-threaded events in the callback.
create can be very useful to bridge an existing API with the reactive
world - such as an asynchronous API based on listeners.
|
create doesn’t parallelize your code nor does it make it asynchronous, even
though it can be used with asynchronous APIs. If you block within the create lambda,
you expose yourself to deadlocks and similar side effects. Even with the use of subscribeOn ,
there’s the caveat that a long-blocking create lambda (such as an infinite loop calling
sink.next(t) ) can lock the pipeline: the requests would never be performed due to the
loop starving the same thread they are supposed to run from. Use the subscribeOn(Scheduler, false)
variant: requestOnSeparateThread = false will use the Scheduler thread for the create
and still let data flow by performing request in the original thread.
|
Imagine that you use a listener-based API. It processes data by chunks
and has two events: (1) a chunk of data is ready and (2) the processing is
complete (terminal event), as represented in the MyEventListener
interface:
interface MyEventListener<T> {
void onDataChunk(List<T> chunk);
void processComplete();
}
You can use create
to bridge this into a Flux<T>
:
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register( (4)
new MyEventListener<String>() { (1)
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s); (2)
}
}
public void processComplete() {
sink.complete(); (3)
}
});
});
1 | Bridge to the MyEventListener API |
2 | Each element in a chunk becomes an element in the Flux . |
3 | The processComplete event is translated to onComplete . |
4 | All of this is done asynchronously whenever the myEventProcessor executes. |
Additionally, since create
can bridge asynchronous APIs and manages backpressure, you
can refine how to behave backpressure-wise, by indicating an OverflowStrategy
:
-
IGNORE
to Completely ignore downstream backpressure requests. This may yieldIllegalStateException
when queues get full downstream. -
ERROR
to signal anIllegalStateException
when the downstream can’t keep up. -
DROP
to drop the incoming signal if the downstream is not ready to receive it. -
LATEST
to let downstream only get the latest signals from upstream. -
BUFFER
(the default) to buffer all signals if the downstream can’t keep up. (this does unbounded buffering and may lead toOutOfMemoryError
).
Mono also has a create generator. The MonoSink of Mono’s create
doesn’t allow several emissions. It will drop all signals after the first one.
|
3. Asynchronous but single-threaded: push
push
is a middle ground between generate
and create
which is suitable for
processing events from a single producer. It is similar to create
in the sense
that it can also be asynchronous and can manage backpressure using any of the
overflow strategies supported by create
. However, only one producing thread
may invoke next
, complete
or error
at a time.
Flux<String> bridge = Flux.push(sink -> {
myEventProcessor.register(
new SingleThreadEventListener<String>() { (1)
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s); (2)
}
}
public void processComplete() {
sink.complete(); (3)
}
public void processError(Throwable e) {
sink.error(e); (4)
}
});
});
1 | Bridge to the SingleThreadEventListener API. |
2 | Events are pushed to the sink using next from a single listener thread. |
3 | complete event generated from the same listener thread. |
4 | error event also generated from the same listener thread. |
3.1. A hybrid push/pull model
Most Reactor operators, like create
, follow a hybrid push/pull model.
What we mean by that is that despite most of the processing being asynchronous
(suggesting a push approach), there is a small pull component to it: the
request.
The consumer pulls data from the source in the sense that it won’t emit anything until first requested. The source pushes data to the consumer whenever it becomes available, but within the bounds of its requested amount.
Note that push()
and create()
both allow to set up an onRequest
consumer
in order to manage the request amount and to ensure that data is pushed through
the sink only when there is pending request.
Flux<String> bridge = Flux.create(sink -> {
myMessageProcessor.register(
new MyMessageListener<String>() {
public void onMessage(List<String> messages) {
for(String s : messages) {
sink.next(s); (3)
}
}
});
sink.onRequest(n -> {
List<String> messages = myMessageProcessor.getHistory(n); (1)
for(String s : messages) {
sink.next(s); (2)
}
});
});
1 | Poll for messages when requests are made. |
2 | If messages are available immediately, push them to the sink. |
3 | The remaining messages that arrive asynchronously later are also delivered. |
3.2. Cleaning up after push()
or create()
Two callbacks, onDispose
and onCancel
, perform any cleanup on cancellation
or termination. onDispose
can be used to perform cleanup when the Flux
completes, errors out, or is cancelled. onCancel
can be used to perform any
action specific to cancellation prior to cleanup with onDispose
.
Flux<String> bridge = Flux.create(sink -> {
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel()) (1)
.onDispose(() -> channel.close()) (2)
});
1 | onCancel is invoked first, for cancel signal only. |
2 | onDispose is invoked for complete, error, or cancel signals. |
4. Handle
The handle
method is a bit different: it is an instance method, meaning that
it is chained on an existing source (as are the common operators). It is present
in both Mono
and Flux
.
It is close to generate
, in the sense that it uses a SynchronousSink
and
only allows one-by-one emissions. However, handle
can be used to generate an
arbitrary value out of each source element, possibly skipping some elements. In
this way, it can serve as a combination of map
and filter
. The signature of
handle is as follows:
Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);
Let’s consider an example. The reactive streams specification disallows null
values in a sequence. What if you want to perform a map
but you want to use
a preexisting method as the map function, and that method sometimes returns null?
For instance, the following method can be applied safely to a source of integers:
public String alphabet(int letterNumber) {
if (letterNumber < 1 || letterNumber > 26) {
return null;
}
int letterIndexAscii = 'A' + letterNumber - 1;
return "" + (char) letterIndexAscii;
}
We can then use handle
to remove any nulls:
handle
for a "map and eliminate nulls" scenarioFlux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
.handle((i, sink) -> {
String letter = alphabet(i); (1)
if (letter != null) (2)
sink.next(letter); (3)
});
alphabet.subscribe(System.out::println);
1 | Map to letters. |
2 | If the "map function" returns null…. |
3 | Filter it out by not calling sink.next . |
Which will print out:
M
I
T