1. Which operator do I need?
In this section, if an operator is specific to Flux or Mono, it is prefixed and linked accordingly, like this: Flux#fromArray. Common operators have no prefix, and links to both implementations are provided, for example: just (Flux|Mono). When a specific use case is covered by a combination of operators, it is presented as a method call, with a leading dot and parameters in parentheses, as follows: .methodCall(parameter) .
|
I want to deal with:
1.1. Creating a New Sequence…
-
that emits a
T
, and I already have:just
(Flux|Mono)-
…from an Optional<T>: Mono#justOrEmpty(Optional<T>)
-
…from a potentially
null
T: Mono#justOrEmpty(T)
-
-
that emits a
T
returned by a method:just
(Flux|Mono) as well-
…but lazily captured: use Mono#fromSupplier or wrap
just
(Flux|Mono) insidedefer
(Flux|Mono)
-
-
that emits several
T
I can explicitly enumerate: Flux#just(T…) -
that iterates over:
-
an array: Flux#fromArray
-
a collection or iterable: Flux#fromIterable
-
a range of integers: Flux#range
-
a Stream supplied for each Subscription: Flux#fromStream(Supplier<Stream>)
-
-
that emits from various single-valued sources such as:
-
that generates events programmatically (can use state):
-
synchronously and one-by-one: Flux#generate
-
asynchronously (can also be sync), multiple emissions possible in one pass: Flux#create (Mono#create as well, without the multiple emission aspect)
-
1.2. Transforming an Existing Sequence
-
I want to transform existing data:
-
on a 1-to-1 basis (eg. strings to their length):
map
(Flux|Mono)-
…in order to materialize each source value’s index: Flux#index
-
on a 1-to-n basis (eg. strings to their characters):
flatMap
(Flux|Mono) + use a factory method -
on a 1-to-n basis with programmatic behavior for each source element and/or state:
handle
(Flux|Mono) -
running an asynchronous task for each source item (eg. urls to http request):
flatMap
(Flux|Mono) + an async Publisher-returning method-
…ignoring some data: conditionally return a Mono.empty() in the flatMap lambda
-
…retaining the original sequence order: Flux#flatMapSequential (this triggers the async processes immediately but reorders the results)
-
…where the async task can return multiple values, from a Mono source: Mono#flatMapMany
-
-
-
I want to add pre-set elements to an existing sequence:
-
at the start: Flux#startWith(T…)
-
at the end: Flux#concatWithValues(T…)
-
-
I want to aggregate a Flux: (the
Flux#
prefix is assumed below)-
into a List: collectList, collectSortedList
-
into a Map: collectMap, collectMultiMap
-
into an arbitrary container: collect
-
into the size of the sequence: count
-
by applying a function between each element (eg. running sum): reduce
-
…but emitting each intermediary value: scan
-
-
into a boolean value from a predicate:
-
applied to all values (AND): all
-
applied to at least one value (OR): any
-
testing the presence of any value: hasElements (there is a Mono equivalent in hasElement)
-
testing the presence of a specific value: hasElement(T)
-
-
-
I want to combine publishers…
-
in sequential order: Flux#concat or
.concatWith(other)
(Flux|Mono)-
…but delaying any error until remaining publishers have been emitted: Flux#concatDelayError
-
…but eagerly subscribing to subsequent publishers: Flux#mergeSequential
-
-
in emission order (combined items emitted as they come): Flux#merge /
.mergeWith(other)
(Flux|Mono)-
…with different types (transforming merge): Flux#zip / Flux#zipWith
-
-
by pairing values:
-
from 2 Monos into a Tuple2: Mono#zipWith
-
from n Monos when they all completed: Mono#zip
-
-
by coordinating their termination:
-
from 1 Mono and any source into a Mono<Void>: Mono#and
-
from n sources when they all completed: Mono#when
-
into an arbitrary container type:
-
each time all sides have emitted: Flux#zip (up to the smallest cardinality)
-
each time a new value arrives at either side: Flux#combineLatest
-
-
-
selecting the first publisher which…
-
triggered by the elements in a source sequence: switchMap (each source element is mapped to a Publisher)
-
triggered by the start of the next publisher in a sequence of publishers: switchOnNext
-
-
I want to repeat an existing sequence:
repeat
(Flux|Mono)-
…but at time intervals:
Flux.interval(duration).flatMap(tick → myExistingPublisher)
-
-
I have an empty sequence but…
-
I have a sequence but I am not interested in values:
ignoreElements
(Flux.ignoreElements()|Mono.ignoreElement())-
…and I want the completion represented as a Mono<Void>:
then
(Flux|Mono) -
…and I want to wait for another task to complete at the end:
thenEmpty
(Flux|Mono) -
…and I want to switch to another Mono at the end: Mono#then(mono)
-
…and I want to emit a single value at the end: Mono#thenReturn(T)
-
…and I want to switch to a Flux at the end:
thenMany
(Flux|Mono)
-
-
I have a Mono for which I want to defer completion…
-
…until another publisher, which is derived from this value, has completed: Mono#delayUntil(Function)
-
-
I want to expand elements recursively into a graph of sequences and emit the combination…
1.3. Peeking into a Sequence
-
Without modifying the final sequence, I want to:
-
get notified of / execute additional behavior (sometimes referred to as "side-effects") on:
-
completion: Flux#doOnComplete, Mono#doOnSuccess (includes the result, if any)
-
"start" of the sequence:
doFirst
(Flux|Mono)-
this is tied to Publisher#subscribe(Subscriber)
-
-
post-subscription :
doOnSubscribe
(Flux|Mono)-
Subscription
acknowledgment aftersubscribe
-
this is tied to Subscriber#onSubscribe(Subscription)
-
-
any type of signal, represented as a Signal:
doOnEach
(Flux|Mono) -
any terminating condition (complete, error, cancel):
doFinally
(Flux|Mono)
-
-
I want to know of all events:
1.4. Filtering a Sequence
-
I want to filter a sequence:
-
restricting on the type of the emitted objects:
ofType
(Flux|Mono) -
by ignoring the values altogether:
ignoreElements
(Flux.ignoreElements()|Mono.ignoreElement()) -
by ignoring duplicates:
-
in the whole sequence (logical set): Flux#distinct
-
between subsequently emitted items (deduplication): Flux#distinctUntilChanged
-
-
I want to keep only a subset of the sequence:
-
by taking N elements:
-
at the beginning of the sequence: Flux#take(long)
-
…requesting an unbounded amount from upstream: Flux#take(long, false)
-
…based on a duration: Flux#take(Duration)
-
…only the first element, as a Mono: Flux#next()
-
-
at the end of the sequence: Flux#takeLast
-
until a criteria is met (inclusive): Flux#takeUntil (predicate-based), Flux#takeUntilOther (companion publisher-based)
-
while a criteria is met (exclusive): Flux#takeWhile
-
-
by taking at most 1 element:
-
at a specific position: Flux#elementAt
-
at the end: .takeLast(1)
-
…and emit an error if empty: Flux#last()
-
…and emit a default value if empty: Flux#last(T)
-
-
-
by skipping elements:
-
at the beginning of the sequence: Flux#skip(long)
-
…based on a duration: Flux#skip(Duration)
-
-
at the end of the sequence: Flux#skipLast
-
until a criteria is met (inclusive): Flux#skipUntil (predicate-based), Flux#skipUntilOther (companion publisher-based)
-
while a criteria is met (exclusive): Flux#skipWhile
-
-
by sampling items:
-
by duration: Flux#sample(Duration)
-
but keeping the first element in the sampling window instead of the last: sampleFirst
-
-
by a publisher-based window: Flux#sample(Publisher)
-
based on a publisher "timing out": Flux#sampleTimeout (each element triggers a publisher, and is emitted if that publisher does not overlap with the next)
-
-
-
I expect at most 1 element (error if more than one)…
-
and I want an error if the sequence is empty: Flux#single()
-
and I want a default value if the sequence is empty: Flux#single(T)
-
and I accept an empty sequence as well: Flux#singleOrEmpty
-
1.5. Handling Errors
-
I want the try/catch equivalent of:
-
I want to recover from errors…
-
by falling back:
-
to a completion ("swallowing" the error):
onErrorComplete
(Flux|Mono) -
to a Publisher or Mono, possibly different ones depending on the error: Flux#onErrorResume and Mono#onErrorResume
-
by retrying…
-
…with a simple policy (max number of attempts):
retry()
(Flux|Mono),retry(long)
(Flux|Mono) -
…triggered by a companion control Flux:
retryWhen
(Flux|Mono) -
…using a standard backoff strategy (exponential backoff with jitter):
retryWhen(Retry.backoff(…))
(Flux|Mono) (see also other factory methods in Retry)
-
-
-
I want to deal with backpressure "errors" (request max from upstream and apply the strategy when downstream does not produce enough request)…
-
by throwing a special IllegalStateException: Flux#onBackpressureError
-
by dropping excess values: Flux#onBackpressureDrop
-
…except the last one seen: Flux#onBackpressureLatest
-
-
by buffering excess values (bounded or unbounded): Flux#onBackpressureBuffer
-
…and applying a strategy when bounded buffer also overflows: Flux#onBackpressureBuffer with a BufferOverflowStrategy
-
-
1.6. Working with Time
-
I want to associate emissions with a timing measured…
-
…with best available precision and versatility of provided data:
timed
(Flux|Mono)-
Timed<T>#elapsed() for Duration since last
onNext
-
Timed<T>#timestamp() for Instant representation of the epoch timestamp (milliseconds resolution)
-
Timed<T>#elapsedSinceSubcription() for Duration since subscription (rather than last onNext)
-
can have nanoseconds resolution for elapsed Durations
-
-
…as a (legacy) Tuple2<Long, T>…
-
-
I want my sequence to be interrupted if there is too much delay between emissions:
timeout
(Flux|Mono) -
I want to get ticks from a clock, regular time intervals: Flux#interval
-
I want to emit a single
0
after an initial delay: static Mono.delay. -
I want to introduce a delay:
-
between each onNext signal: Mono#delayElement, Flux#delayElements
-
before the subscription happens:
delaySubscription
(Flux|Mono)
-
1.7. Splitting a Flux
-
I want to split a Flux<T> into a
Flux<Flux<T>>
, by a boundary criteria:-
of size: window(int)
-
…with overlapping or dropping windows: window(int, int)
-
-
of time window(Duration)
-
…with overlapping or dropping windows: window(Duration, Duration)
-
-
of size OR time (window closes when count is reached or timeout elapsed): windowTimeout(int, Duration)
-
based on a predicate on elements: windowUntil
-
……emitting the element that triggered the boundary in the next window (
cutBefore
variant): .windowUntil(predicate, true) -
…keeping the window open while elements match a predicate: windowWhile (non-matching elements are not emitted)
-
-
driven by an arbitrary boundary represented by onNexts in a control Publisher: window(Publisher), windowWhen
-
-
I want to split a Flux<T> and buffer elements within boundaries together…
-
into List:
-
by a size boundary: buffer(int)
-
…with overlapping or dropping buffers: buffer(int, int)
-
-
by a duration boundary: buffer(Duration)
-
…with overlapping or dropping buffers: buffer(Duration, Duration)
-
-
by a size OR duration boundary: bufferTimeout(int, Duration)
-
by an arbitrary criteria boundary: bufferUntil(Predicate)
-
…putting the element that triggered the boundary in the next buffer: .bufferUntil(predicate, true)
-
…buffering while predicate matches and dropping the element that triggered the boundary: bufferWhile(Predicate)
-
-
driven by an arbitrary boundary represented by onNexts in a control Publisher: buffer(Publisher), bufferWhen
-
-
into an arbitrary "collection" type
C
: use variants like buffer(int, Supplier<C>)
-
-
I want to split a Flux<T> so that element that share a characteristic end up in the same sub-flux: groupBy(Function<T,K>) TIP: Note that this returns a
Flux<GroupedFlux<K, T>>
, each inner GroupedFlux shares the sameK
key accessible through key().
1.8. Going Back to the Synchronous World
Note: all of these methods except Mono#toFuture will throw an UnsupportedOperatorException if called from within a Scheduler marked as "non-blocking only" (by default parallel() and single()).
-
I have a Flux<T> and I want to:
-
block until I can get the first element: Flux#blockFirst
-
…with a timeout: Flux#blockFirst(Duration)
-
-
block until I can get the last element (or null if empty): Flux#blockLast
-
…with a timeout: Flux#blockLast(Duration)
-
-
synchronously switch to an Iterable<T>: Flux#toIterable
-
synchronously switch to a Java 8 Stream<T>: Flux#toStream
-
-
I have a Mono<T> and I want:
-
to block until I can get the value: Mono#block
-
…with a timeout: Mono#block(Duration)
-
-
1.9. Multicasting a Flux to several Subscribers
-
I want to connect multiple Subscriber to a Flux:
-
and decide when to trigger the source with connect(): publish() (returns a ConnectableFlux)
-
and trigger the source immediately (late subscribers see later data):
share()
(Flux|Mono) -
and permanently connect the source when enough subscribers have registered: .publish().autoConnect(n)
-
and automatically connect and cancel the source when subscribers go above/below the threshold: .publish().refCount(n)
-
…but giving a chance for new subscribers to come in before cancelling: .publish().refCount(n, Duration)
-
-
-
I want to cache data from a Publisher and replay it to later subscribers:
-
up to
n
elements: cache(int) -
caching latest elements seen within a Duration (Time-To-Live):
cache(Duration)
(Flux|Mono)-
…but retain no more than
n
elements: cache(int, Duration)
-
-
but without immediately triggering the source: Flux#replay (returns a ConnectableFlux)
-