Simple Ways to Create a Flux or Mono and Subscribe to It
The easiest way to get started with Flux
and Mono
is to use one of the numerous
factory methods found in their respective classes.
For instance, to create a sequence of String
, you can either enumerate them or put them
in a collection and create the Flux from it, as follows:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
Other examples of factory methods include the following:
Mono<String> noData = Mono.empty(); (1)
Mono<String> data = Mono.just("foo");
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); (2)
1 | Notice the factory method honors the generic type even though it has no value. |
2 | The first parameter is the start of the range, while the second parameter is the number of items to produce. |
When it comes to subscribing, Flux
and Mono
make use of Java 8 lambdas. You
have a wide choice of .subscribe()
variants that take lambdas for different
combinations of callbacks, as shown in the following method signatures:
Flux
subscribe(); (1)
subscribe(Consumer<? super T> consumer); (2)
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer); (3)
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer); (4)
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer); (5)
1 | Subscribe and trigger the sequence. |
2 | Do something with each produced value. |
3 | Deal with values but also react to an error. |
4 | Deal with values and errors but also run some code when the sequence successfully completes. |
5 | Deal with values and errors and successful completion but also do something with the
Subscription produced by this subscribe call. |
These variants return a reference to the subscription that you can use to cancel the
subscription when no more data is needed. Upon cancellation, the source should stop
producing values and clean up any resources it created. This cancel-and-clean-up behavior
is represented in Reactor by the general-purpose Disposable interface.
|
1. subscribe
Method Examples
This section contains minimal examples of each of the five signatures for the subscribe
method. The following code shows an example of the basic method with no arguments:
Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(); (2)
1 | Set up a Flux that produces three values when a subscriber attaches. |
2 | Subscribe in the simplest way. |
The preceding code produces no visible output, but it does work. The Flux
produces
three values. If we provide a lambda, we can make the values visible. The next example
for the subscribe
method shows one way to make the values appear:
Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(i -> System.out.println(i)); (2)
1 | Set up a Flux that produces three values when a subscriber attaches. |
2 | Subscribe with a subscriber that will print the values. |
The preceding code produces the following output:
1
2
3
To demonstrate the next signature, we intentionally introduce an error, as shown in the following example:
Flux<Integer> ints = Flux.range(1, 4) (1)
.map(i -> { (2)
if (i <= 3) return i; (3)
throw new RuntimeException("Got to 4"); (4)
});
ints.subscribe(i -> System.out.println(i), (5)
error -> System.err.println("Error: " + error));
1 | Set up a Flux that produces four values when a subscriber attaches. |
2 | We need a map so that we can handle some values differently. |
3 | For most values, return the value. |
4 | For one value, force an error. |
5 | Subscribe with a subscriber that includes an error handler. |
We now have two lambda expressions: one for the content we expect and one for errors. The preceding code produces the following output:
1
2
3
Error: java.lang.RuntimeException: Got to 4
The next signature of the subscribe
method includes both an error handler and
a handler for completion events, as shown in the following example:
Flux<Integer> ints = Flux.range(1, 4); (1)
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error " + error),
() -> System.out.println("Done")); (2)
1 | Set up a Flux that produces four values when a subscriber attaches. |
2 | Subscribe with a Subscriber that includes a handler for completion events. |
Error signals and completion signals are both terminal events and are exclusive of one another (you never get both). To make the completion consumer work, we must take care not to trigger an error.
The completion callback has no input, as represented by an empty pair of
parentheses: It matches the run
method in the Runnable
interface. The preceding code
produces the following output:
1
2
3
4
Done
2. Cancelling a subscribe()
with Its Disposable
All these lambda-based variants of subscribe()
have a Disposable
return type.
In this case, the Disposable
interface represents the fact that the subscription
can be cancelled, by calling its dispose()
method.
For a Flux
or Mono
, cancellation is a signal that the source should stop
producing elements. However, it is NOT guaranteed to be immediate: Some sources
might produce elements so fast that they could complete even before receiving the
cancel instruction.
Some utilities around Disposable
are available in the Disposables
class.
Among these, Disposables.swap()
creates a Disposable
wrapper that lets
you atomically cancel and replace a concrete Disposable
. This can be useful,
for instance, in a UI scenario where you want to cancel a request and replace it
with a new one whenever the user clicks on a button. Disposing the wrapper itself
closes it. Doing so disposes the current concrete value and all future attempted replacements.
Another interesting utility is Disposables.composite(…)
. This composite
lets you collect several Disposable
— for instance, multiple in-flight requests
associated with a service call — and dispose all of them at once later on.
Once the composite’s dispose()
method has been called, any attempt to add
another Disposable
immediately disposes it.
3. An Alternative to Lambdas: BaseSubscriber
There is an additional subscribe
method that is more generic and takes a full-blown
Subscriber
rather than composing one out of lambdas. In order to help with writing
such a Subscriber
, we provide an extendable class called BaseSubscriber
.
Instances of BaseSubscriber (or subclasses of it) are single-use,
meaning that a BaseSubscriber cancels its subscription to the first Publisher if it
is subscribed to a second Publisher .
That is because using an instance twice would violate the Reactive Streams rule that
the onNext method of a Subscriber must not be called in parallel.
As a result, anonymous implementations are fine only if they are declared directly within
the call to Publisher#subscribe(Subscriber) .
|
Now we can implement one of these. We call it a SampleSubscriber
. The following
example shows how it would be attached to a Flux
:
SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(ss);
The following example shows what SampleSubscriber
could look like, as a minimalistic
implementation of a BaseSubscriber
:
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
public class SampleSubscriber<T> extends BaseSubscriber<T> {
@Override
public void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed");
request(1);
}
@Override
public void hookOnNext(T value) {
System.out.println(value);
request(1);
}
}
The SampleSubscriber
class extends BaseSubscriber
, which is the recommended abstract
class for user-defined Subscribers
in Reactor. The class offers hooks that can be
overridden to tune the subscriber’s behavior. By default, it triggers an unbounded
request and behaves exactly as subscribe()
. However, extending BaseSubscriber
is
much more useful when you want a custom request amount.
For a custom request amount, the bare minimum is to implement hookOnSubscribe(Subscription subscription)
and hookOnNext(T value)
, as we did. In our case, the hookOnSubscribe
method
prints a statement to standard out and makes the first request. Then the hookOnNext
method prints a statement and performs additional requests, one request
at a time.
The SampleSubscriber
class produces the following output:
Subscribed
1
2
3
4
BaseSubscriber
also offers a requestUnbounded()
method to switch to unbounded mode
(equivalent to request(Long.MAX_VALUE)
), as well as a cancel()
method.
It also has additional hooks: hookOnComplete
, hookOnError
, hookOnCancel
, and hookFinally
(which is always called when the sequence terminates, with the type of termination passed
in as a SignalType
parameter)
You almost certainly want to implement the hookOnError , hookOnCancel , and
hookOnComplete methods. You may also want to implement the hookFinally method.
SampleSubscriber is the absolute minimum implementation of a Subscriber that performs
bounded requests.
|
4. On Backpressure and Ways to Reshape Requests
When implementing backpressure in Reactor, the way consumer pressure is propagated back to the source is by sending a request
to the upstream operator.
The sum of current requests is sometimes referenced to as the current “demand”, or “pending request”.
Demand is capped at Long.MAX_VALUE
, representing an unbounded request (meaning “produce as fast as you can” — basically disabling backpressure).
The first request comes from the final subscriber at subscription time, yet the most direct ways of subscribing all immediately trigger an unbounded request of Long.MAX_VALUE
:
-
subscribe()
and most of its lambda-based variants (to the exception of the one that has aConsumer<Subscription>
) -
block()
,blockFirst()
andblockLast()
-
iterating over a
toIterable()
ortoStream()
The simplest way of customizing the original request is to subscribe
with a BaseSubscriber
with the hookOnSubscribe
method overridden, as the following example shows:
Flux.range(1, 10)
.doOnRequest(r -> System.out.println("request of " + r))
.subscribe(new BaseSubscriber<Integer>() {
@Override
public void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
public void hookOnNext(Integer integer) {
System.out.println("Cancelling after having received " + integer);
cancel();
}
});
The preceding snippet prints out the following:
request of 1
Cancelling after having received 1
When manipulating a request, you must be careful to produce enough demand for
the sequence to advance, or your Flux can get “stuck”. That is why BaseSubscriber
defaults to an unbounded request in hookOnSubscribe . When overriding this hook, you should usually
call request at least once.
|
4.1. Operators that Change the Demand from Downstream
One thing to keep in mind is that demand expressed at the subscribe level can be reshaped by each operator in the upstream chain.
A textbook case is the buffer(N)
operator: If it receives a request(2)
, it is interpreted as a demand for two full buffers.
As a consequence, since buffers need N
elements to be considered full, the buffer
operator reshapes the request to 2 x N
.
You might also have noticed that some operators have variants that take an int
input parameter called prefetch
.
This is another category of operators that modify the downstream request.
These are usually operators that deal with inner sequences, deriving a Publisher
from each incoming element (like flatMap
).
Prefetch is a way to tune the initial request made on these inner sequences.
If unspecified, most of these operators start with a demand of 32
.
These operators usually also implement a replenishing optimization: Once the operator has seen 75% of the prefetch request fulfilled, it re-requests 75% from upstream. This is a heuristic optimization made so that these operators proactively anticipate the upcoming requests.
Finally, a couple of operators let you directly tune the request: limitRate
and limitRequest
.
limitRate(N)
splits the downstream requests so that they are propagated upstream in smaller batches.
For instance, a request of 100
made to limitRate(10)
would result in, at most, 10
requests of 10
being propagated to the upstream.
Note that, in this form, limitRate
actually implements the replenishing optimization discussed earlier.
The operator has a variant that also lets you tune the replenishing amount (referred to as the lowTide
in the variant): limitRate(highTide, lowTide)
.
Choosing a lowTide
of 0
results in strict batches of highTide
requests, instead of batches further reworked by the replenishing strategy.
limitRequest(N)
, on the other hand, caps the downstream request to a maximum total demand.
It adds up requests up to N
. If a single request
does not make the total demand overflow over N
, that particular request is wholly propagated upstream.
After that amount has been emitted by the source, limitRequest
considers the sequence complete, sends an onComplete
signal downstream, and cancels the source.