T
- the element type of this Reactive Streams Publisher
public abstract class Flux<T> extends Object implements Publisher<T>
Publisher
with rx operators that emits 0 to N elements, and then completes
(successfully or with an error).
The recommended way to learn about the Flux
API and discover new operators is
through the reference documentation, rather than through this javadoc (as opposed to
learning more about individual operators). See the
"which operator do I need?" appendix.
It is intended to be used in implementations and return types. Input parameters should keep using raw
Publisher
as much as possible.
If it is known that the underlying Publisher
will emit 0 or 1 element, Mono
should be used
instead.
Note that using state in the java.util.function
/ lambdas used within Flux operators
should be avoided, as these may be shared between several Subscribers
.
subscribe(CoreSubscriber)
is an internal extension to
subscribe(Subscriber)
used internally for Context
passing. User
provided Subscriber
may
be passed to this "subscribe" extension but will loose the available
per-subscribe @link Hooks#onLastOperator}.
Mono
Constructor and Description |
---|
Flux() |
Modifier and Type | Method and Description |
---|---|
Mono<Boolean> |
all(Predicate<? super T> predicate)
Emit a single boolean true if all values of this sequence match
the
Predicate . |
Mono<Boolean> |
any(Predicate<? super T> predicate)
Emit a single boolean true if any of the values of this
Flux sequence match
the predicate. |
<P> P |
as(Function<? super Flux<T>,P> transformer)
Transform this
Flux into a target type. |
T |
blockFirst()
Subscribe to this
Flux and block indefinitely
until the upstream signals its first value or completes. |
T |
blockFirst(Duration timeout)
Subscribe to this
Flux and block until the upstream
signals its first value, completes or a timeout expires. |
T |
blockLast()
Subscribe to this
Flux and block indefinitely
until the upstream signals its last value or completes. |
T |
blockLast(Duration timeout)
Subscribe to this
Flux and block until the upstream
signals its last value, completes or a timeout expires. |
Flux<List<T>> |
buffer()
|
Flux<List<T>> |
buffer(Duration bufferingTimespan)
|
Flux<List<T>> |
buffer(Duration bufferingTimespan,
Duration openBufferEvery)
Collect incoming values into multiple
List buffers created at a given
openBufferEvery period. |
Flux<List<T>> |
buffer(Duration bufferingTimespan,
Duration openBufferEvery,
Scheduler timer)
|
Flux<List<T>> |
buffer(Duration bufferingTimespan,
Scheduler timer)
|
Flux<List<T>> |
buffer(int maxSize)
|
Flux<List<T>> |
buffer(int maxSize,
int skip)
|
<C extends Collection<? super T>> |
buffer(int maxSize,
int skip,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the given max size is reached
or once this Flux completes. |
<C extends Collection<? super T>> |
buffer(int maxSize,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the given max size is reached
or once this Flux completes. |
Flux<List<T>> |
buffer(Publisher<?> other)
|
<C extends Collection<? super T>> |
buffer(Publisher<?> other,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers, as
delimited by the signals of a companion Publisher this operator will
subscribe to. |
Flux<List<T>> |
bufferTimeout(int maxSize,
Duration maxTime)
|
Flux<List<T>> |
bufferTimeout(int maxSize,
Duration maxTime,
Scheduler timer)
|
<C extends Collection<? super T>> |
bufferTimeout(int maxSize,
Duration maxTime,
Scheduler timer,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the buffer reaches a maximum
size OR the maxTime Duration elapses, as measured on the provided Scheduler . |
<C extends Collection<? super T>> |
bufferTimeout(int maxSize,
Duration maxTime,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the buffer reaches a maximum
size OR the maxTime Duration elapses. |
Flux<List<T>> |
bufferUntil(Predicate<? super T> predicate)
|
Flux<List<T>> |
bufferUntil(Predicate<? super T> predicate,
boolean cutBefore)
|
<U,V> Flux<List<T>> |
bufferWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector)
|
<U,V,C extends Collection<? super T>> |
bufferWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers started each time an opening
companion Publisher emits. |
Flux<List<T>> |
bufferWhile(Predicate<? super T> predicate)
|
Flux<T> |
cache()
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber . |
Flux<T> |
cache(Duration ttl)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cache(Duration ttl,
Scheduler timer)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cache(int history)
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber . |
Flux<T> |
cache(int history,
Duration ttl)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cache(int history,
Duration ttl,
Scheduler timer)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cancelOn(Scheduler scheduler)
|
<E> Flux<E> |
cast(Class<E> clazz)
Cast the current
Flux produced type into a target produced type. |
Flux<T> |
checkpoint()
Activate assembly tracing for this particular
Flux , in case of an error
upstream of the checkpoint. |
Flux<T> |
checkpoint(String description)
Activate assembly marker for this particular
Flux by giving it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. |
Flux<T> |
checkpoint(String description,
boolean forceStackTrace)
Activate assembly tracing or the lighter assembly marking depending on the
forceStackTrace option. |
<R,A> Mono<R> |
collect(Collector<? super T,A,? extends R> collector)
|
<E> Mono<E> |
collect(Supplier<E> containerSupplier,
BiConsumer<E,? super T> collector)
Collect all elements emitted by this
Flux into a user-defined container,
by applying a collector BiConsumer taking the container and each element. |
Mono<List<T>> |
collectList()
|
<K> Mono<Map<K,T>> |
collectMap(Function<? super T,? extends K> keyExtractor)
|
<K,V> Mono<Map<K,V>> |
collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor)
|
<K,V> Mono<Map<K,V>> |
collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,V>> mapSupplier)
|
<K> Mono<Map<K,Collection<T>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor)
|
<K,V> Mono<Map<K,Collection<V>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor)
|
<K,V> Mono<Map<K,Collection<V>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,Collection<V>>> mapSupplier)
|
Mono<List<T>> |
collectSortedList()
|
Mono<List<T>> |
collectSortedList(Comparator<? super T> comparator)
Collect all elements emitted by this
Flux until this sequence completes,
and then sort them using a Comparator into a List that is emitted
by the resulting Mono . |
static <T,V> Flux<V> |
combineLatest(Function<Object[],V> combinator,
int prefetch,
Publisher<? extends T>... sources)
|
static <T,V> Flux<V> |
combineLatest(Function<Object[],V> combinator,
Publisher<? extends T>... sources)
|
static <T,V> Flux<V> |
combineLatest(Iterable<? extends Publisher<? extends T>> sources,
Function<Object[],V> combinator)
|
static <T,V> Flux<V> |
combineLatest(Iterable<? extends Publisher<? extends T>> sources,
int prefetch,
Function<Object[],V> combinator)
|
static <T1,T2,V> Flux<V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
BiFunction<? super T1,? super T2,? extends V> combinator)
|
static <T1,T2,T3,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Function<Object[],V> combinator)
|
static <T1,T2,T3,T4,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Function<Object[],V> combinator)
|
static <T1,T2,T3,T4,T5,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Function<Object[],V> combinator)
|
static <T1,T2,T3,T4,T5,T6,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6,
Function<Object[],V> combinator)
|
<V> Flux<V> |
compose(Function<? super Flux<T>,? extends Publisher<V>> transformer)
|
static <T> Flux<T> |
concat(Iterable<? extends Publisher<? extends T>> sources)
Concatenate all sources provided in an
Iterable , forwarding elements
emitted by the sources downstream. |
static <T> Flux<T> |
concat(Publisher<? extends Publisher<? extends T>> sources)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concat(Publisher<? extends Publisher<? extends T>> sources,
int prefetch)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concat(Publisher<? extends T>... sources)
Concatenate all sources provided as a vararg, forwarding elements emitted by the
sources downstream.
|
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources,
boolean delayUntilEnd,
int prefetch)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources,
int prefetch)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concatDelayError(Publisher<? extends T>... sources)
Concatenate all sources provided as a vararg, forwarding elements emitted by the
sources downstream.
|
<V> Flux<V> |
concatMap(Function<? super T,? extends Publisher<? extends V>> mapper)
|
<V> Flux<V> |
concatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int prefetch)
|
<V> Flux<V> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper)
|
<V> Flux<V> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
boolean delayUntilEnd,
int prefetch)
|
<V> Flux<V> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
int prefetch)
|
<R> Flux<R> |
concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
|
<R> Flux<R> |
concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper,
int prefetch)
|
Flux<T> |
concatWith(Publisher<? extends T> other)
|
Flux<T> |
concatWithValues(T... values)
Concatenates the values to the end of the
Flux |
Mono<Long> |
count()
Counts the number of values in this
Flux . |
static <T> Flux<T> |
create(Consumer<? super FluxSink<T>> emitter)
|
static <T> Flux<T> |
create(Consumer<? super FluxSink<T>> emitter,
FluxSink.OverflowStrategy backpressure)
|
Flux<T> |
defaultIfEmpty(T defaultV)
Provide a default unique value if this sequence is completed without any data
|
static <T> Flux<T> |
defer(Supplier<? extends Publisher<T>> supplier)
Lazily supply a
Publisher every time a Subscription is made on the
resulting Flux , so the actual source instantiation is deferred until each
subscribe and the Supplier can create a subscriber-specific instance. |
Flux<T> |
delayElements(Duration delay)
|
Flux<T> |
delayElements(Duration delay,
Scheduler timer)
|
Flux<T> |
delaySequence(Duration delay)
|
Flux<T> |
delaySequence(Duration delay,
Scheduler timer)
|
Flux<T> |
delaySubscription(Duration delay)
Delay the
subscription to this Flux source until the given
period elapses. |
Flux<T> |
delaySubscription(Duration delay,
Scheduler timer)
Delay the
subscription to this Flux source until the given
period elapses, as measured on the user-provided Scheduler . |
<U> Flux<T> |
delaySubscription(Publisher<U> subscriptionDelay)
|
Flux<T> |
delayUntil(Function<? super T,? extends Publisher<?>> triggerProvider)
|
<X> Flux<X> |
dematerialize()
An operator working only if this
Flux emits onNext, onError or onComplete Signal
instances, transforming these materialized signals into
real signals on the Subscriber . |
Flux<T> |
distinct()
For each
Subscriber , track elements from this Flux that have been
seen and filter out duplicates. |
<V> Flux<T> |
distinct(Function<? super T,? extends V> keySelector)
For each
Subscriber , track elements from this Flux that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function . |
<V,C extends Collection<? super V>> |
distinct(Function<? super T,? extends V> keySelector,
Supplier<C> distinctCollectionSupplier)
For each
Subscriber , track elements from this Flux that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function and by the add method
of the Collection supplied (typically a Set ). |
<V,C> Flux<T> |
distinct(Function<? super T,? extends V> keySelector,
Supplier<C> distinctStoreSupplier,
BiPredicate<C,V> distinctPredicate,
Consumer<C> cleanup)
For each
Subscriber , track elements from this Flux that have been
seen and filter out duplicates, as compared by applying a BiPredicate on
an arbitrary user-supplied <C> store and a key extracted through the user
provided Function . |
Flux<T> |
distinctUntilChanged()
Filter out subsequent repetitions of an element (that is, if they arrive right after
one another).
|
<V> Flux<T> |
distinctUntilChanged(Function<? super T,? extends V> keySelector)
Filter out subsequent repetitions of an element (that is, if they arrive right after
one another), as compared by a key extracted through the user provided
Function
using equality. |
<V> Flux<T> |
distinctUntilChanged(Function<? super T,? extends V> keySelector,
BiPredicate<? super V,? super V> keyComparator)
Filter out subsequent repetitions of an element (that is, if they arrive right
after one another), as compared by a key extracted through the user provided
Function and then comparing keys with the supplied BiPredicate . |
Flux<T> |
doAfterTerminate(Runnable afterTerminate)
Add behavior (side-effect) triggered after the
Flux terminates, either by completing downstream successfully or with an error. |
Flux<T> |
doFinally(Consumer<SignalType> onFinally)
Add behavior (side-effect) triggered after the
Flux terminates for any reason,
including cancellation. |
Flux<T> |
doOnCancel(Runnable onCancel)
Add behavior (side-effect) triggered when the
Flux is cancelled. |
Flux<T> |
doOnComplete(Runnable onComplete)
Add behavior (side-effect) triggered when the
Flux completes successfully. |
<R> Flux<T> |
doOnDiscard(Class<R> type,
Consumer<? super R> discardHook)
Modify the behavior of the whole chain of operators upstream of this one to
conditionally clean up elements that get discarded by these operators.
|
Flux<T> |
doOnEach(Consumer<? super Signal<T>> signalConsumer)
Add behavior (side-effects) triggered when the
Flux emits an item, fails with an error
or completes successfully. |
<E extends Throwable> |
doOnError(Class<E> exceptionType,
Consumer<? super E> onError)
Add behavior (side-effect) triggered when the
Flux completes with an error matching the given exception type. |
Flux<T> |
doOnError(Consumer<? super Throwable> onError)
Add behavior (side-effect) triggered when the
Flux completes with an error. |
Flux<T> |
doOnError(Predicate<? super Throwable> predicate,
Consumer<? super Throwable> onError)
Add behavior (side-effect) triggered when the
Flux completes with an error matching the given exception. |
Flux<T> |
doOnNext(Consumer<? super T> onNext)
Add behavior (side-effect) triggered when the
Flux emits an item. |
Flux<T> |
doOnRequest(LongConsumer consumer)
Add behavior (side-effect) triggering a
LongConsumer when this Flux
receives any request. |
Flux<T> |
doOnSubscribe(Consumer<? super Subscription> onSubscribe)
Add behavior (side-effect) triggered when the
Flux is subscribed. |
Flux<T> |
doOnTerminate(Runnable onTerminate)
Add behavior (side-effect) triggered when the
Flux terminates, either by
completing successfully or with an error. |
Flux<Tuple2<Long,T>> |
elapsed()
Map this
Flux into Tuple2<Long, T>
of timemillis and source data. |
Flux<Tuple2<Long,T>> |
elapsed(Scheduler scheduler)
Map this
Flux into Tuple2<Long, T>
of timemillis and source data. |
Mono<T> |
elementAt(int index)
Emit only the element at the given index position or
IndexOutOfBoundsException
if the sequence is shorter. |
Mono<T> |
elementAt(int index,
T defaultValue)
Emit only the element at the given index position or fall back to a
default value if the sequence is shorter.
|
static <T> Flux<T> |
empty()
Create a
Flux that completes without emitting any item. |
static <T> Flux<T> |
error(Supplier<Throwable> errorSupplier)
Create a
Flux that terminates with an error immediately after being
subscribed to. |
static <T> Flux<T> |
error(Throwable error)
Create a
Flux that terminates with the specified error immediately after
being subscribed to. |
static <O> Flux<O> |
error(Throwable throwable,
boolean whenRequested)
Create a
Flux that terminates with the specified error, either immediately
after being subscribed to or after being first requested. |
Flux<T> |
expand(Function<? super T,? extends Publisher<? extends T>> expander)
Recursively expand elements into a graph and emit all the resulting element using
a breadth-first traversal strategy.
|
Flux<T> |
expand(Function<? super T,? extends Publisher<? extends T>> expander,
int capacityHint)
Recursively expand elements into a graph and emit all the resulting element using
a breadth-first traversal strategy.
|
Flux<T> |
expandDeep(Function<? super T,? extends Publisher<? extends T>> expander)
Recursively expand elements into a graph and emit all the resulting element,
in a depth-first traversal order.
|
Flux<T> |
expandDeep(Function<? super T,? extends Publisher<? extends T>> expander,
int capacityHint)
Recursively expand elements into a graph and emit all the resulting element,
in a depth-first traversal order.
|
Flux<T> |
filter(Predicate<? super T> p)
Evaluate each source value against the given
Predicate . |
Flux<T> |
filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate)
Test each value emitted by this
Flux asynchronously using a generated
Publisher<Boolean> test. |
Flux<T> |
filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate,
int bufferSize)
Test each value emitted by this
Flux asynchronously using a generated
Publisher<Boolean> test. |
static <I> Flux<I> |
first(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
first(Publisher<? extends I>... sources)
|
<R> Flux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
|
<R> Flux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapperOnNext,
Function<? super Throwable,? extends Publisher<? extends R>> mapperOnError,
Supplier<? extends Publisher<? extends R>> mapperOnComplete)
|
<V> Flux<V> |
flatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency)
|
<V> Flux<V> |
flatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency,
int prefetch)
|
<V> Flux<V> |
flatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency,
int prefetch)
|
<R> Flux<R> |
flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
|
<R> Flux<R> |
flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper,
int prefetch)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency,
int prefetch)
|
<R> Flux<R> |
flatMapSequentialDelayError(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency,
int prefetch)
|
static <T> Flux<T> |
from(Publisher<? extends T> source)
|
static <T> Flux<T> |
fromArray(T[] array)
Create a
Flux that emits the items contained in the provided array. |
static <T> Flux<T> |
fromIterable(Iterable<? extends T> it)
|
static <T> Flux<T> |
fromStream(Stream<? extends T> s)
|
static <T> Flux<T> |
fromStream(Supplier<Stream<? extends T>> streamSupplier)
|
static <T,S> Flux<T> |
generate(Callable<S> stateSupplier,
BiFunction<S,SynchronousSink<T>,S> generator)
Programmatically create a
Flux by generating signals one-by-one via a
consumer callback and some state. |
static <T,S> Flux<T> |
generate(Callable<S> stateSupplier,
BiFunction<S,SynchronousSink<T>,S> generator,
Consumer<? super S> stateConsumer)
Programmatically create a
Flux by generating signals one-by-one via a
consumer callback and some state, with a final cleanup callback. |
static <T> Flux<T> |
generate(Consumer<SynchronousSink<T>> generator)
Programmatically create a
Flux by generating signals one-by-one via a
consumer callback. |
int |
getPrefetch()
The prefetch configuration of the
Flux |
<K> Flux<GroupedFlux<K,T>> |
groupBy(Function<? super T,? extends K> keyMapper)
|
<K,V> Flux<GroupedFlux<K,V>> |
groupBy(Function<? super T,? extends K> keyMapper,
Function<? super T,? extends V> valueMapper)
|
<K,V> Flux<GroupedFlux<K,V>> |
groupBy(Function<? super T,? extends K> keyMapper,
Function<? super T,? extends V> valueMapper,
int prefetch)
|
<K> Flux<GroupedFlux<K,T>> |
groupBy(Function<? super T,? extends K> keyMapper,
int prefetch)
|
<TRight,TLeftEnd,TRightEnd,R> |
groupJoin(Publisher<? extends TRight> other,
Function<? super T,? extends Publisher<TLeftEnd>> leftEnd,
Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd,
BiFunction<? super T,? super Flux<TRight>,? extends R> resultSelector)
Map values from two Publishers into time windows and emit combination of values
in case their windows overlap.
|
<R> Flux<R> |
handle(BiConsumer<? super T,SynchronousSink<R>> handler)
Handle the items emitted by this
Flux by calling a biconsumer with the
output sink for each onNext. |
Mono<Boolean> |
hasElement(T value)
Emit a single boolean true if any of the elements of this
Flux sequence is
equal to the provided value. |
Mono<Boolean> |
hasElements()
Emit a single boolean true if this
Flux sequence has at least one element. |
Flux<T> |
hide()
Hides the identities of this
Flux instance. |
Mono<T> |
ignoreElements()
Ignores onNext signals (dropping them) and only propagate termination events.
|
Flux<Tuple2<Long,T>> |
index()
Keep information about the order in which source values were received by
indexing them with a 0-based incrementing long, returning a
Flux
of Tuple2<(index, value)> . |
<I> Flux<I> |
index(BiFunction<? super Long,? super T,? extends I> indexMapper)
Keep information about the order in which source values were received by
indexing them internally with a 0-based incrementing long then combining this
information with the source value into a
I using the provided BiFunction ,
returning a Flux<I> . |
static Flux<Long> |
interval(Duration period)
Create a
Flux that emits long values starting with 0 and incrementing at
specified time intervals on the global timer. |
static Flux<Long> |
interval(Duration delay,
Duration period)
Create a
Flux that emits long values starting with 0 and incrementing at
specified time intervals, after an initial delay, on the global timer. |
static Flux<Long> |
interval(Duration delay,
Duration period,
Scheduler timer)
|
static Flux<Long> |
interval(Duration period,
Scheduler timer)
|
<TRight,TLeftEnd,TRightEnd,R> |
join(Publisher<? extends TRight> other,
Function<? super T,? extends Publisher<TLeftEnd>> leftEnd,
Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd,
BiFunction<? super T,? super TRight,? extends R> resultSelector)
Map values from two Publishers into time windows and emit combination of values
in case their windows overlap.
|
static <T> Flux<T> |
just(T... data)
Create a
Flux that emits the provided elements and then completes. |
static <T> Flux<T> |
just(T data)
Create a new
Flux that will only emit a single element then onComplete. |
Mono<T> |
last()
Emit the last element observed before complete signal as a
Mono , or emit
NoSuchElementException error if the source was empty. |
Mono<T> |
last(T defaultValue)
Emit the last element observed before complete signal as a
Mono , or emit
the defaultValue if the source was empty. |
Flux<T> |
limitRate(int prefetchRate)
Ensure that backpressure signals from downstream subscribers are split into batches
capped at the provided
prefetchRate when propagated upstream, effectively
rate limiting the upstream Publisher . |
Flux<T> |
limitRate(int highTide,
int lowTide)
Ensure that backpressure signals from downstream subscribers are split into batches
capped at the provided
highTide first, then replenishing at the provided
lowTide , effectively rate limiting the upstream Publisher . |
Flux<T> |
limitRequest(long requestCap)
Ensure that the total amount requested upstream is capped at
cap . |
Flux<T> |
log()
Observe all Reactive Streams signals and trace them using
Logger support. |
Flux<T> |
log(Logger logger)
Observe Reactive Streams signals matching the passed filter
options and
trace them using a specific user-provided Logger , at Level.INFO level. |
Flux<T> |
log(Logger logger,
Level level,
boolean showOperatorLine,
SignalType... options)
|
Flux<T> |
log(String category)
Observe all Reactive Streams signals and trace them using
Logger support. |
Flux<T> |
log(String category,
Level level,
boolean showOperatorLine,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and
trace them using Logger support. |
Flux<T> |
log(String category,
Level level,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and
trace them using Logger support. |
<V> Flux<V> |
map(Function<? super T,? extends V> mapper)
Transform the items emitted by this
Flux by applying a synchronous function
to each item. |
Flux<Signal<T>> |
materialize()
Transform incoming onNext, onError and onComplete signals into
Signal instances,
materializing these signals. |
static <I> Flux<I> |
merge(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. |
static <I> Flux<I> |
merge(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
merge(Publisher<? extends I>... sources)
Merge data from
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. |
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source)
|
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source,
int concurrency)
|
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source,
int concurrency,
int prefetch)
|
static <I> Flux<I> |
mergeDelayError(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. |
static <T> Flux<T> |
mergeOrdered(Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator ). |
static <T> Flux<T> |
mergeOrdered(int prefetch,
Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator ). |
static <I extends Comparable<? super I>> |
mergeOrdered(Publisher<? extends I>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by their natural order). |
Flux<T> |
mergeOrderedWith(Publisher<? extends T> other,
Comparator<? super T> otherComparator)
Merge data from this
Flux and a Publisher into a reordered merge
sequence, by picking the smallest value from each sequence as defined by a provided
Comparator . |
static <I> Flux<I> |
mergeSequential(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences provided in an array/vararg
into an ordered merged sequence. |
static <I> Flux<I> |
mergeSequential(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
mergeSequential(Iterable<? extends Publisher<? extends I>> sources,
int maxConcurrency,
int prefetch)
|
static <I> Flux<I> |
mergeSequential(Publisher<? extends I>... sources)
Merge data from
Publisher sequences provided in an array/vararg
into an ordered merged sequence. |
static <T> Flux<T> |
mergeSequential(Publisher<? extends Publisher<? extends T>> sources)
|
static <T> Flux<T> |
mergeSequential(Publisher<? extends Publisher<? extends T>> sources,
int maxConcurrency,
int prefetch)
|
static <I> Flux<I> |
mergeSequentialDelayError(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences provided in an array/vararg
into an ordered merged sequence. |
static <I> Flux<I> |
mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources,
int maxConcurrency,
int prefetch)
|
static <T> Flux<T> |
mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources,
int maxConcurrency,
int prefetch)
|
Flux<T> |
mergeWith(Publisher<? extends T> other)
|
Flux<T> |
metrics()
Activate metrics for this sequence, provided there is an instrumentation facade
on the classpath (otherwise this method is a pure no-op).
|
Flux<T> |
name(String name)
Give a name to this sequence, which can be retrieved using
Scannable.name()
as long as this is the first reachable Scannable.parents() . |
static <T> Flux<T> |
never()
Create a
Flux that will never signal any data, error or completion signal. |
Mono<T> |
next()
|
<U> Flux<U> |
ofType(Class<U> clazz)
Evaluate each accepted value against the given
Class type. |
protected static <T> ConnectableFlux<T> |
onAssembly(ConnectableFlux<T> source)
To be used by custom operators: invokes assembly
Hooks pointcut given a
ConnectableFlux , potentially returning a new ConnectableFlux . |
protected static <T> Flux<T> |
onAssembly(Flux<T> source)
|
Flux<T> |
onBackpressureBuffer()
Request an unbounded demand and push to the returned
Flux , or park the
observed elements if not enough demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(Duration ttl,
int maxSize,
Consumer<? super T> onBufferEviction)
Request an unbounded demand and push to the returned
Flux , or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit and for a maximum Duration of ttl (as measured on the
elastic Scheduler ). |
Flux<T> |
onBackpressureBuffer(Duration ttl,
int maxSize,
Consumer<? super T> onBufferEviction,
Scheduler scheduler)
|
Flux<T> |
onBackpressureBuffer(int maxSize)
Request an unbounded demand and push to the returned
Flux , or park the
observed elements if not enough demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(int maxSize,
BufferOverflowStrategy bufferOverflowStrategy)
Request an unbounded demand and push to the returned
Flux , or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. |
Flux<T> |
onBackpressureBuffer(int maxSize,
Consumer<? super T> onOverflow)
Request an unbounded demand and push to the returned
Flux , or park the
observed elements if not enough demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(int maxSize,
Consumer<? super T> onBufferOverflow,
BufferOverflowStrategy bufferOverflowStrategy)
Request an unbounded demand and push to the returned
Flux , or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. |
Flux<T> |
onBackpressureDrop()
Request an unbounded demand and push to the returned
Flux , or drop
the observed elements if not enough demand is requested downstream. |
Flux<T> |
onBackpressureDrop(Consumer<? super T> onDropped)
|
Flux<T> |
onBackpressureError()
Request an unbounded demand and push to the returned
Flux , or emit onError
fom Exceptions.failWithOverflow() if not enough demand is requested
downstream. |
Flux<T> |
onBackpressureLatest()
Request an unbounded demand and push to the returned
Flux , or only keep
the most recent observed item if not enough demand is requested downstream. |
Flux<T> |
onErrorContinue(BiConsumer<Throwable,Object> errorConsumer)
Let compatible operators upstream recover from errors by dropping the
incriminating element from the sequence and continuing with subsequent elements.
|
<E extends Throwable> |
onErrorContinue(Class<E> type,
BiConsumer<Throwable,Object> errorConsumer)
Let compatible operators upstream recover from errors by dropping the
incriminating element from the sequence and continuing with subsequent elements.
|
<E extends Throwable> |
onErrorContinue(Predicate<E> errorPredicate,
BiConsumer<Throwable,Object> errorConsumer)
Let compatible operators upstream recover from errors by dropping the
incriminating element from the sequence and continuing with subsequent elements.
|
<E extends Throwable> |
onErrorMap(Class<E> type,
Function<? super E,? extends Throwable> mapper)
Transform an error emitted by this
Flux by synchronously applying a function
to it if the error matches the given type. |
Flux<T> |
onErrorMap(Function<? super Throwable,? extends Throwable> mapper)
Transform any error emitted by this
Flux by synchronously applying a function to it. |
Flux<T> |
onErrorMap(Predicate<? super Throwable> predicate,
Function<? super Throwable,? extends Throwable> mapper)
Transform an error emitted by this
Flux by synchronously applying a function
to it if the error matches the given predicate. |
<E extends Throwable> |
onErrorResume(Class<E> type,
Function<? super E,? extends Publisher<? extends T>> fallback)
Subscribe to a fallback publisher when an error matching the given type
occurs, using a function to choose the fallback depending on the error.
|
Flux<T> |
onErrorResume(Function<? super Throwable,? extends Publisher<? extends T>> fallback)
Subscribe to a returned fallback publisher when any error occurs, using a function to
choose the fallback depending on the error.
|
Flux<T> |
onErrorResume(Predicate<? super Throwable> predicate,
Function<? super Throwable,? extends Publisher<? extends T>> fallback)
Subscribe to a fallback publisher when an error matching a given predicate
occurs.
|
<E extends Throwable> |
onErrorReturn(Class<E> type,
T fallbackValue)
Simply emit a captured fallback value when an error of the specified type is
observed on this
Flux . |
Flux<T> |
onErrorReturn(Predicate<? super Throwable> predicate,
T fallbackValue)
Simply emit a captured fallback value when an error matching the given predicate is
observed on this
Flux . |
Flux<T> |
onErrorReturn(T fallbackValue)
Simply emit a captured fallback value when any error is observed on this
Flux . |
Flux<T> |
onErrorStop()
If an
onErrorContinue(BiConsumer) variant has been used before, reverts to the default
'STOP' mode where errors are terminal events. |
protected static <T> Flux<T> |
onLastAssembly(Flux<T> source)
|
Flux<T> |
onTerminateDetach()
Detaches both the child
Subscriber and the Subscription on
termination or cancellation. |
Flux<T> |
or(Publisher<? extends T> other)
|
ParallelFlux<T> |
parallel()
Prepare this
Flux by dividing data on a number of 'rails' matching the
number of CPU cores, in a round-robin fashion. |
ParallelFlux<T> |
parallel(int parallelism)
Prepare this
Flux by dividing data on a number of 'rails' matching the
provided parallelism parameter, in a round-robin fashion. |
ParallelFlux<T> |
parallel(int parallelism,
int prefetch)
|
ConnectableFlux<T> |
publish()
Prepare a
ConnectableFlux which shares this Flux sequence and
dispatches values to subscribers in a backpressure-aware manner. |
<R> Flux<R> |
publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform)
Shares a sequence for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream.
|
<R> Flux<R> |
publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform,
int prefetch)
Shares a sequence for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream.
|
ConnectableFlux<T> |
publish(int prefetch)
Prepare a
ConnectableFlux which shares this Flux sequence and
dispatches values to subscribers in a backpressure-aware manner. |
Mono<T> |
publishNext()
|
Flux<T> |
publishOn(Scheduler scheduler)
|
Flux<T> |
publishOn(Scheduler scheduler,
boolean delayError,
int prefetch)
Run onNext, onComplete and onError on a supplied
Scheduler
Scheduler.Worker . |
Flux<T> |
publishOn(Scheduler scheduler,
int prefetch)
Run onNext, onComplete and onError on a supplied
Scheduler
Scheduler.Worker . |
static <T> Flux<T> |
push(Consumer<? super FluxSink<T>> emitter)
|
static <T> Flux<T> |
push(Consumer<? super FluxSink<T>> emitter,
FluxSink.OverflowStrategy backpressure)
|
static Flux<Integer> |
range(int start,
int count)
|
<A> Mono<A> |
reduce(A initial,
BiFunction<A,? super T,A> accumulator)
Reduce the values from this
Flux sequence into an single object matching the
type of a seed value. |
Mono<T> |
reduce(BiFunction<T,T,T> aggregator)
Reduce the values from this
Flux sequence into an single object of the same
type than the emitted items. |
<A> Mono<A> |
reduceWith(Supplier<A> initial,
BiFunction<A,? super T,A> accumulator)
Reduce the values from this
Flux sequence into an single object matching the
type of a lazily supplied seed value. |
Flux<T> |
repeat()
Repeatedly and indefinitely subscribe to the source upon completion of the
previous subscription.
|
Flux<T> |
repeat(BooleanSupplier predicate)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.
|
Flux<T> |
repeat(long numRepeat)
Repeatedly subscribe to the source
numRepeat times. |
Flux<T> |
repeat(long numRepeat,
BooleanSupplier predicate)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous
subscription.
|
Flux<T> |
repeatWhen(Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
Repeatedly subscribe to this
Flux when a companion sequence emits elements in
response to the flux completion signal. |
ConnectableFlux<T> |
replay()
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber . |
ConnectableFlux<T> |
replay(Duration ttl)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
ConnectableFlux<T> |
replay(Duration ttl,
Scheduler timer)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
ConnectableFlux<T> |
replay(int history)
Turn this
Flux into a connectable hot source and cache last emitted
signals for further Subscriber . |
ConnectableFlux<T> |
replay(int history,
Duration ttl)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
ConnectableFlux<T> |
replay(int history,
Duration ttl,
Scheduler timer)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
Flux<T> |
retry()
Re-subscribes to this
Flux sequence if it signals any error, indefinitely. |
Flux<T> |
retry(long numRetries)
Re-subscribes to this
Flux sequence if it signals any error, for a fixed
number of times. |
Flux<T> |
retry(long numRetries,
Predicate<? super Throwable> retryMatcher)
|
Flux<T> |
retry(Predicate<? super Throwable> retryMatcher)
|
Flux<T> |
retryBackoff(long numRetries,
Duration firstBackoff)
In case of error, retry this
Flux up to numRetries times using a
randomized exponential backoff strategy (jitter). |
Flux<T> |
retryBackoff(long numRetries,
Duration firstBackoff,
Duration maxBackoff)
In case of error, retry this
Flux up to numRetries times using a
randomized exponential backoff strategy. |
Flux<T> |
retryBackoff(long numRetries,
Duration firstBackoff,
Duration maxBackoff,
double jitterFactor)
In case of error, retry this
Flux up to numRetries times using a
randomized exponential backoff strategy, randomized with a user-provided jitter
factor between 0.d (no jitter) and 1.0 (default is 0.5 ). |
Flux<T> |
retryWhen(Function<Flux<Throwable>,? extends Publisher<?>> whenFactory)
|
Flux<T> |
sample(Duration timespan)
|
<U> Flux<T> |
sample(Publisher<U> sampler)
|
Flux<T> |
sampleFirst(Duration timespan)
Repeatedly take a value from this
Flux then skip the values that follow
within a given duration. |
<U> Flux<T> |
sampleFirst(Function<? super T,? extends Publisher<U>> samplerFactory)
|
<U> Flux<T> |
sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory)
|
<U> Flux<T> |
sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory,
int maxConcurrency)
|
<A> Flux<A> |
scan(A initial,
BiFunction<A,? super T,A> accumulator)
Reduce this
Flux values with an accumulator BiFunction and
also emit the intermediate results of this function. |
Flux<T> |
scan(BiFunction<T,T,T> accumulator)
Reduce this
Flux values with an accumulator BiFunction and
also emit the intermediate results of this function. |
<A> Flux<A> |
scanWith(Supplier<A> initial,
BiFunction<A,? super T,A> accumulator)
Reduce this
Flux values with the help of an accumulator BiFunction
and also emits the intermediate results. |
Flux<T> |
share()
|
Mono<T> |
single()
Expect and emit a single item from this
Flux source or signal
NoSuchElementException for an empty source, or
IndexOutOfBoundsException for a source with more than one element. |
Mono<T> |
single(T defaultValue)
Expect and emit a single item from this
Flux source and emit a default
value for an empty source, but signal an IndexOutOfBoundsException for a
source with more than one element. |
Mono<T> |
singleOrEmpty()
Expect and emit a single item from this
Flux source, and accept an empty
source but signal an IndexOutOfBoundsException for a source with more than
one element. |
Flux<T> |
skip(Duration timespan)
Skip elements from this
Flux emitted within the specified initial duration. |
Flux<T> |
skip(Duration timespan,
Scheduler timer)
|
Flux<T> |
skip(long skipped)
Skip the specified number of elements from the beginning of this
Flux then
emit the remaining elements. |
Flux<T> |
skipLast(int n)
Skip a specified number of elements at the end of this
Flux sequence. |
Flux<T> |
skipUntil(Predicate<? super T> untilPredicate)
|
Flux<T> |
skipUntilOther(Publisher<?> other)
|
Flux<T> |
skipWhile(Predicate<? super T> skipPredicate)
|
Flux<T> |
sort()
Sort elements from this
Flux by collecting and sorting them in the background
then emitting the sorted sequence once this sequence completes. |
Flux<T> |
sort(Comparator<? super T> sortFunction)
Sort elements from this
Flux using a Comparator function, by
collecting and sorting elements in the background then emitting the sorted sequence
once this sequence completes. |
Flux<T> |
startWith(Iterable<? extends T> iterable)
|
Flux<T> |
startWith(Publisher<? extends T> publisher)
|
Flux<T> |
startWith(T... values)
Prepend the given values before this
Flux sequence. |
Disposable |
subscribe()
Subscribe to this
Flux and request unbounded demand. |
Disposable |
subscribe(Consumer<? super T> consumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer)
|
abstract void |
subscribe(CoreSubscriber<? super T> actual)
An internal
Publisher.subscribe(Subscriber) that will bypass
Hooks.onLastOperator(Function) pointcut. |
void |
subscribe(Subscriber<? super T> actual) |
Flux<T> |
subscribeOn(Scheduler scheduler)
Run subscribe, onSubscribe and request on a specified
Scheduler 's Scheduler.Worker . |
Flux<T> |
subscribeOn(Scheduler scheduler,
boolean requestOnSeparateThread)
Run subscribe and onSubscribe on a specified
Scheduler 's Scheduler.Worker . |
Flux<T> |
subscriberContext(Context mergeContext)
|
Flux<T> |
subscriberContext(Function<Context,Context> doOnContext)
|
<E extends Subscriber<? super T>> |
subscribeWith(E subscriber)
|
Flux<T> |
switchIfEmpty(Publisher<? extends T> alternate)
Switch to an alternative
Publisher if this sequence is completed without any data. |
<V> Flux<V> |
switchMap(Function<? super T,Publisher<? extends V>> fn)
|
<V> Flux<V> |
switchMap(Function<? super T,Publisher<? extends V>> fn,
int prefetch)
|
static <T> Flux<T> |
switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers)
|
static <T> Flux<T> |
switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers,
int prefetch)
|
Flux<T> |
tag(String key,
String value)
Tag this flux with a key/value pair.
|
Flux<T> |
take(Duration timespan)
|
Flux<T> |
take(Duration timespan,
Scheduler timer)
|
Flux<T> |
take(long n)
Take only the first N values from this
Flux , if available. |
Flux<T> |
takeLast(int n)
Emit the last N values this
Flux emitted before its completion. |
Flux<T> |
takeUntil(Predicate<? super T> predicate)
|
Flux<T> |
takeUntilOther(Publisher<?> other)
|
Flux<T> |
takeWhile(Predicate<? super T> continuePredicate)
Relay values from this
Flux while a predicate returns TRUE
for the values (checked before each value is delivered). |
Mono<Void> |
then()
Return a
Mono<Void> that completes when this Flux completes. |
<V> Mono<V> |
then(Mono<V> other)
|
Mono<Void> |
thenEmpty(Publisher<Void> other)
Return a
Mono<Void> that waits for this Flux to complete then
for a supplied Publisher<Void> to also complete. |
<V> Flux<V> |
thenMany(Publisher<V> other)
|
Flux<T> |
timeout(Duration timeout)
Propagate a
TimeoutException as soon as no item is emitted within the
given Duration from the previous emission (or the subscription for the first item). |
Flux<T> |
timeout(Duration timeout,
Publisher<? extends T> fallback)
|
Flux<T> |
timeout(Duration timeout,
Publisher<? extends T> fallback,
Scheduler timer)
|
Flux<T> |
timeout(Duration timeout,
Scheduler timer)
Propagate a
TimeoutException as soon as no item is emitted within the
given Duration from the previous emission (or the subscription for the first
item), as measured by the specified Scheduler . |
<U> Flux<T> |
timeout(Publisher<U> firstTimeout)
Signal a
TimeoutException in case the first item from this Flux has
not been emitted before the given Publisher emits. |
<U,V> Flux<T> |
timeout(Publisher<U> firstTimeout,
Function<? super T,? extends Publisher<V>> nextTimeoutFactory)
Signal a
TimeoutException in case the first item from this Flux has
not been emitted before the firstTimeout Publisher emits, and whenever
each subsequent elements is not emitted before a Publisher generated from
the latest element signals. |
<U,V> Flux<T> |
timeout(Publisher<U> firstTimeout,
Function<? super T,? extends Publisher<V>> nextTimeoutFactory,
Publisher<? extends T> fallback)
|
Flux<Tuple2<Long,T>> |
timestamp()
|
Flux<Tuple2<Long,T>> |
timestamp(Scheduler scheduler)
|
Iterable<T> |
toIterable()
|
Iterable<T> |
toIterable(int batchSize)
|
Iterable<T> |
toIterable(int batchSize,
Supplier<Queue<T>> queueProvider)
|
Stream<T> |
toStream()
|
Stream<T> |
toStream(int batchSize)
|
String |
toString() |
<V> Flux<V> |
transform(Function<? super Flux<T>,? extends Publisher<V>> transformer)
|
static <T,D> Flux<T> |
using(Callable<? extends D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a
Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or
the Subscriber cancels.
|
static <T,D> Flux<T> |
using(Callable<? extends D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup,
boolean eager)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a
Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or
the Subscriber cancels.
|
static <T,D> Flux<T> |
usingWhen(Publisher<D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> resourceClosure,
Function<? super D,? extends Publisher<?>> asyncCleanup)
Uses a resource, generated by a
Publisher for each individual Subscriber ,
while streaming the values from a Publisher derived from the same resource. |
static <T,D> Flux<T> |
usingWhen(Publisher<D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> resourceClosure,
Function<? super D,? extends Publisher<?>> asyncComplete,
Function<? super D,? extends Publisher<?>> asyncError)
Uses a resource, generated by a
Publisher for each individual Subscriber ,
while streaming the values from a Publisher derived from the same resource. |
static <T,D> Flux<T> |
usingWhen(Publisher<D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> resourceClosure,
Function<? super D,? extends Publisher<?>> asyncComplete,
Function<? super D,? extends Publisher<?>> asyncError,
Function<? super D,? extends Publisher<?>> asyncCancel)
Uses a resource, generated by a
Publisher for each individual Subscriber ,
while streaming the values from a Publisher derived from the same resource. |
Flux<Flux<T>> |
window(Duration windowingTimespan)
|
Flux<Flux<T>> |
window(Duration windowingTimespan,
Duration openWindowEvery)
|
Flux<Flux<T>> |
window(Duration windowingTimespan,
Duration openWindowEvery,
Scheduler timer)
|
Flux<Flux<T>> |
window(Duration windowingTimespan,
Scheduler timer)
|
Flux<Flux<T>> |
window(int maxSize)
|
Flux<Flux<T>> |
window(int maxSize,
int skip)
|
Flux<Flux<T>> |
window(Publisher<?> boundary)
|
Flux<Flux<T>> |
windowTimeout(int maxSize,
Duration maxTime)
|
Flux<Flux<T>> |
windowTimeout(int maxSize,
Duration maxTime,
Scheduler timer)
|
Flux<Flux<T>> |
windowUntil(Predicate<T> boundaryTrigger)
|
Flux<Flux<T>> |
windowUntil(Predicate<T> boundaryTrigger,
boolean cutBefore)
|
Flux<Flux<T>> |
windowUntil(Predicate<T> boundaryTrigger,
boolean cutBefore,
int prefetch)
|
<U,V> Flux<Flux<T>> |
windowWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector)
|
Flux<Flux<T>> |
windowWhile(Predicate<T> inclusionPredicate)
|
Flux<Flux<T>> |
windowWhile(Predicate<T> inclusionPredicate,
int prefetch)
|
<U,R> Flux<R> |
withLatestFrom(Publisher<? extends U> other,
BiFunction<? super T,? super U,? extends R> resultSelector)
Combine the most recently emitted values from both this
Flux and another
Publisher through a BiFunction and emits the result. |
static <I,O> Flux<O> |
zip(Function<? super Object[],? extends O> combinator,
int prefetch,
Publisher<? extends I>... sources)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <I,O> Flux<O> |
zip(Function<? super Object[],? extends O> combinator,
Publisher<? extends I>... sources)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <O> Flux<O> |
zip(Iterable<? extends Publisher<?>> sources,
Function<? super Object[],? extends O> combinator)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <O> Flux<O> |
zip(Iterable<? extends Publisher<?>> sources,
int prefetch,
Function<? super Object[],? extends O> combinator)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <TUPLE extends Tuple2,V> |
zip(Publisher<? extends Publisher<?>> sources,
Function<? super TUPLE,? extends V> combinator)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <T1,T2> Flux<Tuple2<T1,T2>> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2)
Zip two sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple2 . |
static <T1,T2,O> Flux<O> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
BiFunction<? super T1,? super T2,? extends O> combinator)
Zip two sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <T1,T2,T3> Flux<Tuple3<T1,T2,T3>> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3)
Zip three sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple3 . |
static <T1,T2,T3,T4> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4)
Zip four sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple4 . |
static <T1,T2,T3,T4,T5> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5)
Zip five sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple5 . |
static <T1,T2,T3,T4,T5,T6> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6)
Zip six sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple6 . |
static <T1,T2,T3,T4,T5,T6,T7> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6,
Publisher<? extends T7> source7)
Zip seven sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple7 . |
static <T1,T2,T3,T4,T5,T6,T7,T8> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6,
Publisher<? extends T7> source7,
Publisher<? extends T8> source8)
Zip eight sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple8 . |
<T2> Flux<Tuple2<T,T2>> |
zipWith(Publisher<? extends T2> source2)
|
<T2,V> Flux<V> |
zipWith(Publisher<? extends T2> source2,
BiFunction<? super T,? super T2,? extends V> combinator)
Zip this
Flux with another Publisher source, that is to say wait
for both to emit one element and combine these elements using a combinator
BiFunction
The operator will continue doing so until any of the sources completes. |
<T2> Flux<Tuple2<T,T2>> |
zipWith(Publisher<? extends T2> source2,
int prefetch)
|
<T2,V> Flux<V> |
zipWith(Publisher<? extends T2> source2,
int prefetch,
BiFunction<? super T,? super T2,? extends V> combinator)
Zip this
Flux with another Publisher source, that is to say wait
for both to emit one element and combine these elements using a combinator
BiFunction
The operator will continue doing so until any of the sources completes. |
<T2> Flux<Tuple2<T,T2>> |
zipWithIterable(Iterable<? extends T2> iterable)
|
<T2,V> Flux<V> |
zipWithIterable(Iterable<? extends T2> iterable,
BiFunction<? super T,? super T2,? extends V> zipper)
Zip elements from this
Flux with the content of an Iterable , that is
to say combine one element from each, pairwise, using the given zipper BiFunction . |
@SafeVarargs public static <T,V> Flux<V> combineLatest(Function<Object[],V> combinator, Publisher<? extends T>... sources)
Flux
whose data are generated by the combination of the
most recently published value from each of the Publisher
sources.
T
- type of the value from sourcesV
- The produced output after transformation by the given combinatorsources
- The Publisher
sources to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinations@SafeVarargs public static <T,V> Flux<V> combineLatest(Function<Object[],V> combinator, int prefetch, Publisher<? extends T>... sources)
Flux
whose data are generated by the combination of the
most recently published value from each of the Publisher
sources.
T
- type of the value from sourcesV
- The produced output after transformation by the given combinatorsources
- The Publisher
sources to combine values fromprefetch
- The demand sent to each combined source Publisher
combinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of two Publisher
sources.
T1
- type of the value from source1T2
- type of the value from source2V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,T3,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of three Publisher
sources.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromsource3
- The third Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,T3,T4,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of four Publisher
sources.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromsource3
- The third Publisher
source to combine values fromsource4
- The fourth Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,T3,T4,T5,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of five Publisher
sources.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromsource3
- The third Publisher
source to combine values fromsource4
- The fourth Publisher
source to combine values fromsource5
- The fifth Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,T3,T4,T5,T6,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of six Publisher
sources.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromsource3
- The third Publisher
source to combine values fromsource4
- The fourth Publisher
source to combine values fromsource5
- The fifth Publisher
source to combine values fromsource6
- The sixth Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T,V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each
of the Publisher
sources provided in an Iterable
.
T
- The common base type of the values from sourcesV
- The produced output after transformation by the given combinatorsources
- The list of Publisher
sources to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T,V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, int prefetch, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each
of the Publisher
sources provided in an Iterable
.
T
- The common base type of the values from sourcesV
- The produced output after transformation by the given combinatorsources
- The list of Publisher
sources to combine values fromprefetch
- demand produced to each combined source Publisher
combinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T> Flux<T> concat(Iterable<? extends Publisher<? extends T>> sources)
Iterable
, forwarding elements
emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
@SafeVarargs public final Flux<T> concatWithValues(T... values)
Flux
values
- The values to concatenateFlux
concatenating all source sequencespublic static <T> Flux<T> concat(Publisher<? extends Publisher<? extends T>> sources)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
T
- The type of values in both source and output sequencessources
- The Publisher
of Publisher
to concatenateFlux
concatenating all inner sources sequencespublic static <T> Flux<T> concat(Publisher<? extends Publisher<? extends T>> sources, int prefetch)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
T
- The type of values in both source and output sequencessources
- The Publisher
of Publisher
to concatenateprefetch
- the inner source request sizeFlux
concatenating all inner sources sequences@SafeVarargs public static <T> Flux<T> concat(Publisher<? extends T>... sources)
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
T
- The type of values in both source and output sequencessources
- The Publisher
of Publisher
to concatenateFlux
concatenating all inner sources sequences, delaying errorspublic static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, int prefetch)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
T
- The type of values in both source and output sequencessources
- The Publisher
of Publisher
to concatenateprefetch
- the inner source request sizeFlux
concatenating all inner sources sequences until complete or errorpublic static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, boolean delayUntilEnd, int prefetch)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes.
Errors do not interrupt the main sequence but are propagated after the current
concat backlog if delayUntilEnd
is false or after all sources
have had a chance to be concatenated if delayUntilEnd
is true.
T
- The type of values in both source and output sequencessources
- The Publisher
of Publisher
to concatenatedelayUntilEnd
- delay error until all sources have been consumed instead of
after the current sourceprefetch
- the inner source request sizeFlux
concatenating all inner sources sequences until complete or error@SafeVarargs public static <T> Flux<T> concatDelayError(Publisher<? extends T>... sources)
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)
Flux
with the capability of emitting multiple
elements in a synchronous or asynchronous manner through the FluxSink
API.
This includes emitting elements from multiple threads.
This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>create(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
});
T
- The type of values in the sequenceemitter
- Consume the FluxSink
provided per-subscriber by Reactor to generate signals.Flux
push(Consumer)
FluxSink
exposed by this operator buffers in case of
overflow. The buffer is discarded when the main sequence is cancelled.public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure)
Flux
with the capability of emitting multiple
elements in a synchronous or asynchronous manner through the FluxSink
API.
This includes emitting elements from multiple threads.
This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>create(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
}, FluxSink.OverflowStrategy.LATEST);
T
- The type of values in the sequencebackpressure
- the backpressure mode, see FluxSink.OverflowStrategy
for the
available backpressure modesemitter
- Consume the FluxSink
provided per-subscriber by Reactor to generate signals.Flux
#push(Consumer, OverflowStrategy)
FluxSink
exposed by this operator discards elements
as relevant to the chosen FluxSink.OverflowStrategy
. For example, the FluxSink.OverflowStrategy.DROP
discards each items as they are being dropped, while FluxSink.OverflowStrategy.BUFFER
will discard the buffer upon cancellation.public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter)
Flux
with the capability of emitting multiple
elements from a single-threaded producer through the FluxSink
API. For
a multi-threaded capable alternative, see create(Consumer)
.
This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>push(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
}, FluxSink.OverflowStrategy.LATEST);
T
- The type of values in the sequenceemitter
- Consume the FluxSink
provided per-subscriber by Reactor to generate signals.Flux
create(Consumer)
FluxSink
exposed by this operator buffers in case of
overflow. The buffer is discarded when the main sequence is cancelled.public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure)
Flux
with the capability of emitting multiple
elements from a single-threaded producer through the FluxSink
API. For
a multi-threaded capable alternative, see #create(Consumer, OverflowStrategy)
.
This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>push(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
}, FluxSink.OverflowStrategy.LATEST);
T
- The type of values in the sequencebackpressure
- the backpressure mode, see FluxSink.OverflowStrategy
for the
available backpressure modesemitter
- Consume the FluxSink
provided per-subscriber by Reactor to generate signals.Flux
#create(Consumer, OverflowStrategy)
FluxSink
exposed by this operator discards elements
as relevant to the chosen FluxSink.OverflowStrategy
. For example, the FluxSink.OverflowStrategy.DROP
discards each items as they are being dropped, while FluxSink.OverflowStrategy.BUFFER
will discard the buffer upon cancellation.public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier)
Publisher
every time a Subscription
is made on the
resulting Flux
, so the actual source instantiation is deferred until each
subscribe and the Supplier
can create a subscriber-specific instance.
If the supplier doesn't generate a new instance however, this operator will
effectively behave like from(Publisher)
.
public static <T> Flux<T> empty()
Flux
that completes without emitting any item.
T
- the reified type of the target Subscriber
Flux
public static <T> Flux<T> error(Throwable error)
Flux
that terminates with the specified error immediately after
being subscribed to.
T
- the reified type of the target Subscriber
error
- the error to signal to each Subscriber
Flux
public static <T> Flux<T> error(Supplier<Throwable> errorSupplier)
Flux
that terminates with an error immediately after being
subscribed to. The Throwable
is generated by a Supplier
, invoked
each time there is a subscription and allowing for lazy instantiation.
T
- the reified type of the target Subscriber
errorSupplier
- the error signal Supplier
to invoke for each Subscriber
Flux
public static <O> Flux<O> error(Throwable throwable, boolean whenRequested)
Flux
that terminates with the specified error, either immediately
after being subscribed to or after being first requested.
O
- the reified type of the target Subscriber
throwable
- the error to signal to each Subscriber
whenRequested
- if true, will onError on the first request instead of subscribe().Flux
@SafeVarargs public static <I> Flux<I> first(Publisher<? extends I>... sources)
Publisher
to emit any signal (onNext/onError/onComplete) and
replay all signals from that Publisher
, effectively behaving like the
fastest of these competing sources.
I
- The type of values in both source and output sequencessources
- The competing source publishersFlux
behaving like the fastest of its sourcespublic static <I> Flux<I> first(Iterable<? extends Publisher<? extends I>> sources)
Publisher
to emit any signal (onNext/onError/onComplete) and
replay all signals from that Publisher
, effectively behaving like the
fastest of these competing sources.
I
- The type of values in both source and output sequencessources
- The competing source publishersFlux
behaving like the fastest of its sourcespublic static <T> Flux<T> from(Publisher<? extends T> source)
T
- The type of values in both source and output sequencessource
- the source to decorateFlux
public static <T> Flux<T> fromArray(T[] array)
Flux
that emits the items contained in the provided array.
T
- The type of values in the source array and resulting Fluxarray
- the array to read data fromFlux
public static <T> Flux<T> fromStream(Stream<? extends T> s)
Flux
that emits the items contained in the provided Stream
.
Keep in mind that a Stream
cannot be re-used, which can be problematic in
case of multiple subscriptions or re-subscription (like with repeat()
or
retry()
). The Stream
is closed
automatically
by the operator on cancellation, error or completion.
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
Flux
by generating signals one-by-one via a
consumer callback.
T
- the value type emittedgenerator
- Consume the SynchronousSink
provided per-subscriber by Reactor
to generate a single signal on each pass.Flux
public static <T,S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator)
Flux
by generating signals one-by-one via a
consumer callback and some state. The stateSupplier
may return null.
T
- the value type emittedS
- the per-subscriber custom state typestateSupplier
- called for each incoming Subscriber to provide the initial state for the generator bifunctiongenerator
- Consume the SynchronousSink
provided per-subscriber by Reactor
as well as the current state to generate a single signal on each pass
and return a (new) state.Flux
public static <T,S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator, Consumer<? super S> stateConsumer)
Flux
by generating signals one-by-one via a
consumer callback and some state, with a final cleanup callback. The
stateSupplier
may return null but your cleanup stateConsumer
will need to handle the null case.
T
- the value type emittedS
- the per-subscriber custom state typestateSupplier
- called for each incoming Subscriber to provide the initial state for the generator bifunctiongenerator
- Consume the SynchronousSink
provided per-subscriber by Reactor
as well as the current state to generate a single signal on each pass
and return a (new) state.stateConsumer
- called after the generator has terminated or the downstream cancelled, receiving the last
state to be handled (i.e., release resources or do other cleanup).Flux
public static Flux<Long> interval(Duration period)
Flux
that emits long values starting with 0 and incrementing at
specified time intervals on the global timer. If demand is not produced in time,
an onError will be signalled with an overflow
IllegalStateException
detailing the tick that couldn't be emitted.
In normal conditions, the Flux
will never complete.
Runs on the Schedulers.parallel()
Scheduler.
public static Flux<Long> interval(Duration delay, Duration period)
Flux
that emits long values starting with 0 and incrementing at
specified time intervals, after an initial delay, on the global timer. If demand is
not produced in time, an onError will be signalled with an
overflow
IllegalStateException
detailing the tick that couldn't be emitted. In normal conditions, the Flux
will never complete.
Runs on the Schedulers.parallel()
Scheduler.
public static Flux<Long> interval(Duration period, Scheduler timer)
Flux
that emits long values starting with 0 and incrementing at
specified time intervals, on the specified Scheduler
. If demand is not
produced in time, an onError will be signalled with an overflow
IllegalStateException
detailing the tick that couldn't be emitted.
In normal conditions, the Flux
will never complete.
public static Flux<Long> interval(Duration delay, Duration period, Scheduler timer)
Flux
that emits long values starting with 0 and incrementing at
specified time intervals, after an initial delay, on the specified Scheduler
.
If demand is not produced in time, an onError will be signalled with an
overflow
IllegalStateException
detailing the tick that couldn't be emitted. In normal conditions, the Flux
will never complete.
@SafeVarargs public static <T> Flux<T> just(T... data)
Flux
that emits the provided elements and then completes.
T
- the emitted data typedata
- the elements to emit, as a varargFlux
public static <T> Flux<T> just(T data)
Flux
that will only emit a single element then onComplete.
T
- the emitted data typedata
- the single element to emitFlux
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source)
Publisher
sequences emitted by the passed Publisher
into an interleaved merged sequence. Unlike concat
, inner
sources are subscribed to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency)
Publisher
sequences emitted by the passed Publisher
into an interleaved merged sequence. Unlike concat
, inner
sources are subscribed to eagerly (but at most concurrency
sources are
subscribed to at the same time).
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency, int prefetch)
Publisher
sequences emitted by the passed Publisher
into an interleaved merged sequence. Unlike concat
, inner
sources are subscribed to eagerly (but at most concurrency
sources are
subscribed to at the same time).
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public static <I> Flux<I> merge(Iterable<? extends Publisher<? extends I>> sources)
Publisher
sequences contained in an Iterable
into an interleaved merged sequence. Unlike concat
, inner
sources are subscribed to eagerly.
A new Iterator
will be created for each subscriber.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I> Flux<I> merge(Publisher<? extends I>... sources)
Publisher
sequences contained in an array / vararg
into an interleaved merged sequence. Unlike concat
,
sources are subscribed to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I> Flux<I> merge(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences contained in an array / vararg
into an interleaved merged sequence. Unlike concat
,
sources are subscribed to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I> Flux<I> mergeDelayError(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences contained in an array / vararg
into an interleaved merged sequence. Unlike concat
,
sources are subscribed to eagerly.
This variant will delay any error until after the rest of the merge backlog has been processed.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I extends Comparable<? super I>> Flux<I> mergeOrdered(Publisher<? extends I>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by their natural order).
This is not a sort()
, as it doesn't consider the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
I
- a Comparable
merged type that has a natural order
sources
- Publisher
sources of Comparable
to mergeFlux
that , subscribing early but keeping the original ordering@SafeVarargs public static <T> Flux<T> mergeOrdered(Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
). This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
T
- the merged typecomparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that , subscribing early but keeping the original ordering@SafeVarargs public static <T> Flux<T> mergeOrdered(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
). This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
T
- the merged typeprefetch
- the number of elements to prefetch from each source (avoiding too
many small requests to the source when picking)comparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that , subscribing early but keeping the original orderingpublic static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources)
public static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch)
Publisher
sequences emitted by the passed Publisher
into an ordered merged sequence. Unlike concat, the inner publishers are subscribed to
eagerly (but at most maxConcurrency
sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
T
- the merged typesources
- a Publisher
of Publisher
sources to mergeprefetch
- the inner source request sizemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogFlux
, subscribing early but keeping the original orderingpublic static <T> Flux<T> mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch)
Publisher
sequences emitted by the passed Publisher
into an ordered merged sequence. Unlike concat, the inner publishers are subscribed to
eagerly (but at most maxConcurrency
sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
This variant will delay any error until after the rest of the mergeSequential backlog has been processed.
T
- the merged typesources
- a Publisher
of Publisher
sources to mergeprefetch
- the inner source request sizemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogFlux
, subscribing early but keeping the original ordering@SafeVarargs public static <I> Flux<I> mergeSequential(Publisher<? extends I>... sources)
Publisher
sequences provided in an array/vararg
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.
@SafeVarargs public static <I> Flux<I> mergeSequential(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences provided in an array/vararg
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.
@SafeVarargs public static <I> Flux<I> mergeSequentialDelayError(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences provided in an array/vararg
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.
This variant will delay any error until after the rest of the mergeSequential backlog
has been processed.
public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources)
public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch)
Publisher
sequences provided in an Iterable
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly (but at most maxConcurrency
sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
I
- the merged typesources
- an Iterable
of Publisher
sequences to mergemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogprefetch
- the inner source request sizeFlux
, subscribing early but keeping the original orderingpublic static <I> Flux<I> mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch)
Publisher
sequences provided in an Iterable
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly (but at most maxConcurrency
sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
This variant will delay any error until after the rest of the mergeSequential backlog
has been processed.
I
- the merged typesources
- an Iterable
of Publisher
sequences to mergemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogprefetch
- the inner source request sizeFlux
, subscribing early but keeping the original orderingpublic static <T> Flux<T> never()
Flux
that will never signal any data, error or completion signal.
T
- the Subscriber
type targetFlux
public static Flux<Integer> range(int start, int count)
Flux
that will only emit a sequence of count
incrementing integers,
starting from start
. That is, emit integers between start
(included)
and start + count
(excluded) then complete.
start
- the first integer to be emitcount
- the total number of incrementing values to emit, including the first valueFlux
public static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers)
Flux
that mirrors the most recently emitted Publisher
,
forwarding its data until a new Publisher
comes in in the source.
The resulting Flux
will complete once there are no new Publisher
in
the source (source has completed) and the last mirrored Publisher
has also
completed.
T
- the produced typemergedPublishers
- The Publisher
of Publisher
to switch on and mirror.FluxProcessor
accepting publishers and producing Tpublic static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers, int prefetch)
Flux
that mirrors the most recently emitted Publisher
,
forwarding its data until a new Publisher
comes in in the source.
The resulting Flux
will complete once there are no new Publisher
in
the source (source has completed) and the last mirrored Publisher
has also
completed.
T
- the produced typemergedPublishers
- The Publisher
of Publisher
to switch on and mirror.prefetch
- the inner source request sizeFluxProcessor
accepting publishers and producing Tpublic static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)
Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal even.
For an asynchronous version of the cleanup, with distinct path for onComplete, onError
and cancel terminations, see usingWhen(Publisher, Function, Function, Function, Function)
.
T
- emitted typeD
- resource typeresourceSupplier
- a Callable
that is called on subscribe to generate the resourcesourceSupplier
- a factory to derive a Publisher
from the supplied resourceresourceCleanup
- a resource cleanup callback invoked on completionFlux
built around a disposable resourceusingWhen(Publisher, Function, Function, Function, Function)
,
usingWhen(Publisher, Function, Function)
public static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager)
For an asynchronous version of the cleanup, with distinct path for onComplete, onError
and cancel terminations, see usingWhen(Publisher, Function, Function, Function, Function)
.
T
- emitted typeD
- resource typeresourceSupplier
- a Callable
that is called on subscribe to generate the resourcesourceSupplier
- a factory to derive a Publisher
from the supplied resourceresourceCleanup
- a resource cleanup callback invoked on completioneager
- true to clean before terminating downstream subscribersFlux
built around a disposable resourceusingWhen(Publisher, Function, Function, Function, Function)
,
usingWhen(Publisher, Function, Function)
public static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> resourceClosure, Function<? super D,? extends Publisher<?>> asyncComplete, Function<? super D,? extends Publisher<?>> asyncError)
Publisher
for each individual Subscriber
,
while streaming the values from a Publisher
derived from the same resource.
Whenever the resulting sequence terminates, the relevant Function
generates
a "cleanup" Publisher
that is invoked but doesn't change the content of the
main sequence. Instead it just defers the termination (unless it errors, in which case
the error suppresses the original termination signal).
Note that if the resource supplying Publisher
emits more than one resource, the
subsequent resources are dropped (Operators.onNextDropped(Object, Context)
). If
the publisher errors AFTER having emitted one resource, the error is also silently dropped
(Operators.onErrorDropped(Throwable, Context)
).
An empty completion or error without at least one onNext signal triggers a short-circuit
of the main sequence with the same terminal signal (no resource is established, no
cleanup is invoked).
T
- the type of elements emitted by the resource closure, and thus the main sequenceD
- the type of the resource objectresourceSupplier
- a Publisher
that "generates" the resource,
subscribed for each subscription to the main sequenceresourceClosure
- a factory to derive a Publisher
from the supplied resourceasyncComplete
- an asynchronous resource cleanup invoked if the resource closure terminates with onComplete or is cancelledasyncError
- an asynchronous resource cleanup invoked if the resource closure terminates with onErrorFlux
built around a "transactional" resource, with several
termination path triggering asynchronous cleanup sequencesusingWhen(Publisher, Function, Function, Function, Function)
public static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> resourceClosure, Function<? super D,? extends Publisher<?>> asyncComplete, Function<? super D,? extends Publisher<?>> asyncError, Function<? super D,? extends Publisher<?>> asyncCancel)
Publisher
for each individual Subscriber
,
while streaming the values from a Publisher
derived from the same resource.
Whenever the resulting sequence terminates, the relevant Function
generates
a "cleanup" Publisher
that is invoked but doesn't change the content of the
main sequence. Instead it just defers the termination (unless it errors, in which case
the error suppresses the original termination signal).
Note that if the resource supplying Publisher
emits more than one resource, the
subsequent resources are dropped (Operators.onNextDropped(Object, Context)
). If
the publisher errors AFTER having emitted one resource, the error is also silently dropped
(Operators.onErrorDropped(Throwable, Context)
).
An empty completion or error without at least one onNext signal triggers a short-circuit
of the main sequence with the same terminal signal (no resource is established, no
cleanup is invoked).
T
- the type of elements emitted by the resource closure, and thus the main sequenceD
- the type of the resource objectresourceSupplier
- a Publisher
that "generates" the resource,
subscribed for each subscription to the main sequenceresourceClosure
- a factory to derive a Publisher
from the supplied resourceasyncComplete
- an asynchronous resource cleanup invoked if the resource closure terminates with onCompleteasyncError
- an asynchronous resource cleanup invoked if the resource closure terminates with onErrorasyncCancel
- an asynchronous resource cleanup invoked if the resource closure is cancelled.
When null
, the asyncComplete
path is used instead.Flux
built around a "transactional" resource, with several
termination path triggering asynchronous cleanup sequencesusingWhen(Publisher, Function, Function, Function)
public static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> resourceClosure, Function<? super D,? extends Publisher<?>> asyncCleanup)
Publisher
for each individual Subscriber
,
while streaming the values from a Publisher
derived from the same resource.
Whenever the resulting sequence terminates, a provided Function
generates
a "cleanup" Publisher
that is invoked but doesn't change the content of the
main sequence. Instead it just defers the termination (unless it errors, in which case
the error suppresses the original termination signal).
Note that if the resource supplying Publisher
emits more than one resource, the
subsequent resources are dropped (Operators.onNextDropped(Object, Context)
). If
the publisher errors AFTER having emitted one resource, the error is also silently dropped
(Operators.onErrorDropped(Throwable, Context)
).
An empty completion or error without at least one onNext signal triggers a short-circuit
of the main sequence with the same terminal signal (no resource is established, no
cleanup is invoked).
T
- the type of elements emitted by the resource closure, and thus the main sequenceD
- the type of the resource objectresourceSupplier
- a Publisher
that "generates" the resource,
subscribed for each subscription to the main sequenceresourceClosure
- a factory to derive a Publisher
from the supplied resourceasyncCleanup
- an asynchronous resource cleanup invoked when the resource
closure terminates (with onComplete, onError or cancel)Flux
built around a "transactional" resource, with asynchronous
cleanup on all terminations (onComplete, onError, cancel)usingWhen(Publisher, Function, Function, Function, Function)
public static <T1,T2,O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends O> combinator)
T1
- type of the value from source1T2
- type of the value from source2O
- The produced output after transformation by the combinatorsource1
- The first Publisher
source to zip.source2
- The second Publisher
source to zip.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamFlux
public static <T1,T2> Flux<Tuple2<T1,T2>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2)
Tuple2
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
public static <T1,T2,T3> Flux<Tuple3<T1,T2,T3>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3)
Tuple3
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4> Flux<Tuple4<T1,T2,T3,T4>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4)
Tuple4
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5> Flux<Tuple5<T1,T2,T3,T4,T5>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5)
Tuple5
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5,T6> Flux<Tuple6<T1,T2,T3,T4,T5,T6>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6)
Tuple6
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5,T6,T7> Flux<Tuple7<T1,T2,T3,T4,T5,T6,T7>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7)
Tuple7
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6T7
- type of the value from source7source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.source7
- The seventh upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5,T6,T7,T8> Flux<Tuple8<T1,T2,T3,T4,T5,T6,T7,T8>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7, Publisher<? extends T8> source8)
Tuple8
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6T7
- type of the value from source7T8
- type of the value from source8source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.source7
- The seventh upstream Publisher
to subscribe to.source8
- The eight upstream Publisher
to subscribe to.Flux
public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources, Function<? super Object[],? extends O> combinator)
Iterable.iterator()
will be called on each Publisher.subscribe(Subscriber)
.
public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources, int prefetch, Function<? super Object[],? extends O> combinator)
Iterable.iterator()
will be called on each Publisher.subscribe(Subscriber)
.
O
- the combined produced typesources
- the Iterable
providing sources to zipprefetch
- the inner source request sizecombinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
@SafeVarargs public static <I,O> Flux<O> zip(Function<? super Object[],? extends O> combinator, Publisher<? extends I>... sources)
I
- the type of the input sourcesO
- the combined produced typecombinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamsources
- the array providing sources to zipFlux
@SafeVarargs public static <I,O> Flux<O> zip(Function<? super Object[],? extends O> combinator, int prefetch, Publisher<? extends I>... sources)
I
- the type of the input sourcesO
- the combined produced typecombinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamprefetch
- individual source request sizesources
- the array providing sources to zipFlux
public static <TUPLE extends Tuple2,V> Flux<V> zip(Publisher<? extends Publisher<?>> sources, Function<? super TUPLE,? extends V> combinator)
Note that the Publisher
sources from the outer Publisher
will
accumulate into an exhaustive list before starting zip operation.
TUPLE
- the raw tuple typeV
- The produced output after transformation by the given combinatorsources
- The Publisher
of Publisher
sources to zip. A finite publisher is required.combinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
based on the produced valuepublic final Mono<Boolean> all(Predicate<? super T> predicate)
Predicate
.
The implementation uses short-circuit logic and completes with false if the predicate doesn't match a value.
public final Mono<Boolean> any(Predicate<? super T> predicate)
Flux
sequence match
the predicate.
The implementation uses short-circuit logic and completes with false if any value doesn't match the predicate.
public final <P> P as(Function<? super Flux<T>,P> transformer)
Flux
into a target type.
flux.as(Mono::from).subscribe()
P
- the returned instance typetransformer
- the Function
to immediately map this Flux
into a target type instance.Flux
transformed to an instance of Pfor a bounded conversion to {@link Publisher}
@Nullable public final T blockFirst()
Flux
and block indefinitely
until the upstream signals its first value or completes. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException
if it was a checked
exception).
Note that each blockFirst() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
@Nullable public final T blockFirst(Duration timeout)
Flux
and block until the upstream
signals its first value, completes or a timeout expires. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException
if it was a checked
exception). If the provided timeout expires,a RuntimeException
is thrown.
Note that each blockFirst() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
timeout
- maximum time period to wait for before raising a RuntimeException
@Nullable public final T blockLast()
Flux
and block indefinitely
until the upstream signals its last value or completes. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException
if it was a checked
exception).
Note that each blockLast() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
@Nullable public final T blockLast(Duration timeout)
Flux
and block until the upstream
signals its last value, completes or a timeout expires. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException
if it was a checked
exception). If the provided timeout expires,a RuntimeException
is thrown.
Note that each blockLast() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
timeout
- maximum time period to wait for before raising a RuntimeException
public final Flux<List<T>> buffer()
List
buffer that will be emitted
by the returned Flux
once this Flux completes.
Flux
of at most one List
for an alternative collecting algorithm returning {@link Mono}
public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, Supplier<C> bufferSupplier)
Collection
buffers that
will be emitted by the returned Flux
each time the given max size is reached
or once this Flux completes.
C
- the Collection
buffer typemaxSize
- the maximum collected sizebufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
public final Flux<List<T>> buffer(int maxSize, int skip)
List
buffers that will be emitted
by the returned Flux
each time the given max size is reached or once this
Flux completes. Buffers can be created with gaps, as a new buffer will be created
every time skip
values have been emitted by the source.
When maxSize < skip : dropping buffers
When maxSize > skip : overlapping buffers
When maxSize == skip : exact buffers
skip
- the number of items to count before creating a new buffermaxSize
- the max collected sizeFlux
of possibly overlapped or gapped List
public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, int skip, Supplier<C> bufferSupplier)
Collection
buffers that
will be emitted by the returned Flux
each time the given max size is reached
or once this Flux completes. Buffers can be created with gaps, as a new buffer will
be created every time skip
values have been emitted by the source
When maxSize < skip : dropping buffers
When maxSize > skip : overlapping buffers
When maxSize == skip : exact buffers
C
- the Collection
buffer typeskip
- the number of items to count before creating a new buffermaxSize
- the max collected sizebufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of possibly overlapped or gapped
Collection
public final <C extends Collection<? super T>> Flux<C> buffer(Publisher<?> other, Supplier<C> bufferSupplier)
Collection
buffers, as
delimited by the signals of a companion Publisher
this operator will
subscribe to.
C
- the Collection
buffer typeother
- the companion Publisher
whose signals trigger new buffersbufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
delimited by signals from a Publisher
public final Flux<List<T>> buffer(Duration bufferingTimespan, Duration openBufferEvery)
List
buffers created at a given
openBufferEvery
period. Each buffer will last until the bufferingTimespan
has elapsed,
thus emitting the bucket in the resulting Flux
.
When bufferingTimespan < openBufferEvery : dropping buffers
When bufferingTimespan > openBufferEvery : overlapping buffers
When bufferingTimespan == openBufferEvery : exact buffers
bufferingTimespan
- the duration from buffer creation until a buffer is closed and emittedopenBufferEvery
- the interval at which to create a new bufferFlux
of List
delimited by the given period openBufferEvery and sized by bufferingTimespanpublic final Flux<List<T>> buffer(Duration bufferingTimespan, Scheduler timer)
List
buffers that will be emitted by
the returned Flux
every bufferingTimespan
, as measured on the provided Scheduler
.
bufferingTimespan
- the duration from buffer creation until a buffer is closed and emittedtimer
- a time-capable Scheduler
instance to run onFlux
of List
delimited by the given periodpublic final Flux<List<T>> buffer(Duration bufferingTimespan, Duration openBufferEvery, Scheduler timer)
List
buffers created at a given
openBufferEvery
period, as measured on the provided Scheduler
. Each
buffer will last until the bufferingTimespan
has elapsed (also measured on the scheduler),
thus emitting the bucket in the resulting Flux
.
When bufferingTimespan < openBufferEvery : dropping buffers
When bufferingTimespan > openBufferEvery : overlapping buffers
When bufferingTimespan == openBufferEvery : exact buffers
bufferingTimespan
- the duration from buffer creation until a buffer is closed and emittedopenBufferEvery
- the interval at which to create a new buffertimer
- a time-capable Scheduler
instance to run onFlux
of List
delimited by the given period openBufferEvery and sized by bufferingTimespanpublic final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime)
List
buffers that will be emitted
by the returned Flux
each time the buffer reaches a maximum size OR the
maxTime Duration
elapses.
maxSize
- the max collected sizemaxTime
- the timeout enforcing the release of a partial bufferFlux
of List
delimited by given size or a given period timeoutpublic final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier)
Collection
buffers that
will be emitted by the returned Flux
each time the buffer reaches a maximum
size OR the maxTime Duration
elapses.
C
- the Collection
buffer typemaxSize
- the max collected sizemaxTime
- the timeout enforcing the release of a partial bufferbufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
delimited by given size or a given period timeoutpublic final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer)
List
buffers that will be emitted
by the returned Flux
each time the buffer reaches a maximum size OR the
maxTime Duration
elapses, as measured on the provided Scheduler
.
maxSize
- the max collected sizemaxTime
- the timeout enforcing the release of a partial buffertimer
- a time-capable Scheduler
instance to run onFlux
of List
delimited by given size or a given period timeoutpublic final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer, Supplier<C> bufferSupplier)
Collection
buffers that
will be emitted by the returned Flux
each time the buffer reaches a maximum
size OR the maxTime Duration
elapses, as measured on the provided Scheduler
.
C
- the Collection
buffer typemaxSize
- the max collected sizemaxTime
- the timeout enforcing the release of a partial buffertimer
- a time-capable Scheduler
instance to run onbufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
delimited by given size or a given period timeoutpublic final Flux<List<T>> bufferUntil(Predicate<? super T> predicate)
List
buffers that will be emitted by
the resulting Flux
each time the given predicate returns true. Note that
the element that triggers the predicate to return true (and thus closes a buffer)
is included as last element in the emitted buffer.
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
public final Flux<List<T>> bufferUntil(Predicate<? super T> predicate, boolean cutBefore)
List
buffers that will be emitted by
the resulting Flux
each time the given predicate returns true. Note that
the buffer into which the element that triggers the predicate to return true
(and thus closes a buffer) is included depends on the cutBefore
parameter:
set it to true to include the boundary element in the newly opened buffer, false to
include it in the closed buffer (as in bufferUntil(Predicate)
).
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
predicate
- a predicate that triggers the next buffer when it becomes true.cutBefore
- set to true to include the triggering element in the new buffer rather than the old.Flux
of List
public final Flux<List<T>> bufferWhile(Predicate<? super T> predicate)
List
buffers that will be emitted by
the resulting Flux
. Each buffer continues aggregating values while the
given predicate returns true, and a new buffer is created as soon as the
predicate returns false... Note that the element that triggers the predicate
to return false (and thus closes a buffer) is NOT included in any emitted buffer.
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
public final <U,V> Flux<List<T>> bufferWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector)
List
buffers started each time an opening
companion Publisher
emits. Each buffer will last until the corresponding
closing companion Publisher
emits, thus releasing the buffer to the resulting Flux
.
When Open signal is strictly not overlapping Close signal : dropping buffers
When Open signal is strictly more frequent than Close signal : overlapping buffers
When Open signal is exactly coordinated with Close signal : exact buffers
U
- the element type of the buffer-opening sequenceV
- the element type of the buffer-closing sequencebucketOpening
- a companion Publisher
to subscribe for buffer creation signals.closeSelector
- a factory that, given a buffer opening signal, returns a companion
Publisher
to subscribe to for buffer closure and emission signals.Flux
of List
delimited by an opening Publisher
and a relative
closing Publisher
public final <U,V,C extends Collection<? super T>> Flux<C> bufferWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector, Supplier<C> bufferSupplier)
Collection
buffers started each time an opening
companion Publisher
emits. Each buffer will last until the corresponding
closing companion Publisher
emits, thus releasing the buffer to the resulting Flux
.
When Open signal is strictly not overlapping Close signal : dropping buffers
When Open signal is strictly more frequent than Close signal : overlapping buffers
When Open signal is exactly coordinated with Close signal : exact buffers
U
- the element type of the buffer-opening sequenceV
- the element type of the buffer-closing sequenceC
- the Collection
buffer typebucketOpening
- a companion Publisher
to subscribe for buffer creation signals.closeSelector
- a factory that, given a buffer opening signal, returns a companion
Publisher
to subscribe to for buffer closure and emission signals.bufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
delimited by an opening Publisher
and a relative
closing Publisher
public final Flux<T> cache()
Flux
into a hot source and cache last emitted signals for further Subscriber
. Will
retain an unbounded volume of onNext signals. Completion and Error will also be
replayed.
Flux
public final Flux<T> cache(int history)
Flux
into a hot source and cache last emitted signals for further Subscriber
.
Will retain up to the given history size onNext signals. Completion and Error will also be
replayed.
Note that cache(0)
will only cache the terminal signal without
expiration.
history
- number of elements retained in cacheFlux
public final Flux<T> cache(Duration ttl)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain an unbounded history but apply a per-item expiry timeout
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
ttl
- Time-to-live for each cached item and post termination.Flux
public final Flux<T> cache(Duration ttl, Scheduler timer)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain an unbounded history but apply a per-item expiry timeout
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
public final Flux<T> cache(int history, Duration ttl)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain up to the given history size and apply a per-item expiry
timeout.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
history
- number of elements retained in cachettl
- Time-to-live for each cached item and post termination.Flux
public final Flux<T> cache(int history, Duration ttl, Scheduler timer)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain up to the given history size and apply a per-item expiry
timeout.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
public final <E> Flux<E> cast(Class<E> clazz)
Flux
produced type into a target produced type.
public final Flux<T> checkpoint()
Flux
, in case of an error
upstream of the checkpoint. Tracing incurs the cost of an exception stack trace
creation.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
Flux
.public final Flux<T> checkpoint(String description)
Flux
by giving it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. Note that unlike checkpoint()
, this doesn't create a
filled stack trace, avoiding the main cost of the operator.
However, as a trade-off the description must be unique enough for the user to find
out where this Flux was assembled. If you only want a generic description, and
still rely on the stack trace to find the assembly site, use the
checkpoint(String, boolean)
variant.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
description
- a unique enough description to include in the light assembly traceback.Flux
public final Flux<T> checkpoint(@Nullable String description, boolean forceStackTrace)
forceStackTrace
option.
By setting the forceStackTrace
parameter to true, activate assembly
tracing for this particular Flux
and give it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. Note that unlike checkpoint(String)
, this will incur
the cost of an exception stack trace creation. The description could for
example be a meaningful name for the assembled flux or a wider correlation ID,
since the stack trace will always provide enough information to locate where this
Flux was assembled.
By setting forceStackTrace
to false, behaves like
checkpoint(String)
and is subject to the same caveat in choosing the
description.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly marker.
description
- a description (must be unique enough if forceStackTrace is set
to false).forceStackTrace
- false to make a light checkpoint without a stacktrace, true
to use a stack trace.Flux
.public final <E> Mono<E> collect(Supplier<E> containerSupplier, BiConsumer<E,? super T> collector)
Flux
into a user-defined container,
by applying a collector BiConsumer
taking the container and each element.
The collected result will be emitted when this sequence completes.
E
- the container typecontainerSupplier
- the supplier of the container instance for each Subscribercollector
- a consumer of both the container instance and the value being currently collectedMono
of the collected container on completepublic final <K> Mono<Map<K,T>> collectMap(Function<? super T,? extends K> keyExtractor)
Flux
into a hashed Map
that is
emitted by the resulting Mono
when this sequence completes.
The key is extracted from each element by applying the keyExtractor
Function
. In case several elements map to the same key, the associated value
will be the most recently emitted element.
public final <K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor)
Flux
into a hashed Map
that is
emitted by the resulting Mono
when this sequence completes.
The key is extracted from each element by applying the keyExtractor
Function
, and the value is extracted by the valueExtractor
Function.
In case several elements map to the same key, the associated value will be derived
from the most recently emitted element.
K
- the type of the key extracted from each source elementV
- the type of the value extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
valueExtractor
- a Function
to map elements to a value for the Map
Mono
of a Map
of key-element pairs (only including latest
element's value in case of key conflicts)public final <K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor, Supplier<Map<K,V>> mapSupplier)
Flux
into a user-defined Map
that is
emitted by the resulting Mono
when this sequence completes.
The key is extracted from each element by applying the keyExtractor
Function
, and the value is extracted by the valueExtractor
Function.
In case several elements map to the same key, the associated value will be derived
from the most recently emitted element.
K
- the type of the key extracted from each source elementV
- the type of the value extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
valueExtractor
- a Function
to map elements to a value for the Map
mapSupplier
- a Map
factory called for each Subscriber
Mono
of a Map
of key-value pairs (only including latest
element's value in case of key conflicts)public final <K> Mono<Map<K,Collection<T>>> collectMultimap(Function<? super T,? extends K> keyExtractor)
public final <K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor)
Flux
into a multimap
that is
emitted by the resulting Mono
when this sequence completes.
The key is extracted from each element by applying the keyExtractor
Function
, and every element mapping to the same key is converted by the
valueExtractor
Function to a value stored in the List
associated to
said key.
K
- the type of the key extracted from each source elementV
- the type of the value extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
valueExtractor
- a Function
to map elements to a value for the Map
Mono
of a Map
of key-List(values) pairspublic final <K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor, Supplier<Map<K,Collection<V>>> mapSupplier)
Flux
into a user-defined multimap
that is
emitted by the resulting Mono
when this sequence completes.
The key is extracted from each element by applying the keyExtractor
Function
, and every element mapping to the same key is converted by the
valueExtractor
Function to a value stored in the Collection
associated to
said key.
K
- the type of the key extracted from each source elementV
- the type of the value extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
valueExtractor
- a Function
to map elements to a value for the Map
mapSupplier
- a multimap (Map
of Collection
) factory called
for each Subscriber
Mono
of a Map
of key-Collection(values) pairspublic final Mono<List<T>> collectSortedList(@Nullable Comparator<? super T> comparator)
Flux
until this sequence completes,
and then sort them using a Comparator
into a List
that is emitted
by the resulting Mono
.
comparator
- a Comparator
to sort the items of this sequencesMono
of a sorted List
of all values from this Flux
public final <V> Flux<V> compose(Function<? super Flux<T>,? extends Publisher<V>> transformer)
Flux
in order to generate a target Flux
type.
A transformation will occur for each Subscriber
. For instance:
flux.compose(Mono::from).subscribe()
V
- the item type in the returned Publisher
transformer
- the Function
to lazily map this Flux
into a target Publisher
instance for each new subscriberFlux
transform() for immmediate transformation of {@link Flux}
,
as() for a loose conversion to an arbitrary type
public final <V> Flux<V> concatMap(Function<? super T,? extends Publisher<? extends V>> mapper)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors will immediately short circuit current concat backlog.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of VFlux
public final <V> Flux<V> concatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors will immediately short circuit current concat backlog. The prefetch argument
allows to give an arbitrary prefetch size to the inner Publisher
.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of Vprefetch
- the inner source produced demandFlux
public final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors in the individual publishers will be delayed at the end of the whole concat
sequence (possibly getting combined into a composite
if several sources error.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of VFlux
public final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors in the individual publishers will be delayed at the end of the whole concat
sequence (possibly getting combined into a composite
if several sources error.
The prefetch argument allows to give an arbitrary prefetch size to the inner Publisher
.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of Vprefetch
- the inner source produced demandFlux
public final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, boolean delayUntilEnd, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors in the individual publishers will be delayed after the current concat
backlog if delayUntilEnd is false or after all sources if delayUntilEnd is true.
The prefetch argument allows to give an arbitrary prefetch size to the inner Publisher
.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of VdelayUntilEnd
- delay error until all sources have been consumed instead of
after the current sourceprefetch
- the inner source produced demandFlux
public final <R> Flux<R> concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
Flux
into Iterable
, then flatten the elements from those by
concatenating them into a single Flux
.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N Iterable
Flux
public final <R> Flux<R> concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper, int prefetch)
Flux
into Iterable
, then flatten the emissions from those by
concatenating them into a single Flux
. The prefetch argument allows to give an arbitrary prefetch size to the merged Iterable
.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N Iterable
prefetch
- the maximum in-flight elements from each inner Iterable
sequenceFlux
public final Mono<Long> count()
Flux
.
The count will be emitted when onComplete is observed.
public final Flux<T> defaultIfEmpty(T defaultV)
defaultV
- the alternate value if this sequence is emptyFlux
public final Flux<T> delayElements(Duration delay)
Flux
elements (Subscriber.onNext(T)
signals)
by a given Duration
. Signals are delayed and continue on the
parallel
default Scheduler, but empty sequences or
immediate error signals are not delayed.
delay
- duration by which to delay each Subscriber.onNext(T)
signalFlux
delaySubscription to introduce a delay at the beginning of the sequence only
public final Flux<T> delayElements(Duration delay, Scheduler timer)
Flux
elements (Subscriber.onNext(T)
signals)
by a given Duration
. Signals are delayed and continue on an user-specified
Scheduler
, but empty sequences or immediate error signals are not delayed.
delay
- period to delay each Subscriber.onNext(T)
signaltimer
- a time-capable Scheduler
instance to delay each signal onFlux
public final Flux<T> delaySequence(Duration delay)
Flux
forward in time by a given Duration
.
Unlike with delayElements(Duration)
, elements are shifted forward in time
as they are emitted, always resulting in the delay between two elements being
the same as in the source (only the first element is visibly delayed from the
previous event, that is the subscription).
Signals are delayed and continue on the parallel
Scheduler
, but empty sequences or immediate error signals are not delayed.
With this operator, a source emitting at 10Hz with a delaySequence Duration
of 1s will still emit at 10Hz, with an initial "hiccup" of 1s.
On the other hand, delayElements(Duration)
would end up emitting
at 1Hz.
This is closer to delaySubscription(Duration)
, except the source
is subscribed to immediately.
public final Flux<T> delaySequence(Duration delay, Scheduler timer)
Flux
forward in time by a given Duration
.
Unlike with delayElements(Duration, Scheduler)
, elements are shifted forward in time
as they are emitted, always resulting in the delay between two elements being
the same as in the source (only the first element is visibly delayed from the
previous event, that is the subscription).
Signals are delayed and continue on an user-specified Scheduler
, but empty
sequences or immediate error signals are not delayed.
With this operator, a source emitting at 10Hz with a delaySequence Duration
of 1s will still emit at 10Hz, with an initial "hiccup" of 1s.
On the other hand, delayElements(Duration, Scheduler)
would end up emitting
at 1Hz.
This is closer to delaySubscription(Duration, Scheduler)
, except the source
is subscribed to immediately.
public final Flux<T> delayUntil(Function<? super T,? extends Publisher<?>> triggerProvider)
Flux
and generate a Publisher
from each of this
Flux elements, each acting as a trigger for relaying said element.
That is to say, the resulting Flux
delays each of its emission until the
associated trigger Publisher terminates.
In case of an error either in the source or in a trigger, that error is propagated
immediately downstream.
Note that unlike with the Mono variant
there is
no fusion of subsequent calls.
public final Flux<T> delaySubscription(Duration delay)
subscription
to this Flux
source until the given
period elapses. The delay is introduced through the parallel
default Scheduler.
public final Flux<T> delaySubscription(Duration delay, Scheduler timer)
subscription
to this Flux
source until the given
period elapses, as measured on the user-provided Scheduler
.
public final <U> Flux<T> delaySubscription(Publisher<U> subscriptionDelay)
U
- the other source typesubscriptionDelay
- a companion Publisher
whose onNext/onComplete signal will trigger the subscription
Flux
public final <X> Flux<X> dematerialize()
Flux
emits onNext, onError or onComplete Signal
instances, transforming these materialized
signals into
real signals on the Subscriber
.
The error Signal
will trigger onError and complete Signal
will trigger
onComplete.
X
- the dematerialized typeFlux
materialize()
public final Flux<T> distinct()
Subscriber
, track elements from this Flux
that have been
seen and filter out duplicates.
The values themselves are recorded into a HashSet
for distinct detection.
Use distinct(Object::hashcode)
if you want a more lightweight approach that
doesn't retain all the objects, but is more susceptible to falsely considering two
elements as distinct due to a hashcode collision.
Flux
only emitting distinct valuesdistinct(Function, Supplier, BiPredicate, Consumer)
.public final <V> Flux<T> distinct(Function<? super T,? extends V> keySelector)
Subscriber
, track elements from this Flux
that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function
.
V
- the type of the key extracted from each value in this sequencekeySelector
- function to compute comparison key for each elementFlux
only emitting values with distinct keysdistinct(Function, Supplier, BiPredicate, Consumer)
.public final <V,C extends Collection<? super V>> Flux<T> distinct(Function<? super T,? extends V> keySelector, Supplier<C> distinctCollectionSupplier)
Subscriber
, track elements from this Flux
that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function
and by the add method
of the Collection
supplied (typically a Set
).
V
- the type of the key extracted from each value in this sequenceC
- the type of Collection used for distinct checking of keyskeySelector
- function to compute comparison key for each elementdistinctCollectionSupplier
- supplier of the Collection
used for distinct
check through add
of the key.Flux
only emitting values with distinct keysdistinct(Function, Supplier, BiPredicate, Consumer)
.public final <V,C> Flux<T> distinct(Function<? super T,? extends V> keySelector, Supplier<C> distinctStoreSupplier, BiPredicate<C,V> distinctPredicate, Consumer<C> cleanup)
Subscriber
, track elements from this Flux
that have been
seen and filter out duplicates, as compared by applying a BiPredicate
on
an arbitrary user-supplied <C>
store and a key extracted through the user
provided Function
. The BiPredicate should typically add the key to the
arbitrary store for further comparison. A cleanup callback is also invoked on the
store upon termination of the sequence.
V
- the type of the key extracted from each value in this sequenceC
- the type of store backing the BiPredicate
keySelector
- function to compute comparison key for each elementdistinctStoreSupplier
- supplier of the arbitrary store object used in distinct
checks along the extracted key.distinctPredicate
- the BiPredicate
to apply to the arbitrary store +
extracted key to perform a distinct check. Since nothing is assumed of the store,
this predicate should also add the key to the store as necessary.cleanup
- the cleanup callback to invoke on the store upon termination.Flux
only emitting values with distinct keyscleanup
as well if you need discarding of keys
categorized by the operator as "seen".public final Flux<T> distinctUntilChanged()
The last distinct value seen is retained for further comparison, which is done
on the values themselves using the equals method
.
Use distinctUntilChanged(Object::hashcode)
if you want a more lightweight approach that
doesn't retain all the objects, but is more susceptible to falsely considering two
elements as distinct due to a hashcode collision.
Flux
with only one occurrence in a row of each element
(yet elements can repeat in the overall sequence)public final <V> Flux<T> distinctUntilChanged(Function<? super T,? extends V> keySelector)
Function
using equality.
V
- the type of the key extracted from each value in this sequencekeySelector
- function to compute comparison key for each elementFlux
with only one occurrence in a row of each element of
the same key (yet element keys can repeat in the overall sequence)public final <V> Flux<T> distinctUntilChanged(Function<? super T,? extends V> keySelector, BiPredicate<? super V,? super V> keyComparator)
Function
and then comparing keys with the supplied BiPredicate
.
V
- the type of the key extracted from each value in this sequencekeySelector
- function to compute comparison key for each elementkeyComparator
- predicate used to compare keys.Flux
with only one occurrence in a row of each element
of the same key for which the predicate returns true (yet element keys can repeat
in the overall sequence)keyComparator
returns true). The keys themselves
are not discarded.public final Flux<T> doAfterTerminate(Runnable afterTerminate)
Flux
terminates, either by completing downstream successfully or with an error.
afterTerminate
- the callback to call after Subscriber.onComplete()
or Subscriber.onError(java.lang.Throwable)
Flux
public final Flux<T> doOnCancel(Runnable onCancel)
Flux
is cancelled.
onCancel
- the callback to call on Subscription.cancel()
Flux
public final Flux<T> doOnComplete(Runnable onComplete)
Flux
completes successfully.
onComplete
- the callback to call on Subscriber.onComplete()
Flux
public final <R> Flux<T> doOnDiscard(Class<R> type, Consumer<? super R> discardHook)
The discardHook
must be idempotent and safe to use on any instance of the desired
type.
Calls to this method are additive, and the order of invocation of the discardHook
is the same as the order of declaration (calling .filter(...).doOnDiscard(first).doOnDiscard(second)
will let the filter invoke first
then second
handlers).
Two main categories of discarding operators exist:
onDiscard Support
section.type
- the Class
of elements in the upstream chain of operators that
this cleanup hook should take into account.discardHook
- a Consumer
of elements in the upstream chain of operators
that performs the cleanup.Flux
that cleans up matching elements that get discarded upstream of it.public final Flux<T> doOnEach(Consumer<? super Signal<T>> signalConsumer)
Flux
emits an item, fails with an error
or completes successfully. All these events are represented as a Signal
that is passed to the side-effect callback. Note that this is an advanced operator,
typically used for monitoring of a Flux. These Signal
have a Context
associated to them.signalConsumer
- the mandatory callback to call on
Subscriber.onNext(Object)
, Subscriber.onError(Throwable)
and
Subscriber.onComplete()
Flux
doOnNext(Consumer)
,
doOnError(Consumer)
,
doOnComplete(Runnable)
,
materialize()
,
Signal
public final Flux<T> doOnError(Consumer<? super Throwable> onError)
Flux
completes with an error.
onError
- the callback to call on Subscriber.onError(java.lang.Throwable)
Flux
public final <E extends Throwable> Flux<T> doOnError(Class<E> exceptionType, Consumer<? super E> onError)
Flux
completes with an error matching the given exception type.
E
- type of the error to handleexceptionType
- the type of exceptions to handleonError
- the error handler for each errorFlux
public final Flux<T> doOnError(Predicate<? super Throwable> predicate, Consumer<? super Throwable> onError)
Flux
completes with an error matching the given exception.
predicate
- the matcher for exceptions to handleonError
- the error handler for each errorFlux
public final Flux<T> doOnNext(Consumer<? super T> onNext)
Flux
emits an item.
onNext
- the callback to call on Subscriber.onNext(T)
Flux
resuming on errors
(including when fusion is enabled). Exceptions thrown by the consumer are passed to
the onErrorContinue(BiConsumer)
error consumer (the value consumer
is not invoked, as the source element will be part of the sequence). The onNext
signal is then propagated as normal.public final Flux<T> doOnRequest(LongConsumer consumer)
LongConsumer
when this Flux
receives any request.
Note that non fatal error raised in the callback will not be propagated and
will simply trigger Operators.onOperatorError(Throwable, Context)
.
consumer
- the consumer to invoke on each requestFlux
public final Flux<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe)
Flux
is subscribed.
This method is not intended for capturing the subscription and calling its methods,
but for side effects like monitoring. For instance, the correct way to cancel a subscription is
to call Disposable.dispose()
on the Disposable returned by subscribe()
.
onSubscribe
- the callback to call on Subscriber.onSubscribe(org.reactivestreams.Subscription)
Flux
public final Flux<T> doOnTerminate(Runnable onTerminate)
Flux
terminates, either by
completing successfully or with an error.
onTerminate
- the callback to call on Subscriber.onComplete()
or Subscriber.onError(java.lang.Throwable)
Flux
public final Flux<T> doFinally(Consumer<SignalType> onFinally)
Flux
terminates for any reason,
including cancellation. The terminating event (SignalType.ON_COMPLETE
,
SignalType.ON_ERROR
and SignalType.CANCEL
) is passed to the consumer,
which is executed after the signal has been passed downstream.
Note that the fact that the signal is propagated downstream before the callback is
executed means that several doFinally in a row will be executed in
reverse order. If you want to assert the execution of the callback
please keep in mind that the Flux will complete before it is executed, so its
effect might not be visible immediately after eg. a blockLast()
.
onFinally
- the callback to execute after a terminal signal (complete, error
or cancel)Flux
public final Flux<Tuple2<Long,T>> elapsed()
Flux
into Tuple2<Long, T>
of timemillis and source data. The timemillis corresponds to the elapsed time
between each signal as measured by the parallel
scheduler.
First duration is measured between the subscription and the first element.
Flux
that emits a tuple of time elapsed in milliseconds and matching datapublic final Flux<Tuple2<Long,T>> elapsed(Scheduler scheduler)
Flux
into Tuple2<Long, T>
of timemillis and source data. The timemillis corresponds to the elapsed time
between each signal as measured by the provided Scheduler
.
First duration is measured between the subscription and the first element.
scheduler
- a Scheduler
instance to read time from
Flux
that emits tuples of time elapsed in milliseconds and matching datapublic final Mono<T> elementAt(int index)
IndexOutOfBoundsException
if the sequence is shorter.
index
- zero-based index of the only item to emitMono
of the item at the specified zero-based indexpublic final Mono<T> elementAt(int index, T defaultValue)
index
- zero-based index of the only item to emitdefaultValue
- a default value to emit if the sequence is shorterMono
of the item at the specified zero-based index or a default valuepublic final Flux<T> expandDeep(Function<? super T,? extends Publisher<? extends T>> expander, int capacityHint)
That is: emit one value from this Flux
, expand it and emit the first value
at this first level of recursion, and so on... When no more recursion is possible,
backtrack to the previous level and re-apply the strategy.
For example, given the hierarchical structure
A - AA - aa1 B - BB - bb1Expands
Flux.just(A, B)
into
A AA aa1 B BB bb1
public final Flux<T> expandDeep(Function<? super T,? extends Publisher<? extends T>> expander)
That is: emit one value from this Flux
, expand it and emit the first value
at this first level of recursion, and so on... When no more recursion is possible,
backtrack to the previous level and re-apply the strategy.
For example, given the hierarchical structure
A - AA - aa1 B - BB - bb1Expands
Flux.just(A, B)
into
A AA aa1 B BB bb1
public final Flux<T> expand(Function<? super T,? extends Publisher<? extends T>> expander, int capacityHint)
That is: emit the values from this Flux
first, then expand each at a first level of
recursion and emit all of the resulting values, then expand all of these at a second
level and so on..
For example, given the hierarchical structure
A - AA - aa1 B - BB - bb1Expands
Flux.just(A, B)
into
A B AA BB aa1 bb1
public final Flux<T> expand(Function<? super T,? extends Publisher<? extends T>> expander)
That is: emit the values from this Flux
first, then expand each at a first level of
recursion and emit all of the resulting values, then expand all of these at a second
level and so on..
For example, given the hierarchical structure
A - AA - aa1 B - BB - bb1Expands
Flux.just(A, B)
into
A B AA BB aa1 bb1
public final Flux<T> filter(Predicate<? super T> p)
Predicate
. If the predicate test succeeds, the value is
emitted. If the predicate test fails, the value is ignored and a request of 1 is made upstream.
p
- the Predicate
to test values againstFlux
containing only values that pass the predicate testresuming on errors
(including when fusion is enabled). Exceptions thrown by the predicate are
considered as if the predicate returned false: they cause the source value to be
dropped and a new element (request(1)
) being requested from upstream.public final Flux<T> filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate)
Flux
asynchronously using a generated
Publisher<Boolean>
test. A value is replayed if the first item emitted
by its corresponding test is true. It is dropped if its test is either
empty or its first emitted value is false.
Note that only the first value of the test publisher is considered, and unless it
is a Mono
, test will be cancelled after receiving that first value. Test
publishers are generated and subscribed to in sequence.
asyncPredicate
- the function generating a Publisher
of Boolean
for each value, to filter the Flux withFlux
public final Flux<T> filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate, int bufferSize)
Flux
asynchronously using a generated
Publisher<Boolean>
test. A value is replayed if the first item emitted
by its corresponding test is true. It is dropped if its test is either
empty or its first emitted value is false.
Note that only the first value of the test publisher is considered, and unless it
is a Mono
, test will be cancelled after receiving that first value. Test
publishers are generated and subscribed to in sequence.
asyncPredicate
- the function generating a Publisher
of Boolean
for each value, to filter the Flux withbufferSize
- the maximum expected number of values to hold pending a result of
their respective asynchronous predicates, rounded to the next power of two. This is
capped depending on the size of the heap and the JVM limits, so be careful with
large values (although eg. 65536 should still be fine). Also serves as
the initial request size for the source.Flux
public final <R> Flux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
through merging,
which allow them to interleave.
There are three dimensions to this operator that can be compared with
flatMapSequential
and concatMap
:
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
Flux
resuming on errors
in the mapper Function
. Exceptions thrown by the mapper then behave as if
it had mapped the value to an empty publisher. If the mapper does map to a scalar
publisher (an optimization in which the value can be resolved immediately without
subscribing to the publisher, e.g. a Mono.fromCallable(Callable)
) but said
publisher throws, this can be resumed from in the same manner.public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
through merging,
which allow them to interleave.
There are three dimensions to this operator that can be compared with
flatMapSequential
and concatMap
:
Publisher
can be
subscribed to and merged in parallel. In turn, that argument shows the size of
the first Subscription.request(long)
to the upstream.
V
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
concurrency
- the maximum number of in-flight inner sequencesFlux
resuming on errors
in the mapper Function
. Exceptions thrown by the mapper then behave as if
it had mapped the value to an empty publisher. If the mapper does map to a scalar
publisher (an optimization in which the value can be resolved immediately without
subscribing to the publisher, e.g. a Mono.fromCallable(Callable)
) but said
publisher throws, this can be resumed from in the same manner.public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
through merging,
which allow them to interleave.
There are three dimensions to this operator that can be compared with
flatMapSequential
and concatMap
:
Publisher
can be
subscribed to and merged in parallel. In turn, that argument shows the size of
the first Subscription.request(long)
to the upstream.
The prefetch argument allows to give an arbitrary prefetch size to the merged
Publisher
(in other words prefetch size means the size of the first
Subscription.request(long)
to the merged Publisher
).
V
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
concurrency
- the maximum number of in-flight inner sequencesprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
resuming on errors
in the mapper Function
. Exceptions thrown by the mapper then behave as if
it had mapped the value to an empty publisher. If the mapper does map to a scalar
publisher (an optimization in which the value can be resolved immediately without
subscribing to the publisher, e.g. a Mono.fromCallable(Callable)
) but said
publisher throws, this can be resumed from in the same manner.public final <V> Flux<V> flatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
through merging,
which allow them to interleave.
There are three dimensions to this operator that can be compared with
flatMapSequential
and concatMap
:
Publisher
can be
subscribed to and merged in parallel. The prefetch argument allows to give an
arbitrary prefetch size to the merged Publisher
. This variant will delay
any error until after the rest of the flatMap backlog has been processed.
V
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
concurrency
- the maximum number of in-flight inner sequencesprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
resuming on errors
in the mapper Function
. Exceptions thrown by the mapper then behave as if
it had mapped the value to an empty publisher. If the mapper does map to a scalar
publisher (an optimization in which the value can be resolved immediately without
subscribing to the publisher, e.g. a Mono.fromCallable(Callable)
) but said
publisher throws, this can be resumed from in the same manner.public final <R> Flux<R> flatMap(@Nullable Function<? super T,? extends Publisher<? extends R>> mapperOnNext, @Nullable Function<? super Throwable,? extends Publisher<? extends R>> mapperOnError, @Nullable Supplier<? extends Publisher<? extends R>> mapperOnComplete)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
through merging,
which allow them to interleave. Note that at least one of the signal mappers must
be provided, and all provided mappers must produce a publisher.
There are three dimensions to this operator that can be compared with
flatMapSequential
and concatMap
:
OnError will be transformed into completion signal after its mapping callback has been applied.
R
- the output Publisher
type targetmapperOnNext
- the Function
to call on next data and returning a sequence to merge.
Use null to ignore (provided at least one other mapper is specified).mapperOnError
- the Function
to call on error signal and returning a sequence to merge.
Use null to ignore (provided at least one other mapper is specified).mapperOnComplete
- the Function
to call on complete signal and returning a sequence to merge.
Use null to ignore (provided at least one other mapper is specified).Flux
public final <R> Flux<R> flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
Flux
into Iterable
, then flatten the elements from those by
merging them into a single Flux
.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N Iterable
Flux
public final <R> Flux<R> flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper, int prefetch)
Flux
into Iterable
, then flatten the emissions from those by
merging them into a single Flux
. The prefetch argument allows to give an
arbitrary prefetch size to the merged Iterable
.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N Iterable
prefetch
- the maximum in-flight elements from each inner Iterable
sequenceFlux
public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, but merge them in
the order of their source element.
There are three dimensions to this operator that can be compared with
flatMap
and concatMap
:
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher
. However, if such an early inner takes more time to complete than
subsequent faster inners, the data from these faster inners will be queued until
the earlier inner completes, so as to maintain source ordering.
public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, but merge them in
the order of their source element.
There are three dimensions to this operator that can be compared with
flatMap
and concatMap
:
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher
. However, if such an early inner takes more time to complete than
subsequent faster inners, the data from these faster inners will be queued until
the earlier inner completes, so as to maintain source ordering.
The concurrency argument allows to control how many merged Publisher
can happen in parallel.
public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, but merge them in
the order of their source element.
There are three dimensions to this operator that can be compared with
flatMap
and concatMap
:
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher
. However, if such an early inner takes more time to complete than
subsequent faster inners, the data from these faster inners will be queued until
the earlier inner completes, so as to maintain source ordering.
The concurrency argument allows to control how many merged Publisher
can happen in parallel. The prefetch argument allows to give an arbitrary prefetch
size to the merged Publisher
.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
maxConcurrency
- the maximum number of in-flight inner sequencesprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
, subscribing early but keeping the original orderingpublic final <R> Flux<R> flatMapSequentialDelayError(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, but merge them in
the order of their source element.
There are three dimensions to this operator that can be compared with
flatMap
and concatMap
:
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher
. However, if such an early inner takes more time to complete than
subsequent faster inners, the data from these faster inners will be queued until
the earlier inner completes, so as to maintain source ordering.
The concurrency argument allows to control how many merged Publisher
can happen in parallel. The prefetch argument allows to give an arbitrary prefetch
size to the merged Publisher
. This variant will delay any error until after the
rest of the flatMap backlog has been processed.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
maxConcurrency
- the maximum number of in-flight inner sequencesprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
, subscribing early but keeping the original orderingpublic int getPrefetch()
Flux
Flux
, -1 if unspecifiedpublic final <K> Flux<GroupedFlux<K,T>> groupBy(Function<? super T,? extends K> keyMapper)
Flux
(or groups) for each
unique key, as produced by the provided keyMapper Function
. Note that
groupBy works best with a low cardinality of groups, so chose your keyMapper
function accordingly.
The groups need to be drained and consumed downstream for groupBy to work correctly.
Notably when the criteria produces a large amount of groups, it can lead to hanging
if the groups are not suitably consumed downstream (eg. due to a flatMap
with a maxConcurrency
parameter that is set too low).
K
- the key type extracted from each value of this sequencekeyMapper
- the key mapping Function
that evaluates an incoming data and returns a key.Flux
of GroupedFlux
grouped sequencespublic final <K> Flux<GroupedFlux<K,T>> groupBy(Function<? super T,? extends K> keyMapper, int prefetch)
Flux
(or groups) for each
unique key, as produced by the provided keyMapper Function
. Note that
groupBy works best with a low cardinality of groups, so chose your keyMapper
function accordingly.
The groups need to be drained and consumed downstream for groupBy to work correctly.
Notably when the criteria produces a large amount of groups, it can lead to hanging
if the groups are not suitably consumed downstream (eg. due to a flatMap
with a maxConcurrency
parameter that is set too low).
K
- the key type extracted from each value of this sequencekeyMapper
- the key mapping Function
that evaluates an incoming data and returns a key.prefetch
- the number of values to prefetch from the sourceFlux
of GroupedFlux
grouped sequencespublic final <K,V> Flux<GroupedFlux<K,V>> groupBy(Function<? super T,? extends K> keyMapper, Function<? super T,? extends V> valueMapper)
Flux
(or groups) for each
unique key, as produced by the provided keyMapper Function
. Source elements
are also mapped to a different value using the valueMapper
. Note that
groupBy works best with a low cardinality of groups, so chose your keyMapper
function accordingly.
The groups need to be drained and consumed downstream for groupBy to work correctly.
Notably when the criteria produces a large amount of groups, it can lead to hanging
if the groups are not suitably consumed downstream (eg. due to a flatMap
with a maxConcurrency
parameter that is set too low).
K
- the key type extracted from each value of this sequenceV
- the value type extracted from each value of this sequencekeyMapper
- the key mapping function that evaluates an incoming data and returns a key.valueMapper
- the value mapping function that evaluates which data to extract for re-routing.Flux
of GroupedFlux
grouped sequencespublic final <K,V> Flux<GroupedFlux<K,V>> groupBy(Function<? super T,? extends K> keyMapper, Function<? super T,? extends V> valueMapper, int prefetch)
Flux
(or groups) for each
unique key, as produced by the provided keyMapper Function
. Source elements
are also mapped to a different value using the valueMapper
. Note that
groupBy works best with a low cardinality of groups, so chose your keyMapper
function accordingly.
The groups need to be drained and consumed downstream for groupBy to work correctly.
Notably when the criteria produces a large amount of groups, it can lead to hanging
if the groups are not suitably consumed downstream (eg. due to a flatMap
with a maxConcurrency
parameter that is set too low).
K
- the key type extracted from each value of this sequenceV
- the value type extracted from each value of this sequencekeyMapper
- the key mapping function that evaluates an incoming data and returns a key.valueMapper
- the value mapping function that evaluates which data to extract for re-routing.prefetch
- the number of values to prefetch from the sourceFlux
of GroupedFlux
grouped sequencespublic final <TRight,TLeftEnd,TRightEnd,R> Flux<R> groupJoin(Publisher<? extends TRight> other, Function<? super T,? extends Publisher<TLeftEnd>> leftEnd, Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd, BiFunction<? super T,? super Flux<TRight>,? extends R> resultSelector)
Flux
and a Flux
emitting the value from the other
Publisher
to a BiFunction
.
There are no guarantees in what order the items get combined when multiple items from one or both source Publishers overlap.
Unlike join(org.reactivestreams.Publisher<? extends TRight>, java.util.function.Function<? super T, ? extends org.reactivestreams.Publisher<TLeftEnd>>, java.util.function.Function<? super TRight, ? extends org.reactivestreams.Publisher<TRightEnd>>, java.util.function.BiFunction<? super T, ? super TRight, ? extends R>)
, items from the second Publisher
will be provided
as a Flux
to the resultSelector
.
TRight
- the type of the elements from the right Publisher
TLeftEnd
- the type for this Flux
window signalsTRightEnd
- the type for the right Publisher
window signalsR
- the combined result typeother
- the other Publisher
to correlate items withleftEnd
- a function that returns a Publisher whose emissions indicate the
time window for the source value to be consideredrightEnd
- a function that returns a Publisher whose emissions indicate the
time window for the right
Publisher value to be consideredresultSelector
- a function that takes an item emitted by this Flux
and
a Flux
representation of the overlapping item from the other Publisher
and returns the value to be emitted by the resulting Flux
Flux
join(Publisher, Function, Function, BiFunction)
public final <R> Flux<R> handle(BiConsumer<? super T,SynchronousSink<R>> handler)
Flux
by calling a biconsumer with the
output sink for each onNext. At most one SynchronousSink.next(Object)
call must be performed and/or 0 or 1 SynchronousSink.error(Throwable)
or
SynchronousSink.complete()
.R
- the transformed typehandler
- the handling BiConsumer
Flux
resuming on errors
(including when
fusion is enabled) when the BiConsumer
throws an exception or if an error is signaled explicitly via
SynchronousSink.error(Throwable)
.public final Mono<Boolean> hasElement(T value)
Flux
sequence is
equal to the provided value.
The implementation uses short-circuit logic and completes with true if an element matches the value.
value
- constant compared to incoming signalsFlux
with true
if any element is equal to a given value and false
otherwisepublic final Mono<Boolean> hasElements()
Flux
sequence has at least one element.
The implementation uses short-circuit logic and completes with true on onNext.
Mono
with true
if any value is emitted and false
otherwisepublic final Flux<T> hide()
Flux
instance.
The main purpose of this operator is to prevent certain identity-based optimizations from happening, mostly for diagnostic purposes.
Flux
preventing Publisher
/ Subscription
based Reactor optimizations