Which operator do I need?
| In this section, if an operator is specific to Flux or Mono, it is prefixed and linked accordingly, like this: Flux#fromArray. Common operators have no prefix, and links to both implementations are provided, for example: just(Flux|Mono). When a specific use case is covered by a combination of operators, it is presented as a method call, with a leading dot and parameters in parentheses, as follows:.methodCall(parameter). | 
I want to deal with:
1. Creating a New Sequence…
- 
that emits a T, and I already have:just(Flux|Mono)- 
…from an Optional<T>: Mono#justOrEmpty(Optional<T>) 
- 
…from a potentially nullT: Mono#justOrEmpty(T)
 
- 
- 
that emits a Treturned by a method:just(Flux|Mono) as well- 
…but lazily captured: use Mono#fromSupplier or wrap just(Flux|Mono) insidedefer(Flux|Mono)
 
- 
- 
that emits several TI can explicitly enumerate: Flux#just(T…)
- 
that iterates over: - 
an array: Flux#fromArray 
- 
a collection or iterable: Flux#fromIterable 
- 
a range of integers: Flux#range 
- 
a Stream supplied for each Subscription: Flux#fromStream(Supplier<Stream>) 
 
- 
- 
that emits from various single-valued sources such as: 
- 
that generates events programmatically (can use state): - 
synchronously and one-by-one: Flux#generate 
- 
asynchronously (can also be sync), multiple emissions possible in one pass: Flux#create (Mono#create as well, without the multiple emission aspect) 
 
- 
2. Transforming an Existing Sequence
- 
I want to transform existing data: - 
on a 1-to-1 basis (eg. strings to their length): map(Flux|Mono)
- 
…in order to materialize each source value’s index: Flux#index 
 
- 
on a 1-to-n basis (eg. strings to their characters): flatMap(Flux|Mono) + use a factory method
- 
on a 1-to-n basis with programmatic behavior for each source element and/or state: handle(Flux|Mono)
- 
running an asynchronous task for each source item (eg. urls to http request): flatMap(Flux|Mono) + an async Publisher-returning method- 
…ignoring some data: conditionally return a Mono.empty() in the flatMap lambda 
- 
…retaining the original sequence order: Flux#flatMapSequential (this triggers the async processes immediately but reorders the results) 
- 
…where the async task can return multiple values, from a Mono source: Mono#flatMapMany 
 
- 
 
- 
- 
I want to add pre-set elements to an existing sequence: - 
at the start: Flux#startWith(T…) 
- 
at the end: Flux#concatWithValues(T…) 
 
- 
- 
I want to aggregate a Flux: (the Flux#prefix is assumed below)- 
into a List: collectList, collectSortedList 
- 
into a Map: collectMap, collectMultiMap 
- 
into an arbitrary container: collect 
- 
into the size of the sequence: count 
- 
by applying a function between each element (eg. running sum): reduce - 
…but emitting each intermediary value: scan 
 
- 
- 
into a boolean value from a predicate: - 
applied to all values (AND): all 
- 
applied to at least one value (OR): any 
- 
testing the presence of any value: hasElements (there is a Mono equivalent in hasElement) 
- 
testing the presence of a specific value: hasElement(T) 
 
- 
 
- 
- 
I want to combine publishers… - 
in sequential order: Flux#concat or .concatWith(other)(Flux|Mono)- 
…but delaying any error until remaining publishers have been emitted: Flux#concatDelayError 
- 
…but eagerly subscribing to subsequent publishers: Flux#mergeSequential 
 
- 
- 
in emission order (combined items emitted as they come): Flux#merge / .mergeWith(other)(Flux|Mono)- 
…with different types (transforming merge): Flux#zip / Flux#zipWith 
 
- 
- 
by pairing values: - 
from 2 Monos into a Tuple2: Mono#zipWith 
- 
from n Monos when they all completed: Mono#zip 
 
- 
- 
by coordinating their termination: - 
from 1 Mono and any source into a Mono<Void>: Mono#and 
- 
from n sources when they all completed: Mono#when 
- 
into an arbitrary container type: - 
each time all sides have emitted: Flux#zip (up to the smallest cardinality) 
- 
each time a new value arrives at either side: Flux#combineLatest 
 
- 
 
- 
- 
selecting the first publisher which… 
- 
triggered by the elements in a source sequence: switchMap (each source element is mapped to a Publisher) 
- 
triggered by the start of the next publisher in a sequence of publishers: switchOnNext 
 
- 
- 
I want to repeat an existing sequence: repeat(Flux|Mono)- 
…but at time intervals: Flux.interval(duration).flatMap(tick → myExistingPublisher)
 
- 
- 
I have an empty sequence but… 
- 
I have a sequence but I am not interested in values: ignoreElements(Flux.ignoreElements()|Mono.ignoreElement())- 
…and I want the completion represented as a Mono<Void>: then(Flux|Mono)
- 
…and I want to wait for another task to complete at the end: thenEmpty(Flux|Mono)
- 
…and I want to switch to another Mono at the end: Mono#then(mono) 
- 
…and I want to emit a single value at the end: Mono#thenReturn(T) 
- 
…and I want to switch to a Flux at the end: thenMany(Flux|Mono)
 
- 
- 
I have a Mono for which I want to defer completion… - 
…until another publisher, which is derived from this value, has completed: Mono#delayUntil(Function) 
 
- 
- 
I want to expand elements recursively into a graph of sequences and emit the combination… 
3. Peeking into a Sequence
- 
Without modifying the final sequence, I want to: - 
get notified of / execute additional behavior (sometimes referred to as "side-effects") on: 
- 
completion: Flux#doOnComplete, Mono#doOnSuccess (includes the result, if any) 
- 
"start" of the sequence: doFirst(Flux|Mono)- 
this is tied to Publisher#subscribe(Subscriber) 
 
- 
- 
post-subscription : doOnSubscribe(Flux|Mono)- 
Subscriptionacknowledgment aftersubscribe
- 
this is tied to Subscriber#onSubscribe(Subscription) 
 
- 
- 
any type of signal, represented as a Signal: doOnEach(Flux|Mono)
- 
any terminating condition (complete, error, cancel): doFinally(Flux|Mono)
 
 
- 
- 
I want to know of all events: 
4. Filtering a Sequence
- 
I want to filter a sequence: 
- 
restricting on the type of the emitted objects: ofType(Flux|Mono)
- 
by ignoring the values altogether: ignoreElements(Flux.ignoreElements()|Mono.ignoreElement())
- 
by ignoring duplicates: - 
in the whole sequence (logical set): Flux#distinct 
- 
between subsequently emitted items (deduplication): Flux#distinctUntilChanged 
 
- 
 
- 
I want to keep only a subset of the sequence: - 
by taking N elements: - 
at the beginning of the sequence: Flux#take(long) - 
…requesting an unbounded amount from upstream: Flux#take(long, false) 
- 
…based on a duration: Flux#take(Duration) 
- 
…only the first element, as a Mono: Flux#next() 
 
- 
- 
at the end of the sequence: Flux#takeLast 
- 
until a criteria is met (inclusive): Flux#takeUntil (predicate-based), Flux#takeUntilOther (companion publisher-based) 
- 
while a criteria is met (exclusive): Flux#takeWhile 
 
- 
- 
by taking at most 1 element: - 
at a specific position: Flux#elementAt 
- 
at the end: .takeLast(1) - 
…and emit an error if empty: Flux#last() 
- 
…and emit a default value if empty: Flux#last(T) 
 
- 
 
- 
- 
by skipping elements: - 
at the beginning of the sequence: Flux#skip(long) - 
…based on a duration: Flux#skip(Duration) 
 
- 
- 
at the end of the sequence: Flux#skipLast 
- 
until a criteria is met (inclusive): Flux#skipUntil (predicate-based), Flux#skipUntilOther (companion publisher-based) 
- 
while a criteria is met (exclusive): Flux#skipWhile 
 
- 
- 
by sampling items: - 
by duration: Flux#sample(Duration) - 
but keeping the first element in the sampling window instead of the last: sampleFirst 
 
- 
- 
by a publisher-based window: Flux#sample(Publisher) 
- 
based on a publisher "timing out": Flux#sampleTimeout (each element triggers a publisher, and is emitted if that publisher does not overlap with the next) 
 
- 
 
- 
- 
I expect at most 1 element (error if more than one)… - 
and I want an error if the sequence is empty: Flux#single() 
- 
and I want a default value if the sequence is empty: Flux#single(T) 
- 
and I accept an empty sequence as well: Flux#singleOrEmpty 
 
- 
5. Handling Errors
- 
I want the try/catch equivalent of: 
- 
I want to recover from errors… - 
by falling back: 
- 
to a completion ("swallowing" the error): onErrorComplete(Flux|Mono)
- 
to a Publisher or Mono, possibly different ones depending on the error: Flux#onErrorResume and Mono#onErrorResume 
 
- 
by retrying… - 
…with a simple policy (max number of attempts): retry()(Flux|Mono),retry(long)(Flux|Mono)
- 
…triggered by a companion control Flux: retryWhen(Flux|Mono)
- 
…using a standard backoff strategy (exponential backoff with jitter): retryWhen(Retry.backoff(…))(Flux|Mono) (see also other factory methods in Retry)
 
- 
 
- 
- 
I want to deal with backpressure "errors" (request max from upstream and apply the strategy when downstream does not produce enough request)… - 
by throwing a special IllegalStateException: Flux#onBackpressureError 
- 
by dropping excess values: Flux#onBackpressureDrop - 
…except the last one seen: Flux#onBackpressureLatest 
 
- 
- 
by buffering excess values (bounded or unbounded): Flux#onBackpressureBuffer - 
…and applying a strategy when bounded buffer also overflows: Flux#onBackpressureBuffer with a BufferOverflowStrategy 
 
- 
 
- 
6. Working with Time
- 
I want to associate emissions with a timing measured… - 
…with best available precision and versatility of provided data: timed(Flux|Mono)- 
Timed<T>#elapsed() for Duration since last onNext
- 
Timed<T>#timestamp() for Instant representation of the epoch timestamp (milliseconds resolution) 
- 
Timed<T>#elapsedSinceSubcription() for Duration since subscription (rather than last onNext) 
- 
can have nanoseconds resolution for elapsed Durations 
 
- 
- 
…as a (legacy) Tuple2<Long, T>… 
 
- 
- 
I want my sequence to be interrupted if there is too much delay between emissions: timeout(Flux|Mono)
- 
I want to get ticks from a clock, regular time intervals: Flux#interval 
- 
I want to emit a single 0after an initial delay: static Mono.delay.
- 
I want to introduce a delay: - 
between each onNext signal: Mono#delayElement, Flux#delayElements 
- 
before the subscription happens: delaySubscription(Flux|Mono)
 
- 
7. Splitting a Flux
- 
I want to split a Flux<T> into a Flux<Flux<T>>, by a boundary criteria:- 
of size: window(int) - 
…with overlapping or dropping windows: window(int, int) 
 
- 
- 
of time window(Duration) - 
…with overlapping or dropping windows: window(Duration, Duration) 
 
- 
- 
of size OR time (window closes when count is reached or timeout elapsed): windowTimeout(int, Duration) 
- 
based on a predicate on elements: windowUntil - 
……emitting the element that triggered the boundary in the next window ( cutBeforevariant): .windowUntil(predicate, true)
- 
…keeping the window open while elements match a predicate: windowWhile (non-matching elements are not emitted) 
 
- 
- 
driven by an arbitrary boundary represented by onNexts in a control Publisher: window(Publisher), windowWhen 
 
- 
- 
I want to split a Flux<T> and buffer elements within boundaries together… - 
into List: - 
by a size boundary: buffer(int) - 
…with overlapping or dropping buffers: buffer(int, int) 
 
- 
- 
by a duration boundary: buffer(Duration) - 
…with overlapping or dropping buffers: buffer(Duration, Duration) 
 
- 
- 
by a size OR duration boundary: bufferTimeout(int, Duration) 
- 
by an arbitrary criteria boundary: bufferUntil(Predicate) - 
…putting the element that triggered the boundary in the next buffer: .bufferUntil(predicate, true) 
- 
…buffering while predicate matches and dropping the element that triggered the boundary: bufferWhile(Predicate) 
 
- 
- 
driven by an arbitrary boundary represented by onNexts in a control Publisher: buffer(Publisher), bufferWhen 
 
- 
- 
into an arbitrary "collection" type C: use variants like buffer(int, Supplier<C>)
 
- 
- 
I want to split a Flux<T> so that element that share a characteristic end up in the same sub-flux: groupBy(Function<T,K>) TIP: Note that this returns a Flux<GroupedFlux<K, T>>, each inner GroupedFlux shares the sameKkey accessible through key().
8. Going Back to the Synchronous World
Note: all of these methods except Mono#toFuture will throw an UnsupportedOperatorException if called from within a Scheduler marked as "non-blocking only" (by default parallel() and single()).
- 
I have a Flux<T> and I want to: - 
block until I can get the first element: Flux#blockFirst - 
…with a timeout: Flux#blockFirst(Duration) 
 
- 
- 
block until I can get the last element (or null if empty): Flux#blockLast - 
…with a timeout: Flux#blockLast(Duration) 
 
- 
- 
synchronously switch to an Iterable<T>: Flux#toIterable 
- 
synchronously switch to a Java 8 Stream<T>: Flux#toStream 
 
- 
- 
I have a Mono<T> and I want: - 
to block until I can get the value: Mono#block - 
…with a timeout: Mono#block(Duration) 
 
- 
 
- 
9. Multicasting a Flux to several Subscribers
- 
I want to connect multiple Subscriber to a Flux: - 
and decide when to trigger the source with connect(): publish() (returns a ConnectableFlux) 
- 
and trigger the source immediately (late subscribers see later data): share()(Flux|Mono)
- 
and permanently connect the source when enough subscribers have registered: .publish().autoConnect(n) 
- 
and automatically connect and cancel the source when subscribers go above/below the threshold: .publish().refCount(n) - 
…but giving a chance for new subscribers to come in before cancelling: .publish().refCount(n, Duration) 
 
- 
 
- 
- 
I want to cache data from a Publisher and replay it to later subscribers: - 
up to nelements: cache(int)
- 
caching latest elements seen within a Duration (Time-To-Live): cache(Duration)(Flux|Mono)- 
…but retain no more than nelements: cache(int, Duration)
 
- 
- 
but without immediately triggering the source: Flux#replay (returns a ConnectableFlux) 
 
-