Simple Ways to Create a Flux or Mono and Subscribe to It

The easiest way to get started with Flux and Mono is to use one of the numerous factory methods found in their respective classes.

For instance, to create a sequence of String, you can either enumerate them or put them in a collection and create the Flux from it, as follows:

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);

Other examples of factory methods include the following:

Mono<String> noData = Mono.empty(); (1)

Mono<String> data = Mono.just("foo");

Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); (2)
1 Notice the factory method honors the generic type even though it has no value.
2 The first parameter is the start of the range, while the second parameter is the number of items to produce.

When it comes to subscribing, Flux and Mono make use of Java 8 lambdas. You have a wide choice of .subscribe() variants that take lambdas for different combinations of callbacks, as shown in the following method signatures:

Lambda-based subscribe variants for Flux
subscribe(); (1)

subscribe(Consumer<? super T> consumer); (2)

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); (3)

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); (4)

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); (5)
1 Subscribe and trigger the sequence.
2 Do something with each produced value.
3 Deal with values but also react to an error.
4 Deal with values and errors but also run some code when the sequence successfully completes.
5 Deal with values and errors and successful completion but also do something with the Subscription produced by this subscribe call.
These variants return a reference to the subscription that you can use to cancel the subscription when no more data is needed. Upon cancellation, the source should stop producing values and clean up any resources it created. This cancel-and-clean-up behavior is represented in Reactor by the general-purpose Disposable interface.

1. subscribe Method Examples

This section contains minimal examples of each of the five signatures for the subscribe method. The following code shows an example of the basic method with no arguments:

Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(); (2)
1 Set up a Flux that produces three values when a subscriber attaches.
2 Subscribe in the simplest way.

The preceding code produces no visible output, but it does work. The Flux produces three values. If we provide a lambda, we can make the values visible. The next example for the subscribe method shows one way to make the values appear:

Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(i -> System.out.println(i)); (2)
1 Set up a Flux that produces three values when a subscriber attaches.
2 Subscribe with a subscriber that will print the values.

The preceding code produces the following output:

1
2
3

To demonstrate the next signature, we intentionally introduce an error, as shown in the following example:

Flux<Integer> ints = Flux.range(1, 4) (1)
      .map(i -> { (2)
        if (i <= 3) return i; (3)
        throw new RuntimeException("Got to 4"); (4)
      });
ints.subscribe(i -> System.out.println(i), (5)
      error -> System.err.println("Error: " + error));
1 Set up a Flux that produces four values when a subscriber attaches.
2 We need a map so that we can handle some values differently.
3 For most values, return the value.
4 For one value, force an error.
5 Subscribe with a subscriber that includes an error handler.

We now have two lambda expressions: one for the content we expect and one for errors. The preceding code produces the following output:

1
2
3
Error: java.lang.RuntimeException: Got to 4

The next signature of the subscribe method includes both an error handler and a handler for completion events, as shown in the following example:

Flux<Integer> ints = Flux.range(1, 4); (1)
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> System.out.println("Done")); (2)
1 Set up a Flux that produces four values when a subscriber attaches.
2 Subscribe with a Subscriber that includes a handler for completion events.

Error signals and completion signals are both terminal events and are exclusive of one another (you never get both). To make the completion consumer work, we must take care not to trigger an error.

The completion callback has no input, as represented by an empty pair of parentheses: It matches the run method in the Runnable interface. The preceding code produces the following output:

1
2
3
4
Done

2. Cancelling a subscribe() with Its Disposable

All these lambda-based variants of subscribe() have a Disposable return type. In this case, the Disposable interface represents the fact that the subscription can be cancelled, by calling its dispose() method.

For a Flux or Mono, cancellation is a signal that the source should stop producing elements. However, it is NOT guaranteed to be immediate: Some sources might produce elements so fast that they could complete even before receiving the cancel instruction.

Some utilities around Disposable are available in the Disposables class. Among these, Disposables.swap() creates a Disposable wrapper that lets you atomically cancel and replace a concrete Disposable. This can be useful, for instance, in a UI scenario where you want to cancel a request and replace it with a new one whenever the user clicks on a button. Disposing the wrapper itself closes it. Doing so disposes the current concrete value and all future attempted replacements.

Another interesting utility is Disposables.composite(…​). This composite lets you collect several Disposable — for instance, multiple in-flight requests associated with a service call — and dispose all of them at once later on. Once the composite’s dispose() method has been called, any attempt to add another Disposable immediately disposes it.

3. An Alternative to Lambdas: BaseSubscriber

There is an additional subscribe method that is more generic and takes a full-blown Subscriber rather than composing one out of lambdas. In order to help with writing such a Subscriber, we provide an extendable class called BaseSubscriber.

Instances of BaseSubscriber (or subclasses of it) are single-use, meaning that a BaseSubscriber cancels its subscription to the first Publisher if it is subscribed to a second Publisher. That is because using an instance twice would violate the Reactive Streams rule that the onNext method of a Subscriber must not be called in parallel. As a result, anonymous implementations are fine only if they are declared directly within the call to Publisher#subscribe(Subscriber).

Now we can implement one of these. We call it a SampleSubscriber. The following example shows how it would be attached to a Flux:

SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(ss);

The following example shows what SampleSubscriber could look like, as a minimalistic implementation of a BaseSubscriber:

import org.reactivestreams.Subscription;

import reactor.core.publisher.BaseSubscriber;

public class SampleSubscriber<T> extends BaseSubscriber<T> {

	@Override
	public void hookOnSubscribe(Subscription subscription) {
		System.out.println("Subscribed");
		request(1);
	}

	@Override
	public void hookOnNext(T value) {
		System.out.println(value);
		request(1);
	}
}

The SampleSubscriber class extends BaseSubscriber, which is the recommended abstract class for user-defined Subscribers in Reactor. The class offers hooks that can be overridden to tune the subscriber’s behavior. By default, it triggers an unbounded request and behaves exactly as subscribe(). However, extending BaseSubscriber is much more useful when you want a custom request amount.

For a custom request amount, the bare minimum is to implement hookOnSubscribe(Subscription subscription) and hookOnNext(T value), as we did. In our case, the hookOnSubscribe method prints a statement to standard out and makes the first request. Then the hookOnNext method prints a statement and performs additional requests, one request at a time.

The SampleSubscriber class produces the following output:

Subscribed
1
2
3
4

BaseSubscriber also offers a requestUnbounded() method to switch to unbounded mode (equivalent to request(Long.MAX_VALUE)), as well as a cancel() method.

It also has additional hooks: hookOnComplete, hookOnError, hookOnCancel, and hookFinally (which is always called when the sequence terminates, with the type of termination passed in as a SignalType parameter)

You almost certainly want to implement the hookOnError, hookOnCancel, and hookOnComplete methods. You may also want to implement the hookFinally method. SampleSubscriber is the absolute minimum implementation of a Subscriber that performs bounded requests.

4. On Backpressure and Ways to Reshape Requests

When implementing backpressure in Reactor, the way consumer pressure is propagated back to the source is by sending a request to the upstream operator. The sum of current requests is sometimes referenced to as the current “demand”, or “pending request”. Demand is capped at Long.MAX_VALUE, representing an unbounded request (meaning “produce as fast as you can” — basically disabling backpressure).

The first request comes from the final subscriber at subscription time, yet the most direct ways of subscribing all immediately trigger an unbounded request of Long.MAX_VALUE:

  • subscribe() and most of its lambda-based variants (to the exception of the one that has a Consumer<Subscription>)

  • block(), blockFirst() and blockLast()

  • iterating over a toIterable() or toStream()

The simplest way of customizing the original request is to subscribe with a BaseSubscriber with the hookOnSubscribe method overridden, as the following example shows:

Flux.range(1, 10)
    .doOnRequest(r -> System.out.println("request of " + r))
    .subscribe(new BaseSubscriber<Integer>() {

      @Override
      public void hookOnSubscribe(Subscription subscription) {
        request(1);
      }

      @Override
      public void hookOnNext(Integer integer) {
        System.out.println("Cancelling after having received " + integer);
        cancel();
      }
    });

The preceding snippet prints out the following:

request of 1
Cancelling after having received 1
When manipulating a request, you must be careful to produce enough demand for the sequence to advance, or your Flux can get “stuck”. That is why BaseSubscriber defaults to an unbounded request in hookOnSubscribe. When overriding this hook, you should usually call request at least once.

4.1. Operators that Change the Demand from Downstream

One thing to keep in mind is that demand expressed at the subscribe level can be reshaped by each operator in the upstream chain. A textbook case is the buffer(N) operator: If it receives a request(2), it is interpreted as a demand for two full buffers. As a consequence, since buffers need N elements to be considered full, the buffer operator reshapes the request to 2 x N.

You might also have noticed that some operators have variants that take an int input parameter called prefetch. This is another category of operators that modify the downstream request. These are usually operators that deal with inner sequences, deriving a Publisher from each incoming element (like flatMap).

Prefetch is a way to tune the initial request made on these inner sequences. If unspecified, most of these operators start with a demand of 32.

These operators usually also implement a replenishing optimization: Once the operator has seen 75% of the prefetch request fulfilled, it re-requests 75% from upstream. This is a heuristic optimization made so that these operators proactively anticipate the upcoming requests.

Finally, a couple of operators let you directly tune the request: limitRate and limitRequest.

limitRate(N) splits the downstream requests so that they are propagated upstream in smaller batches. For instance, a request of 100 made to limitRate(10) would result in, at most, 10 requests of 10 being propagated to the upstream. Note that, in this form, limitRate actually implements the replenishing optimization discussed earlier.

The operator has a variant that also lets you tune the replenishing amount (referred to as the lowTide in the variant): limitRate(highTide, lowTide). Choosing a lowTide of 0 results in strict batches of highTide requests, instead of batches further reworked by the replenishing strategy.

limitRequest(N), on the other hand, caps the downstream request to a maximum total demand. It adds up requests up to N. If a single request does not make the total demand overflow over N, that particular request is wholly propagated upstream. After that amount has been emitted by the source, limitRequest considers the sequence complete, sends an onComplete signal downstream, and cancels the source.