Programmatically creating a sequence

In this section, we introduce the creation of a Flux or a Mono by programmatically defining its associated events (onNext, onError, and onComplete). All these methods share the fact that they expose an API to trigger the events that we call a sink. There are actually a few sink variants, which we’ll get to shortly.

1. Synchronous generate

The simplest form of programmatic creation of a Flux is through the generate method, which takes a generator function.

This is for synchronous and one-by-one emissions, meaning that the sink is a SynchronousSink and that its next() method can only be called at most once per callback invocation. You can then additionally call error(Throwable) or complete(), but this is optional.

The most useful variant is probably the one that also lets you keep a state that you can refer to in your sink usage to decide what to emit next. The generator function then becomes a BiFunction<S, SynchronousSink<T>, S>, with <S> the type of the state object. You have to provide a Supplier<S> for the initial state, and your generator function now returns a new state on each round.

For instance, you could use an int as the state:

Example of state-based generate
Flux<String> flux = Flux.generate(
    () -> 0, (1)
    (state, sink) -> {
      sink.next("3 x " + state + " = " + 3*state); (2)
      if (state == 10) sink.complete(); (3)
      return state + 1; (4)
    });
1 We supply the initial state value of 0.
2 We use the state to choose what to emit (a row in the multiplication table of 3).
3 We also use it to choose when to stop.
4 We return a new state that we use in the next invocation (unless the sequence terminated in this one).

The preceding code generates the table of 3, as the following sequence:

3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30

You can also use a mutable <S>. The example above could for instance be rewritten using a single AtomicLong as the state, mutating it on each round:

Mutable state variant
Flux<String> flux = Flux.generate(
    AtomicLong::new, (1)
    (state, sink) -> {
      long i = state.getAndIncrement(); (2)
      sink.next("3 x " + i + " = " + 3*i);
      if (i == 10) sink.complete();
      return state; (3)
    });
1 This time, we generate a mutable object as the state.
2 We mutate the state here.
3 We return the same instance as the new state.
If your state object needs to clean up some resources, use the generate(Supplier<S>, BiFunction, Consumer<S>) variant to clean up the last state instance.

The following example uses the generate method that includes a Consumer:

Flux<String> flux = Flux.generate(
    AtomicLong::new,
      (state, sink) -> { (1)
      long i = state.getAndIncrement(); (2)
      sink.next("3 x " + i + " = " + 3*i);
      if (i == 10) sink.complete();
      return state; (3)
    }, (state) -> System.out.println("state: " + state)); (4)
1 Again, we generate a mutable object as the state.
2 We mutate the state here.
3 We return the same instance as the new state.
4 We see the last state value (11) as the output of this Consumer lambda.

In the case of the state containing a database connection or other resource that needs to be handled at the end of the process, the Consumer lambda could close the connection or otherwise handle any tasks that should be done at the end of the process.

2. Asynchronous and Multi-threaded: create

create is a more advanced form of programmatic creation of a Flux which is suitable for multiple emissions per round, even from multiple threads.

It exposes a FluxSink, with its next, error, and complete methods. Contrary to generate, it doesn’t have a state-based variant. On the other hand, it can trigger multi-threaded events in the callback.

create can be very useful to bridge an existing API with the reactive world - such as an asynchronous API based on listeners.
create doesn’t parallelize your code nor does it make it asynchronous, even though it can be used with asynchronous APIs. If you block within the create lambda, you expose yourself to deadlocks and similar side effects. Even with the use of subscribeOn, there’s the caveat that a long-blocking create lambda (such as an infinite loop calling sink.next(t)) can lock the pipeline: the requests would never be performed due to the loop starving the same thread they are supposed to run from. Use the subscribeOn(Scheduler, false) variant: requestOnSeparateThread = false will use the Scheduler thread for the create and still let data flow by performing request in the original thread.

Imagine that you use a listener-based API. It processes data by chunks and has two events: (1) a chunk of data is ready and (2) the processing is complete (terminal event), as represented in the MyEventListener interface:

interface MyEventListener<T> {
    void onDataChunk(List<T> chunk);
    void processComplete();
}

You can use create to bridge this into a Flux<T>:

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register( (4)
      new MyEventListener<String>() { (1)

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); (2)
          }
        }

        public void processComplete() {
            sink.complete(); (3)
        }
    });
});
1 Bridge to the MyEventListener API
2 Each element in a chunk becomes an element in the Flux.
3 The processComplete event is translated to onComplete.
4 All of this is done asynchronously whenever the myEventProcessor executes.

Additionally, since create can bridge asynchronous APIs and manages backpressure, you can refine how to behave backpressure-wise, by indicating an OverflowStrategy:

  • IGNORE to Completely ignore downstream backpressure requests. This may yield IllegalStateException when queues get full downstream.

  • ERROR to signal an IllegalStateException when the downstream can’t keep up.

  • DROP to drop the incoming signal if the downstream is not ready to receive it.

  • LATEST to let downstream only get the latest signals from upstream.

  • BUFFER (the default) to buffer all signals if the downstream can’t keep up. (this does unbounded buffering and may lead to OutOfMemoryError).

Mono also has a create generator. The MonoSink of Mono’s create doesn’t allow several emissions. It will drop all signals after the first one.

3. Asynchronous but single-threaded: push

push is a middle ground between generate and create which is suitable for processing events from a single producer. It is similar to create in the sense that it can also be asynchronous and can manage backpressure using any of the overflow strategies supported by create. However, only one producing thread may invoke next, complete or error at a time.

Flux<String> bridge = Flux.push(sink -> {
    myEventProcessor.register(
      new SingleThreadEventListener<String>() { (1)

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); (2)
          }
        }

        public void processComplete() {
            sink.complete(); (3)
        }

        public void processError(Throwable e) {
            sink.error(e); (4)
        }
    });
});
1 Bridge to the SingleThreadEventListener API.
2 Events are pushed to the sink using next from a single listener thread.
3 complete event generated from the same listener thread.
4 error event also generated from the same listener thread.

3.1. A hybrid push/pull model

Most Reactor operators, like create, follow a hybrid push/pull model. What we mean by that is that despite most of the processing being asynchronous (suggesting a push approach), there is a small pull component to it: the request.

The consumer pulls data from the source in the sense that it won’t emit anything until first requested. The source pushes data to the consumer whenever it becomes available, but within the bounds of its requested amount.

Note that push() and create() both allow to set up an onRequest consumer in order to manage the request amount and to ensure that data is pushed through the sink only when there is pending request.

Flux<String> bridge = Flux.create(sink -> {
    myMessageProcessor.register(
      new MyMessageListener<String>() {

        public void onMessage(List<String> messages) {
          for(String s : messages) {
            sink.next(s); (3)
          }
        }
    });
    sink.onRequest(n -> {
        List<String> messages = myMessageProcessor.getHistory(n); (1)
        for(String s : messages) {
           sink.next(s); (2)
        }
    });
});
1 Poll for messages when requests are made.
2 If messages are available immediately, push them to the sink.
3 The remaining messages that arrive asynchronously later are also delivered.

3.2. Cleaning up after push() or create()

Two callbacks, onDispose and onCancel, perform any cleanup on cancellation or termination. onDispose can be used to perform cleanup when the Flux completes, errors out, or is cancelled. onCancel can be used to perform any action specific to cancellation prior to cleanup with onDispose.

Flux<String> bridge = Flux.create(sink -> {
    sink.onRequest(n -> channel.poll(n))
        .onCancel(() -> channel.cancel()) (1)
        .onDispose(() -> channel.close())  (2)
    });
1 onCancel is invoked first, for cancel signal only.
2 onDispose is invoked for complete, error, or cancel signals.

4. Handle

The handle method is a bit different: it is an instance method, meaning that it is chained on an existing source (as are the common operators). It is present in both Mono and Flux.

It is close to generate, in the sense that it uses a SynchronousSink and only allows one-by-one emissions. However, handle can be used to generate an arbitrary value out of each source element, possibly skipping some elements. In this way, it can serve as a combination of map and filter. The signature of handle is as follows:

Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);

Let’s consider an example. The reactive streams specification disallows null values in a sequence. What if you want to perform a map but you want to use a preexisting method as the map function, and that method sometimes returns null?

For instance, the following method can be applied safely to a source of integers:

public String alphabet(int letterNumber) {
	if (letterNumber < 1 || letterNumber > 26) {
		return null;
	}
	int letterIndexAscii = 'A' + letterNumber - 1;
	return "" + (char) letterIndexAscii;
}

We can then use handle to remove any nulls:

Using handle for a "map and eliminate nulls" scenario
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
    .handle((i, sink) -> {
        String letter = alphabet(i); (1)
        if (letter != null) (2)
            sink.next(letter); (3)
    });

alphabet.subscribe(System.out::println);
1 Map to letters.
2 If the "map function" returns null…​.
3 Filter it out by not calling sink.next.

Which will print out:

M
I
T