1. Which operator do I need?
In this section, if an operator is specific to Flux or Mono, it is prefixed and linked accordingly, like this: Flux#fromArray. Common operators have no prefix, and links to both implementations are provided, for example: just (Flux|Mono). When a specific use case is covered by a combination of operators, it is presented as a method call, with a leading dot and parameters in parentheses, as follows: .methodCall(parameter).
 | 
I want to deal with:
1.1. Creating a New Sequence…
- 
that emits a
T, and I already have:just(Flux|Mono)- 
…from an Optional<T>: Mono#justOrEmpty(Optional<T>)
 - 
…from a potentially
nullT: Mono#justOrEmpty(T) 
 - 
 - 
that emits a
Treturned by a method:just(Flux|Mono) as well- 
…but lazily captured: use Mono#fromSupplier or wrap
just(Flux|Mono) insidedefer(Flux|Mono) 
 - 
 - 
that emits several
TI can explicitly enumerate: Flux#just(T…) - 
that iterates over:
- 
an array: Flux#fromArray
 - 
a collection or iterable: Flux#fromIterable
 - 
a range of integers: Flux#range
 - 
a Stream supplied for each Subscription: Flux#fromStream(Supplier<Stream>)
 
 - 
 - 
that emits from various single-valued sources such as:
 - 
that generates events programmatically (can use state):
- 
synchronously and one-by-one: Flux#generate
 - 
asynchronously (can also be sync), multiple emissions possible in one pass: Flux#create (Mono#create as well, without the multiple emission aspect)
 
 - 
 
1.2. Transforming an Existing Sequence
- 
I want to transform existing data:
- 
on a 1-to-1 basis (eg. strings to their length):
map(Flux|Mono)- 
…in order to materialize each source value’s index: Flux#index
 
 - 
on a 1-to-n basis (eg. strings to their characters):
flatMap(Flux|Mono) + use a factory method - 
on a 1-to-n basis with programmatic behavior for each source element and/or state:
handle(Flux|Mono) - 
running an asynchronous task for each source item (eg. urls to http request):
flatMap(Flux|Mono) + an async Publisher-returning method- 
…ignoring some data: conditionally return a Mono.empty() in the flatMap lambda
 - 
…retaining the original sequence order: Flux#flatMapSequential (this triggers the async processes immediately but reorders the results)
 - 
…where the async task can return multiple values, from a Mono source: Mono#flatMapMany
 
 - 
 
 - 
 - 
I want to add pre-set elements to an existing sequence:
- 
at the start: Flux#startWith(T…)
 - 
at the end: Flux#concatWithValues(T…)
 
 - 
 - 
I want to aggregate a Flux: (the
Flux#prefix is assumed below)- 
into a List: collectList, collectSortedList
 - 
into a Map: collectMap, collectMultiMap
 - 
into an arbitrary container: collect
 - 
into the size of the sequence: count
 - 
by applying a function between each element (eg. running sum): reduce
- 
…but emitting each intermediary value: scan
 
 - 
 - 
into a boolean value from a predicate:
- 
applied to all values (AND): all
 - 
applied to at least one value (OR): any
 - 
testing the presence of any value: hasElements (there is a Mono equivalent in hasElement)
 - 
testing the presence of a specific value: hasElement(T)
 
 - 
 
 - 
 - 
I want to combine publishers…
- 
in sequential order: Flux#concat or
.concatWith(other)(Flux|Mono)- 
…but delaying any error until remaining publishers have been emitted: Flux#concatDelayError
 - 
…but eagerly subscribing to subsequent publishers: Flux#mergeSequential
 
 - 
 - 
in emission order (combined items emitted as they come): Flux#merge /
.mergeWith(other)(Flux|Mono)- 
…with different types (transforming merge): Flux#zip / Flux#zipWith
 
 - 
 - 
by pairing values:
- 
from 2 Monos into a Tuple2: Mono#zipWith
 - 
from n Monos when they all completed: Mono#zip
 
 - 
 - 
by coordinating their termination:
- 
from 1 Mono and any source into a Mono<Void>: Mono#and
 - 
from n sources when they all completed: Mono#when
 - 
into an arbitrary container type:
- 
each time all sides have emitted: Flux#zip (up to the smallest cardinality)
 - 
each time a new value arrives at either side: Flux#combineLatest
 
 - 
 
 - 
 - 
selecting the first publisher which…
 - 
triggered by the elements in a source sequence: switchMap (each source element is mapped to a Publisher)
 - 
triggered by the start of the next publisher in a sequence of publishers: switchOnNext
 
 - 
 - 
I want to repeat an existing sequence:
repeat(Flux|Mono)- 
…but at time intervals:
Flux.interval(duration).flatMap(tick → myExistingPublisher) 
 - 
 - 
I have an empty sequence but…
 - 
I have a sequence but I am not interested in values:
ignoreElements(Flux.ignoreElements()|Mono.ignoreElement())- 
…and I want the completion represented as a Mono<Void>:
then(Flux|Mono) - 
…and I want to wait for another task to complete at the end:
thenEmpty(Flux|Mono) - 
…and I want to switch to another Mono at the end: Mono#then(mono)
 - 
…and I want to emit a single value at the end: Mono#thenReturn(T)
 - 
…and I want to switch to a Flux at the end:
thenMany(Flux|Mono) 
 - 
 - 
I have a Mono for which I want to defer completion…
- 
…until another publisher, which is derived from this value, has completed: Mono#delayUntil(Function)
 
 - 
 - 
I want to expand elements recursively into a graph of sequences and emit the combination…
 
1.3. Peeking into a Sequence
- 
Without modifying the final sequence, I want to:
- 
get notified of / execute additional behavior (sometimes referred to as "side-effects") on:
- 
completion: Flux#doOnComplete, Mono#doOnSuccess (includes the result, if any)
 - 
"start" of the sequence:
doFirst(Flux|Mono)- 
this is tied to Publisher#subscribe(Subscriber)
 
 - 
 - 
post-subscription :
doOnSubscribe(Flux|Mono)- 
Subscriptionacknowledgment aftersubscribe - 
this is tied to Subscriber#onSubscribe(Subscription)
 
 - 
 - 
any type of signal, represented as a Signal:
doOnEach(Flux|Mono) - 
any terminating condition (complete, error, cancel):
doFinally(Flux|Mono) 
 
 - 
 - 
I want to know of all events:
 
1.4. Filtering a Sequence
- 
I want to filter a sequence:
- 
restricting on the type of the emitted objects:
ofType(Flux|Mono) - 
by ignoring the values altogether:
ignoreElements(Flux.ignoreElements()|Mono.ignoreElement()) - 
by ignoring duplicates:
- 
in the whole sequence (logical set): Flux#distinct
 - 
between subsequently emitted items (deduplication): Flux#distinctUntilChanged
 
 - 
 
 - 
I want to keep only a subset of the sequence:
- 
by taking N elements:
- 
at the beginning of the sequence: Flux#take(long)
- 
…requesting an unbounded amount from upstream: Flux#take(long, false)
 - 
…based on a duration: Flux#take(Duration)
 - 
…only the first element, as a Mono: Flux#next()
 
 - 
 - 
at the end of the sequence: Flux#takeLast
 - 
until a criteria is met (inclusive): Flux#takeUntil (predicate-based), Flux#takeUntilOther (companion publisher-based)
 - 
while a criteria is met (exclusive): Flux#takeWhile
 
 - 
 - 
by taking at most 1 element:
- 
at a specific position: Flux#elementAt
 - 
at the end: .takeLast(1)
- 
…and emit an error if empty: Flux#last()
 - 
…and emit a default value if empty: Flux#last(T)
 
 - 
 
 - 
 - 
by skipping elements:
- 
at the beginning of the sequence: Flux#skip(long)
- 
…based on a duration: Flux#skip(Duration)
 
 - 
 - 
at the end of the sequence: Flux#skipLast
 - 
until a criteria is met (inclusive): Flux#skipUntil (predicate-based), Flux#skipUntilOther (companion publisher-based)
 - 
while a criteria is met (exclusive): Flux#skipWhile
 
 - 
 - 
by sampling items:
- 
by duration: Flux#sample(Duration)
- 
but keeping the first element in the sampling window instead of the last: sampleFirst
 
 - 
 - 
by a publisher-based window: Flux#sample(Publisher)
 - 
based on a publisher "timing out": Flux#sampleTimeout (each element triggers a publisher, and is emitted if that publisher does not overlap with the next)
 
 - 
 
 - 
 - 
I expect at most 1 element (error if more than one)…
- 
and I want an error if the sequence is empty: Flux#single()
 - 
and I want a default value if the sequence is empty: Flux#single(T)
 - 
and I accept an empty sequence as well: Flux#singleOrEmpty
 
 - 
 
1.5. Handling Errors
- 
I want the try/catch equivalent of:
 - 
I want to recover from errors…
- 
by falling back:
- 
to a completion ("swallowing" the error):
onErrorComplete(Flux|Mono) - 
to a Publisher or Mono, possibly different ones depending on the error: Flux#onErrorResume and Mono#onErrorResume
 
 - 
by retrying…
- 
…with a simple policy (max number of attempts):
retry()(Flux|Mono),retry(long)(Flux|Mono) - 
…triggered by a companion control Flux:
retryWhen(Flux|Mono) - 
…using a standard backoff strategy (exponential backoff with jitter):
retryWhen(Retry.backoff(…))(Flux|Mono) (see also other factory methods in Retry) 
 - 
 
 - 
 - 
I want to deal with backpressure "errors" (request max from upstream and apply the strategy when downstream does not produce enough request)…
- 
by throwing a special IllegalStateException: Flux#onBackpressureError
 - 
by dropping excess values: Flux#onBackpressureDrop
- 
…except the last one seen: Flux#onBackpressureLatest
 
 - 
 - 
by buffering excess values (bounded or unbounded): Flux#onBackpressureBuffer
- 
…and applying a strategy when bounded buffer also overflows: Flux#onBackpressureBuffer with a BufferOverflowStrategy
 
 - 
 
 - 
 
1.6. Working with Time
- 
I want to associate emissions with a timing measured…
- 
…with best available precision and versatility of provided data:
timed(Flux|Mono)- 
Timed<T>#elapsed() for Duration since last
onNext - 
Timed<T>#timestamp() for Instant representation of the epoch timestamp (milliseconds resolution)
 - 
Timed<T>#elapsedSinceSubcription() for Duration since subscription (rather than last onNext)
 - 
can have nanoseconds resolution for elapsed Durations
 
 - 
 - 
…as a (legacy) Tuple2<Long, T>…
 
 - 
 - 
I want my sequence to be interrupted if there is too much delay between emissions:
timeout(Flux|Mono) - 
I want to get ticks from a clock, regular time intervals: Flux#interval
 - 
I want to emit a single
0after an initial delay: static Mono.delay. - 
I want to introduce a delay:
- 
between each onNext signal: Mono#delayElement, Flux#delayElements
 - 
before the subscription happens:
delaySubscription(Flux|Mono) 
 - 
 
1.7. Splitting a Flux
- 
I want to split a Flux<T> into a
Flux<Flux<T>>, by a boundary criteria:- 
of size: window(int)
- 
…with overlapping or dropping windows: window(int, int)
 
 - 
 - 
of time window(Duration)
- 
…with overlapping or dropping windows: window(Duration, Duration)
 
 - 
 - 
of size OR time (window closes when count is reached or timeout elapsed): windowTimeout(int, Duration)
 - 
based on a predicate on elements: windowUntil
- 
……emitting the element that triggered the boundary in the next window (
cutBeforevariant): .windowUntil(predicate, true) - 
…keeping the window open while elements match a predicate: windowWhile (non-matching elements are not emitted)
 
 - 
 - 
driven by an arbitrary boundary represented by onNexts in a control Publisher: window(Publisher), windowWhen
 
 - 
 - 
I want to split a Flux<T> and buffer elements within boundaries together…
- 
into List:
- 
by a size boundary: buffer(int)
- 
…with overlapping or dropping buffers: buffer(int, int)
 
 - 
 - 
by a duration boundary: buffer(Duration)
- 
…with overlapping or dropping buffers: buffer(Duration, Duration)
 
 - 
 - 
by a size OR duration boundary: bufferTimeout(int, Duration)
 - 
by an arbitrary criteria boundary: bufferUntil(Predicate)
- 
…putting the element that triggered the boundary in the next buffer: .bufferUntil(predicate, true)
 - 
…buffering while predicate matches and dropping the element that triggered the boundary: bufferWhile(Predicate)
 
 - 
 - 
driven by an arbitrary boundary represented by onNexts in a control Publisher: buffer(Publisher), bufferWhen
 
 - 
 - 
into an arbitrary "collection" type
C: use variants like buffer(int, Supplier<C>) 
 - 
 - 
I want to split a Flux<T> so that element that share a characteristic end up in the same sub-flux: groupBy(Function<T,K>) TIP: Note that this returns a
Flux<GroupedFlux<K, T>>, each inner GroupedFlux shares the sameKkey accessible through key(). 
1.8. Going Back to the Synchronous World
Note: all of these methods except Mono#toFuture will throw an UnsupportedOperatorException if called from within a Scheduler marked as "non-blocking only" (by default parallel() and single()).
- 
I have a Flux<T> and I want to:
- 
block until I can get the first element: Flux#blockFirst
- 
…with a timeout: Flux#blockFirst(Duration)
 
 - 
 - 
block until I can get the last element (or null if empty): Flux#blockLast
- 
…with a timeout: Flux#blockLast(Duration)
 
 - 
 - 
synchronously switch to an Iterable<T>: Flux#toIterable
 - 
synchronously switch to a Java 8 Stream<T>: Flux#toStream
 
 - 
 - 
I have a Mono<T> and I want:
- 
to block until I can get the value: Mono#block
- 
…with a timeout: Mono#block(Duration)
 
 - 
 
 - 
 
1.9. Multicasting a Flux to several Subscribers
- 
I want to connect multiple Subscriber to a Flux:
- 
and decide when to trigger the source with connect(): publish() (returns a ConnectableFlux)
 - 
and trigger the source immediately (late subscribers see later data):
share()(Flux|Mono) - 
and permanently connect the source when enough subscribers have registered: .publish().autoConnect(n)
 - 
and automatically connect and cancel the source when subscribers go above/below the threshold: .publish().refCount(n)
- 
…but giving a chance for new subscribers to come in before cancelling: .publish().refCount(n, Duration)
 
 - 
 
 - 
 - 
I want to cache data from a Publisher and replay it to later subscribers:
- 
up to
nelements: cache(int) - 
caching latest elements seen within a Duration (Time-To-Live):
cache(Duration)(Flux|Mono)- 
…but retain no more than
nelements: cache(int, Duration) 
 - 
 - 
but without immediately triggering the source: Flux#replay (returns a ConnectableFlux)
 
 -