1. Which operator do I need?

In this section, if an operator is specific to Flux or Mono, it is prefixed and linked accordingly, like this: Flux#fromArray. Common operators have no prefix, and links to both implementations are provided, for example: just (Flux|Mono). When a specific use case is covered by a combination of operators, it is presented as a method call, with a leading dot and parameters in parentheses, as follows: .methodCall(parameter).

I want to deal with:

1.1. Creating a New Sequence…​

1.2. Transforming an Existing Sequence

  • I want to transform existing data:

    • on a 1-to-1 basis (eg. strings to their length): map (Flux|Mono)

      • …​by just casting it: cast (Flux|Mono)

      • …​in order to materialize each source value’s index: Flux#index

    • on a 1-to-n basis (eg. strings to their characters): flatMap (Flux|Mono) + use a factory method

    • on a 1-to-n basis with programmatic behavior for each source element and/or state: handle (Flux|Mono)

    • running an asynchronous task for each source item (eg. urls to http request): flatMap (Flux|Mono) + an async Publisher-returning method

      • …​ignoring some data: conditionally return a Mono.empty() in the flatMap lambda

      • …​retaining the original sequence order: Flux#flatMapSequential (this triggers the async processes immediately but reorders the results)

      • …​where the async task can return multiple values, from a Mono source: Mono#flatMapMany

  • I want to add pre-set elements to an existing sequence:

  • I want to aggregate a Flux: (the Flux# prefix is assumed below)

  • I want to combine publishers…​

    • in sequential order: Flux#concat or .concatWith(other) (Flux|Mono)

    • in emission order (combined items emitted as they come): Flux#merge / .mergeWith(other) (Flux|Mono)

    • by pairing values:

    • by coordinating their termination:

      • from 1 Mono and any source into a Mono<Void>: Mono#and

      • from n sources when they all completed: Mono#when

      • into an arbitrary container type:

        • each time all sides have emitted: Flux#zip (up to the smallest cardinality)

        • each time a new value arrives at either side: Flux#combineLatest

    • selecting the first publisher which…​

      • produces a value (onNext): firstWithValue (Flux|Mono)

      • produces any signal: firstWithSignal (Flux|Mono)

    • triggered by the elements in a source sequence: switchMap (each source element is mapped to a Publisher)

    • triggered by the start of the next publisher in a sequence of publishers: switchOnNext

  • I want to repeat an existing sequence: repeat (Flux|Mono)

    • …​but at time intervals: Flux.interval(duration).flatMap(tick → myExistingPublisher)

  • I have an empty sequence but…​

    • I want a value instead: defaultIfEmpty (Flux|Mono)

    • I want another sequence instead: switchIfEmpty (Flux|Mono)

  • I have a sequence but I am not interested in values: ignoreElements (Flux.ignoreElements()|Mono.ignoreElement())

    • …​and I want the completion represented as a Mono<Void>: then (Flux|Mono)

    • …​and I want to wait for another task to complete at the end: thenEmpty (Flux|Mono)

    • …​and I want to switch to another Mono at the end: Mono#then(mono)

    • …​and I want to emit a single value at the end: Mono#thenReturn(T)

    • …​and I want to switch to a Flux at the end: thenMany (Flux|Mono)

  • I have a Mono for which I want to defer completion…​

  • I want to expand elements recursively into a graph of sequences and emit the combination…​

    • …​expanding the graph breadth first: expand(Function) (Flux|Mono)

    • …​expanding the graph depth first: expandDeep(Function) (Flux|Mono)

1.3. Peeking into a Sequence

  • Without modifying the final sequence, I want to:

  • I want to know of all events:

    • each represented as Signal object:

      • in a callback outside the sequence: doOnEach (Flux|Mono)

      • instead of the original onNext emissions: materialize (Flux|Mono)

        • …​and get back to the onNexts: dematerialize (Flux|Mono)

    • as a line in a log: log (Flux|Mono)

1.4. Filtering a Sequence

1.5. Handling Errors

  • I want to create an erroring sequence: error (Flux|Mono)…​

    • …​to replace the completion of a successful Flux: .concat(Flux.error(e))

    • …​to replace the emission of a successful Mono: .then(Mono.error(e))

    • …​if too much time elapses between onNexts: timeout (Flux|Mono)

    • …​lazily: error(Supplier<Throwable>) (Flux|Mono)

  • I want the try/catch equivalent of:

    • throwing: error (Flux|Mono)

    • catching an exception:

      • and falling back to a default value: onErrorReturn (Flux|Mono)

      • and swallowing the error (ie. complete): onErrorComplete (Flux|Mono)

      • and falling back to another Flux or Mono: onErrorResume (Flux|Mono)

      • and wrapping and re-throwing: .onErrorMap(t → new RuntimeException(t)) (Flux|Mono)

    • the finally block: doFinally (Flux|Mono)

    • the using pattern from Java 7: using (Flux|Mono) factory method

  • I want to recover from errors…​

    • by falling back:

    • by retrying…​

      • …​with a simple policy (max number of attempts): retry() (Flux|Mono), retry(long) (Flux|Mono)

      • …​triggered by a companion control Flux: retryWhen (Flux|Mono)

      • …​using a standard backoff strategy (exponential backoff with jitter): retryWhen(Retry.backoff(…​)) (Flux|Mono) (see also other factory methods in Retry)

  • I want to deal with backpressure "errors" (request max from upstream and apply the strategy when downstream does not produce enough request)…​

1.6. Working with Time

1.7. Splitting a Flux

1.8. Going Back to the Synchronous World

Note: all of these methods except Mono#toFuture will throw an UnsupportedOperatorException if called from within a Scheduler marked as "non-blocking only" (by default parallel() and single()).

1.9. Multicasting a Flux to several Subscribers