T - the element type of this Reactive Streams Publisherpublic abstract class Flux<T> extends Object implements Publisher<T>
Publisher with rx operators that emits 0 to N elements, and then completes
(successfully or with an error).
It is intended to be used in implementations and return types. Input parameters should keep using raw
Publisher as much as possible.
If it is known that the underlying Publisher will emit 0 or 1 element, Mono should be used
instead.
Note that using state in the java.util.function / lambdas used within Flux operators
should be avoided, as these may be shared between several Subscribers.
subscribe(CoreSubscriber) is an internal extension to
subscribe(Subscriber) used internally for Context passing. User
provided Subscriber may
be passed to this "subscribe" extension but will loose the available
per-subscribe @link Hooks#onLastOperator}.
Mono| Constructor and Description |
|---|
Flux() |
| Modifier and Type | Method and Description |
|---|---|
Mono<Boolean> |
all(Predicate<? super T> predicate)
Emit a single boolean true if all values of this sequence match
the
Predicate. |
Mono<Boolean> |
any(Predicate<? super T> predicate)
Emit a single boolean true if any of the values of this
Flux sequence match
the predicate. |
<P> P |
as(Function<? super Flux<T>,P> transformer)
Transform this
Flux into a target type. |
T |
blockFirst()
Subscribe to this
Flux and block indefinitely
until the upstream signals its first value or completes. |
T |
blockFirst(Duration timeout)
Subscribe to this
Flux and block until the upstream
signals its first value, completes or a timeout expires. |
T |
blockLast()
Subscribe to this
Flux and block indefinitely
until the upstream signals its last value or completes. |
T |
blockLast(Duration timeout)
Subscribe to this
Flux and block until the upstream
signals its last value, completes or a timeout expires. |
Flux<List<T>> |
buffer()
|
Flux<List<T>> |
buffer(Duration timespan)
|
Flux<List<T>> |
buffer(Duration timespan,
Duration timeshift)
Collect incoming values into multiple
List buffers created at a given
timeshift period. |
Flux<List<T>> |
buffer(Duration timespan,
Duration timeshift,
Scheduler timer)
|
Flux<List<T>> |
buffer(Duration timespan,
Scheduler timer)
|
Flux<List<T>> |
buffer(int maxSize)
|
Flux<List<T>> |
buffer(int maxSize,
int skip)
|
<C extends Collection<? super T>> |
buffer(int maxSize,
int skip,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the given max size is reached
or once this Flux completes. |
<C extends Collection<? super T>> |
buffer(int maxSize,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the given max size is reached
or once this Flux completes. |
Flux<List<T>> |
buffer(Publisher<?> other)
|
<C extends Collection<? super T>> |
buffer(Publisher<?> other,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers, as
delimited by the signals of a companion Publisher this operator will
subscribe to. |
Flux<List<T>> |
bufferTimeout(int maxSize,
Duration timespan)
|
Flux<List<T>> |
bufferTimeout(int maxSize,
Duration timespan,
Scheduler timer)
|
<C extends Collection<? super T>> |
bufferTimeout(int maxSize,
Duration timespan,
Scheduler timer,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the buffer reaches a maximum
size OR the timespan Duration elapses, as measured on the provided Scheduler. |
<C extends Collection<? super T>> |
bufferTimeout(int maxSize,
Duration timespan,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the buffer reaches a maximum
size OR the timespan Duration elapses. |
Flux<List<T>> |
bufferUntil(Predicate<? super T> predicate)
|
Flux<List<T>> |
bufferUntil(Predicate<? super T> predicate,
boolean cutBefore)
|
<U,V> Flux<List<T>> |
bufferWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector)
|
<U,V,C extends Collection<? super T>> |
bufferWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers started each time an opening
companion Publisher emits. |
Flux<List<T>> |
bufferWhile(Predicate<? super T> predicate)
|
Flux<T> |
cache()
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber. |
Flux<T> |
cache(Duration ttl)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber. |
Flux<T> |
cache(int history)
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber. |
Flux<T> |
cache(int history,
Duration ttl)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber. |
Flux<T> |
cancelOn(Scheduler scheduler)
|
<E> Flux<E> |
cast(Class<E> clazz)
Cast the current
Flux produced type into a target produced type. |
Flux<T> |
checkpoint()
Activate assembly tracing for this particular
Flux, in case of an error
upstream of the checkpoint. |
Flux<T> |
checkpoint(String description)
Activate assembly marker for this particular
Flux by giving it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. |
Flux<T> |
checkpoint(String description,
boolean forceStackTrace)
Activate assembly tracing or the lighter assembly marking depending on the
forceStackTrace option. |
<R,A> Mono<R> |
collect(Collector<? super T,A,? extends R> collector)
|
<E> Mono<E> |
collect(Supplier<E> containerSupplier,
BiConsumer<E,? super T> collector)
Collect all elements emitted by this
Flux into a user-defined container,
by applying a collector BiConsumer taking the container and each element. |
Mono<List<T>> |
collectList()
|
<K> Mono<Map<K,T>> |
collectMap(Function<? super T,? extends K> keyExtractor)
|
<K,V> Mono<Map<K,V>> |
collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor)
|
<K,V> Mono<Map<K,V>> |
collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,V>> mapSupplier)
|
<K> Mono<Map<K,Collection<T>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor)
|
<K,V> Mono<Map<K,Collection<V>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor)
|
<K,V> Mono<Map<K,Collection<V>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,Collection<V>>> mapSupplier)
|
Mono<List<T>> |
collectSortedList()
|
Mono<List<T>> |
collectSortedList(Comparator<? super T> comparator)
Collect all elements emitted by this
Flux until this sequence completes,
and then sort them using a Comparator into a List that is emitted
by the resulting Mono. |
static <T,V> Flux<V> |
combineLatest(Function<Object[],V> combinator,
int prefetch,
Publisher<? extends T>... sources)
|
static <T,V> Flux<V> |
combineLatest(Function<Object[],V> combinator,
Publisher<? extends T>... sources)
|
static <T,V> Flux<V> |
combineLatest(Iterable<? extends Publisher<? extends T>> sources,
Function<Object[],V> combinator)
|
static <T,V> Flux<V> |
combineLatest(Iterable<? extends Publisher<? extends T>> sources,
int prefetch,
Function<Object[],V> combinator)
|
static <T1,T2,V> Flux<V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
BiFunction<? super T1,? super T2,? extends V> combinator)
|
static <T1,T2,T3,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Function<Object[],V> combinator)
|
static <T1,T2,T3,T4,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Function<Object[],V> combinator)
|
static <T1,T2,T3,T4,T5,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Function<Object[],V> combinator)
|
static <T1,T2,T3,T4,T5,T6,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6,
Function<Object[],V> combinator)
|
<V> Flux<V> |
compose(Function<? super Flux<T>,? extends Publisher<V>> transformer)
|
static <T> Flux<T> |
concat(Iterable<? extends Publisher<? extends T>> sources)
Concatenate all sources provided in an
Iterable, forwarding elements
emitted by the sources downstream. |
static <T> Flux<T> |
concat(Publisher<? extends Publisher<? extends T>> sources)
Concatenate all sources emitted as an onNext signal from a parent
Publisher,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concat(Publisher<? extends Publisher<? extends T>> sources,
int prefetch)
Concatenate all sources emitted as an onNext signal from a parent
Publisher,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concat(Publisher<? extends T>... sources)
Concatenate all sources provided as a vararg, forwarding elements emitted by the
sources downstream.
|
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources)
Concatenate all sources emitted as an onNext signal from a parent
Publisher,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources,
boolean delayUntilEnd,
int prefetch)
Concatenate all sources emitted as an onNext signal from a parent
Publisher,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources,
int prefetch)
Concatenate all sources emitted as an onNext signal from a parent
Publisher,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concatDelayError(Publisher<? extends T>... sources)
Concatenate all sources provided as a vararg, forwarding elements emitted by the
sources downstream.
|
<V> Flux<V> |
concatMap(Function<? super T,? extends Publisher<? extends V>> mapper)
|
<V> Flux<V> |
concatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int prefetch)
|
<V> Flux<V> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
boolean delayUntilEnd,
int prefetch)
|
<V> Flux<V> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
int prefetch)
|
<V> Flux<V> |
concatMapDelayError(Function<? super T,Publisher<? extends V>> mapper)
|
<R> Flux<R> |
concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
|
<R> Flux<R> |
concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper,
int prefetch)
|
Flux<T> |
concatWith(Publisher<? extends T> other)
|
Flux<T> |
concatWithValues(T... values)
Concatenates the values to the end of the
Flux |
Mono<Long> |
count()
Counts the number of values in this
Flux. |
static <T> Flux<T> |
create(Consumer<? super FluxSink<T>> emitter)
|
static <T> Flux<T> |
create(Consumer<? super FluxSink<T>> emitter,
FluxSink.OverflowStrategy backpressure)
|
Flux<T> |
defaultIfEmpty(T defaultV)
Provide a default unique value if this sequence is completed without any data
|
static <T> Flux<T> |
defer(Supplier<? extends Publisher<T>> supplier)
Lazily supply a
Publisher every time a Subscription is made on the
resulting Flux, so the actual source instantiation is deferred until each
subscribe and the Supplier can create a subscriber-specific instance. |
Flux<T> |
delayElements(Duration delay)
|
Flux<T> |
delayElements(Duration delay,
Scheduler timer)
|
Flux<T> |
delaySequence(Duration delay)
|
Flux<T> |
delaySequence(Duration delay,
Scheduler timer)
|
Flux<T> |
delaySubscription(Duration delay)
Delay the
subscription to this Flux source until the given
period elapses. |
Flux<T> |
delaySubscription(Duration delay,
Scheduler timer)
Delay the
subscription to this Flux source until the given
period elapses, as measured on the user-provided Scheduler. |
<U> Flux<T> |
delaySubscription(Publisher<U> subscriptionDelay)
|
Flux<T> |
delayUntil(Function<? super T,? extends Publisher<?>> triggerProvider)
|
<X> Flux<X> |
dematerialize()
An operator working only if this
Flux emits onNext, onError or onComplete Signal
instances, transforming these materialized signals into
real signals on the Subscriber. |
Flux<T> |
distinct()
For each
Subscriber, track elements from this Flux that have been
seen and filter out duplicates. |
<V> Flux<T> |
distinct(Function<? super T,? extends V> keySelector)
For each
Subscriber, track elements from this Flux that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function. |
<V,C extends Collection<? super V>> |
distinct(Function<? super T,? extends V> keySelector,
Supplier<C> distinctCollectionSupplier)
For each
Subscriber, track elements from this Flux that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function and by the add method
of the Collection supplied (typically a Set). |
<V,C> Flux<T> |
distinct(Function<? super T,? extends V> keySelector,
Supplier<C> distinctStoreSupplier,
BiPredicate<C,V> distinctPredicate,
Consumer<C> cleanup)
For each
Subscriber, track elements from this Flux that have been
seen and filter out duplicates, as compared by applying a BiPredicate on
an arbitrary user-supplied <C> store and a key extracted through the user
provided Function. |
Flux<T> |
distinctUntilChanged()
Filter out subsequent repetitions of an element (that is, if they arrive right after
one another).
|
<V> Flux<T> |
distinctUntilChanged(Function<? super T,? extends V> keySelector)
Filter out subsequent repetitions of an element (that is, if they arrive right after
one another), as compared by a key extracted through the user provided
Function
using equality. |
<V> Flux<T> |
distinctUntilChanged(Function<? super T,? extends V> keySelector,
BiPredicate<? super V,? super V> keyComparator)
Filter out subsequent repetitions of an element (that is, if they arrive right
after one another), as compared by a key extracted through the user provided
Function and then comparing keys with the supplied BiPredicate. |
Flux<T> |
doAfterTerminate(Runnable afterTerminate)
Add behavior (side-effect) triggered after the
Flux terminates, either by completing downstream successfully or with an error. |
Flux<T> |
doFinally(Consumer<SignalType> onFinally)
Add behavior (side-effect) triggered after the
Flux terminates for any reason,
including cancellation. |
Flux<T> |
doOnCancel(Runnable onCancel)
Add behavior (side-effect) triggered when the
Flux is cancelled. |
Flux<T> |
doOnComplete(Runnable onComplete)
Add behavior (side-effect) triggered when the
Flux completes successfully. |
Flux<T> |
doOnEach(Consumer<? super Signal<T>> signalConsumer)
Add behavior (side-effects) triggered when the
Flux emits an item, fails with an error
or completes successfully. |
<E extends Throwable> |
doOnError(Class<E> exceptionType,
Consumer<? super E> onError)
Add behavior (side-effect) triggered when the
Flux completes with an error matching the given exception type. |
Flux<T> |
doOnError(Consumer<? super Throwable> onError)
Add behavior (side-effect) triggered when the
Flux completes with an error. |
Flux<T> |
doOnError(Predicate<? super Throwable> predicate,
Consumer<? super Throwable> onError)
Add behavior (side-effect) triggered when the
Flux completes with an error matching the given exception. |
Flux<T> |
doOnNext(Consumer<? super T> onNext)
Add behavior (side-effect) triggered when the
Flux emits an item. |
Flux<T> |
doOnRequest(LongConsumer consumer)
Add behavior (side-effect) triggering a
LongConsumer when this Flux
receives any request. |
Flux<T> |
doOnSubscribe(Consumer<? super Subscription> onSubscribe)
Add behavior (side-effect) triggered when the
Flux is subscribed. |
Flux<T> |
doOnTerminate(Runnable onTerminate)
Add behavior (side-effect) triggered when the
Flux terminates, either by
completing successfully or with an error. |
Flux<Tuple2<Long,T>> |
elapsed()
Map this
Flux into Tuple2<Long, T>
of timemillis and source data. |
Flux<Tuple2<Long,T>> |
elapsed(Scheduler scheduler)
Map this
Flux into Tuple2<Long, T>
of timemillis and source data. |
Mono<T> |
elementAt(int index)
Emit only the element at the given index position or
IndexOutOfBoundsException
if the sequence is shorter. |
Mono<T> |
elementAt(int index,
T defaultValue)
Emit only the element at the given index position or fall back to a
default value if the sequence is shorter.
|
static <T> Flux<T> |
empty()
Create a
Flux that completes without emitting any item. |
static <T> Flux<T> |
error(Throwable error)
Create a
Flux that terminates with the specified error immediately after
being subscribed to. |
static <O> Flux<O> |
error(Throwable throwable,
boolean whenRequested)
Create a
Flux that terminates with the specified error, either immediately
after being subscribed to or after being first requested. |
Flux<T> |
expand(Function<? super T,? extends Publisher<? extends T>> expander)
Recursively expand elements into a graph and emit all the resulting element using
a breadth-first traversal strategy.
|
Flux<T> |
expand(Function<? super T,? extends Publisher<? extends T>> expander,
int capacityHint)
Recursively expand elements into a graph and emit all the resulting element using
a breadth-first traversal strategy.
|
Flux<T> |
expandDeep(Function<? super T,? extends Publisher<? extends T>> expander)
Recursively expand elements into a graph and emit all the resulting element,
in a depth-first traversal order.
|
Flux<T> |
expandDeep(Function<? super T,? extends Publisher<? extends T>> expander,
int capacityHint)
Recursively expand elements into a graph and emit all the resulting element,
in a depth-first traversal order.
|
Flux<T> |
filter(Predicate<? super T> p)
Evaluate each source value against the given
Predicate. |
Flux<T> |
filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate)
Test each value emitted by this
Flux asynchronously using a generated
Publisher<Boolean> test. |
Flux<T> |
filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate,
int bufferSize)
Test each value emitted by this
Flux asynchronously using a generated
Publisher<Boolean> test. |
static <I> Flux<I> |
first(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
first(Publisher<? extends I>... sources)
|
<R> Flux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
|
<R> Flux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapperOnNext,
Function<? super Throwable,? extends Publisher<? extends R>> mapperOnError,
Supplier<? extends Publisher<? extends R>> mapperOnComplete)
|
<V> Flux<V> |
flatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency)
|
<V> Flux<V> |
flatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency,
int prefetch)
|
<V> Flux<V> |
flatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency,
int prefetch)
|
<R> Flux<R> |
flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
|
<R> Flux<R> |
flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper,
int prefetch)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency,
int prefetch)
|
<R> Flux<R> |
flatMapSequentialDelayError(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency,
int prefetch)
|
static <T> Flux<T> |
from(Publisher<? extends T> source)
|
static <T> Flux<T> |
fromArray(T[] array)
Create a
Flux that emits the items contained in the provided array. |
static <T> Flux<T> |
fromIterable(Iterable<? extends T> it)
|
static <T> Flux<T> |
fromStream(Stream<? extends T> s)
|
static <T> Flux<T> |
fromStream(Supplier<Stream<? extends T>> streamSupplier)
|
static <T,S> Flux<T> |
generate(Callable<S> stateSupplier,
BiFunction<S,SynchronousSink<T>,S> generator)
Programmatically create a
Flux by generating signals one-by-one via a
consumer callback and some state. |
static <T,S> Flux<T> |
generate(Callable<S> stateSupplier,
BiFunction<S,SynchronousSink<T>,S> generator,
Consumer<? super S> stateConsumer)
Programmatically create a
Flux by generating signals one-by-one via a
consumer callback and some state, with a final cleanup callback. |
static <T> Flux<T> |
generate(Consumer<SynchronousSink<T>> generator)
Programmatically create a
Flux by generating signals one-by-one via a
consumer callback. |
int |
getPrefetch()
The prefetch configuration of the
Flux |
<K> Flux<GroupedFlux<K,T>> |
groupBy(Function<? super T,? extends K> keyMapper)
|
<K,V> Flux<GroupedFlux<K,V>> |
groupBy(Function<? super T,? extends K> keyMapper,
Function<? super T,? extends V> valueMapper)
|
<K,V> Flux<GroupedFlux<K,V>> |
groupBy(Function<? super T,? extends K> keyMapper,
Function<? super T,? extends V> valueMapper,
int prefetch)
|
<K> Flux<GroupedFlux<K,T>> |
groupBy(Function<? super T,? extends K> keyMapper,
int prefetch)
|
<TRight,TLeftEnd,TRightEnd,R> |
groupJoin(Publisher<? extends TRight> other,
Function<? super T,? extends Publisher<TLeftEnd>> leftEnd,
Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd,
BiFunction<? super T,? super Flux<TRight>,? extends R> resultSelector)
Map values from two Publishers into time windows and emit combination of values
in case their windows overlap.
|
<R> Flux<R> |
handle(BiConsumer<? super T,SynchronousSink<R>> handler)
Handle the items emitted by this
Flux by calling a biconsumer with the
output sink for each onNext. |
Mono<Boolean> |
hasElement(T value)
Emit a single boolean true if any of the elements of this
Flux sequence is
equal to the provided value. |
Mono<Boolean> |
hasElements()
Emit a single boolean true if this
Flux sequence has at least one element. |
Flux<T> |
hide()
Hides the identities of this
Flux instance. |
Mono<T> |
ignoreElements()
Ignores onNext signals (dropping them) and only propagate termination events.
|
Flux<Tuple2<Long,T>> |
index()
Keep information about the order in which source values were received by
indexing them with a 0-based incrementing long, returning a
Flux
of Tuple2<(index, value)>. |
<I> Flux<I> |
index(BiFunction<? super Long,? super T,? extends I> indexMapper)
Keep information about the order in which source values were received by
indexing them internally with a 0-based incrementing long then combining this
information with the source value into a
I using the provided BiFunction,
returning a Flux<I>. |
static Flux<Long> |
interval(Duration period)
Create a
Flux that emits long values starting with 0 and incrementing at
specified time intervals on the global timer. |
static Flux<Long> |
interval(Duration delay,
Duration period)
Create a
Flux that emits long values starting with 0 and incrementing at
specified time intervals, after an initial delay, on the global timer. |
static Flux<Long> |
interval(Duration delay,
Duration period,
Scheduler timer)
|
static Flux<Long> |
interval(Duration period,
Scheduler timer)
|
<TRight,TLeftEnd,TRightEnd,R> |
join(Publisher<? extends TRight> other,
Function<? super T,? extends Publisher<TLeftEnd>> leftEnd,
Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd,
BiFunction<? super T,? super TRight,? extends R> resultSelector)
Map values from two Publishers into time windows and emit combination of values
in case their windows overlap.
|
static <T> Flux<T> |
just(T... data)
Create a
Flux that emits the provided elements and then completes. |
static <T> Flux<T> |
just(T data)
Create a new
Flux that will only emit a single element then onComplete. |
Mono<T> |
last()
Emit the last element observed before complete signal as a
Mono, or emit
NoSuchElementException error if the source was empty. |
Mono<T> |
last(T defaultValue)
Emit the last element observed before complete signal as a
Mono, or emit
the defaultValue if the source was empty. |
Flux<T> |
limitRate(int prefetchRate)
Ensure that backpressure signals from downstream subscribers are split into batches
capped at the provided
prefetchRate when propagated upstream, effectively
rate limiting the upstream Publisher. |
Flux<T> |
limitRate(int highTide,
int lowTide)
Ensure that backpressure signals from downstream subscribers are split into batches
capped at the provided
highTide first, then replenishing at the provided
lowTide, effectively rate limiting the upstream Publisher. |
Flux<T> |
limitRequest(long requestCap)
Ensure that the total amount requested upstream is capped at
cap. |
Flux<T> |
log()
Observe all Reactive Streams signals and trace them using
Logger support. |
Flux<T> |
log(Logger logger)
Observe Reactive Streams signals matching the passed filter
options and
trace them using a specific user-provided Logger, at Level.INFO level. |
Flux<T> |
log(Logger logger,
Level level,
boolean showOperatorLine,
SignalType... options)
|
Flux<T> |
log(String category)
Observe all Reactive Streams signals and trace them using
Logger support. |
Flux<T> |
log(String category,
Level level,
boolean showOperatorLine,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and
trace them using Logger support. |
Flux<T> |
log(String category,
Level level,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and
trace them using Logger support. |
<V> Flux<V> |
map(Function<? super T,? extends V> mapper)
Transform the items emitted by this
Flux by applying a synchronous function
to each item. |
Flux<Signal<T>> |
materialize()
Transform incoming onNext, onError and onComplete signals into
Signal instances,
materializing these signals. |
static <I> Flux<I> |
merge(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. |
static <I> Flux<I> |
merge(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
merge(Publisher<? extends I>... sources)
Merge data from
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. |
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source)
|
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source,
int concurrency)
|
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source,
int concurrency,
int prefetch)
|
static <I> Flux<I> |
mergeDelayError(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. |
static <T> Flux<T> |
mergeOrdered(Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator). |
static <T> Flux<T> |
mergeOrdered(int prefetch,
Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator). |
static <I extends Comparable<? super I>> |
mergeOrdered(Publisher<? extends I>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by their natural order). |
Flux<T> |
mergeOrderedWith(Publisher<? extends T> other,
Comparator<? super T> otherComparator)
Merge data from this
Flux and a Publisher into a reordered merge
sequence, by picking the smallest value from each sequence as defined by a provided
Comparator. |
static <I> Flux<I> |
mergeSequential(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences provided in an array/vararg
into an ordered merged sequence. |
static <I> Flux<I> |
mergeSequential(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
mergeSequential(Iterable<? extends Publisher<? extends I>> sources,
int maxConcurrency,
int prefetch)
|
static <I> Flux<I> |
mergeSequential(Publisher<? extends I>... sources)
Merge data from
Publisher sequences provided in an array/vararg
into an ordered merged sequence. |
static <T> Flux<T> |
mergeSequential(Publisher<? extends Publisher<? extends T>> sources)
|
static <T> Flux<T> |
mergeSequential(Publisher<? extends Publisher<? extends T>> sources,
int maxConcurrency,
int prefetch)
|
static <I> Flux<I> |
mergeSequentialDelayError(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences provided in an array/vararg
into an ordered merged sequence. |
static <I> Flux<I> |
mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources,
int maxConcurrency,
int prefetch)
|
static <T> Flux<T> |
mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources,
int maxConcurrency,
int prefetch)
|
Flux<T> |
mergeWith(Publisher<? extends T> other)
|
Flux<T> |
name(String name)
Give a name to this sequence, which can be retrieved using
Scannable.name()
as long as this is the first reachable Scannable.parents(). |
static <T> Flux<T> |
never()
Create a
Flux that will never signal any data, error or completion signal. |
Mono<T> |
next()
|
<U> Flux<U> |
ofType(Class<U> clazz)
Evaluate each accepted value against the given
Class type. |
protected static <T> ConnectableFlux<T> |
onAssembly(ConnectableFlux<T> source)
To be used by custom operators: invokes assembly
Hooks pointcut given a
ConnectableFlux, potentially returning a new ConnectableFlux. |
protected static <T> Flux<T> |
onAssembly(Flux<T> source)
|
Flux<T> |
onBackpressureBuffer()
Request an unbounded demand and push to the returned
Flux, or park the
observed elements if not enough demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(Duration ttl,
int maxSize,
Consumer<? super T> onBufferEviction)
Request an unbounded demand and push to the returned
Flux, or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit and for a maximum Duration of ttl (as measured on the
elastic Scheduler). |
Flux<T> |
onBackpressureBuffer(Duration ttl,
int maxSize,
Consumer<? super T> onBufferEviction,
Scheduler scheduler)
|
Flux<T> |
onBackpressureBuffer(int maxSize)
Request an unbounded demand and push to the returned
Flux, or park the
observed elements if not enough demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(int maxSize,
BufferOverflowStrategy bufferOverflowStrategy)
Request an unbounded demand and push to the returned
Flux, or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. |
Flux<T> |
onBackpressureBuffer(int maxSize,
Consumer<? super T> onOverflow)
Request an unbounded demand and push to the returned
Flux, or park the
observed elements if not enough demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(int maxSize,
Consumer<? super T> onBufferOverflow,
BufferOverflowStrategy bufferOverflowStrategy)
Request an unbounded demand and push to the returned
Flux, or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. |
Flux<T> |
onBackpressureDrop()
Request an unbounded demand and push to the returned
Flux, or drop
the observed elements if not enough demand is requested downstream. |
Flux<T> |
onBackpressureDrop(Consumer<? super T> onDropped)
|
Flux<T> |
onBackpressureError()
Request an unbounded demand and push to the returned
Flux, or emit onError
fom Exceptions.failWithOverflow() if not enough demand is requested
downstream. |
Flux<T> |
onBackpressureLatest()
Request an unbounded demand and push to the returned
Flux, or only keep
the most recent observed item if not enough demand is requested downstream. |
<E extends Throwable> |
onErrorMap(Class<E> type,
Function<? super E,? extends Throwable> mapper)
Transform an error emitted by this
Flux by synchronously applying a function
to it if the error matches the given type. |
Flux<T> |
onErrorMap(Function<? super Throwable,? extends Throwable> mapper)
Transform any error emitted by this
Flux by synchronously applying a function to it. |
Flux<T> |
onErrorMap(Predicate<? super Throwable> predicate,
Function<? super Throwable,? extends Throwable> mapper)
Transform an error emitted by this
Flux by synchronously applying a function
to it if the error matches the given predicate. |
<E extends Throwable> |
onErrorResume(Class<E> type,
Function<? super E,? extends Publisher<? extends T>> fallback)
Subscribe to a fallback publisher when an error matching the given type
occurs, using a function to choose the fallback depending on the error.
|
Flux<T> |
onErrorResume(Function<? super Throwable,? extends Publisher<? extends T>> fallback)
Subscribe to a returned fallback publisher when any error occurs, using a function to
choose the fallback depending on the error.
|
Flux<T> |
onErrorResume(Predicate<? super Throwable> predicate,
Function<? super Throwable,? extends Publisher<? extends T>> fallback)
Subscribe to a fallback publisher when an error matching a given predicate
occurs.
|
<E extends Throwable> |
onErrorReturn(Class<E> type,
T fallbackValue)
Simply emit a captured fallback value when an error of the specified type is
observed on this
Flux. |
Flux<T> |
onErrorReturn(Predicate<? super Throwable> predicate,
T fallbackValue)
Simply emit a captured fallback value when an error matching the given predicate is
observed on this
Flux. |
Flux<T> |
onErrorReturn(T fallbackValue)
Simply emit a captured fallback value when any error is observed on this
Flux. |
protected static <T> Flux<T> |
onLastAssembly(Flux<T> source)
|
Flux<T> |
onTerminateDetach()
Detaches both the child
Subscriber and the Subscription on
termination or cancellation. |
Flux<T> |
or(Publisher<? extends T> other)
|
ParallelFlux<T> |
parallel()
Prepare this
Flux by dividing data on a number of 'rails' matching the
number of CPU cores, in a round-robin fashion. |
ParallelFlux<T> |
parallel(int parallelism)
Prepare this
Flux by dividing data on a number of 'rails' matching the
provided parallelism parameter, in a round-robin fashion. |
ParallelFlux<T> |
parallel(int parallelism,
int prefetch)
|
ConnectableFlux<T> |
publish()
Prepare a
ConnectableFlux which shares this Flux sequence and
dispatches values to subscribers in a backpressure-aware manner. |
<R> Flux<R> |
publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform)
Shares a sequence for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream.
|
<R> Flux<R> |
publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform,
int prefetch)
Shares a sequence for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream.
|
ConnectableFlux<T> |
publish(int prefetch)
Prepare a
ConnectableFlux which shares this Flux sequence and
dispatches values to subscribers in a backpressure-aware manner. |
Mono<T> |
publishNext()
|
Flux<T> |
publishOn(Scheduler scheduler)
|
Flux<T> |
publishOn(Scheduler scheduler,
boolean delayError,
int prefetch)
Run onNext, onComplete and onError on a supplied
Scheduler
Scheduler.Worker. |
Flux<T> |
publishOn(Scheduler scheduler,
int prefetch)
Run onNext, onComplete and onError on a supplied
Scheduler
Scheduler.Worker. |
static <T> Flux<T> |
push(Consumer<? super FluxSink<T>> emitter)
|
static <T> Flux<T> |
push(Consumer<? super FluxSink<T>> emitter,
FluxSink.OverflowStrategy backpressure)
|
static Flux<Integer> |
range(int start,
int count)
|
<A> Mono<A> |
reduce(A initial,
BiFunction<A,? super T,A> accumulator)
Reduce the values from this
Flux sequence into an single object matching the
type of a seed value. |
Mono<T> |
reduce(BiFunction<T,T,T> aggregator)
Reduce the values from this
Flux sequence into an single object of the same
type than the emitted items. |
<A> Mono<A> |
reduceWith(Supplier<A> initial,
BiFunction<A,? super T,A> accumulator)
Reduce the values from this
Flux sequence into an single object matching the
type of a lazily supplied seed value. |
Flux<T> |
repeat()
Repeatedly and indefinitely subscribe to the source upon completion of the
previous subscription.
|
Flux<T> |
repeat(BooleanSupplier predicate)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.
|
Flux<T> |
repeat(long numRepeat)
Repeatedly subscribe to the source numRepeat times.
|
Flux<T> |
repeat(long numRepeat,
BooleanSupplier predicate)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous
subscription.
|
Flux<T> |
repeatWhen(Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
Repeatedly subscribe to this
Flux when a companion sequence emits elements in
response to the flux completion signal. |
ConnectableFlux<T> |
replay()
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber. |
ConnectableFlux<T> |
replay(Duration ttl)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber. |
ConnectableFlux<T> |
replay(Duration ttl,
Scheduler timer)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber. |
ConnectableFlux<T> |
replay(int history)
Turn this
Flux into a connectable hot source and cache last emitted
signals for further Subscriber. |
ConnectableFlux<T> |
replay(int history,
Duration ttl)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber. |
ConnectableFlux<T> |
replay(int history,
Duration ttl,
Scheduler timer)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber. |
Flux<T> |
retry()
Re-subscribes to this
Flux sequence if it signals any error, indefinitely. |
Flux<T> |
retry(long numRetries)
Re-subscribes to this
Flux sequence if it signals any error, for a fixed
number of times. |
Flux<T> |
retry(long numRetries,
Predicate<? super Throwable> retryMatcher)
|
Flux<T> |
retry(Predicate<? super Throwable> retryMatcher)
|
Flux<T> |
retryWhen(Function<Flux<Throwable>,? extends Publisher<?>> whenFactory)
|
Flux<T> |
sample(Duration timespan)
|
<U> Flux<T> |
sample(Publisher<U> sampler)
|
Flux<T> |
sampleFirst(Duration timespan)
Repeatedly take a value from this
Flux then skip the values that follow
within a given duration. |
<U> Flux<T> |
sampleFirst(Function<? super T,? extends Publisher<U>> samplerFactory)
|
<U> Flux<T> |
sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory)
|
<U> Flux<T> |
sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory,
int maxConcurrency)
|
<A> Flux<A> |
scan(A initial,
BiFunction<A,? super T,A> accumulator)
Reduce this
Flux values with an accumulator BiFunction and
also emit the intermediate results of this function. |
Flux<T> |
scan(BiFunction<T,T,T> accumulator)
Reduce this
Flux values with an accumulator BiFunction and
also emit the intermediate results of this function. |
<A> Flux<A> |
scanWith(Supplier<A> initial,
BiFunction<A,? super T,A> accumulator)
Reduce this
Flux values with the help of an accumulator BiFunction
and also emits the intermediate results. |
Flux<T> |
share()
|
Mono<T> |
single()
Expect and emit a single item from this
Flux source or signal
NoSuchElementException for an empty source, or
IndexOutOfBoundsException for a source with more than one element. |
Mono<T> |
single(T defaultValue)
Expect and emit a single item from this
Flux source and emit a default
value for an empty source, but signal an IndexOutOfBoundsException for a
source with more than one element. |
Mono<T> |
singleOrEmpty()
Expect and emit a single item from this
Flux source, and accept an empty
source but signal an IndexOutOfBoundsException for a source with more than
one element. |
Flux<T> |
skip(Duration timespan)
Skip elements from this
Flux emitted within the specified initial duration. |
Flux<T> |
skip(Duration timespan,
Scheduler timer)
|
Flux<T> |
skip(long skipped)
Skip the specified number of elements from the beginning of this
Flux then
emit the remaining elements. |
Flux<T> |
skipLast(int n)
Skip a specified number of elements at the end of this
Flux sequence. |
Flux<T> |
skipUntil(Predicate<? super T> untilPredicate)
|
Flux<T> |
skipUntilOther(Publisher<?> other)
|
Flux<T> |
skipWhile(Predicate<? super T> skipPredicate)
|
Flux<T> |
sort()
Sort elements from this
Flux by collecting and sorting them in the background
then emitting the sorted sequence once this sequence completes. |
Flux<T> |
sort(Comparator<? super T> sortFunction)
Sort elements from this
Flux using a Comparator function, by
collecting and sorting elements in the background then emitting the sorted sequence
once this sequence completes. |
Flux<T> |
startWith(Iterable<? extends T> iterable)
|
Flux<T> |
startWith(Publisher<? extends T> publisher)
|
Flux<T> |
startWith(T... values)
Prepend the given values before this
Flux sequence. |
Disposable |
subscribe()
Subscribe to this
Flux and request unbounded demand. |
Disposable |
subscribe(Consumer<? super T> consumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer)
|
abstract void |
subscribe(CoreSubscriber<? super T> actual)
An internal
Publisher.subscribe(Subscriber) that will bypass
Hooks.onLastOperator(Function) pointcut. |
void |
subscribe(Subscriber<? super T> actual) |
Flux<T> |
subscribeOn(Scheduler scheduler)
Run subscribe, onSubscribe and request on a specified
Scheduler's Scheduler.Worker. |
Flux<T> |
subscribeOn(Scheduler scheduler,
boolean requestOnSeparateThread)
Run subscribe and onSubscribe on a specified
Scheduler's Scheduler.Worker. |
Flux<T> |
subscriberContext(Context mergeContext)
|
Flux<T> |
subscriberContext(Function<Context,Context> doOnContext)
|
<E extends Subscriber<? super T>> |
subscribeWith(E subscriber)
|
Flux<T> |
switchIfEmpty(Publisher<? extends T> alternate)
Switch to an alternative
Publisher if this sequence is completed without any data. |
<V> Flux<V> |
switchMap(Function<? super T,Publisher<? extends V>> fn)
|
<V> Flux<V> |
switchMap(Function<? super T,Publisher<? extends V>> fn,
int prefetch)
|
static <T> Flux<T> |
switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers)
|
static <T> Flux<T> |
switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers,
int prefetch)
|
Flux<T> |
tag(String key,
String value)
Tag this flux with a key/value pair.
|
Flux<T> |
take(Duration timespan)
|
Flux<T> |
take(Duration timespan,
Scheduler timer)
|
Flux<T> |
take(long n)
Take only the first N values from this
Flux, if available. |
Flux<T> |
takeLast(int n)
Emit the last N values this
Flux emitted before its completion. |
Flux<T> |
takeUntil(Predicate<? super T> predicate)
|
Flux<T> |
takeUntilOther(Publisher<?> other)
|
Flux<T> |
takeWhile(Predicate<? super T> continuePredicate)
Relay values from this
Flux while a predicate returns TRUE
for the values (checked before each value is delivered). |
Mono<Void> |
then()
Return a
Mono<Void> that completes when this Flux completes. |
<V> Mono<V> |
then(Mono<V> other)
|
Mono<Void> |
thenEmpty(Publisher<Void> other)
Return a
Mono<Void> that waits for this Flux to complete then
for a supplied Publisher<Void> to also complete. |
<V> Flux<V> |
thenMany(Publisher<V> other)
|
Flux<T> |
timeout(Duration timeout)
Propagate a
TimeoutException as soon as no item is emitted within the
given Duration from the previous emission (or the subscription for the first item). |
Flux<T> |
timeout(Duration timeout,
Publisher<? extends T> fallback)
|
Flux<T> |
timeout(Duration timeout,
Publisher<? extends T> fallback,
Scheduler timer)
|
Flux<T> |
timeout(Duration timeout,
Scheduler timer)
Propagate a
TimeoutException as soon as no item is emitted within the
given Duration from the previous emission (or the subscription for the first
item), as measured by the specified Scheduler. |
<U> Flux<T> |
timeout(Publisher<U> firstTimeout)
Signal a
TimeoutException in case the first item from this Flux has
not been emitted before the given Publisher emits. |
<U,V> Flux<T> |
timeout(Publisher<U> firstTimeout,
Function<? super T,? extends Publisher<V>> nextTimeoutFactory)
Signal a
TimeoutException in case the first item from this Flux has
not been emitted before the firstTimeout Publisher emits, and whenever
each subsequent elements is not emitted before a Publisher generated from
the latest element signals. |
<U,V> Flux<T> |
timeout(Publisher<U> firstTimeout,
Function<? super T,? extends Publisher<V>> nextTimeoutFactory,
Publisher<? extends T> fallback)
|
Flux<Tuple2<Long,T>> |
timestamp()
|
Flux<Tuple2<Long,T>> |
timestamp(Scheduler scheduler)
|
Iterable<T> |
toIterable()
|
Iterable<T> |
toIterable(int batchSize)
|
Iterable<T> |
toIterable(int batchSize,
Supplier<Queue<T>> queueProvider)
|
Stream<T> |
toStream()
|
Stream<T> |
toStream(int batchSize)
|
String |
toString() |
<V> Flux<V> |
transform(Function<? super Flux<T>,? extends Publisher<V>> transformer)
|
static <T,D> Flux<T> |
using(Callable<? extends D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a
Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or
the Subscriber cancels.
|
static <T,D> Flux<T> |
using(Callable<? extends D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup,
boolean eager)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a
Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or
the Subscriber cancels.
|
Flux<Flux<T>> |
window(Duration timespan)
|
Flux<Flux<T>> |
window(Duration timespan,
Duration timeshift)
|
Flux<Flux<T>> |
window(Duration timespan,
Duration timeshift,
Scheduler timer)
|
Flux<Flux<T>> |
window(Duration timespan,
Scheduler timer)
|
Flux<Flux<T>> |
window(int maxSize)
|
Flux<Flux<T>> |
window(int maxSize,
int skip)
|
Flux<Flux<T>> |
window(Publisher<?> boundary)
|
Flux<Flux<T>> |
windowTimeout(int maxSize,
Duration timespan)
|
Flux<Flux<T>> |
windowTimeout(int maxSize,
Duration timespan,
Scheduler timer)
|
Flux<Flux<T>> |
windowUntil(Predicate<T> boundaryTrigger)
|
Flux<Flux<T>> |
windowUntil(Predicate<T> boundaryTrigger,
boolean cutBefore)
|
Flux<Flux<T>> |
windowUntil(Predicate<T> boundaryTrigger,
boolean cutBefore,
int prefetch)
|
<U,V> Flux<Flux<T>> |
windowWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector)
|
Flux<Flux<T>> |
windowWhile(Predicate<T> inclusionPredicate)
|
Flux<Flux<T>> |
windowWhile(Predicate<T> inclusionPredicate,
int prefetch)
|
<U,R> Flux<R> |
withLatestFrom(Publisher<? extends U> other,
BiFunction<? super T,? super U,? extends R> resultSelector)
Combine the most recently emitted values from both this
Flux and another
Publisher through a BiFunction and emits the result. |
static <I,O> Flux<O> |
zip(Function<? super Object[],? extends O> combinator,
int prefetch,
Publisher<? extends I>... sources)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <I,O> Flux<O> |
zip(Function<? super Object[],? extends O> combinator,
Publisher<? extends I>... sources)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <O> Flux<O> |
zip(Iterable<? extends Publisher<?>> sources,
Function<? super Object[],? extends O> combinator)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <O> Flux<O> |
zip(Iterable<? extends Publisher<?>> sources,
int prefetch,
Function<? super Object[],? extends O> combinator)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <TUPLE extends Tuple2,V> |
zip(Publisher<? extends Publisher<?>> sources,
Function<? super TUPLE,? extends V> combinator)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <T1,T2> Flux<Tuple2<T1,T2>> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2)
Zip two sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple2. |
static <T1,T2,O> Flux<O> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
BiFunction<? super T1,? super T2,? extends O> combinator)
Zip two sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <T1,T2,T3> Flux<Tuple3<T1,T2,T3>> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3)
Zip three sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple3. |
static <T1,T2,T3,T4> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4)
Zip four sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple4. |
static <T1,T2,T3,T4,T5> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5)
Zip five sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple5. |
static <T1,T2,T3,T4,T5,T6> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6)
Zip six sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple6. |
<T2> Flux<Tuple2<T,T2>> |
zipWith(Publisher<? extends T2> source2)
|
<T2,V> Flux<V> |
zipWith(Publisher<? extends T2> source2,
BiFunction<? super T,? super T2,? extends V> combinator)
Zip this
Flux with another Publisher source, that is to say wait
for both to emit one element and combine these elements using a combinator
BiFunction
The operator will continue doing so until any of the sources completes. |
<T2> Flux<Tuple2<T,T2>> |
zipWith(Publisher<? extends T2> source2,
int prefetch)
|
<T2,V> Flux<V> |
zipWith(Publisher<? extends T2> source2,
int prefetch,
BiFunction<? super T,? super T2,? extends V> combinator)
Zip this
Flux with another Publisher source, that is to say wait
for both to emit one element and combine these elements using a combinator
BiFunction
The operator will continue doing so until any of the sources completes. |
<T2> Flux<Tuple2<T,T2>> |
zipWithIterable(Iterable<? extends T2> iterable)
|
<T2,V> Flux<V> |
zipWithIterable(Iterable<? extends T2> iterable,
BiFunction<? super T,? super T2,? extends V> zipper)
Zip elements from this
Flux with the content of an Iterable, that is
to say combine one element from each, pairwise, using the given zipper BiFunction. |
@SafeVarargs public static <T,V> Flux<V> combineLatest(Function<Object[],V> combinator, Publisher<? extends T>... sources)
Flux whose data are generated by the combination of the most recently published value from each
of the Publisher sources.

T - type of the value from sourcesV - The produced output after transformation by the given combinatorsources - The Publisher sources to combine values fromcombinator - The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux based on the produced combinations@SafeVarargs public static <T,V> Flux<V> combineLatest(Function<Object[],V> combinator, int prefetch, Publisher<? extends T>... sources)
Flux whose data are generated by the combination of the most recently published value from each
of the Publisher sources.

T - type of the value from sourcesV - The produced output after transformation by the given combinatorsources - The Publisher sources to combine values fromprefetch - The demand sent to each combined source Publishercombinator - The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux based on the produced combinationspublic static <T1,T2,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends V> combinator)
Flux whose data are generated by the combination of the most recently published value from each
of two Publisher sources.

T1 - type of the value from source1T2 - type of the value from source2V - The produced output after transformation by the given combinatorsource1 - The first Publisher source to combine values fromsource2 - The second Publisher source to combine values fromcombinator - The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux based on the produced combinationspublic static <T1,T2,T3,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Function<Object[],V> combinator)
Flux whose data are generated by the combination of the most recently published value from each
of three Publisher sources.

T1 - type of the value from source1T2 - type of the value from source2T3 - type of the value from source3V - The produced output after transformation by the given combinatorsource1 - The first Publisher source to combine values fromsource2 - The second Publisher source to combine values fromsource3 - The third Publisher source to combine values fromcombinator - The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux based on the produced combinationspublic static <T1,T2,T3,T4,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Function<Object[],V> combinator)
Flux whose data are generated by the combination of the most recently published value from each
of four Publisher sources.

T1 - type of the value from source1T2 - type of the value from source2T3 - type of the value from source3T4 - type of the value from source4V - The produced output after transformation by the given combinatorsource1 - The first Publisher source to combine values fromsource2 - The second Publisher source to combine values fromsource3 - The third Publisher source to combine values fromsource4 - The fourth Publisher source to combine values fromcombinator - The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux based on the produced combinationspublic static <T1,T2,T3,T4,T5,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Function<Object[],V> combinator)
Flux whose data are generated by the combination of the most recently published value from each
of five Publisher sources.

T1 - type of the value from source1T2 - type of the value from source2T3 - type of the value from source3T4 - type of the value from source4T5 - type of the value from source5V - The produced output after transformation by the given combinatorsource1 - The first Publisher source to combine values fromsource2 - The second Publisher source to combine values fromsource3 - The third Publisher source to combine values fromsource4 - The fourth Publisher source to combine values fromsource5 - The fifth Publisher source to combine values fromcombinator - The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux based on the produced combinationspublic static <T1,T2,T3,T4,T5,T6,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Function<Object[],V> combinator)
Flux whose data are generated by the combination of the most recently published value from each
of six Publisher sources.

T1 - type of the value from source1T2 - type of the value from source2T3 - type of the value from source3T4 - type of the value from source4T5 - type of the value from source5T6 - type of the value from source6V - The produced output after transformation by the given combinatorsource1 - The first Publisher source to combine values fromsource2 - The second Publisher source to combine values fromsource3 - The third Publisher source to combine values fromsource4 - The fourth Publisher source to combine values fromsource5 - The fifth Publisher source to combine values fromsource6 - The sixth Publisher source to combine values fromcombinator - The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux based on the produced combinationspublic static <T,V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<Object[],V> combinator)
Flux whose data are generated by the combination of the most recently published value from each
of the Publisher sources provided in an Iterable.

T - The common base type of the values from sourcesV - The produced output after transformation by the given combinatorsources - The list of Publisher sources to combine values fromcombinator - The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux based on the produced combinationspublic static <T,V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, int prefetch, Function<Object[],V> combinator)
Flux whose data are generated by the combination of the most recently published value from each
of the Publisher sources provided in an Iterable.

T - The common base type of the values from sourcesV - The produced output after transformation by the given combinatorsources - The list of Publisher sources to combine values fromprefetch - demand produced to each combined source Publishercombinator - The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux based on the produced combinationspublic static <T> Flux<T> concat(Iterable<? extends Publisher<? extends T>> sources)
Iterable, forwarding elements
emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.

@SafeVarargs public final Flux<T> concatWithValues(T... values)
Fluxvalues - The values to concatenateFlux concatenating all source sequencespublic static <T> Flux<T> concat(Publisher<? extends Publisher<? extends T>> sources)
Publisher,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
public static <T> Flux<T> concat(Publisher<? extends Publisher<? extends T>> sources, int prefetch)
Publisher,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
@SafeVarargs public static <T> Flux<T> concat(Publisher<? extends T>... sources)
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources)
Publisher,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, int prefetch)
Publisher,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, boolean delayUntilEnd, int prefetch)
Publisher,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes.
Errors do not interrupt the main sequence but are propagated after the current
concat backlog if delayUntilEnd is false or after all sources
have had a chance to be concatenated if delayUntilEnd is true.
T - The type of values in both source and output sequencessources - The Publisher of Publisher to concatenatedelayUntilEnd - delay error until all sources have been consumed instead of
after the current sourceprefetch - the inner source request sizeFlux concatenating all inner sources sequences until complete or error@SafeVarargs public static <T> Flux<T> concatDelayError(Publisher<? extends T>... sources)
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)
Flux with the capability of emitting multiple
elements in a synchronous or asynchronous manner through the FluxSink API.
This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>create(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
});
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure)
Flux with the capability of emitting multiple
elements in a synchronous or asynchronous manner through the FluxSink API.
This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>create(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
}, FluxSink.OverflowStrategy.LATEST);
T - The type of values in the sequencebackpressure - the backpressure mode, see FluxSink.OverflowStrategy for the
available backpressure modesemitter - Consume the FluxSink provided per-subscriber by Reactor to generate signals.Fluxpublic static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter)
Flux with the capability of emitting multiple
elements from a single-threaded producer through the FluxSink API.
This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>push(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
}, FluxSink.OverflowStrategy.LATEST);
public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure)
Flux with the capability of emitting multiple
elements from a single-threaded producer through the FluxSink API.
This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>push(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
}, FluxSink.OverflowStrategy.LATEST);
T - The type of values in the sequencebackpressure - the backpressure mode, see FluxSink.OverflowStrategy for the
available backpressure modesemitter - Consume the FluxSink provided per-subscriber by Reactor to generate signals.Fluxpublic static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier)
Publisher every time a Subscription is made on the
resulting Flux, so the actual source instantiation is deferred until each
subscribe and the Supplier can create a subscriber-specific instance.
If the supplier doesn't generate a new instance however, this operator will
effectively behave like from(Publisher).

public static <T> Flux<T> empty()
Flux that completes without emitting any item.
T - the reified type of the target SubscriberFluxpublic static <T> Flux<T> error(Throwable error)
Flux that terminates with the specified error immediately after
being subscribed to.
T - the reified type of the target Subscribererror - the error to signal to each SubscriberFluxpublic static <O> Flux<O> error(Throwable throwable, boolean whenRequested)
Flux that terminates with the specified error, either immediately
after being subscribed to or after being first requested.

O - the reified type of the target Subscriberthrowable - the error to signal to each SubscriberwhenRequested - if true, will onError on the first request instead of subscribe().Flux@SafeVarargs public static <I> Flux<I> first(Publisher<? extends I>... sources)
Publisher to emit any signal (onNext/onError/onComplete) and
replay all signals from that Publisher, effectively behaving like the
fastest of these competing sources.
I - The type of values in both source and output sequencessources - The competing source publishersFlux behaving like the fastest of its sourcespublic static <I> Flux<I> first(Iterable<? extends Publisher<? extends I>> sources)
Publisher to emit any signal (onNext/onError/onComplete) and
replay all signals from that Publisher, effectively behaving like the
fastest of these competing sources.
I - The type of values in both source and output sequencessources - The competing source publishersFlux behaving like the fastest of its sourcespublic static <T> Flux<T> from(Publisher<? extends T> source)
T - The type of values in both source and output sequencessource - the source to decorateFluxpublic static <T> Flux<T> fromArray(T[] array)
Flux that emits the items contained in the provided array.
T - The type of values in the source array and resulting Fluxarray - the array to read data fromFluxpublic static <T> Flux<T> fromStream(Stream<? extends T> s)
Flux that emits the items contained in the provided Stream.
Keep in mind that a Stream cannot be re-used, which can be problematic in
case of multiple subscriptions or re-subscription (like with repeat() or
retry()). The Stream is closed automatically
by the operator on cancellation, error or completion.
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
Flux by generating signals one-by-one via a
consumer callback.
T - the value type emittedgenerator - Consume the SynchronousSink provided per-subscriber by Reactor
to generate a single signal on each pass.Fluxpublic static <T,S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator)
Flux by generating signals one-by-one via a
consumer callback and some state. The stateSupplier may return null.
T - the value type emittedS - the per-subscriber custom state typestateSupplier - called for each incoming Subscriber to provide the initial state for the generator bifunctiongenerator - Consume the SynchronousSink provided per-subscriber by Reactor
as well as the current state to generate a single signal on each pass
and return a (new) state.Fluxpublic static <T,S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator, Consumer<? super S> stateConsumer)
Flux by generating signals one-by-one via a
consumer callback and some state, with a final cleanup callback. The
stateSupplier may return null but your cleanup stateConsumer
will need to handle the null case.
T - the value type emittedS - the per-subscriber custom state typestateSupplier - called for each incoming Subscriber to provide the initial state for the generator bifunctiongenerator - Consume the SynchronousSink provided per-subscriber by Reactor
as well as the current state to generate a single signal on each pass
and return a (new) state.stateConsumer - called after the generator has terminated or the downstream cancelled, receiving the last
state to be handled (i.e., release resources or do other cleanup).Fluxpublic static Flux<Long> interval(Duration period)
Flux that emits long values starting with 0 and incrementing at
specified time intervals on the global timer. If demand is not produced in time,
an onError will be signalled with an overflow
IllegalStateException detailing the tick that couldn't be emitted.
In normal conditions, the Flux will never complete.
Runs on the Schedulers.parallel() Scheduler.
public static Flux<Long> interval(Duration delay, Duration period)
Flux that emits long values starting with 0 and incrementing at
specified time intervals, after an initial delay, on the global timer. If demand is
not produced in time, an onError will be signalled with an
overflow IllegalStateException
detailing the tick that couldn't be emitted. In normal conditions, the Flux
will never complete.
Runs on the Schedulers.parallel() Scheduler.

public static Flux<Long> interval(Duration period, Scheduler timer)
Flux that emits long values starting with 0 and incrementing at
specified time intervals, on the specified Scheduler. If demand is not
produced in time, an onError will be signalled with an overflow
IllegalStateException detailing the tick that couldn't be emitted.
In normal conditions, the Flux will never complete.
public static Flux<Long> interval(Duration delay, Duration period, Scheduler timer)
Flux that emits long values starting with 0 and incrementing at
specified time intervals, after an initial delay, on the specified Scheduler.
If demand is not produced in time, an onError will be signalled with an
overflow IllegalStateException
detailing the tick that couldn't be emitted. In normal conditions, the Flux
will never complete.

@SafeVarargs public static <T> Flux<T> just(T... data)
Flux that emits the provided elements and then completes.
T - the emitted data typedata - the elements to emit, as a varargFluxpublic static <T> Flux<T> just(T data)
Flux that will only emit a single element then onComplete.
T - the emitted data typedata - the single element to emitFluxpublic static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source)
Publisher sequences emitted by the passed Publisher
into an interleaved merged sequence. Unlike concat, inner
sources are subscribed to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency)
Publisher sequences emitted by the passed Publisher
into an interleaved merged sequence. Unlike concat, inner
sources are subscribed to eagerly (but at most concurrency sources are
subscribed to at the same time).
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency, int prefetch)
Publisher sequences emitted by the passed Publisher
into an interleaved merged sequence. Unlike concat, inner
sources are subscribed to eagerly (but at most concurrency sources are
subscribed to at the same time).
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public static <I> Flux<I> merge(Iterable<? extends Publisher<? extends I>> sources)
Publisher sequences contained in an Iterable
into an interleaved merged sequence. Unlike concat, inner
sources are subscribed to eagerly.
A new Iterator will be created for each subscriber.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I> Flux<I> merge(Publisher<? extends I>... sources)
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. Unlike concat,
sources are subscribed to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I> Flux<I> merge(int prefetch, Publisher<? extends I>... sources)
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. Unlike concat,
sources are subscribed to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I> Flux<I> mergeDelayError(int prefetch, Publisher<? extends I>... sources)
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. Unlike concat,
sources are subscribed to eagerly.
This variant will delay any error until after the rest of the merge backlog has been processed.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I extends Comparable<? super I>> Flux<I> mergeOrdered(Publisher<? extends I>... sources)
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by their natural order).
This is not a sort(), as it doesn't consider the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
I - a Comparable merged type that has a natural ordersources - Publisher sources of Comparable to mergeFlux that , subscribing early but keeping the original ordering@SafeVarargs public static <T> Flux<T> mergeOrdered(Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator). This is not a sort(Comparator), as it doesn't consider
the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
T - the merged typecomparator - the Comparator to use to find the smallest valuesources - Publisher sources to mergeFlux that , subscribing early but keeping the original ordering@SafeVarargs public static <T> Flux<T> mergeOrdered(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator). This is not a sort(Comparator), as it doesn't consider
the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
T - the merged typeprefetch - the number of elements to prefetch from each source (avoiding too
many small requests to the source when picking)comparator - the Comparator to use to find the smallest valuesources - Publisher sources to mergeFlux that , subscribing early but keeping the original orderingpublic static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources)
public static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch)
Publisher sequences emitted by the passed Publisher
into an ordered merged sequence. Unlike concat, the inner publishers are subscribed to
eagerly (but at most maxConcurrency sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
T - the merged typesources - a Publisher of Publisher sources to mergeprefetch - the inner source request sizemaxConcurrency - the request produced to the main source thus limiting concurrent merge backlogFlux, subscribing early but keeping the original orderingpublic static <T> Flux<T> mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch)
Publisher sequences emitted by the passed Publisher
into an ordered merged sequence. Unlike concat, the inner publishers are subscribed to
eagerly (but at most maxConcurrency sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
This variant will delay any error until after the rest of the mergeSequential backlog has been processed.
T - the merged typesources - a Publisher of Publisher sources to mergeprefetch - the inner source request sizemaxConcurrency - the request produced to the main source thus limiting concurrent merge backlogFlux, subscribing early but keeping the original ordering@SafeVarargs public static <I> Flux<I> mergeSequential(Publisher<? extends I>... sources)
Publisher sequences provided in an array/vararg
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.
@SafeVarargs public static <I> Flux<I> mergeSequential(int prefetch, Publisher<? extends I>... sources)
Publisher sequences provided in an array/vararg
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.
@SafeVarargs public static <I> Flux<I> mergeSequentialDelayError(int prefetch, Publisher<? extends I>... sources)
Publisher sequences provided in an array/vararg
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.
This variant will delay any error until after the rest of the mergeSequential backlog
has been processed.
public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources)
public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch)
Publisher sequences provided in an Iterable
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly (but at most maxConcurrency sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
I - the merged typesources - an Iterable of Publisher sequences to mergemaxConcurrency - the request produced to the main source thus limiting concurrent merge backlogprefetch - the inner source request sizeFlux, subscribing early but keeping the original orderingpublic static <I> Flux<I> mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch)
Publisher sequences provided in an Iterable
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly (but at most maxConcurrency sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
This variant will delay any error until after the rest of the mergeSequential backlog
has been processed.
I - the merged typesources - an Iterable of Publisher sequences to mergemaxConcurrency - the request produced to the main source thus limiting concurrent merge backlogprefetch - the inner source request sizeFlux, subscribing early but keeping the original orderingpublic static <T> Flux<T> never()
Flux that will never signal any data, error or completion signal.
T - the Subscriber type targetFluxpublic static Flux<Integer> range(int start, int count)
Flux that will only emit a sequence of count incrementing integers,
starting from start. That is, emit integers between start (included)
and start + count (excluded) then complete.

start - the first integer to be emitcount - the total number of incrementing values to emit, including the first valueFluxpublic static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers)
Flux that mirrors the most recently emitted Publisher,
forwarding its data until a new Publisher comes in in the source.
The resulting Flux will complete once there are no new Publisher in
the source (source has completed) and the last mirrored Publisher has also
completed.

T - the produced typemergedPublishers - The Publisher of Publisher to switch on and mirror.FluxProcessor accepting publishers and producing Tpublic static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers, int prefetch)
Flux that mirrors the most recently emitted Publisher,
forwarding its data until a new Publisher comes in in the source.
The resulting Flux will complete once there are no new Publisher in
the source (source has completed) and the last mirrored Publisher has also
completed.

T - the produced typemergedPublishers - The Publisher of Publisher to switch on and mirror.prefetch - the inner source request sizeFluxProcessor accepting publishers and producing Tpublic static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)
Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal even.

T - emitted typeD - resource typeresourceSupplier - a Callable that is called on subscribe to generate the resourcesourceSupplier - a factory to derive a Publisher from the supplied resourceresourceCleanup - a resource cleanup callback invoked on completionFlux built around a disposable resourcepublic static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager)

T - emitted typeD - resource typeresourceSupplier - a Callable that is called on subscribe to generate the resourcesourceSupplier - a factory to derive a Publisher from the supplied resourceresourceCleanup - a resource cleanup callback invoked on completioneager - true to clean before terminating downstream subscribersFlux built around a disposable resourcepublic static <T1,T2,O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends O> combinator)
T1 - type of the value from source1T2 - type of the value from source2O - The produced output after transformation by the combinatorsource1 - The first Publisher source to zip.source2 - The second Publisher source to zip.combinator - The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamFluxpublic static <T1,T2> Flux<Tuple2<T1,T2>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2)
Tuple2.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
public static <T1,T2,T3> Flux<Tuple3<T1,T2,T3>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3)
Tuple3.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1 - type of the value from source1T2 - type of the value from source2T3 - type of the value from source3source1 - The first upstream Publisher to subscribe to.source2 - The second upstream Publisher to subscribe to.source3 - The third upstream Publisher to subscribe to.Fluxpublic static <T1,T2,T3,T4> Flux<Tuple4<T1,T2,T3,T4>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4)
Tuple4.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1 - type of the value from source1T2 - type of the value from source2T3 - type of the value from source3T4 - type of the value from source4source1 - The first upstream Publisher to subscribe to.source2 - The second upstream Publisher to subscribe to.source3 - The third upstream Publisher to subscribe to.source4 - The fourth upstream Publisher to subscribe to.Fluxpublic static <T1,T2,T3,T4,T5> Flux<Tuple5<T1,T2,T3,T4,T5>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5)
Tuple5.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1 - type of the value from source1T2 - type of the value from source2T3 - type of the value from source3T4 - type of the value from source4T5 - type of the value from source5source1 - The first upstream Publisher to subscribe to.source2 - The second upstream Publisher to subscribe to.source3 - The third upstream Publisher to subscribe to.source4 - The fourth upstream Publisher to subscribe to.source5 - The fifth upstream Publisher to subscribe to.Fluxpublic static <T1,T2,T3,T4,T5,T6> Flux<Tuple6<T1,T2,T3,T4,T5,T6>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6)
Tuple6.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1 - type of the value from source1T2 - type of the value from source2T3 - type of the value from source3T4 - type of the value from source4T5 - type of the value from source5T6 - type of the value from source6source1 - The first upstream Publisher to subscribe to.source2 - The second upstream Publisher to subscribe to.source3 - The third upstream Publisher to subscribe to.source4 - The fourth upstream Publisher to subscribe to.source5 - The fifth upstream Publisher to subscribe to.source6 - The sixth upstream Publisher to subscribe to.Fluxpublic static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources, Function<? super Object[],? extends O> combinator)
Iterable.iterator() will be called on each Publisher.subscribe(Subscriber).

public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources, int prefetch, Function<? super Object[],? extends O> combinator)
Iterable.iterator() will be called on each Publisher.subscribe(Subscriber).

O - the combined produced typesources - the Iterable providing sources to zipprefetch - the inner source request sizecombinator - The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux@SafeVarargs public static <I,O> Flux<O> zip(Function<? super Object[],? extends O> combinator, Publisher<? extends I>... sources)
I - the type of the input sourcesO - the combined produced typecombinator - The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamsources - the array providing sources to zipFlux@SafeVarargs public static <I,O> Flux<O> zip(Function<? super Object[],? extends O> combinator, int prefetch, Publisher<? extends I>... sources)
I - the type of the input sourcesO - the combined produced typecombinator - The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamprefetch - individual source request sizesources - the array providing sources to zipFluxpublic static <TUPLE extends Tuple2,V> Flux<V> zip(Publisher<? extends Publisher<?>> sources, Function<? super TUPLE,? extends V> combinator)
Note that the Publisher sources from the outer Publisher will
accumulate into an exhaustive list before starting zip operation.

TUPLE - the raw tuple typeV - The produced output after transformation by the given combinatorsources - The Publisher of Publisher sources to zip. A finite publisher is required.combinator - The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux based on the produced valuepublic final Mono<Boolean> all(Predicate<? super T> predicate)
Predicate.
The implementation uses short-circuit logic and completes with false if the predicate doesn't match a value.

public final Mono<Boolean> any(Predicate<? super T> predicate)
Flux sequence match
the predicate.
The implementation uses short-circuit logic and completes with false if any value doesn't match the predicate.

public final <P> P as(Function<? super Flux<T>,P> transformer)
Flux into a target type.
flux.as(Mono::from).subscribe()
P - the returned instance typetransformer - the Function to immediately map this Flux
into a target type instance.Flux transformed to an instance of Pfor a bounded conversion to {@link Publisher}@Nullable public final T blockFirst()
Flux and block indefinitely
until the upstream signals its first value or completes. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException if it was a checked
exception).
Note that each blockFirst() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
@Nullable public final T blockFirst(Duration timeout)
Flux and block until the upstream
signals its first value, completes or a timeout expires. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException if it was a checked
exception). If the provided timeout expires,a RuntimeException is thrown.
Note that each blockFirst() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
timeout - maximum time period to wait for before raising a RuntimeException@Nullable public final T blockLast()
Flux and block indefinitely
until the upstream signals its last value or completes. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException if it was a checked
exception).
Note that each blockLast() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
@Nullable public final T blockLast(Duration timeout)
Flux and block until the upstream
signals its last value, completes or a timeout expires. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException if it was a checked
exception). If the provided timeout expires,a RuntimeException is thrown.
Note that each blockLast() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
timeout - maximum time period to wait for before raising a RuntimeExceptionpublic final Flux<List<T>> buffer()
List buffer that will be emitted
by the returned Flux once this Flux completes.

Flux of at most one Listfor an alternative collecting algorithm returning {@link Mono}public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, Supplier<C> bufferSupplier)
Collection buffers that
will be emitted by the returned Flux each time the given max size is reached
or once this Flux completes.

C - the Collection buffer typemaxSize - the maximum collected sizebufferSupplier - a Supplier of the concrete Collection to use for each bufferFlux of Collectionpublic final Flux<List<T>> buffer(int maxSize, int skip)
List buffers that will be emitted
by the returned Flux each time the given max size is reached or once this
Flux completes. Buffers can be created with gaps, as a new buffer will be created
every time skip values have been emitted by the source.
When maxSize < skip : dropping buffers
When maxSize > skip : overlapping buffers
When maxSize == skip : exact buffers

public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, int skip, Supplier<C> bufferSupplier)
Collection buffers that
will be emitted by the returned Flux each time the given max size is reached
or once this Flux completes. Buffers can be created with gaps, as a new buffer will
be created every time skip values have been emitted by the source
When maxSize < skip : dropping buffers
When maxSize > skip : overlapping buffers
When maxSize == skip : exact buffers

C - the Collection buffer typeskip - the number of items to count before creating a new buffermaxSize - the max collected sizebufferSupplier - a Supplier of the concrete Collection to use for each bufferFlux of possibly overlapped or gapped
Collectionpublic final <C extends Collection<? super T>> Flux<C> buffer(Publisher<?> other, Supplier<C> bufferSupplier)
Collection buffers, as
delimited by the signals of a companion Publisher this operator will
subscribe to.

C - the Collection buffer typeother - the companion Publisher whose signals trigger new buffersbufferSupplier - a Supplier of the concrete Collection to use for each bufferFlux of Collection delimited by signals from a Publisherpublic final Flux<List<T>> buffer(Duration timespan, Duration timeshift)
List buffers created at a given
timeshift period. Each buffer will last until the timespan has elapsed,
thus emitting the bucket in the resulting Flux.
When timespan < timeshift : dropping buffers
When timespan > timeshift : overlapping buffers
When timespan == timeshift : exact buffers

public final Flux<List<T>> buffer(Duration timespan, Duration timeshift, Scheduler timer)
List buffers created at a given
timeshift period, as measured on the provided Scheduler. Each
buffer will last until the timespan has elapsed (also measured on the scheduler),
thus emitting the bucket in the resulting Flux.
When timespan < timeshift : dropping buffers
When timespan > timeshift : overlapping buffers
When timespan == timeshift : exact buffers

timespan - the duration from buffer creation until a buffer is closed and emittedtimeshift - the interval at which to create a new buffertimer - a time-capable Scheduler instance to run onFlux of List delimited by the given period timeshift and sized by timespanpublic final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration timespan, Supplier<C> bufferSupplier)
Collection buffers that
will be emitted by the returned Flux each time the buffer reaches a maximum
size OR the timespan Duration elapses.

C - the Collection buffer typemaxSize - the max collected sizetimespan - the timeout enforcing the release of a partial bufferbufferSupplier - a Supplier of the concrete Collection to use for each bufferFlux of Collection delimited by given size or a given period timeoutpublic final Flux<List<T>> bufferTimeout(int maxSize, Duration timespan, Scheduler timer)
public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration timespan, Scheduler timer, Supplier<C> bufferSupplier)
Collection buffers that
will be emitted by the returned Flux each time the buffer reaches a maximum
size OR the timespan Duration elapses, as measured on the provided Scheduler.

C - the Collection buffer typemaxSize - the max collected sizetimespan - the timeout enforcing the release of a partial buffertimer - a time-capable Scheduler instance to run onbufferSupplier - a Supplier of the concrete Collection to use for each bufferFlux of Collection delimited by given size or a given period timeoutpublic final Flux<List<T>> bufferUntil(Predicate<? super T> predicate)
List buffers that will be emitted by
the resulting Flux each time the given predicate returns true. Note that
the element that triggers the predicate to return true (and thus closes a buffer)
is included as last element in the emitted buffer.
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
public final Flux<List<T>> bufferUntil(Predicate<? super T> predicate, boolean cutBefore)
List buffers that will be emitted by
the resulting Flux each time the given predicate returns true. Note that
the buffer into which the element that triggers the predicate to return true
(and thus closes a buffer) is included depends on the cutBefore parameter:
push it to true to include the boundary element in the newly opened buffer, false to
include it in the closed buffer (as in bufferUntil(Predicate)).
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
public final Flux<List<T>> bufferWhile(Predicate<? super T> predicate)
List buffers that will be emitted by
the resulting Flux. Each buffer continues aggregating values while the
given predicate returns true, and a new buffer is created as soon as the
predicate returns false... Note that the element that triggers the predicate
to return false (and thus closes a buffer) is NOT included in any emitted buffer.
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
public final <U,V> Flux<List<T>> bufferWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector)
List buffers started each time an opening
companion Publisher emits. Each buffer will last until the corresponding
closing companion Publisher emits, thus releasing the buffer to the resulting Flux.
When Open signal is strictly not overlapping Close signal : dropping buffers
When Open signal is strictly more frequent than Close signal : overlapping buffers
When Open signal is exactly coordinated with Close signal : exact buffers

U - the element type of the buffer-opening sequenceV - the element type of the buffer-closing sequencebucketOpening - a companion Publisher to subscribe for buffer creation signals.closeSelector - a factory that, given a buffer opening signal, returns a companion
Publisher to subscribe to for buffer closure and emission signals.Flux of List delimited by an opening Publisher and a relative
closing Publisherpublic final <U,V,C extends Collection<? super T>> Flux<C> bufferWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector, Supplier<C> bufferSupplier)
Collection buffers started each time an opening
companion Publisher emits. Each buffer will last until the corresponding
closing companion Publisher emits, thus releasing the buffer to the resulting Flux.
When Open signal is strictly not overlapping Close signal : dropping buffers
When Open signal is strictly more frequent than Close signal : overlapping buffers
When Open signal is exactly coordinated with Close signal : exact buffers

U - the element type of the buffer-opening sequenceV - the element type of the buffer-closing sequenceC - the Collection buffer typebucketOpening - a companion Publisher to subscribe for buffer creation signals.closeSelector - a factory that, given a buffer opening signal, returns a companion
Publisher to subscribe to for buffer closure and emission signals.bufferSupplier - a Supplier of the concrete Collection to use for each bufferFlux of Collection delimited by an opening Publisher and a relative
closing Publisherpublic final Flux<T> cache()
Flux into a hot source and cache last emitted signals for further Subscriber. Will
retain an unbounded volume of onNext signals. Completion and Error will also be
replayed.

Fluxpublic final Flux<T> cache(int history)
Flux into a hot source and cache last emitted signals for further Subscriber.
Will retain up to the given history size onNext signals. Completion and Error will also be
replayed.
Note that cache(0) will only cache the terminal signal without
expiration.

history - number of elements retained in cacheFluxpublic final Flux<T> cache(Duration ttl)
Flux into a hot source and cache last emitted signals for further
Subscriber. Will retain an unbounded history but apply a per-item expiry timeout
Completion and Error will also be replayed until ttl triggers in which case
the next Subscriber will start over a new subscription.

ttl - Time-to-live for each cached item and post termination.Fluxpublic final Flux<T> cache(int history, Duration ttl)
Flux into a hot source and cache last emitted signals for further
Subscriber. Will retain up to the given history size and apply a per-item expiry
timeout.
Completion and Error will also be replayed until ttl triggers in which case
the next Subscriber will start over a new subscription.

history - number of elements retained in cachettl - Time-to-live for each cached item and post termination.Fluxpublic final <E> Flux<E> cast(Class<E> clazz)
Flux produced type into a target produced type.

public final Flux<T> checkpoint()
Flux, in case of an error
upstream of the checkpoint. Tracing incurs the cost of an exception stack trace
creation.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
Flux.public final Flux<T> checkpoint(String description)
Flux by giving it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. Note that unlike checkpoint(), this doesn't create a
filled stack trace, avoiding the main cost of the operator.
However, as a trade-off the description must be unique enough for the user to find
out where this Flux was assembled. If you only want a generic description, and
still rely on the stack trace to find the assembly site, use the
checkpoint(String, boolean) variant.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
description - a unique enough description to include in the light assembly traceback.Fluxpublic final Flux<T> checkpoint(@Nullable String description, boolean forceStackTrace)
forceStackTrace option.
By setting the forceStackTrace parameter to true, activate assembly
tracing for this particular Flux and give it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. Note that unlike checkpoint(String), this will incur
the cost of an exception stack trace creation. The description could for
example be a meaningful name for the assembled flux or a wider correlation ID,
since the stack trace will always provide enough information to locate where this
Flux was assembled.
By setting forceStackTrace to false, behaves like
checkpoint(String) and is subject to the same caveat in choosing the
description.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly marker.
description - a description (must be unique enough if forceStackTrace is push
to false).forceStackTrace - false to make a light checkpoint without a stacktrace, true
to use a stack trace.Flux.public final <E> Mono<E> collect(Supplier<E> containerSupplier, BiConsumer<E,? super T> collector)
Flux into a user-defined container,
by applying a collector BiConsumer taking the container and each element.
The collected result will be emitted when this sequence completes.

E - the container typecontainerSupplier - the supplier of the container instance for each Subscribercollector - a consumer of both the container instance and the value being currently collectedMono of the collected container on completepublic final <K> Mono<Map<K,T>> collectMap(Function<? super T,? extends K> keyExtractor)
Flux into a hashed Map that is
emitted by the resulting Mono when this sequence completes.
The key is extracted from each element by applying the keyExtractor
Function. In case several elements map to the same key, the associated value
will be the most recently emitted element.

public final <K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor)
Flux into a hashed Map that is
emitted by the resulting Mono when this sequence completes.
The key is extracted from each element by applying the keyExtractor
Function, and the value is extracted by the valueExtractor Function.
In case several elements map to the same key, the associated value will be derived
from the most recently emitted element.

K - the type of the key extracted from each source elementV - the type of the value extracted from each source elementkeyExtractor - a Function to map elements to a key for the MapvalueExtractor - a Function to map elements to a value for the MapMono of a Map of key-element pairs (only including latest
element's value in case of key conflicts)public final <K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor, Supplier<Map<K,V>> mapSupplier)
Flux into a user-defined Map that is
emitted by the resulting Mono when this sequence completes.
The key is extracted from each element by applying the keyExtractor
Function, and the value is extracted by the valueExtractor Function.
In case several elements map to the same key, the associated value will be derived
from the most recently emitted element.

K - the type of the key extracted from each source elementV - the type of the value extracted from each source elementkeyExtractor - a Function to map elements to a key for the MapvalueExtractor - a Function to map elements to a value for the MapmapSupplier - a Map factory called for each SubscriberMono of a Map of key-value pairs (only including latest
element's value in case of key conflicts)public final <K> Mono<Map<K,Collection<T>>> collectMultimap(Function<? super T,? extends K> keyExtractor)
public final <K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor)
Flux into a multimap that is
emitted by the resulting Mono when this sequence completes.
The key is extracted from each element by applying the keyExtractor
Function, and every element mapping to the same key is converted by the
valueExtractor Function to a value stored in the List associated to
said key.

K - the type of the key extracted from each source elementV - the type of the value extracted from each source elementkeyExtractor - a Function to map elements to a key for the MapvalueExtractor - a Function to map elements to a value for the MapMono of a Map of key-List(values) pairspublic final <K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor, Supplier<Map<K,Collection<V>>> mapSupplier)
Flux into a user-defined multimap that is
emitted by the resulting Mono when this sequence completes.
The key is extracted from each element by applying the keyExtractor
Function, and every element mapping to the same key is converted by the
valueExtractor Function to a value stored in the Collection associated to
said key.

K - the type of the key extracted from each source elementV - the type of the value extracted from each source elementkeyExtractor - a Function to map elements to a key for the MapvalueExtractor - a Function to map elements to a value for the MapmapSupplier - a multimap (Map of Collection) factory called
for each SubscriberMono of a Map of key-Collection(values) pairspublic final Mono<List<T>> collectSortedList(@Nullable Comparator<? super T> comparator)
Flux until this sequence completes,
and then sort them using a Comparator into a List that is emitted
by the resulting Mono.

comparator - a Comparator to sort the items of this sequencesMono of a sorted List of all values from this Fluxpublic final <V> Flux<V> compose(Function<? super Flux<T>,? extends Publisher<V>> transformer)
Flux in order to generate a target Flux type.
A transformation will occur for each Subscriber. For instance:
flux.compose(Mono::from).subscribe()
V - the item type in the returned Publishertransformer - the Function to lazily map this Flux into a target Publisher
instance for each new subscriberFluxtransform() for immmediate transformation of {@link Flux},
as() for a loose conversion to an arbitrary typepublic final <V> Flux<V> concatMap(Function<? super T,? extends Publisher<? extends V>> mapper)
Flux asynchronously into Publishers,
then flatten these inner publishers into a single Flux, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap and flatMapSequential:
Errors will immediately short circuit current concat backlog.

V - the produced concatenated typemapper - the function to transform this sequence of T into concatenated sequences of VFluxpublic final <V> Flux<V> concatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int prefetch)
Flux asynchronously into Publishers,
then flatten these inner publishers into a single Flux, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap and flatMapSequential:
Errors will immediately short circuit current concat backlog. The prefetch argument
allows to give an arbitrary prefetch size to the inner Publisher.

V - the produced concatenated typemapper - the function to transform this sequence of T into concatenated sequences of Vprefetch - the inner source produced demandFluxpublic final <V> Flux<V> concatMapDelayError(Function<? super T,Publisher<? extends V>> mapper)
Flux asynchronously into Publishers,
then flatten these inner publishers into a single Flux, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap and flatMapSequential:
Errors in the individual publishers will be delayed after the current concat backlog, usually stopping the sequence at the source that triggered the error.

V - the produced concatenated typemapper - the function to transform this sequence of T into concatenated sequences of VFluxpublic final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, int prefetch)
Flux asynchronously into Publishers,
then flatten these inner publishers into a single Flux, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap and flatMapSequential:
Errors in the individual publishers will be delayed after the current concat backlog,
usually stopping the sequence at the source that triggered the error.
The prefetch argument allows to give an arbitrary prefetch size to the inner Publisher.

V - the produced concatenated typemapper - the function to transform this sequence of T into concatenated sequences of Vprefetch - the inner source produced demandFluxpublic final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, boolean delayUntilEnd, int prefetch)
Flux asynchronously into Publishers,
then flatten these inner publishers into a single Flux, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap and flatMapSequential:
Errors in the individual publishers will be delayed after the current concat
backlog if delayUntilEnd is false or after all sources if delayUntilEnd is true.
The prefetch argument allows to give an arbitrary prefetch size to the inner Publisher.

V - the produced concatenated typemapper - the function to transform this sequence of T into concatenated sequences of VdelayUntilEnd - delay error until all sources have been consumed instead of
after the current sourceprefetch - the inner source produced demandFluxpublic final <R> Flux<R> concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
Flux into Iterable, then flatten the elements from those by
concatenating them into a single Flux.
Note that unlike flatMap(Function) and concatMap(Function), with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable and concatMapIterable are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
public final <R> Flux<R> concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper, int prefetch)
Flux into Iterable, then flatten the emissions from those by
concatenating them into a single Flux. The prefetch argument allows to give an arbitrary prefetch size to the merged Iterable.
Note that unlike flatMap(Function) and concatMap(Function), with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable and concatMapIterable are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
R - the merged output sequence typemapper - the Function to transform input sequence into N Iterableprefetch - the maximum in-flight elements from each inner Iterable sequenceFluxpublic final Mono<Long> count()
Flux.
The count will be emitted when onComplete is observed.

public final Flux<T> defaultIfEmpty(T defaultV)
defaultV - the alternate value if this sequence is emptyFluxpublic final Flux<T> delayElements(Duration delay)
Flux elements (Subscriber.onNext(T) signals)
by a given Duration. Signals are delayed and continue on the
parallel default Scheduler, but empty sequences or
immediate error signals are not delayed.

delay - duration by which to delay each Subscriber.onNext(T) signalFluxdelaySubscription to introduce a delay at the beginning of the sequence onlypublic final Flux<T> delayElements(Duration delay, Scheduler timer)
Flux elements (Subscriber.onNext(T) signals)
by a given Duration. Signals are delayed and continue on an user-specified
Scheduler, but empty sequences or immediate error signals are not delayed.

delay - period to delay each Subscriber.onNext(T) signaltimer - a time-capable Scheduler instance to delay each signal onFluxpublic final Flux<T> delaySequence(Duration delay)
Flux forward in time by a given Duration.
Unlike with delayElements(Duration), elements are shifted forward in time
as they are emitted, always resulting in the delay between two elements being
the same as in the source (only the first element is visibly delayed from the
previous event, that is the subscription).
Signals are delayed and continue on the parallel
Scheduler, but empty sequences or immediate error signals are not delayed.
With this operator, a source emitting at 10Hz with a delaySequence Duration
of 1s will still emit at 10Hz, with an initial "hiccup" of 1s.
On the other hand, delayElements(Duration) would end up emitting
at 1Hz.
This is closer to delaySubscription(Duration), except the source
is subscribed to immediately.
public final Flux<T> delaySequence(Duration delay, Scheduler timer)
Flux forward in time by a given Duration.
Unlike with delayElements(Duration, Scheduler), elements are shifted forward in time
as they are emitted, always resulting in the delay between two elements being
the same as in the source (only the first element is visibly delayed from the
previous event, that is the subscription).
Signals are delayed and continue on an user-specified Scheduler, but empty
sequences or immediate error signals are not delayed.
With this operator, a source emitting at 10Hz with a delaySequence Duration
of 1s will still emit at 10Hz, with an initial "hiccup" of 1s.
On the other hand, delayElements(Duration, Scheduler) would end up emitting
at 1Hz.
This is closer to delaySubscription(Duration, Scheduler), except the source
is subscribed to immediately.
public final Flux<T> delayUntil(Function<? super T,? extends Publisher<?>> triggerProvider)
Flux and generate a Publisher from each of this
Flux elements, each acting as a trigger for relaying said element.
That is to say, the resulting Flux delays each of its emission until the
associated trigger Publisher terminates.
In case of an error either in the source or in a trigger, that error is propagated
immediately downstream.
Note that unlike with the Mono variant there is
no fusion of subsequent calls.

public final Flux<T> delaySubscription(Duration delay)
subscription to this Flux source until the given
period elapses. The delay is introduced through the parallel default Scheduler.

public final Flux<T> delaySubscription(Duration delay, Scheduler timer)
subscription to this Flux source until the given
period elapses, as measured on the user-provided Scheduler.

public final <U> Flux<T> delaySubscription(Publisher<U> subscriptionDelay)
U - the other source typesubscriptionDelay - a companion Publisher whose onNext/onComplete signal will trigger the subscriptionFluxpublic final <X> Flux<X> dematerialize()
Flux emits onNext, onError or onComplete Signal
instances, transforming these materialized signals into
real signals on the Subscriber.
The error Signal will trigger onError and complete Signal will trigger
onComplete.

X - the dematerialized typeFluxmaterialize()public final Flux<T> distinct()
Subscriber, track elements from this Flux that have been
seen and filter out duplicates.
The values themselves are recorded into a HashSet for distinct detection.
Use distinct(Object::hashcode) if you want a more lightweight approach that
doesn't retain all the objects, but is more susceptible to falsely considering two
elements as distinct due to a hashcode collision.

Flux only emitting distinct valuespublic final <V> Flux<T> distinct(Function<? super T,? extends V> keySelector)
Subscriber, track elements from this Flux that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function.

V - the type of the key extracted from each value in this sequencekeySelector - function to compute comparison key for each elementFlux only emitting values with distinct keyspublic final <V,C extends Collection<? super V>> Flux<T> distinct(Function<? super T,? extends V> keySelector, Supplier<C> distinctCollectionSupplier)
Subscriber, track elements from this Flux that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function and by the add method
of the Collection supplied (typically a Set).

V - the type of the key extracted from each value in this sequenceC - the type of Collection used for distinct checking of keyskeySelector - function to compute comparison key for each elementdistinctCollectionSupplier - supplier of the Collection used for distinct
check through add of the key.Flux only emitting values with distinct keyspublic final <V,C> Flux<T> distinct(Function<? super T,? extends V> keySelector, Supplier<C> distinctStoreSupplier, BiPredicate<C,V> distinctPredicate, Consumer<C> cleanup)
Subscriber, track elements from this Flux that have been
seen and filter out duplicates, as compared by applying a BiPredicate on
an arbitrary user-supplied <C> store and a key extracted through the user
provided Function. The BiPredicate should typically add the key to the
arbitrary store for further comparison. A cleanup callback is also invoked on the
store upon termination of the sequence.

V - the type of the key extracted from each value in this sequenceC - the type of store backing the BiPredicatekeySelector - function to compute comparison key for each elementdistinctStoreSupplier - supplier of the arbitrary store object used in distinct
checks along the extracted key.distinctPredicate - the BiPredicate to apply to the arbitrary store +
extracted key to perform a distinct check. Since nothing is assumed of the store,
this predicate should also add the key to the store as necessary.cleanup - the cleanup callback to invoke on the store upon termination.Flux only emitting values with distinct keyspublic final Flux<T> distinctUntilChanged()
The values themselves are recorded into a HashSet for distinct detection.
Use distinctUntilChanged(Object::hashcode) if you want a more lightweight approach that
doesn't retain all the objects, but is more susceptible to falsely considering two
elements as distinct due to a hashcode collision.
Flux with only one occurrence in a row of each element
(yet elements can repeat in the overall sequence)public final <V> Flux<T> distinctUntilChanged(Function<? super T,? extends V> keySelector)
Function
using equality.

V - the type of the key extracted from each value in this sequencekeySelector - function to compute comparison key for each elementFlux with only one occurrence in a row of each element of
the same key (yet element keys can repeat in the overall sequence)public final <V> Flux<T> distinctUntilChanged(Function<? super T,? extends V> keySelector, BiPredicate<? super V,? super V> keyComparator)
Function and then comparing keys with the supplied BiPredicate.

V - the type of the key extracted from each value in this sequencekeySelector - function to compute comparison key for each elementkeyComparator - predicate used to compare keys.Flux with only one occurrence in a row of each element
of the same key for which the predicate returns true (yet element keys can repeat
in the overall sequence)public final Flux<T> doAfterTerminate(Runnable afterTerminate)
Flux terminates, either by completing downstream successfully or with an error.
afterTerminate - the callback to call after Subscriber.onComplete() or Subscriber.onError(java.lang.Throwable)Fluxpublic final Flux<T> doOnCancel(Runnable onCancel)
Flux is cancelled.
onCancel - the callback to call on Subscription.cancel()Fluxpublic final Flux<T> doOnComplete(Runnable onComplete)
Flux completes successfully.
onComplete - the callback to call on Subscriber.onComplete()Fluxpublic final Flux<T> doOnEach(Consumer<? super Signal<T>> signalConsumer)
Flux emits an item, fails with an error
or completes successfully. All these events are represented as a Signal
that is passed to the side-effect callback. Note that this is an advanced operator,
typically used for monitoring of a Flux. These Signal have a Context
associated to them.signalConsumer - the mandatory callback to call on
Subscriber.onNext(Object), Subscriber.onError(Throwable) and
Subscriber.onComplete()FluxdoOnNext(Consumer),
doOnError(Consumer),
doOnComplete(Runnable),
materialize(),
Signalpublic final Flux<T> doOnError(Consumer<? super Throwable> onError)
Flux completes with an error.
onError - the callback to call on Subscriber.onError(java.lang.Throwable)Fluxpublic final <E extends Throwable> Flux<T> doOnError(Class<E> exceptionType, Consumer<? super E> onError)
Flux completes with an error matching the given exception type.

E - type of the error to handleexceptionType - the type of exceptions to handleonError - the error handler for each errorFluxpublic final Flux<T> doOnError(Predicate<? super Throwable> predicate, Consumer<? super Throwable> onError)
Flux completes with an error matching the given exception.

predicate - the matcher for exceptions to handleonError - the error handler for each errorFluxpublic final Flux<T> doOnNext(Consumer<? super T> onNext)
Flux emits an item.
onNext - the callback to call on Subscriber.onNext(T)Fluxpublic final Flux<T> doOnRequest(LongConsumer consumer)
LongConsumer when this Flux
receives any request.
Note that non fatal error raised in the callback will not be propagated and
will simply trigger Operators.onOperatorError(Throwable, Context).

consumer - the consumer to invoke on each requestFluxpublic final Flux<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe)
Flux is subscribed.
This method is not intended for capturing the subscription and calling its methods,
but for side effects like monitoring. For instance, the correct way to cancel a subscription is
to call Disposable.dispose() on the Disposable returned by subscribe().
onSubscribe - the callback to call on Subscriber.onSubscribe(org.reactivestreams.Subscription)Fluxpublic final Flux<T> doOnTerminate(Runnable onTerminate)
Flux terminates, either by
completing successfully or with an error.
onTerminate - the callback to call on Subscriber.onComplete() or Subscriber.onError(java.lang.Throwable)Fluxpublic final Flux<T> doFinally(Consumer<SignalType> onFinally)
Flux terminates for any reason,
including cancellation. The terminating event (SignalType.ON_COMPLETE,
SignalType.ON_ERROR and SignalType.CANCEL) is passed to the consumer,
which is executed after the signal has been passed downstream.
Note that the fact that the signal is propagated downstream before the callback is
executed means that several doFinally in a row will be executed in
reverse order. If you want to assert the execution of the callback
please keep in mind that the Flux will complete before it is executed, so its
effect might not be visible immediately after eg. a blockLast().
onFinally - the callback to execute after a terminal signal (complete, error
or cancel)Fluxpublic final Flux<Tuple2<Long,T>> elapsed()
Flux into Tuple2<Long, T>
of timemillis and source data. The timemillis corresponds to the elapsed time
between each signal as measured by the parallel scheduler.
First duration is measured between the subscription and the first element.

Flux that emits a tuple of time elapsed in milliseconds and matching datapublic final Flux<Tuple2<Long,T>> elapsed(Scheduler scheduler)
Flux into Tuple2<Long, T>
of timemillis and source data. The timemillis corresponds to the elapsed time
between each signal as measured by the provided Scheduler.
First duration is measured between the subscription and the first element.

scheduler - a Scheduler instance to read time fromFlux that emits tuples of time elapsed in milliseconds and matching datapublic final Mono<T> elementAt(int index)
IndexOutOfBoundsException
if the sequence is shorter.

index - zero-based index of the only item to emitMono of the item at the specified zero-based indexpublic final Mono<T> elementAt(int index, T defaultValue)

index - zero-based index of the only item to emitdefaultValue - a default value to emit if the sequence is shorterMono of the item at the specified zero-based index or a default valuepublic final Flux<T> expandDeep(Function<? super T,? extends Publisher<? extends T>> expander, int capacityHint)
That is: emit one value from this Flux, expand it and emit the first value
at this first level of recursion, and so on... When no more recursion is possible,
backtrack to the previous level and re-apply the strategy.
For example, given the hierarchical structure
A
- AA
- aa1
B
- BB
- bb1
Expands Flux.just(A, B) into
A AA aa1 B BB bb1
public final Flux<T> expandDeep(Function<? super T,? extends Publisher<? extends T>> expander)
That is: emit one value from this Flux, expand it and emit the first value
at this first level of recursion, and so on... When no more recursion is possible,
backtrack to the previous level and re-apply the strategy.
For example, given the hierarchical structure
A
- AA
- aa1
B
- BB
- bb1
Expands Flux.just(A, B) into
A AA aa1 B BB bb1
public final Flux<T> expand(Function<? super T,? extends Publisher<? extends T>> expander, int capacityHint)
That is: emit the values from this Flux first, then expand each at a first level of
recursion and emit all of the resulting values, then expand all of these at a second
level and so on..
For example, given the hierarchical structure
A
- AA
- aa1
B
- BB
- bb1
Expands Flux.just(A, B) into
A B AA BB aa1 bb1
public final Flux<T> expand(Function<? super T,? extends Publisher<? extends T>> expander)
That is: emit the values from this Flux first, then expand each at a first level of
recursion and emit all of the resulting values, then expand all of these at a second
level and so on..
For example, given the hierarchical structure
A
- AA
- aa1
B
- BB
- bb1
Expands Flux.just(A, B) into
A B AA BB aa1 bb1
public final Flux<T> filter(Predicate<? super T> p)
Predicate. If the predicate test succeeds, the value is
emitted. If the predicate test fails, the value is ignored and a request of 1 is made upstream.

public final Flux<T> filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate)
Flux asynchronously using a generated
Publisher<Boolean> test. A value is replayed if the first item emitted
by its corresponding test is true. It is dropped if its test is either
empty or its first emitted value is false.
Note that only the first value of the test publisher is considered, and unless it
is a Mono, test will be cancelled after receiving that first value. Test
publishers are generated and subscribed to in sequence.
public final Flux<T> filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate, int bufferSize)
Flux asynchronously using a generated
Publisher<Boolean> test. A value is replayed if the first item emitted
by its corresponding test is true. It is dropped if its test is either
empty or its first emitted value is false.
Note that only the first value of the test publisher is considered, and unless it
is a Mono, test will be cancelled after receiving that first value. Test
publishers are generated and subscribed to in sequence.
asyncPredicate - the function generating a Publisher of Boolean
for each value, to filter the Flux withbufferSize - the maximum expected number of values to hold pending a result of
their respective asynchronous predicates, rounded to the next power of two. This is
capped depending on the size of the heap and the JVM limits, so be careful with
large values (although eg. 65536 should still be fine). Also serves as
the initial request size for the source.Fluxpublic final <R> Flux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
Flux asynchronously into Publishers,
then flatten these inner publishers into a single Flux through merging,
which allow them to interleave.
There are three dimensions to this operator that can be compared with
flatMapSequential and concatMap:
public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency)
Flux asynchronously into Publishers,
then flatten these inner publishers into a single Flux through merging,
which allow them to interleave.
There are three dimensions to this operator that can be compared with
flatMapSequential and concatMap:
Publisher can be
subscribed to and merged in parallel.

public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)
Flux asynchronously into Publishers,
then flatten these inner publishers into a single Flux through merging,
which allow them to interleave.
There are three dimensions to this operator that can be compared with
flatMapSequential and concatMap:
Publisher can be
subscribed to and merged in parallel. The prefetch argument allows to give an
arbitrary prefetch size to the merged Publisher.

V - the merged output sequence typemapper - the Function to transform input sequence into N sequences Publisherconcurrency - the maximum number of in-flight inner sequencesprefetch - the maximum in-flight elements from each inner Publisher sequenceFluxpublic final <V> Flux<V> flatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)
Flux asynchronously into Publishers,
then flatten these inner publishers into a single Flux through merging,
which allow them to interleave.
There are three dimensions to this operator that can be compared with
flatMapSequential and concatMap:
Publisher can be
subscribed to and merged in parallel. The prefetch argument allows to give an
arbitrary prefetch size to the merged Publisher. This variant will delay
any error until after the rest of the flatMap backlog has been processed.

V - the merged output sequence typemapper - the Function to transform input sequence into N sequences Publisherconcurrency - the maximum number of in-flight inner sequencesprefetch - the maximum in-flight elements from each inner Publisher sequenceFluxpublic final <R> Flux<R> flatMap(@Nullable Function<? super T,? extends Publisher<? extends R>> mapperOnNext, @Nullable Function<? super Throwable,? extends Publisher<? extends R>> mapperOnError, @Nullable Supplier<? extends Publisher<? extends R>> mapperOnComplete)
Flux asynchronously into Publishers,
then flatten these inner publishers into a single Flux through merging,
which allow them to interleave. Note that at least one of the signal mappers must
be provided, and all provided mappers must produce a publisher.
There are three dimensions to this operator that can be compared with
flatMapSequential and concatMap:
OnError will be transformed into completion signal after its mapping callback has been applied.
R - the output Publisher type targetmapperOnNext - the Function to call on next data and returning a sequence to merge.
Use null to ignore (provided at least one other mapper is specified).mapperOnError - the Function to call on error signal and returning a sequence to merge.
Use null to ignore (provided at least one other mapper is specified).mapperOnComplete - the Function to call on complete signal and returning a sequence to merge.
Use null to ignore (provided at least one other mapper is specified).Fluxpublic final <R> Flux<R> flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
Flux into Iterable, then flatten the elements from those by
merging them into a single Flux.
Note that unlike flatMap(Function) and concatMap(Function), with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable and concatMapIterable are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
public final <R> Flux<R> flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper, int prefetch)
Flux into Iterable, then flatten the emissions from those by
merging them into a single Flux. The prefetch argument allows to give an
arbitrary prefetch size to the merged Iterable.
Note that unlike flatMap(Function) and concatMap(Function), with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable and concatMapIterable are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
R - the merged output sequence typemapper - the Function to transform input sequence into N Iterableprefetch - the maximum in-flight elements from each inner Iterable sequenceFluxpublic final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper)
Flux asynchronously into Publishers,
then flatten these inner publishers into a single Flux, but merge them in
the order of their source element.
There are three dimensions to this operator that can be compared with
flatMap and concatMap:
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher. However, if such an early inner takes more time to complete than
subsequent faster inners, the data from these faster inners will be queued until
the earlier inner completes, so as to maintain source ordering.

public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency)
Flux asynchronously into Publishers,
then flatten these inner publishers into a single Flux, but merge them in
the order of their source element.
There are three dimensions to this operator that can be compared with
flatMap and concatMap:
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher. However, if such an early inner takes more time to complete than
subsequent faster inners, the data from these faster inners will be queued until
the earlier inner completes, so as to maintain source ordering.
The concurrency argument allows to control how many merged Publisher can happen in parallel.

public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency, int prefetch)
Flux asynchronously into Publishers,
then flatten these inner publishers into a single Flux, but merge them in
the order of their source element.
There are three dimensions to this operator that can be compared with
flatMap and concatMap:
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher. However, if such an early inner takes more time to complete than
subsequent faster inners, the data from these faster inners will be queued until
the earlier inner completes, so as to maintain source ordering.
The concurrency argument allows to control how many merged Publisher
can happen in parallel. The prefetch argument allows to give an arbitrary prefetch
size to the merged Publisher.

R - the merged output sequence typemapper - the Function to transform input sequence into N sequences PublishermaxConcurrency - the maximum number of in-flight inner sequencesprefetch - the maximum in-flight elements from each inner Publisher sequenceFlux, subscribing early but keeping the original orderingpublic final <R> Flux<R> flatMapSequentialDelayError(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency, int prefetch)
Flux asynchronously into Publishers,
then flatten these inner publishers into a single Flux, but merge them in
the order of their source element.
There are three dimensions to this operator that can be compared with
flatMap and concatMap:
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher. However, if such an early inner takes more time to complete than
subsequent faster inners, the data from these faster inners will be queued until
the earlier inner completes, so as to maintain source ordering.
The concurrency argument allows to control how many merged Publisher
can happen in parallel. The prefetch argument allows to give an arbitrary prefetch
size to the merged Publisher. This variant will delay any error until after the
rest of the flatMap backlog has been processed.

R - the merged output sequence typemapper - the Function to transform input sequence into N sequences PublishermaxConcurrency - the maximum number of in-flight inner sequencesprefetch - the maximum in-flight elements from each inner Publisher sequenceFlux, subscribing early but keeping the original orderingpublic int getPrefetch()
FluxFlux, -1 if unspecifiedpublic final <K> Flux<GroupedFlux<K,T>> groupBy(Function<? super T,? extends K> keyMapper)
Flux (or groups) for each
unique key, as produced by the provided keyMapper Function. Note that
groupBy works best with a low cardinality of groups, so chose your keyMapper
function accordingly.
The groups need to be drained and consumed downstream for groupBy to work correctly.
Notably when the criteria produces a large amount of groups, it can lead to hanging
if the groups are not suitably consumed downstream (eg. due to a flatMap
with a maxConcurrency parameter that is set too low).
K - the key type extracted from each value of this sequencekeyMapper - the key mapping Function that evaluates an incoming data and returns a key.Flux of GroupedFlux grouped sequencespublic final <K> Flux<GroupedFlux<K,T>> groupBy(Function<? super T,? extends K> keyMapper, int prefetch)
Flux (or groups) for each
unique key, as produced by the provided keyMapper Function. Note that
groupBy works best with a low cardinality of groups, so chose your keyMapper
function accordingly.
The groups need to be drained and consumed downstream for groupBy to work correctly.
Notably when the criteria produces a large amount of groups, it can lead to hanging
if the groups are not suitably consumed downstream (eg. due to a flatMap
with a maxConcurrency parameter that is set too low).
K - the key type extracted from each value of this sequencekeyMapper - the key mapping Function that evaluates an incoming data and returns a key.prefetch - the number of values to prefetch from the sourceFlux of GroupedFlux grouped sequencespublic final <K,V> Flux<GroupedFlux<K,V>> groupBy(Function<? super T,? extends K> keyMapper, Function<? super T,? extends V> valueMapper)
Flux (or groups) for each
unique key, as produced by the provided keyMapper Function. Source elements
are also mapped to a different value using the valueMapper. Note that
groupBy works best with a low cardinality of groups, so chose your keyMapper
function accordingly.
The groups need to be drained and consumed downstream for groupBy to work correctly.
Notably when the criteria produces a large amount of groups, it can lead to hanging
if the groups are not suitably consumed downstream (eg. due to a flatMap
with a maxConcurrency parameter that is set too low).
K - the key type extracted from each value of this sequenceV - the value type extracted from each value of this sequencekeyMapper - the key mapping function that evaluates an incoming data and returns a key.valueMapper - the value mapping function that evaluates which data to extract for re-routing.Flux of GroupedFlux grouped sequencespublic final <K,V> Flux<GroupedFlux<K,V>> groupBy(Function<? super T,? extends K> keyMapper, Function<? super T,? extends V> valueMapper, int prefetch)
Flux (or groups) for each
unique key, as produced by the provided keyMapper Function. Source elements
are also mapped to a different value using the valueMapper. Note that
groupBy works best with a low cardinality of groups, so chose your keyMapper
function accordingly.
The groups need to be drained and consumed downstream for groupBy to work correctly.
Notably when the criteria produces a large amount of groups, it can lead to hanging
if the groups are not suitably consumed downstream (eg. due to a flatMap
with a maxConcurrency parameter that is set too low).
K - the key type extracted from each value of this sequenceV - the value type extracted from each value of this sequencekeyMapper - the key mapping function that evaluates an incoming data and returns a key.valueMapper - the value mapping function that evaluates which data to extract for re-routing.prefetch - the number of values to prefetch from the sourceFlux of GroupedFlux grouped sequencespublic final <TRight,TLeftEnd,TRightEnd,R> Flux<R> groupJoin(Publisher<? extends TRight> other, Function<? super T,? extends Publisher<TLeftEnd>> leftEnd, Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd, BiFunction<? super T,? super Flux<TRight>,? extends R> resultSelector)
Flux and a Flux emitting the value from the other
Publisher to a BiFunction.
There are no guarantees in what order the items get combined when multiple items from one or both source Publishers overlap.
Unlike join(org.reactivestreams.Publisher<? extends TRight>, java.util.function.Function<? super T, ? extends org.reactivestreams.Publisher<TLeftEnd>>, java.util.function.Function<? super TRight, ? extends org.reactivestreams.Publisher<TRightEnd>>, java.util.function.BiFunction<? super T, ? super TRight, ? extends R>), items from the second Publisher will be provided
as a Flux to the resultSelector.

TRight - the type of the elements from the right PublisherTLeftEnd - the type for this Flux window signalsTRightEnd - the type for the right Publisher window signalsR - the combined result typeother - the other Publisher to correlate items withleftEnd - a function that returns a Publisher whose emissions indicate the
time window for the source value to be consideredrightEnd - a function that returns a Publisher whose emissions indicate the
time window for the right Publisher value to be consideredresultSelector - a function that takes an item emitted by this Flux and
a Flux representation of the overlapping item from the other Publisher
and returns the value to be emitted by the resulting FluxFluxjoin(Publisher, Function, Function, BiFunction)public final <R> Flux<R> handle(BiConsumer<? super T,SynchronousSink<R>> handler)
Flux by calling a biconsumer with the
output sink for each onNext. At most one SynchronousSink.next(Object)
call must be performed and/or 0 or 1 SynchronousSink.error(Throwable) or
SynchronousSink.complete().R - the transformed typehandler - the handling BiConsumerFluxpublic final Mono<Boolean> hasElement(T value)
Flux sequence is
equal to the provided value.
The implementation uses short-circuit logic and completes with true if an element matches the value.

value - constant compared to incoming signalsFlux with true if any element is equal to a given value and false
otherwisepublic final Mono<Boolean> hasElements()
Flux sequence has at least one element.
The implementation uses short-circuit logic and completes with true on onNext.

Mono with true if any value is emitted and false
otherwisepublic final Flux<T> hide()
Flux instance.
The main purpose of this operator is to prevent certain identity-based optimizations from happening, mostly for diagnostic purposes.
Flux preventing Publisher / Subscription based Reactor optimizationspublic final Flux<Tuple2<Long,T>> index()
Flux
of Tuple2<(index, value)>.Flux with each source value combined with its 0-based index.public final <I> Flux<I> index(BiFunction<? super Long,? super T,? extends I> indexMapper)
I using the provided BiFunction,
returning a Flux<I>.
Typical usage would be to produce a Tuple2 similar to index(), but
1-based instead of 0-based:
index((i, v) -> Tuples.of(i+1, v))
indexMapper - the BiFunction to use to combine elements and their index.Flux with each source value combined with its computed index.public final Mono<T> ignoreElements()
public final <TRight,TLeftEnd,TRightEnd,R> Flux<R> join(Publisher<? extends TRight> other, Function<? super T,? extends Publisher<TLeftEnd>> leftEnd, Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd, BiFunction<? super T,? super TRight,? extends R> resultSelector)
Flux and the other Publisher to a BiFunction.
There are no guarantees in what order the items get combined when multiple items from one or both source Publishers overlap.

TRight - the type of the elements from the right PublisherTLeftEnd - the type for this Flux window signalsTRightEnd - the type for the right Publisher window signalsR - the combined result typeother - the other Publisher to correlate items withleftEnd - a function that returns a Publisher whose emissions indicate the
time window for the source value to be consideredrightEnd - a function that returns a Publisher whose emissions indicate the
time window for the right Publisher value to be consideredresultSelector - a function that takes an item emitted by each Publisher and returns the
value to be emitted by the resulting FluxFluxgroupJoin(Publisher, Function, Function, BiFunction)public final Mono<T> last()
Mono, or emit
NoSuchElementException error if the source was empty.
For a passive version use takeLast(int)

public final Mono<T> last(T defaultValue)
Mono, or emit
the defaultValue if the source was empty.
For a passive version use takeLast(int)

public final Flux<T> limitRate(int prefetchRate)
prefetchRate when propagated upstream, effectively
rate limiting the upstream Publisher.
Typically used for scenarios where consumer(s) request a large amount of data
(eg. Long.MAX_VALUE) but the data source behaves better or can be optimized
with smaller requests (eg. database paging, etc...). All data is still processed,
unlike with limitRequest(long) which will cap the grand total request
amount.
Equivalent to flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe()
prefetchRate - the limit to apply to downstream's backpressureFlux limiting downstream's backpressurepublishOn(Scheduler, int),
limitRequest(long)public final Flux<T> limitRate(int highTide, int lowTide)
highTide first, then replenishing at the provided
lowTide, effectively rate limiting the upstream Publisher.
Typically used for scenarios where consumer(s) request a large amount of data
(eg. Long.MAX_VALUE) but the data source behaves better or can be optimized
with smaller requests (eg. database paging, etc...). All data is still processed,
unlike with limitRequest(long) which will cap the grand total request
amount.
Similar to flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe() ,
except with a customized "low tide" instead of the default 75%.
Note that the smaller the lowTide is, the higher the potential for concurrency
between request and data production. And thus the more extraneous replenishment
requests this operator could make. For example, for a global downstream
request of 14, with a highTide of 10 and a lowTide of 2, the operator would perform
7 low tide requests, whereas with the default lowTide of 8 it would only perform one.
highTide - the initial request amountlowTide - the subsequent (or replenishing) request amountFlux limiting downstream's backpressure and customizing the
replenishment request amountpublishOn(Scheduler, int),
limitRequest(long)public final Flux<T> limitRequest(long requestCap)
cap.
Backpressure signals from downstream subscribers are smaller than the cap are
propagated as is, but if they would cause the total requested amount to go over the
cap, they are reduced to the minimum value that doesn't go over.
As a result, this operator never let the upstream produce more elements than the
cap, and it can be used as a stricter form of take(long). Typically useful
for cases where a race between request and cancellation can lead the upstream to
producing a lot of extraneous data, and such a production is undesirable (e.g.
a source that would send the extraneous data over the network).
requestCap - the global backpressure limit to apply to the sum of downstream's requestsFlux that requests AT MOST cap from upstream in total.limitRate(int),
take(long)public final Flux<T> log()
Logger support.
Default will use Level.INFO and java.util.logging.
If SLF4J is available, it will be used instead.
The default log category will be "reactor.Flux.", followed by a suffix generated from the source operator, e.g. "reactor.Flux.Map".
Flux that logs signalspublic final Flux<T> log(String category)
Logger support.
Default will use Level.INFO and java.util.logging.
If SLF4J is available, it will be used instead.
category - to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will be added, e.g. "reactor.Flux.Map".Flux that logs signalspublic final Flux<T> log(@Nullable String category, Level level, SignalType... options)
options and
trace them using Logger support. Default will use Level.INFO and
java.util.logging. If SLF4J is available, it will be used instead.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
flux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
category - to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will be added, e.g. "reactor.Flux.Map".level - the Level to enforce for this tracing Flux (only FINEST, FINE,
INFO, WARNING and SEVERE are taken into account)options - a vararg SignalType option to filter log messagesFlux that logs signalspublic final Flux<T> log(@Nullable String category, Level level, boolean showOperatorLine, SignalType... options)
options and
trace them using Logger support. Default will use Level.INFO and
java.util.logging. If SLF4J is available, it will be used instead.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
flux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)

category - to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will be added, e.g. "reactor.Flux.Map".level - the Level to enforce for this tracing Flux (only FINEST, FINE,
INFO, WARNING and SEVERE are taken into account)showOperatorLine - capture the current stack to display operator class/line number.options - a vararg SignalType option to filter log messagesFlux that logs signalspublic final Flux<T> log(Logger logger)
options and
trace them using a specific user-provided Logger, at Level.INFO level.

public final Flux<T> log(Logger logger, Level level, boolean showOperatorLine, SignalType... options)
options and
trace them using a specific user-provided Logger, at the given Level.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
flux.log(myCustomLogger, Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)

logger - the Logger to use, instead of resolving one through a category.level - the Level to enforce for this tracing Flux (only FINEST, FINE,
INFO, WARNING and SEVERE are taken into account)showOperatorLine - capture the current stack to display operator class/line number (default in overload is false).options - a vararg SignalType option to filter log messagesFlux that logs signalspublic final <V> Flux<V> map(Function<? super T,? extends V> mapper)
Flux by applying a synchronous function
to each item.
public final Flux<Signal<T>> materialize()
Signal instances,
materializing these signals.
Since the error is materialized as a Signal, the propagation will be stopped and onComplete will be
emitted. Complete signal will first emit a Signal.complete() and then effectively complete the flux.
All these Signal have a Context associated to them.

Flux of materialized Signaldematerialize()public final Flux<T> mergeOrderedWith(Publisher<? extends T> other, Comparator<? super T> otherComparator)
Flux and a Publisher into a reordered merge
sequence, by picking the smallest value from each sequence as defined by a provided
Comparator. Note that subsequent calls are combined, and their comparators are
in lexicographic order as defined by Comparator.thenComparing(Comparator).
The combination step is avoided if the two Comparators are
equal (which can easily be achieved by using the
same reference, and is also always true of Comparator.naturalOrder()).
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
other - the Publisher to merge withotherComparator - the Comparator to use for mergingFluxpublic final Flux<T> mergeWith(Publisher<? extends T> other)
Flux and a Publisher into an interleaved merged
sequence. Unlike concat, inner sources are subscribed
to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public final Flux<T> name(String name)
Scannable.name()
as long as this is the first reachable Scannable.parents().name - a name for the sequencepublic final Flux<T> onBackpressureBuffer()
Flux, or park the
observed elements if not enough demand is requested downstream. Errors will be
delayed until the buffer gets consumed.

Flux that buffers with unbounded capacitypublic final Flux<T> onBackpressureBuffer(int maxSize)
Flux, or park the
observed elements if not enough demand is requested downstream. Errors will be
immediately emitted on overflow regardless of the pending buffer.

maxSize - maximum buffer backlog size before immediate errorFlux that buffers with bounded capacitypublic final Flux<T> onBackpressureBuffer(int maxSize, Consumer<? super T> onOverflow)
Flux, or park the
observed elements if not enough demand is requested downstream. Overflow error
will be delayed after the current backlog is consumed. However the
Consumer will be immediately invoked.

maxSize - maximum buffer backlog size before overflow callback is calledonOverflow - callback to invoke on overflowFlux that buffers with a bounded capacitypublic final Flux<T> onBackpressureBuffer(int maxSize, BufferOverflowStrategy bufferOverflowStrategy)
Flux, or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. Over that limit, the overflow strategy is applied (see BufferOverflowStrategy).
Note that for the ERROR strategy, the overflow
error will be delayed after the current backlog is consumed.

maxSize - maximum buffer backlog size before overflow strategy is appliedbufferOverflowStrategy - strategy to apply to overflowing elementsFlux that buffers up to a capacity then applies an
overflow strategypublic final Flux<T> onBackpressureBuffer(int maxSize, Consumer<? super T> onBufferOverflow, BufferOverflowStrategy bufferOverflowStrategy)
Flux, or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. Over that limit, the overflow strategy is applied (see BufferOverflowStrategy).
A Consumer is immediately invoked when there is an overflow, receiving the
value that was discarded because of the overflow (which can be different from the
latest element emitted by the source in case of a
DROP_LATEST strategy).
Note that for the ERROR strategy, the overflow
error will be delayed after the current backlog is consumed. The consumer is still
invoked immediately.

maxSize - maximum buffer backlog size before overflow callback is calledonBufferOverflow - callback to invoke on overflowbufferOverflowStrategy - strategy to apply to overflowing elementsFlux that buffers up to a capacity then applies an
overflow strategypublic final Flux<T> onBackpressureBuffer(Duration ttl, int maxSize, Consumer<? super T> onBufferEviction)
Flux, or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit and for a maximum Duration of ttl (as measured on the
elastic Scheduler). Over that limit, oldest
elements from the source are dropped.
Elements evicted based on the TTL are passed to a cleanup Consumer, which
is also immediately invoked when there is an overflow.

ttl - maximum Duration for which an element is kept in the backlogmaxSize - maximum buffer backlog size before overflow callback is calledonBufferEviction - callback to invoke once TTL is reached or on overflowFlux that buffers with a TTL and up to a capacity then applies an
overflow strategypublic final Flux<T> onBackpressureBuffer(Duration ttl, int maxSize, Consumer<? super T> onBufferEviction, Scheduler scheduler)
Flux, or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit and for a maximum Duration of ttl (as measured on the provided
Scheduler). Over that limit, oldest elements from the source are dropped.
Elements evicted based on the TTL are passed to a cleanup Consumer, which
is also immediately invoked when there is an overflow.

ttl - maximum Duration for which an element is kept in the backlogmaxSize - maximum buffer backlog size before overflow callback is calledonBufferEviction - callback to invoke once TTL is reached or on overflowscheduler - the scheduler on which to run the timeout checkFlux that buffers with a TTL and up to a capacity then applies an
overflow strategypublic final Flux<T> onBackpressureDrop()
Flux, or drop
the observed elements if not enough demand is requested downstream.

Flux that drops overflowing elementspublic final Flux<T> onBackpressureDrop(Consumer<? super T> onDropped)
Flux, or drop and
notify dropping Consumer with the observed elements if not enough demand
is requested downstream.

onDropped - the Consumer called when an value gets dropped due to lack of downstream requestsFlux that drops overflowing elementspublic final Flux<T> onBackpressureError()
Flux, or emit onError
fom Exceptions.failWithOverflow() if not enough demand is requested
downstream.

Flux that errors on overflowing elementspublic final Flux<T> onBackpressureLatest()
Flux, or only keep
the most recent observed item if not enough demand is requested downstream.

Flux that will only keep a reference to the last observed itempublic final Flux<T> onErrorMap(Function<? super Throwable,? extends Throwable> mapper)
Flux by synchronously applying a function to it.
public final <E extends Throwable> Flux<T> onErrorMap(Class<E> type, Function<? super E,? extends Throwable> mapper)
Flux by synchronously applying a function
to it if the error matches the given type. Otherwise let the error pass through.
public final Flux<T> onErrorMap(Predicate<? super Throwable> predicate, Function<? super Throwable,? extends Throwable> mapper)
Flux by synchronously applying a function
to it if the error matches the given predicate. Otherwise let the error pass through.

public final Flux<T> onErrorResume(Function<? super Throwable,? extends Publisher<? extends T>> fallback)
public final <E extends Throwable> Flux<T> onErrorResume(Class<E> type, Function<? super E,? extends Publisher<? extends T>> fallback)

public final Flux<T> onErrorResume(Predicate<? super Throwable> predicate, Function<? super Throwable,? extends Publisher<? extends T>> fallback)

public final Flux<T> onErrorReturn(T fallbackValue)
Flux.
fallbackValue - the value to emit if an error occursFluxpublic final <E extends Throwable> Flux<T> onErrorReturn(Class<E> type, T fallbackValue)
Flux.

E - the error typetype - the error type to matchfallbackValue - the value to emit if an error occurs that matches the typeFluxpublic final Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)
Flux.

predicate - the error predicate to matchfallbackValue - the value to emit if an error occurs that matches the predicateFluxpublic final Flux<T> onTerminateDetach()
Subscriber and the Subscription on
termination or cancellation.
This is an advanced interoperability operator that should help with odd
retention scenarios when running with non-reactor Subscriber.
Fluxpublic final Flux<T> or(Publisher<? extends T> other)
Publisher between this Flux and another publisher
to emit any signal (onNext/onError/onComplete) and replay all signals from that
Publisher, effectively behaving like the fastest of these competing sources.
other - the Publisher to race withfirst(org.reactivestreams.Publisher<? extends I>...)public final ParallelFlux<T> parallel()
Flux by dividing data on a number of 'rails' matching the
number of CPU cores, in a round-robin fashion. Note that to actually perform the
work in parallel, you should call ParallelFlux.runOn(Scheduler) afterward.

ParallelFlux instancepublic final ParallelFlux<T> parallel(int parallelism)
Flux by dividing data on a number of 'rails' matching the
provided parallelism parameter, in a round-robin fashion. Note that to
actually perform the work in parallel, you should call ParallelFlux.runOn(Scheduler)
afterward.

parallelism - the number of parallel railsParallelFlux instancepublic final ParallelFlux<T> parallel(int parallelism, int prefetch)
Flux by dividing data on a number of 'rails' matching the
provided parallelism parameter, in a round-robin fashion and using a
custom prefetch amount and queue for dealing with the source Flux's values.
Note that to actually perform the work in parallel, you should call
ParallelFlux.runOn(Scheduler) afterward.

parallelism - the number of parallel railsprefetch - the number of values to prefetch from the sourceParallelFlux instancepublic final ConnectableFlux<T> publish()
ConnectableFlux which shares this Flux sequence and
dispatches values to subscribers in a backpressure-aware manner. Prefetch will
default to Queues.SMALL_BUFFER_SIZE. This will effectively turn
any type of sequence into a hot sequence.
Backpressure will be coordinated on Subscription.request(long) and if any
Subscriber is missing demand (requested = 0), multicast will pause
pushing/pulling.

ConnectableFluxpublic final ConnectableFlux<T> publish(int prefetch)
ConnectableFlux which shares this Flux sequence and
dispatches values to subscribers in a backpressure-aware manner. This will
effectively turn any type of sequence into a hot sequence.
Backpressure will be coordinated on Subscription.request(long) and if any
Subscriber is missing demand (requested = 0), multicast will pause
pushing/pulling.

prefetch - bounded requested demandConnectableFluxpublic final <R> Flux<R> publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform)
R - the output value typetransform - the transformation functionFluxpublic final <R> Flux<R> publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform, int prefetch)
R - the output value typetransform - the transformation functionprefetch - the request sizeFluxpublic final Mono<T> publishNext()
Mono which shares this Flux sequence and dispatches the
first observed item to subscribers in a backpressure-aware manner.
This will effectively turn any type of sequence into a hot sequence when the first
Subscriber subscribes.

Monopublic final Flux<T> publishOn(Scheduler scheduler)
Scheduler
Worker.
This operator influences the threading context where the rest of the operators in
the chain below it will execute, up to a new occurrence of publishOn.
Typically used for fast publisher, slow consumer(s) scenarios.
flux.publishOn(Schedulers.single()).subscribe()
scheduler - a Scheduler providing the Scheduler.Worker where to publishFlux producing asynchronously on a given Schedulerpublic final Flux<T> publishOn(Scheduler scheduler, int prefetch)
Scheduler
Scheduler.Worker.
This operator influences the threading context where the rest of the operators in
the chain below it will execute, up to a new occurrence of publishOn.
Typically used for fast publisher, slow consumer(s) scenarios.
flux.publishOn(Schedulers.single()).subscribe()
scheduler - a Scheduler providing the Scheduler.Worker where to publishprefetch - the asynchronous boundary capacityFlux producing asynchronouslypublic final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch)
Scheduler
Scheduler.Worker.
This operator influences the threading context where the rest of the operators in
the chain below it will execute, up to a new occurrence of publishOn.
Typically used for fast publisher, slow consumer(s) scenarios.
flux.publishOn(Schedulers.single()).subscribe()
scheduler - a Scheduler providing the Scheduler.Worker where to publishdelayError - should the buffer be consumed before forwarding any errorprefetch - the asynchronous boundary capacityFlux producing asynchronouslypublic final Mono<T> reduce(BiFunction<T,T,T> aggregator)
Flux sequence into an single object of the same
type than the emitted items. Reduction is performed using a BiFunction that
takes the intermediate result of the reduction and the current value and returns
the next intermediate value of the reduction. It will ignore sequence with 0 or 1
elements.

aggregator - the reducing BiFunctionFluxpublic final <A> Mono<A> reduce(A initial, BiFunction<A,? super T,A> accumulator)
Flux sequence into an single object matching the
type of a seed value. Reduction is performed using a BiFunction that
takes the intermediate result of the reduction and the current value and returns
the next intermediate value of the reduction. First element is paired with the seed
value, initial.

A - the type of the seed and the reduced objectaccumulator - the reducing BiFunctioninitial - the seed, the initial leftmost argument to pass to the reducing BiFunctionFluxpublic final <A> Mono<A> reduceWith(Supplier<A> initial, BiFunction<A,? super T,A> accumulator)
Flux sequence into an single object matching the
type of a lazily supplied seed value. Reduction is performed using a
BiFunction that takes the intermediate result of the reduction and the
current value and returns the next intermediate value of the reduction. First
element is paired with the seed value, supplied via initial.

A - the type of the seed and the reduced objectaccumulator - the reducing BiFunctioninitial - a Supplier of the seed, called on subscription and passed to the the reducing BiFunctionFluxpublic final Flux<T> repeat()

Flux on onCompletepublic final Flux<T> repeat(BooleanSupplier predicate)

predicate - the boolean to evaluate on onComplete.Flux that repeats on onComplete while the predicate matchespublic final Flux<T> repeat(long numRepeat)

numRepeat - the number of times to re-subscribe on onCompleteFlux that repeats on onComplete, up to the specified number of repetitionspublic final Flux<T> repeat(long numRepeat, BooleanSupplier predicate)

numRepeat - the number of times to re-subscribe on completepredicate - the boolean to evaluate on onCompleteFlux that repeats on onComplete while the predicate matches,
up to the specified number of repetitionspublic final Flux<T> repeatWhen(Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
Flux when a companion sequence emits elements in
response to the flux completion signal. Any terminal signal from the companion
sequence will terminate the resulting Flux with the same signal immediately.
If the companion sequence signals when this Flux is active, the repeat
attempt is suppressed.
Note that if the companion Publisher created by the repeatFactory
emits Context as trigger objects, these Context will REPLACE the
operator's own Context. Please be careful there: replacing the
Context means that some keys you don't own could be removed, breaking libraries
that depend on them. As a result, the recommended approach is to always create such
a Context trigger by starting from the original Context (ensuring the trigger
contains all the keys from the original, unless you absolutely know you want to
remove one of these keys):
.repeatWhen(emittedEachAttempt -> emittedEachAttempt
.flatMap(e -> Mono.subscriberContext().map(ctx -> Tuples.of(e, ctx)))
.flatMap(t2 -> {
long lastEmitted = t2.getT1();
Context ctx = t2.getT2();
int rl = ctx.getOrDefault("repeatsLeft", 0);
if (rl > 0) {
// /!\ THE ctx.put HERE IS THE ESSENTIAL PART /!\
return Mono.just(ctx.put("repeatsLeft", rl - 1)
.put("emitted", lastEmitted));
} else {
return Mono.error(new IllegalStateException("repeats exhausted"));
}
})
)
repeatFactory - the Function that returns the associated Publisher
companion, given a Flux that signals each onComplete as a Long
representing the number of source elements emitted in the latest attempt.Flux that repeats on onComplete when the companion Publisher produces an
onNext signalpublic final ConnectableFlux<T> replay()
Flux into a hot source and cache last emitted signals for further Subscriber. Will
retain an unbounded amount of onNext signals. Completion and Error will also be
replayed.

ConnectableFluxpublic final ConnectableFlux<T> replay(int history)
Flux into a connectable hot source and cache last emitted
signals for further Subscriber.
Will retain up to the given history size onNext signals. Completion and Error will also be
replayed.
Note that cache(0) will only cache the terminal signal without
expiration.

history - number of events retained in history excluding complete and
errorConnectableFluxpublic final ConnectableFlux<T> replay(Duration ttl)
Flux into a connectable hot source and cache last emitted signals
for further Subscriber. Will retain each onNext up to the given per-item
expiry timeout.
Completion and Error will also be replayed until ttl triggers in which case
the next Subscriber will start over a new subscription

ttl - Per-item and post termination timeout durationConnectableFluxpublic final ConnectableFlux<T> replay(int history, Duration ttl)
Flux into a connectable hot source and cache last emitted signals
for further Subscriber. Will retain up to the given history size onNext
signals with a per-item ttl.
Completion and Error will also be replayed until ttl triggers in which case
the next Subscriber will start over a new subscription

history - number of events retained in history excluding complete and errorttl - Per-item and post termination timeout durationConnectableFluxpublic final ConnectableFlux<T> replay(Duration ttl, Scheduler timer)
Flux into a connectable hot source and cache last emitted signals
for further Subscriber. Will retain onNext signal for up to the given
Duration with a per-item ttl.
Completion and Error will also be replayed until ttl triggers in which case
the next Subscriber will start over a new subscription

ttl - Per-item and post termination timeout durationtimer - a time-capable Scheduler instance to read current time fromConnectableFluxpublic final ConnectableFlux<T> replay(int history, Duration ttl, Scheduler timer)
Flux into a connectable hot source and cache last emitted signals
for further Subscriber. Will retain up to the given history size onNext
signals with a per-item ttl.
Completion and Error will also be replayed until ttl triggers in which case
the next Subscriber will start over a new subscription

history - number of events retained in history excluding complete and errorttl - Per-item and post termination timeout durationtimer - a Scheduler instance to read current time fromConnectableFluxpublic final Flux<T> retry()
Flux sequence if it signals any error, indefinitely.

Flux that retries on onErrorpublic final Flux<T> retry(long numRetries)
Flux sequence if it signals any error, for a fixed
number of times.
Note that passing Long.MAX_VALUE is treated as infinite retry.

numRetries - the number of times to tolerate an errorFlux that retries on onError up to the specified number of retry attempts.public final Flux<T> retry(Predicate<? super Throwable> retryMatcher)
Flux sequence if it signals any error
that matches the given Predicate, otherwise push the error downstream.

retryMatcher - the predicate to evaluate if retry should occur based on a given error signalFlux that retries on onError if the predicates matches.public final Flux<T> retry(long numRetries, Predicate<? super Throwable> retryMatcher)
Flux sequence up to the specified number of retries if it signals any
error that match the given Predicate, otherwise push the error downstream.

numRetries - the number of times to tolerate an errorretryMatcher - the predicate to evaluate if retry should occur based on a given error signalFlux that retries on onError up to the specified number of retry
attempts, only if the predicate matches.public final Flux<T> retryWhen(Function<Flux<Throwable>,? extends Publisher<?>> whenFactory)
Flux when a companion sequence signals
an item in response to this Flux error signal
If the companion sequence signals when the Flux is active, the retry
attempt is suppressed and any terminal signal will terminate the Flux source with the same signal
immediately.
Note that if the companion Publisher created by the whenFactory
emits Context as trigger objects, these Context will REPLACE the
operator's own Context. Please be careful there: replacing the
Context means that some keys you don't own could be removed, breaking libraries
that depend on them. As a result, the recommended approach is to always create such
a Context trigger by starting from the original Context (ensuring the trigger
contains all the keys from the original, unless you absolutely know you want to
remove one of these keys):
.retryWhen(errorCurrentAttempt -> errorCurrentAttempt
.flatMap(e -> Mono.subscriberContext().map(ctx -> Tuples.of(e, ctx)))
.flatMap(t2 -> {
Throwable lastError = t2.getT1();
Context ctx = t2.getT2();
int rl = ctx.getOrDefault("retriesLeft", 0);
if (rl > 0) {
// /!\ THE ctx.put HERE IS THE ESSENTIAL PART /!\
return Mono.just(ctx.put("retriesLeft", rl - 1)
.put("lastError", lastError));
} else {
return Mono.error(new IllegalStateException("retries exhausted", lastError));
}
})
)
public final Flux<T> sample(Duration timespan)
Flux by periodically emitting an item corresponding to that
Flux latest emitted value within the periodical time window.
Note that if some elements are emitted quicker than the timespan just before source
completion, the last of these elements will be emitted along with the onComplete
signal.

timespan - the duration of the window after which to emit the latest observed itemFlux sampled to the last item seen over each periodic windowpublic final <U> Flux<T> sample(Publisher<U> sampler)
Flux by emitting an item corresponding to that Flux
latest emitted value whenever a companion sampler Publisher signals a value.
Termination of either Publisher will result in termination for the Subscriber
as well.
Note that if some elements are emitted just before source completion and before a
last sampler can trigger, the last of these elements will be emitted along with the
onComplete signal.
Both Publisher will run in unbounded mode because the backpressure
would interfere with the sampling precision.

public final Flux<T> sampleFirst(Duration timespan)
Flux then skip the values that follow
within a given duration.

timespan - the duration during which to skip values after each sampleFlux sampled to the first item of each duration-based windowpublic final <U> Flux<T> sampleFirst(Function<? super T,? extends Publisher<U>> samplerFactory)
public final <U> Flux<T> sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory)
Flux only if there were no new values emitted
during the window defined by a companion Publisher derived from that particular
value.
Note that this means that the last value in the sequence is always emitted.

U - the companion reified typethrottlerFactory - supply a companion sampler Publisher which signals
the end of the window during which no new emission should occur. If it is the case,
the original value triggering the window is emitted.Flux sampled to items not followed by any other item within a window
defined by a companion Publisherpublic final <U> Flux<T> sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory, int maxConcurrency)
Flux only if there were no new values emitted
during the window defined by a companion Publisher derived from that particular
value.
The provided maxConcurrency will keep a bounded maximum of concurrent timeouts and drop any new items until at least one timeout terminates.
Note that this means that the last value in the sequence is always emitted.

U - the throttling typethrottlerFactory - supply a companion sampler Publisher which signals
the end of the window during which no new emission should occur. If it is the case,
the original value triggering the window is emitted.maxConcurrency - the maximum number of concurrent timeoutsFlux sampled to items not followed by any other item within a window
defined by a companion Publisherpublic final Flux<T> scan(BiFunction<T,T,T> accumulator)
Flux values with an accumulator BiFunction and
also emit the intermediate results of this function.
Unlike scan(Object, BiFunction), this operator doesn't take an initial value
but treats the first Flux value as initial value.
The accumulation works as follows:
result[0] = source[0]
result[1] = accumulator(result[0], source[1])
result[2] = accumulator(result[1], source[2])
result[3] = accumulator(result[2], source[3])
...

accumulator - the accumulating BiFunctionFluxpublic final <A> Flux<A> scan(A initial, BiFunction<A,? super T,A> accumulator)
Flux values with an accumulator BiFunction and
also emit the intermediate results of this function.
The accumulation works as follows:
result[0] = initialValue;
result[1] = accumulator(result[0], source[0])
result[2] = accumulator(result[1], source[1])
result[3] = accumulator(result[2], source[2])
...

A - the accumulated typeinitial - the initial leftmost argument to pass to the reduce functionaccumulator - the accumulating BiFunctionFlux starting with initial statepublic final <A> Flux<A> scanWith(Supplier<A> initial, BiFunction<A,? super T,A> accumulator)
Flux values with the help of an accumulator BiFunction
and also emits the intermediate results. A seed value is lazily provided by a
Supplier invoked for each Subscriber.
The accumulation works as follows:
result[0] = initialValue;
result[1] = accumulator(result[0], source[0])
result[2] = accumulator(result[1], source[1])
result[3] = accumulator(result[2], source[2])
...

A - the accumulated typeinitial - the supplier providing the seed, the leftmost parameter initially
passed to the reduce functionaccumulator - the accumulating BiFunctionFlux starting with initial statepublic final Flux<T> share()
Flux that multicasts (shares) the original Flux.
As long as there is at least one Subscriber this Flux will be subscribed and
emitting data.
When all subscribers have cancelled it will cancel the source
Flux.
This is an alias for publish().ConnectableFlux.refCount().
public final Mono<T> single()
Flux source or signal
NoSuchElementException for an empty source, or
IndexOutOfBoundsException for a source with more than one element.

Mono with the single item or an error signalpublic final Mono<T> single(T defaultValue)
Flux source and emit a default
value for an empty source, but signal an IndexOutOfBoundsException for a
source with more than one element.

public final Mono<T> singleOrEmpty()
Flux source, and accept an empty
source but signal an IndexOutOfBoundsException for a source with more than
one element.

Mono with the expected single item, no item or an errorpublic final Flux<T> skip(long skipped)
Flux then
emit the remaining elements.

skipped - the number of elements to dropFlux with the specified number of elements skipped at
the beginningpublic final Flux<T> skip(Duration timespan)
Flux emitted within the specified initial duration.

timespan - the initial time window during which to drop elementsFlux dropping at the beginning until the end of the given durationpublic final Flux<T> skipLast(int n)
Flux sequence.

n - the number of elements to drop before completionFlux dropping the specified number of elements at the end of the
sequencepublic final Flux<T> sort()
Flux by collecting and sorting them in the background
then emitting the sorted sequence once this sequence completes.
Each item emitted by the Flux must implement Comparable with
respect to all other items in the sequence.
Note that calling sort with long, non-terminating or infinite sources
might cause OutOfMemoryError. Use sequence splitting like window(int) to sort batches in that case.
FluxClassCastException - if any item emitted by the Flux does not implement
Comparable with respect to all other items emitted by the Fluxpublic final Flux<T> sort(Comparator<? super T> sortFunction)
Flux using a Comparator function, by
collecting and sorting elements in the background then emitting the sorted sequence
once this sequence completes.
Note that calling sort with long, non-terminating or infinite sources
might cause OutOfMemoryError
@SafeVarargs public final Flux<T> startWith(T... values)
Flux sequence.

public final Disposable subscribe()
Flux and request unbounded demand.
This version doesn't specify any consumption behavior for the events from the chain, especially no error handling, so other variants should usually be preferred.
Disposable that can be used to cancel the underlying Subscriptionpublic final Disposable subscribe(Consumer<? super T> consumer)
Consumer to this Flux that will consume all the
elements in the sequence. It will request an unbounded demand (Long.MAX_VALUE).
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer).
For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber) with a BaseSubscriber.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.

consumer - the consumer to invoke on each value (onNext signal)Disposable that can be used to cancel the underlying Subscriptionpublic final Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer)
Flux with a Consumer that will consume all the
elements in the sequence, as well as a Consumer that will handle errors.
The subscription will request an unbounded demand (Long.MAX_VALUE).
For a passive version that observe and forward incoming data see
doOnNext(java.util.function.Consumer) and doOnError(java.util.function.Consumer).
For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber) with a BaseSubscriber.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumers are not invoked when executing in a main thread or a unit test for instance.

consumer - the consumer to invoke on each next signalerrorConsumer - the consumer to invoke on error signalDisposable that can be used to cancel the underlying Subscriptionpublic final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer)
Consumer to this Flux that will respectively consume all the
elements in the sequence, handle errors and react to completion. The subscription
will request unbounded demand (Long.MAX_VALUE).
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer),
doOnError(java.util.function.Consumer) and doOnComplete(Runnable).
For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber) with a BaseSubscriber.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.

consumer - the consumer to invoke on each valueerrorConsumer - the consumer to invoke on error signalcompleteConsumer - the consumer to invoke on complete signalDisposable that can be used to cancel the underlying Subscriptionpublic final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer)
Consumer to this Flux that will respectively consume all the
elements in the sequence, handle errors, react to completion, and request upon subscription.
It will let the provided subscriptionConsumer
request the adequate amount of data, or request unbounded demand
Long.MAX_VALUE if no such consumer is provided.
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer),
doOnError(java.util.function.Consumer), doOnComplete(Runnable)
and doOnSubscribe(Consumer).
For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber) with a BaseSubscriber.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.

consumer - the consumer to invoke on each valueerrorConsumer - the consumer to invoke on error signalcompleteConsumer - the consumer to invoke on complete signalsubscriptionConsumer - the consumer to invoke on subscribe signal, to be used
for the initial request, or null for max requestDisposable that can be used to cancel the underlying Subscriptionpublic final void subscribe(Subscriber<? super T> actual)
public abstract void subscribe(CoreSubscriber<? super T> actual)
Publisher.subscribe(Subscriber) that will bypass
Hooks.onLastOperator(Function) pointcut.
In addition to behave as expected by Publisher.subscribe(Subscriber)
in a controlled manner, it supports direct subscribe-time Context passing.
actual - the Subscriber interested into the published sequencesubscribe(Subscriber)public final Flux<T> subscriberContext(Context mergeContext)
Context by adding all values
from the given Context, producing a new Context that is propagated
upstream.
The Context propagation happens once per subscription (not on each onNext):
it is done during the subscribe(Subscriber) phase, which runs from
the last operator of a chain towards the first.
So this operator enriches a Context coming from under it in the chain
(downstream, by default an empty one) and passes the new enriched Context
to operators above it in the chain (upstream, by way of them using
Flux#subscribe(Subscriber,Context)).
public final Flux<T> subscriberContext(Function<Context,Context> doOnContext)
Context by applying a Function
to it, producing a new Context that is propagated upstream.
The Context propagation happens once per subscription (not on each onNext):
it is done during the subscribe(Subscriber) phase, which runs from
the last operator of a chain towards the first.
So this operator enriches a Context coming from under it in the chain
(downstream, by default an empty one) and passes the new enriched Context
to operators above it in the chain (upstream, by way of them using
Flux#subscribe(Subscriber,Context)).
public final Flux<T> subscribeOn(Scheduler scheduler)
Scheduler's Scheduler.Worker.
As such, placing this operator anywhere in the chain will also impact the execution
context of onNext/onError/onComplete signals from the beginning of the chain up to
the next occurrence of a publishOn.
Note that if you are using an eager or blocking
create(Consumer, FluxSink.OverflowStrategy)
as the source, it can lead to deadlocks due to requests piling up behind the emitter.
In such case, you should call subscribeOn(scheduler, false)
instead.
Typically used for slow publisher e.g., blocking IO, fast consumer(s) scenarios.
flux.subscribeOn(Schedulers.single()).subscribe()
Note that Scheduler.Worker.schedule(Runnable) raising
RejectedExecutionException on late
Subscription.request(long) will be propagated to the request caller.
scheduler - a Scheduler providing the Scheduler.Worker where to subscribeFlux requesting asynchronouslypublishOn(Scheduler),
subscribeOn(Scheduler, boolean)public final Flux<T> subscribeOn(Scheduler scheduler, boolean requestOnSeparateThread)
Scheduler's Scheduler.Worker.
Request will be run on that worker too depending on the requestOnSeparateThread
parameter (which defaults to true in the subscribeOn(Scheduler) version).
As such, placing this operator anywhere in the chain will also impact the execution
context of onNext/onError/onComplete signals from the beginning of the chain up to
the next occurrence of a publishOn.
Note that if you are using an eager or blocking
create(Consumer, FluxSink.OverflowStrategy)
as the source, it can lead to deadlocks due to requests piling up behind the emitter.
Thus this operator has a requestOnSeparateThread parameter, which should be
set to false in this case.
Typically used for slow publisher e.g., blocking IO, fast consumer(s) scenarios.
flux.subscribeOn(Schedulers.single()).subscribe()
Note that Scheduler.Worker.schedule(Runnable) raising
RejectedExecutionException on late
Subscription.request(long) will be propagated to the request caller.
scheduler - a Scheduler providing the Scheduler.Worker where to subscriberequestOnSeparateThread - whether or not to also perform requests on the worker.
true to behave like subscribeOn(Scheduler)Flux requesting asynchronouslypublishOn(Scheduler),
subscribeOn(Scheduler)public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber)
Subscriber to this Flux and return said
Subscriber (eg. a FluxProcessor).
If you need more control over backpressure and the request, use aflux.subscribeWith(WorkQueueProcessor.create()).subscribe()
BaseSubscriber.E - the reified type from the input/output subscribersubscriber - the Subscriber to subscribe with and returnSubscriberpublic final Flux<T> switchIfEmpty(Publisher<? extends T> alternate)
Publisher if this sequence is completed without any data.
public final <V> Flux<V> switchMap(Function<? super T,Publisher<? extends V>> fn, int prefetch)
public final Flux<T> tag(String key, String value)
Set of
all tags throughout the publisher chain by using Scannable.tags() (as
traversed
by Scannable.parents()).key - a tag keyvalue - a tag valuepublic final Flux<T> take(long n)
Flux, if available.
If N is zero, the resulting Flux completes as soon as this Flux
signals its first value (which is not not relayed, though).
Note that this operator doesn't manipulate the backpressure requested amount.
Rather, it merely lets requests from downstream propagate as is and cancels once
N elements have been emitted. As a result, the source could produce a lot of
extraneous elements in the meantime. If that behavior is undesirable and you do
not own the request from downstream (e.g. prefetching operators), consider
using limitRequest(long) instead.

n - the number of items to emit from this FluxFlux limited to size NlimitRequest(long)public final Flux<T> takeLast(int n)
Flux emitted before its completion.

public final Flux<T> takeUntil(Predicate<? super T> predicate)
Flux until the given Predicate matches.
This includes the matching data (unlike takeWhile(java.util.function.Predicate<? super T>)).

public final Flux<T> takeWhile(Predicate<? super T> continuePredicate)
Flux while a predicate returns TRUE
for the values (checked before each value is delivered).
This only includes the matching data (unlike takeUntil(java.util.function.Predicate<? super T>)).

public final Mono<Void> then()
Mono<Void> that completes when this Flux completes.
This will actively ignore the sequence and only replay completion or error signals.
public final Mono<Void> thenEmpty(Publisher<Void> other)
Mono<Void> that waits for this Flux to complete then
for a supplied Publisher<Void> to also complete. The
second completion signal is replayed, or any error signal that occurs instead.

public final Flux<T> timeout(Duration timeout)
TimeoutException as soon as no item is emitted within the
given Duration from the previous emission (or the subscription for the first item).

public final Flux<T> timeout(Duration timeout, @Nullable Publisher<? extends T> fallback)
Flux as soon as no item is emitted within the
given Duration from the previous emission (or the subscription for the first item).
If the given Publisher is null, signal a TimeoutException instead.

public final Flux<T> timeout(Duration timeout, Scheduler timer)
TimeoutException as soon as no item is emitted within the
given Duration from the previous emission (or the subscription for the first
item), as measured by the specified Scheduler.

public final Flux<T> timeout(Duration timeout, @Nullable Publisher<? extends T> fallback, Scheduler timer)
Flux as soon as no item is emitted within the
given Duration from the previous emission (or the subscription for the
first item), as measured on the specified Scheduler.
If the given Publisher is null, signal a TimeoutException instead.

public final <U> Flux<T> timeout(Publisher<U> firstTimeout)
TimeoutException in case the first item from this Flux has
not been emitted before the given Publisher emits.

public final <U,V> Flux<T> timeout(Publisher<U> firstTimeout, Function<? super T,? extends Publisher<V>> nextTimeoutFactory)
TimeoutException in case the first item from this Flux has
not been emitted before the firstTimeout Publisher emits, and whenever
each subsequent elements is not emitted before a Publisher generated from
the latest element signals.

U - the type of the elements of the first timeout PublisherV - the type of the elements of the subsequent timeout PublishersfirstTimeout - the timeout Publisher that must not emit before the first signal from this FluxnextTimeoutFactory - the timeout Publisher factory for each next itemFlux that can time out if each element does not come before
a signal from a per-item companion Publisherpublic final <U,V> Flux<T> timeout(Publisher<U> firstTimeout, Function<? super T,? extends Publisher<V>> nextTimeoutFactory, Publisher<? extends T> fallback)
Publisher in case the first item from this Flux has
not been emitted before the firstTimeout Publisher emits, and whenever
each subsequent elements is not emitted before a Publisher generated from
the latest element signals.

U - the type of the elements of the first timeout PublisherV - the type of the elements of the subsequent timeout PublishersfirstTimeout - the timeout Publisher that must not emit before the first signal from this FluxnextTimeoutFactory - the timeout Publisher factory for each next itemfallback - the fallback Publisher to subscribe when a timeout occursFlux that can time out if each element does not come before
a signal from a per-item companion Publisherpublic final Flux<Tuple2<Long,T>> timestamp()
Tuple2 pair of T1 the current clock time in
millis (as a Long measured by the parallel
Scheduler) and T2 the emitted data (as a T), for each item from this Flux.

Fluxpublic final Iterable<T> toIterable(int batchSize, @Nullable Supplier<Queue<T>> queueProvider)
public final Stream<T> toStream()
Stream of unknown size with onClose attached to Subscription.cancel()public final Stream<T> toStream(int batchSize)
batchSize - the bounded capacity to prefetch from this Flux or
Integer.MAX_VALUE for unbounded demand

Stream of unknown size with onClose attached to Subscription.cancel()public final <V> Flux<V> transform(Function<? super Flux<T>,? extends Publisher<V>> transformer)
Flux in order to generate a target Flux. Unlike compose(Function), the
provided function is executed as part of assembly.
Function<Flux, Flux> applySchedulers = flux -> flux.subscribeOn(Schedulers.elastic()) .publishOn(Schedulers.parallel()); flux.transform(applySchedulers).map(v -> v * v).subscribe();
V - the item type in the returned Fluxtransformer - the Function to immediately map this Flux into a target Flux
instance.Fluxfor deferred composition of {@link Flux} for each {@link Subscriber},
for a loose conversion to an arbitrary typepublic final Flux<Flux<T>> window(Duration timespan, Duration timeshift)
Flux sequence into multiple Flux windows that open
for a given timespan Duration, after which it closes with onComplete.
Each window is opened at a regular timeShift interval, starting from the
first item.
Both durations are measured on the parallel Scheduler.
When timespan < timeshift : dropping windows
When timespan > timeshift : overlapping windows
When timespan == timeshift : exact windows

public final Flux<Flux<T>> window(Duration timespan, Duration timeshift, Scheduler timer)
Flux sequence into multiple Flux windows that open
for a given timespan Duration, after which it closes with onComplete.
Each window is opened at a regular timeShift interval, starting from the
first item.
Both durations are measured on the provided Scheduler.
When timespan < timeshift : dropping windows
When timespan > timeshift : overlapping windows
When timeshift == timeshift : exact windows

public final Flux<Flux<T>> windowTimeout(int maxSize, Duration timespan, Scheduler timer)
public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore)
Flux sequence into multiple Flux windows delimited by the
given predicate. A new window is opened each time the predicate returns true.
If cutBefore is true, the old window will onComplete and the triggering
element will be emitted in the new window. Note it can mean that an empty window is
sometimes emitted, eg. if the first element in the sequence immediately matches the
predicate.
Otherwise, the triggering element will be emitted in the old window before it does
onComplete, similar to windowUntil(Predicate).

public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore, int prefetch)
Flux sequence into multiple Flux windows delimited by the given
predicate and using a prefetch. A new window is opened each time the predicate
returns true.
If cutBefore is true, the old window will onComplete and the triggering
element will be emitted in the new window. Note it can mean that an empty window is
sometimes emitted, eg. if the first element in the sequence immediately matches the
predicate.
Otherwise, the triggering element will be emitted in the old window before it does
onComplete, similar to windowUntil(Predicate).

boundaryTrigger - a predicate that triggers the next window when it becomes true.cutBefore - push to true to include the triggering element in the new window rather than the old.prefetch - the request size to use for this Flux.Flux of Flux windows, bounded depending
on the predicate.public final Flux<Flux<T>> windowWhile(Predicate<T> inclusionPredicate)
Flux sequence into multiple Flux windows that stay open
while a given predicate matches the source elements. Once the predicate returns
false, the window closes with an onComplete and the triggering element is discarded.
Note that for a sequence starting with a separator, or having several subsequent separators anywhere in the sequence, each occurrence will lead to an empty window.

public final Flux<Flux<T>> windowWhile(Predicate<T> inclusionPredicate, int prefetch)
Flux sequence into multiple Flux windows that stay open
while a given predicate matches the source elements. Once the predicate returns
false, the window closes with an onComplete and the triggering element is discarded.
Note that for a sequence starting with a separator, or having several subsequent separators anywhere in the sequence, each occurrence will lead to an empty window.

public final <U,V> Flux<Flux<T>> windowWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector)
Flux sequence into potentially overlapping windows controlled by items of a
start Publisher and end Publisher derived from the start values.
When Open signal is strictly not overlapping Close signal : dropping windows
When Open signal is strictly more frequent than Close signal : overlapping windows
When Open signal is exactly coordinated with Close signal : exact windows

U - the type of the sequence opening windowsV - the type of the sequence closing windows opened by the bucketOpening Publisher's elementsbucketOpening - a Publisher that opens a new window when it emits any itemcloseSelector - a Function given an opening signal and returning a Publisher that
will close the window when emittingFlux of Flux windows opened by signals from a first
Publisher and lasting until a selected second Publisher emitspublic final <U,R> Flux<R> withLatestFrom(Publisher<? extends U> other, BiFunction<? super T,? super U,? extends R> resultSelector)
Flux and another
Publisher through a BiFunction and emits the result.
The operator will drop values from this Flux until the other
Publisher produces any value.
If the other Publisher completes without any value, the sequence is completed.

U - the other Publisher sequence typeR - the result typeother - the Publisher to combine withresultSelector - the bi-function called with each pair of source and other
elements that should return a single value to be emittedFlux gated by another Publisherpublic final <T2> Flux<Tuple2<T,T2>> zipWith(Publisher<? extends T2> source2)
Flux with another Publisher source, that is to say wait
for both to emit one element and combine these elements once into a Tuple2.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
public final <T2,V> Flux<V> zipWith(Publisher<? extends T2> source2, BiFunction<? super T,? super T2,? extends V> combinator)
Flux with another Publisher source, that is to say wait
for both to emit one element and combine these elements using a combinator
BiFunction
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T2 - type of the value from source2V - The produced output after transformation by the combinatorsource2 - The second source Publisher to zip with this Flux.combinator - The aggregate function that will receive a unique value from each
source and return the value to signal downstreamFluxpublic final <T2,V> Flux<V> zipWith(Publisher<? extends T2> source2, int prefetch, BiFunction<? super T,? super T2,? extends V> combinator)
Flux with another Publisher source, that is to say wait
for both to emit one element and combine these elements using a combinator
BiFunction
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T2 - type of the value from source2V - The produced output after transformation by the combinatorsource2 - The second source Publisher to zip with this Flux.prefetch - the request size to use for this Flux and the other Publishercombinator - The aggregate function that will receive a unique value from each
source and return the value to signal downstreamFluxpublic final <T2> Flux<Tuple2<T,T2>> zipWith(Publisher<? extends T2> source2, int prefetch)
Flux with another Publisher source, that is to say wait
for both to emit one element and combine these elements once into a Tuple2.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
public final <T2> Flux<Tuple2<T,T2>> zipWithIterable(Iterable<? extends T2> iterable)
public final <T2,V> Flux<V> zipWithIterable(Iterable<? extends T2> iterable, BiFunction<? super T,? super T2,? extends V> zipper)
Flux with the content of an Iterable, that is
to say combine one element from each, pairwise, using the given zipper BiFunction.

T2 - the value type of the other iterable sequenceV - the result typeiterable - the Iterable to zip withzipper - the BiFunction pair combinatorFluxprotected static <T> Flux<T> onAssembly(Flux<T> source)
Hooks pointcut given a
Flux, potentially returning a new Flux. This is for example useful
to activate cross-cutting concerns at assembly time, eg. a generalized
checkpoint().T - the value typesource - the source to apply assembly hooks ontoprotected static <T> Flux<T> onLastAssembly(Flux<T> source)
Hooks pointcut given a
Flux, potentially returning a new Flux. This is for example useful
to activate cross-cutting concerns at assembly time, eg. a generalized
checkpoint().T - the value typesource - the source to apply assembly hooks ontoprotected static <T> ConnectableFlux<T> onAssembly(ConnectableFlux<T> source)
Hooks pointcut given a
ConnectableFlux, potentially returning a new ConnectableFlux. This
is for example useful to activate cross-cutting concerns at assembly time, eg. a
generalized checkpoint().T - the value typesource - the source to apply assembly hooks onto