Simple Ways to Create a Flux or Mono and Subscribe to It
The easiest way to get started with Flux and Mono is to use one of the numerous
factory methods found in their respective classes.
For instance, to create a sequence of String, you can either enumerate them or put them
in a collection and create the Flux from it, as follows:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);Other examples of factory methods include the following:
Mono<String> noData = Mono.empty(); (1)
Mono<String> data = Mono.just("foo");
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); (2)| 1 | Notice the factory method honors the generic type even though it has no value. | 
| 2 | The first parameter is the start of the range, while the second parameter is the number of items to produce. | 
When it comes to subscribing, Flux and Mono make use of Java 8 lambdas. You
have a wide choice of .subscribe() variants that take lambdas for different
combinations of callbacks, as shown in the following method signatures:
Fluxsubscribe(); (1)
subscribe(Consumer<? super T> consumer); (2)
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); (3)
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); (4)
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); (5)| 1 | Subscribe and trigger the sequence. | 
| 2 | Do something with each produced value. | 
| 3 | Deal with values but also react to an error. | 
| 4 | Deal with values and errors but also run some code when the sequence successfully completes. | 
| 5 | Deal with values and errors and successful completion but also do something with the Subscriptionproduced by thissubscribecall. | 
| These variants return a reference to the subscription that you can use to cancel the
subscription when no more data is needed. Upon cancellation, the source should stop
producing values and clean up any resources it created. This cancel-and-clean-up behavior
is represented in Reactor by the general-purpose Disposableinterface. | 
1. subscribe Method Examples
This section contains minimal examples of each of the five signatures for the subscribe
method. The following code shows an example of the basic method with no arguments:
Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(); (2)| 1 | Set up a Fluxthat produces three values when a subscriber attaches. | 
| 2 | Subscribe in the simplest way. | 
The preceding code produces no visible output, but it does work. The Flux produces
three values. If we provide a lambda, we can make the values visible. The next example
for the subscribe method shows one way to make the values appear:
Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(i -> System.out.println(i)); (2)| 1 | Set up a Fluxthat produces three values when a subscriber attaches. | 
| 2 | Subscribe with a subscriber that will print the values. | 
The preceding code produces the following output:
1
2
3To demonstrate the next signature, we intentionally introduce an error, as shown in the following example:
Flux<Integer> ints = Flux.range(1, 4) (1)
      .map(i -> { (2)
        if (i <= 3) return i; (3)
        throw new RuntimeException("Got to 4"); (4)
      });
ints.subscribe(i -> System.out.println(i), (5)
      error -> System.err.println("Error: " + error));| 1 | Set up a Flux that produces four values when a subscriber attaches. | 
| 2 | We need a map so that we can handle some values differently. | 
| 3 | For most values, return the value. | 
| 4 | For one value, force an error. | 
| 5 | Subscribe with a subscriber that includes an error handler. | 
We now have two lambda expressions: one for the content we expect and one for errors. The preceding code produces the following output:
1
2
3
Error: java.lang.RuntimeException: Got to 4The next signature of the subscribe method includes both an error handler and
a handler for completion events, as shown in the following example:
Flux<Integer> ints = Flux.range(1, 4); (1)
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> System.out.println("Done")); (2)| 1 | Set up a Flux that produces four values when a subscriber attaches. | 
| 2 | Subscribe with a Subscriber that includes a handler for completion events. | 
Error signals and completion signals are both terminal events and are exclusive of one another (you never get both). To make the completion consumer work, we must take care not to trigger an error.
The completion callback has no input, as represented by an empty pair of
parentheses: It matches the run method in the Runnable interface. The preceding code
produces the following output:
1
2
3
4
Done2. Cancelling a subscribe() with Its Disposable
All these lambda-based variants of subscribe() have a Disposable return type.
In this case, the Disposable interface represents the fact that the subscription
can be cancelled, by calling its dispose() method.
For a Flux or Mono, cancellation is a signal that the source should stop
producing elements. However, it is NOT guaranteed to be immediate: Some sources
might produce elements so fast that they could complete even before receiving the
cancel instruction.
Some utilities around Disposable are available in the Disposables class.
Among these, Disposables.swap() creates a Disposable wrapper that lets
you atomically cancel and replace a concrete Disposable. This can be useful,
for instance, in a UI scenario where you want to cancel a request and replace it
with a new one whenever the user clicks on a button. Disposing the wrapper itself
closes it. Doing so disposes the current concrete value and all future attempted replacements.
Another interesting utility is Disposables.composite(…). This composite
lets you collect several Disposable — for instance, multiple in-flight requests
associated with a service call — and dispose all of them at once later on.
Once the composite’s dispose() method has been called, any attempt to add
another Disposable immediately disposes it.
3. An Alternative to Lambdas: BaseSubscriber
There is an additional subscribe method that is more generic and takes a full-blown
Subscriber rather than composing one out of lambdas. In order to help with writing
such a Subscriber, we provide an extendable class called BaseSubscriber.
| Instances of BaseSubscriber(or subclasses of it) are single-use,
meaning that aBaseSubscribercancels its subscription to the firstPublisherif it
is subscribed to a secondPublisher.
That is because using an instance twice would violate the Reactive Streams rule that
theonNextmethod of aSubscribermust not be called in parallel.
As a result, anonymous implementations are fine only if they are declared directly within
the call toPublisher#subscribe(Subscriber). | 
Now we can implement one of these. We call it a SampleSubscriber. The following
example shows how it would be attached to a Flux:
SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(ss);The following example shows what SampleSubscriber could look like, as a minimalistic
implementation of a BaseSubscriber:
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
public class SampleSubscriber<T> extends BaseSubscriber<T> {
	@Override
	public void hookOnSubscribe(Subscription subscription) {
		System.out.println("Subscribed");
		request(1);
	}
	@Override
	public void hookOnNext(T value) {
		System.out.println(value);
		request(1);
	}
}The SampleSubscriber class extends BaseSubscriber, which is the recommended abstract
class for user-defined Subscribers in Reactor. The class offers hooks that can be
overridden to tune the subscriber’s behavior. By default, it triggers an unbounded
request and behaves exactly as subscribe(). However, extending BaseSubscriber is
much more useful when you want a custom request amount.
For a custom request amount, the bare minimum is to implement hookOnSubscribe(Subscription subscription)
and hookOnNext(T value), as we did. In our case, the hookOnSubscribe method
prints a statement to standard out and makes the first request. Then the hookOnNext
method prints a statement and performs additional requests, one request
at a time.
The SampleSubscriber class produces the following output:
Subscribed
1
2
3
4BaseSubscriber also offers a requestUnbounded() method to switch to unbounded mode
(equivalent to request(Long.MAX_VALUE)), as well as a cancel() method.
It also has additional hooks: hookOnComplete, hookOnError, hookOnCancel, and hookFinally
(which is always called when the sequence terminates, with the type of termination passed
in as a SignalType parameter)
| You almost certainly want to implement the hookOnError,hookOnCancel, andhookOnCompletemethods. You may also want to implement thehookFinallymethod.SampleSubscriberis the absolute minimum implementation of aSubscriberthat performs
bounded requests. | 
4. On Backpressure and Ways to Reshape Requests
When implementing backpressure in Reactor, the way consumer pressure is propagated back to the source is by sending a request to the upstream operator.
The sum of current requests is sometimes referenced to as the current “demand”, or “pending request”.
Demand is capped at Long.MAX_VALUE, representing an unbounded request (meaning “produce as fast as you can” — basically disabling backpressure).
The first request comes from the final subscriber at subscription time, yet the most direct ways of subscribing all immediately trigger an unbounded request of Long.MAX_VALUE:
- 
subscribe()and most of its lambda-based variants (to the exception of the one that has aConsumer<Subscription>)
- 
block(),blockFirst()andblockLast()
- 
iterating over a toIterable()ortoStream()
The simplest way of customizing the original request is to subscribe with a BaseSubscriber with the hookOnSubscribe method overridden, as the following example shows:
Flux.range(1, 10)
    .doOnRequest(r -> System.out.println("request of " + r))
    .subscribe(new BaseSubscriber<Integer>() {
      @Override
      public void hookOnSubscribe(Subscription subscription) {
        request(1);
      }
      @Override
      public void hookOnNext(Integer integer) {
        System.out.println("Cancelling after having received " + integer);
        cancel();
      }
    });The preceding snippet prints out the following:
request of 1
Cancelling after having received 1| When manipulating a request, you must be careful to produce enough demand for
the sequence to advance, or your Flux can get “stuck”. That is why BaseSubscriberdefaults to an unbounded request inhookOnSubscribe. When overriding this hook, you should usually
callrequestat least once. | 
4.1. Operators that Change the Demand from Downstream
One thing to keep in mind is that demand expressed at the subscribe level can be reshaped by each operator in the upstream chain.
A textbook case is the buffer(N) operator: If it receives a request(2), it is interpreted as a demand for two full buffers.
As a consequence, since buffers need N elements to be considered full, the buffer operator reshapes the request to 2 x N.
You might also have noticed that some operators have variants that take an int input parameter called prefetch.
This is another category of operators that modify the downstream request.
These are usually operators that deal with inner sequences, deriving a Publisher from each incoming element (like flatMap).
Prefetch is a way to tune the initial request made on these inner sequences.
If unspecified, most of these operators start with a demand of 32.
These operators usually also implement a replenishing optimization: Once the operator has seen 75% of the prefetch request fulfilled, it re-requests 75% from upstream. This is a heuristic optimization made so that these operators proactively anticipate the upcoming requests.
Finally, a couple of operators let you directly tune the request: limitRate and limitRequest.
limitRate(N) splits the downstream requests so that they are propagated upstream in smaller batches.
For instance, a request of 100 made to limitRate(10) would result in, at most, 10 requests of 10 being propagated to the upstream.
Note that, in this form, limitRate actually implements the replenishing optimization discussed earlier.
The operator has a variant that also lets you tune the replenishing amount (referred to as the lowTide in the variant): limitRate(highTide, lowTide).
Choosing a lowTide of 0 results in strict batches of highTide requests, instead of batches further reworked by the replenishing strategy.
limitRequest(N), on the other hand, caps the downstream request to a maximum total demand.
It adds up requests up to N. If a single request does not make the total demand overflow over N, that particular request is wholly propagated upstream.
After that amount has been emitted by the source, limitRequest considers the sequence complete, sends an onComplete signal downstream, and cancels the source.