T
- the element type of this Reactive Streams Publisher
public abstract class Flux<T> extends Object implements CorePublisher<T>
Publisher
with rx operators that emits 0 to N elements, and then completes
(successfully or with an error).
The recommended way to learn about the Flux
API and discover new operators is
through the reference documentation, rather than through this javadoc (as opposed to
learning more about individual operators). See the
"which operator do I need?" appendix.
It is intended to be used in implementations and return types. Input parameters should keep using raw
Publisher
as much as possible.
If it is known that the underlying Publisher
will emit 0 or 1 element, Mono
should be used
instead.
Note that using state in the java.util.function
/ lambdas used within Flux operators
should be avoided, as these may be shared between several Subscribers
.
subscribe(CoreSubscriber)
is an internal extension to
subscribe(Subscriber)
used internally for Context
passing. User
provided Subscriber
may
be passed to this "subscribe" extension but will loose the available
per-subscribe Hooks.onLastOperator(java.util.function.Function<? super org.reactivestreams.Publisher<java.lang.Object>, ? extends org.reactivestreams.Publisher<java.lang.Object>>)
.
Mono
Constructor and Description |
---|
Flux() |
Modifier and Type | Method and Description |
---|---|
Mono<Boolean> |
all(Predicate<? super T> predicate)
Emit a single boolean true if all values of this sequence match
the
Predicate . |
Mono<Boolean> |
any(Predicate<? super T> predicate)
Emit a single boolean true if any of the values of this
Flux sequence match
the predicate. |
<P> P |
as(Function<? super Flux<T>,P> transformer)
Transform this
Flux into a target type. |
T |
blockFirst()
Subscribe to this
Flux and block indefinitely
until the upstream signals its first value or completes. |
T |
blockFirst(Duration timeout)
Subscribe to this
Flux and block until the upstream
signals its first value, completes or a timeout expires. |
T |
blockLast()
Subscribe to this
Flux and block indefinitely
until the upstream signals its last value or completes. |
T |
blockLast(Duration timeout)
Subscribe to this
Flux and block until the upstream
signals its last value, completes or a timeout expires. |
Flux<List<T>> |
buffer()
|
Flux<List<T>> |
buffer(Duration bufferingTimespan)
|
Flux<List<T>> |
buffer(Duration bufferingTimespan,
Duration openBufferEvery)
Collect incoming values into multiple
List buffers created at a given
openBufferEvery period. |
Flux<List<T>> |
buffer(Duration bufferingTimespan,
Duration openBufferEvery,
Scheduler timer)
|
Flux<List<T>> |
buffer(Duration bufferingTimespan,
Scheduler timer)
|
Flux<List<T>> |
buffer(int maxSize)
|
Flux<List<T>> |
buffer(int maxSize,
int skip)
|
<C extends Collection<? super T>> |
buffer(int maxSize,
int skip,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the given max size is reached
or once this Flux completes. |
<C extends Collection<? super T>> |
buffer(int maxSize,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the given max size is reached
or once this Flux completes. |
Flux<List<T>> |
buffer(Publisher<?> other)
|
<C extends Collection<? super T>> |
buffer(Publisher<?> other,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers, as
delimited by the signals of a companion Publisher this operator will
subscribe to. |
Flux<List<T>> |
bufferTimeout(int maxSize,
Duration maxTime)
|
Flux<List<T>> |
bufferTimeout(int maxSize,
Duration maxTime,
boolean fairBackpressure)
|
Flux<List<T>> |
bufferTimeout(int maxSize,
Duration maxTime,
Scheduler timer)
|
Flux<List<T>> |
bufferTimeout(int maxSize,
Duration maxTime,
Scheduler timer,
boolean fairBackpressure)
|
<C extends Collection<? super T>> |
bufferTimeout(int maxSize,
Duration maxTime,
Scheduler timer,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the buffer reaches a maximum
size OR the maxTime Duration elapses, as measured on the provided Scheduler . |
<C extends Collection<? super T>> |
bufferTimeout(int maxSize,
Duration maxTime,
Scheduler timer,
Supplier<C> bufferSupplier,
boolean fairBackpressure)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the buffer reaches a maximum
size OR the maxTime Duration elapses, as measured on the provided Scheduler . |
<C extends Collection<? super T>> |
bufferTimeout(int maxSize,
Duration maxTime,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the buffer reaches a maximum
size OR the maxTime Duration elapses. |
<C extends Collection<? super T>> |
bufferTimeout(int maxSize,
Duration maxTime,
Supplier<C> bufferSupplier,
boolean fairBackpressure)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the buffer reaches a maximum
size OR the maxTime Duration elapses. |
Flux<List<T>> |
bufferUntil(Predicate<? super T> predicate)
|
Flux<List<T>> |
bufferUntil(Predicate<? super T> predicate,
boolean cutBefore)
|
Flux<List<T>> |
bufferUntilChanged()
|
<V> Flux<List<T>> |
bufferUntilChanged(Function<? super T,? extends V> keySelector)
|
<V> Flux<List<T>> |
bufferUntilChanged(Function<? super T,? extends V> keySelector,
BiPredicate<? super V,? super V> keyComparator)
Collect subsequent repetitions of an element (that is, if they arrive right after
one another), as compared by a key extracted through the user provided
Function and compared using a supplied BiPredicate , into multiple
List buffers that will be emitted by the resulting Flux . |
<U,V> Flux<List<T>> |
bufferWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector)
|
<U,V,C extends Collection<? super T>> |
bufferWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers started each time an opening
companion Publisher emits. |
Flux<List<T>> |
bufferWhile(Predicate<? super T> predicate)
|
Flux<T> |
cache()
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber . |
Flux<T> |
cache(Duration ttl)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cache(Duration ttl,
Scheduler timer)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cache(int history)
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber . |
Flux<T> |
cache(int history,
Duration ttl)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cache(int history,
Duration ttl,
Scheduler timer)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cancelOn(Scheduler scheduler)
|
<E> Flux<E> |
cast(Class<E> clazz)
Cast the current
Flux produced type into a target produced type. |
Flux<T> |
checkpoint()
Activate traceback (full assembly tracing) for this particular
Flux , in case of an error
upstream of the checkpoint. |
Flux<T> |
checkpoint(String description)
Activate traceback (assembly marker) for this particular
Flux by giving it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. |
Flux<T> |
checkpoint(String description,
boolean forceStackTrace)
Activate traceback (full assembly tracing or the lighter assembly marking depending on the
forceStackTrace option). |
<R,A> Mono<R> |
collect(Collector<? super T,A,? extends R> collector)
|
<E> Mono<E> |
collect(Supplier<E> containerSupplier,
BiConsumer<E,? super T> collector)
Collect all elements emitted by this
Flux into a user-defined container,
by applying a collector BiConsumer taking the container and each element. |
Mono<List<T>> |
collectList()
|
<K> Mono<Map<K,T>> |
collectMap(Function<? super T,? extends K> keyExtractor)
|
<K,V> Mono<Map<K,V>> |
collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor)
|
<K,V> Mono<Map<K,V>> |
collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,V>> mapSupplier)
|
<K> Mono<Map<K,Collection<T>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor)
|
<K,V> Mono<Map<K,Collection<V>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor)
|
<K,V> Mono<Map<K,Collection<V>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,Collection<V>>> mapSupplier)
|
Mono<List<T>> |
collectSortedList()
|
Mono<List<T>> |
collectSortedList(Comparator<? super T> comparator)
Collect all elements emitted by this
Flux until this sequence completes,
and then sort them using a Comparator into a List that is emitted
by the resulting Mono . |
static <T,V> Flux<V> |
combineLatest(Function<Object[],V> combinator,
int prefetch,
Publisher<? extends T>... sources)
|
static <T,V> Flux<V> |
combineLatest(Function<Object[],V> combinator,
Publisher<? extends T>... sources)
|
static <T,V> Flux<V> |
combineLatest(Iterable<? extends Publisher<? extends T>> sources,
Function<Object[],V> combinator)
|
static <T,V> Flux<V> |
combineLatest(Iterable<? extends Publisher<? extends T>> sources,
int prefetch,
Function<Object[],V> combinator)
|
static <T1,T2,V> Flux<V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
BiFunction<? super T1,? super T2,? extends V> combinator)
|
static <T1,T2,T3,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Function<Object[],V> combinator)
|
static <T1,T2,T3,T4,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Function<Object[],V> combinator)
|
static <T1,T2,T3,T4,T5,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Function<Object[],V> combinator)
|
static <T1,T2,T3,T4,T5,T6,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6,
Function<Object[],V> combinator)
|
static <T> Flux<T> |
concat(Iterable<? extends Publisher<? extends T>> sources)
Concatenate all sources provided in an
Iterable , forwarding elements
emitted by the sources downstream. |
static <T> Flux<T> |
concat(Publisher<? extends Publisher<? extends T>> sources)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concat(Publisher<? extends Publisher<? extends T>> sources,
int prefetch)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concat(Publisher<? extends T>... sources)
Concatenate all sources provided as a vararg, forwarding elements emitted by the
sources downstream.
|
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources,
boolean delayUntilEnd,
int prefetch)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources,
int prefetch)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concatDelayError(Publisher<? extends T>... sources)
Concatenate all sources provided as a vararg, forwarding elements emitted by the
sources downstream.
|
<V> Flux<V> |
concatMap(Function<? super T,? extends Publisher<? extends V>> mapper)
|
<V> Flux<V> |
concatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int prefetch)
|
<V> Flux<V> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper)
|
<V> Flux<V> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
boolean delayUntilEnd,
int prefetch)
|
<V> Flux<V> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
int prefetch)
|
<R> Flux<R> |
concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
|
<R> Flux<R> |
concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper,
int prefetch)
|
Flux<T> |
concatWith(Publisher<? extends T> other)
|
Flux<T> |
concatWithValues(T... values)
Concatenates the values to the end of the
Flux |
Flux<T> |
contextCapture()
If context-propagation library
is on the classpath, this is a convenience shortcut to capture thread local values during the
subscription phase and put them in the
Context that is visible upstream of this operator. |
Flux<T> |
contextWrite(ContextView contextToAppend)
Enrich the
Context visible from downstream for the benefit of upstream
operators, by making all values from the provided ContextView visible on top
of pairs from downstream. |
Flux<T> |
contextWrite(Function<Context,Context> contextModifier)
|
Mono<Long> |
count()
Counts the number of values in this
Flux . |
static <T> Flux<T> |
create(Consumer<? super FluxSink<T>> emitter)
|
static <T> Flux<T> |
create(Consumer<? super FluxSink<T>> emitter,
FluxSink.OverflowStrategy backpressure)
|
Flux<T> |
defaultIfEmpty(T defaultV)
Provide a default unique value if this sequence is completed without any data
|
static <T> Flux<T> |
defer(Supplier<? extends Publisher<T>> supplier)
Lazily supply a
Publisher every time a Subscription is made on the
resulting Flux , so the actual source instantiation is deferred until each
subscribe and the Supplier can create a subscriber-specific instance. |
static <T> Flux<T> |
deferContextual(Function<ContextView,? extends Publisher<T>> contextualPublisherFactory)
Lazily supply a
Publisher every time a Subscription is made on the
resulting Flux , so the actual source instantiation is deferred until each
subscribe and the Function can create a subscriber-specific instance. |
Flux<T> |
delayElements(Duration delay)
|
Flux<T> |
delayElements(Duration delay,
Scheduler timer)
|
Flux<T> |
delaySequence(Duration delay)
|
Flux<T> |
delaySequence(Duration delay,
Scheduler timer)
|
Flux<T> |
delaySubscription(Duration delay)
Delay the
subscription to this Flux source until the given
period elapses. |
Flux<T> |
delaySubscription(Duration delay,
Scheduler timer)
Delay the
subscription to this Flux source until the given
period elapses, as measured on the user-provided Scheduler . |
<U> Flux<T> |
delaySubscription(Publisher<U> subscriptionDelay)
|
Flux<T> |
delayUntil(Function<? super T,? extends Publisher<?>> triggerProvider)
|
<X> Flux<X> |
dematerialize()
An operator working only if this
Flux emits onNext, onError or onComplete Signal
instances, transforming these materialized signals into
real signals on the Subscriber . |
Flux<T> |
distinct()
For each
Subscriber , track elements from this Flux that have been
seen and filter out duplicates. |
<V> Flux<T> |
distinct(Function<? super T,? extends V> keySelector)
For each
Subscriber , track elements from this Flux that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function . |
<V,C extends Collection<? super V>> |
distinct(Function<? super T,? extends V> keySelector,
Supplier<C> distinctCollectionSupplier)
For each
Subscriber , track elements from this Flux that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function and by the add method
of the Collection supplied (typically a Set ). |
<V,C> Flux<T> |
distinct(Function<? super T,? extends V> keySelector,
Supplier<C> distinctStoreSupplier,
BiPredicate<C,V> distinctPredicate,
Consumer<C> cleanup)
For each
Subscriber , track elements from this Flux that have been
seen and filter out duplicates, as compared by applying a BiPredicate on
an arbitrary user-supplied <C> store and a key extracted through the user
provided Function . |
Flux<T> |
distinctUntilChanged()
Filter out subsequent repetitions of an element (that is, if they arrive right after
one another).
|
<V> Flux<T> |
distinctUntilChanged(Function<? super T,? extends V> keySelector)
Filter out subsequent repetitions of an element (that is, if they arrive right after
one another), as compared by a key extracted through the user provided
Function
using equality. |
<V> Flux<T> |
distinctUntilChanged(Function<? super T,? extends V> keySelector,
BiPredicate<? super V,? super V> keyComparator)
Filter out subsequent repetitions of an element (that is, if they arrive right
after one another), as compared by a key extracted through the user provided
Function and then comparing keys with the supplied BiPredicate . |
Flux<T> |
doAfterTerminate(Runnable afterTerminate)
Add behavior (side-effect) triggered after the
Flux terminates, either by completing downstream successfully or with an error. |
Flux<T> |
doFinally(Consumer<SignalType> onFinally)
Add behavior (side-effect) triggered after the
Flux terminates for any reason,
including cancellation. |
Flux<T> |
doFirst(Runnable onFirst)
Add behavior (side-effect) triggered before the
Flux is
subscribed to, which should be the first event after assembly time. |
Flux<T> |
doOnCancel(Runnable onCancel)
Add behavior (side-effect) triggered when the
Flux is cancelled. |
Flux<T> |
doOnComplete(Runnable onComplete)
Add behavior (side-effect) triggered when the
Flux completes successfully. |
<R> Flux<T> |
doOnDiscard(Class<R> type,
Consumer<? super R> discardHook)
Potentially modify the behavior of the whole chain of operators upstream of this one to
conditionally clean up elements that get discarded by these operators.
|
Flux<T> |
doOnEach(Consumer<? super Signal<T>> signalConsumer)
Add behavior (side-effects) triggered when the
Flux emits an item, fails with an error
or completes successfully. |
<E extends Throwable> |
doOnError(Class<E> exceptionType,
Consumer<? super E> onError)
Add behavior (side-effect) triggered when the
Flux completes with an error matching the given exception type. |
Flux<T> |
doOnError(Consumer<? super Throwable> onError)
Add behavior (side-effect) triggered when the
Flux completes with an error. |
Flux<T> |
doOnError(Predicate<? super Throwable> predicate,
Consumer<? super Throwable> onError)
Add behavior (side-effect) triggered when the
Flux completes with an error matching the given exception. |
Flux<T> |
doOnNext(Consumer<? super T> onNext)
Add behavior (side-effect) triggered when the
Flux emits an item. |
Flux<T> |
doOnRequest(LongConsumer consumer)
Add behavior (side-effect) triggering a
LongConsumer when this Flux
receives any request. |
Flux<T> |
doOnSubscribe(Consumer<? super Subscription> onSubscribe)
Add behavior (side-effect) triggered when the
Flux is being subscribed,
that is to say when a Subscription has been produced by the Publisher
and is being passed to the Subscriber.onSubscribe(Subscription) . |
Flux<T> |
doOnTerminate(Runnable onTerminate)
Add behavior (side-effect) triggered when the
Flux terminates, either by
completing successfully or failing with an error. |
Flux<Tuple2<Long,T>> |
elapsed()
Map this
Flux into Tuple2<Long, T>
of timemillis and source data. |
Flux<Tuple2<Long,T>> |
elapsed(Scheduler scheduler)
Map this
Flux into Tuple2<Long, T>
of timemillis and source data. |
Mono<T> |
elementAt(int index)
Emit only the element at the given index position or
IndexOutOfBoundsException
if the sequence is shorter. |
Mono<T> |
elementAt(int index,
T defaultValue)
Emit only the element at the given index position or fall back to a
default value if the sequence is shorter.
|
static <T> Flux<T> |
empty()
Create a
Flux that completes without emitting any item. |
static <T> Flux<T> |
error(Supplier<? extends Throwable> errorSupplier)
Create a
Flux that terminates with an error immediately after being
subscribed to. |
static <T> Flux<T> |
error(Throwable error)
Create a
Flux that terminates with the specified error immediately after
being subscribed to. |
static <O> Flux<O> |
error(Throwable throwable,
boolean whenRequested)
Create a
Flux that terminates with the specified error, either immediately
after being subscribed to or after being first requested. |
Flux<T> |
expand(Function<? super T,? extends Publisher<? extends T>> expander)
Recursively expand elements into a graph and emit all the resulting element using
a breadth-first traversal strategy.
|
Flux<T> |
expand(Function<? super T,? extends Publisher<? extends T>> expander,
int capacityHint)
Recursively expand elements into a graph and emit all the resulting element using
a breadth-first traversal strategy.
|
Flux<T> |
expandDeep(Function<? super T,? extends Publisher<? extends T>> expander)
Recursively expand elements into a graph and emit all the resulting element,
in a depth-first traversal order.
|
Flux<T> |
expandDeep(Function<? super T,? extends Publisher<? extends T>> expander,
int capacityHint)
Recursively expand elements into a graph and emit all the resulting element,
in a depth-first traversal order.
|
Flux<T> |
filter(Predicate<? super T> p)
Evaluate each source value against the given
Predicate . |
Flux<T> |
filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate)
Test each value emitted by this
Flux asynchronously using a generated
Publisher<Boolean> test. |
Flux<T> |
filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate,
int bufferSize)
Test each value emitted by this
Flux asynchronously using a generated
Publisher<Boolean> test. |
static <I> Flux<I> |
first(Iterable<? extends Publisher<? extends I>> sources)
Deprecated.
use
firstWithSignal(Iterable) . To be removed in reactor 3.5. |
static <I> Flux<I> |
first(Publisher<? extends I>... sources)
Deprecated.
use
firstWithSignal(Publisher[]) . To be removed in reactor 3.5. |
static <I> Flux<I> |
firstWithSignal(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
firstWithSignal(Publisher<? extends I>... sources)
|
static <I> Flux<I> |
firstWithValue(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
firstWithValue(Publisher<? extends I> first,
Publisher<? extends I>... others)
|
<R> Flux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
|
<R> Flux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapperOnNext,
Function<? super Throwable,? extends Publisher<? extends R>> mapperOnError,
Supplier<? extends Publisher<? extends R>> mapperOnComplete)
|
<V> Flux<V> |
flatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency)
|
<V> Flux<V> |
flatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency,
int prefetch)
|
<V> Flux<V> |
flatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency,
int prefetch)
|
<R> Flux<R> |
flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
|
<R> Flux<R> |
flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper,
int prefetch)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency,
int prefetch)
|
<R> Flux<R> |
flatMapSequentialDelayError(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency,
int prefetch)
|
static <T> Flux<T> |
from(Publisher<? extends T> source)
|
static <T> Flux<T> |
fromArray(T[] array)
Create a
Flux that emits the items contained in the provided array. |
static <T> Flux<T> |
fromIterable(Iterable<? extends T> it)
|
static <T> Flux<T> |
fromStream(Stream<? extends T> s)
|
static <T> Flux<T> |
fromStream(Supplier<Stream<? extends T>> streamSupplier)
|
static <T,S> Flux<T> |
generate(Callable<S> stateSupplier,
BiFunction<S,SynchronousSink<T>,S> generator)
Programmatically create a
Flux by generating signals one-by-one via a
consumer callback and some state. |
static <T,S> Flux<T> |
generate(Callable<S> stateSupplier,
BiFunction<S,SynchronousSink<T>,S> generator,
Consumer<? super S> stateConsumer)
Programmatically create a
Flux by generating signals one-by-one via a
consumer callback and some state, with a final cleanup callback. |
static <T> Flux<T> |
generate(Consumer<SynchronousSink<T>> generator)
Programmatically create a
Flux by generating signals one-by-one via a
consumer callback. |
int |
getPrefetch()
The prefetch configuration of the
Flux |
<K> Flux<GroupedFlux<K,T>> |
groupBy(Function<? super T,? extends K> keyMapper)
|
<K,V> Flux<GroupedFlux<K,V>> |
groupBy(Function<? super T,? extends K> keyMapper,
Function<? super T,? extends V> valueMapper)
|
<K,V> Flux<GroupedFlux<K,V>> |
groupBy(Function<? super T,? extends K> keyMapper,
Function<? super T,? extends V> valueMapper,
int prefetch)
|
<K> Flux<GroupedFlux<K,T>> |
groupBy(Function<? super T,? extends K> keyMapper,
int prefetch)
|
<TRight,TLeftEnd,TRightEnd,R> |
groupJoin(Publisher<? extends TRight> other,
Function<? super T,? extends Publisher<TLeftEnd>> leftEnd,
Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd,
BiFunction<? super T,? super Flux<TRight>,? extends R> resultSelector)
Map values from two Publishers into time windows and emit combination of values
in case their windows overlap.
|
<R> Flux<R> |
handle(BiConsumer<? super T,SynchronousSink<R>> handler)
Handle the items emitted by this
Flux by calling a biconsumer with the
output sink for each onNext. |
Mono<Boolean> |
hasElement(T value)
Emit a single boolean true if any of the elements of this
Flux sequence is
equal to the provided value. |
Mono<Boolean> |
hasElements()
Emit a single boolean true if this
Flux sequence has at least one element. |
Flux<T> |
hide()
Hides the identities of this
Flux instance. |
Mono<T> |
ignoreElements()
Ignores onNext signals (dropping them) and only propagate termination events.
|
Flux<Tuple2<Long,T>> |
index()
Keep information about the order in which source values were received by
indexing them with a 0-based incrementing long, returning a
Flux
of Tuple2<(index, value)> . |
<I> Flux<I> |
index(BiFunction<? super Long,? super T,? extends I> indexMapper)
Keep information about the order in which source values were received by
indexing them internally with a 0-based incrementing long then combining this
information with the source value into a
I using the provided BiFunction ,
returning a Flux<I> . |
static Flux<Long> |
interval(Duration period)
Create a
Flux that emits long values starting with 0 and incrementing at
specified time intervals on the global timer. |
static Flux<Long> |
interval(Duration delay,
Duration period)
Create a
Flux that emits long values starting with 0 and incrementing at
specified time intervals, after an initial delay, on the global timer. |
static Flux<Long> |
interval(Duration delay,
Duration period,
Scheduler timer)
|
static Flux<Long> |
interval(Duration period,
Scheduler timer)
|
<TRight,TLeftEnd,TRightEnd,R> |
join(Publisher<? extends TRight> other,
Function<? super T,? extends Publisher<TLeftEnd>> leftEnd,
Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd,
BiFunction<? super T,? super TRight,? extends R> resultSelector)
Combine values from two Publishers in case their windows overlap.
|
static <T> Flux<T> |
just(T... data)
Create a
Flux that emits the provided elements and then completes. |
static <T> Flux<T> |
just(T data)
Create a new
Flux that will only emit a single element then onComplete. |
Mono<T> |
last()
Emit the last element observed before complete signal as a
Mono , or emit
NoSuchElementException error if the source was empty. |
Mono<T> |
last(T defaultValue)
Emit the last element observed before complete signal as a
Mono , or emit
the defaultValue if the source was empty. |
Flux<T> |
limitRate(int prefetchRate)
Ensure that backpressure signals from downstream subscribers are split into batches
capped at the provided
prefetchRate when propagated upstream, effectively
rate limiting the upstream Publisher . |
Flux<T> |
limitRate(int highTide,
int lowTide)
Ensure that backpressure signals from downstream subscribers are split into batches
capped at the provided
highTide first, then replenishing at the provided
lowTide , effectively rate limiting the upstream Publisher . |
Flux<T> |
limitRequest(long n)
Deprecated.
replace with
take(n, true) in 3.4.x, then take(long) in 3.5.0.
To be removed in 3.6.0 at the earliest. See https://github.com/reactor/reactor-core/issues/2339 |
Flux<T> |
log()
Observe all Reactive Streams signals and trace them using
Logger support. |
Flux<T> |
log(Logger logger)
Observe Reactive Streams signals matching the passed filter
options and
trace them using a specific user-provided Logger , at Level.INFO level. |
Flux<T> |
log(Logger logger,
Level level,
boolean showOperatorLine,
SignalType... options)
|
Flux<T> |
log(String category)
Observe all Reactive Streams signals and trace them using
Logger support. |
Flux<T> |
log(String category,
Level level,
boolean showOperatorLine,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and
trace them using Logger support. |
Flux<T> |
log(String category,
Level level,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and
trace them using Logger support. |
<V> Flux<V> |
map(Function<? super T,? extends V> mapper)
Transform the items emitted by this
Flux by applying a synchronous function
to each item. |
<V> Flux<V> |
mapNotNull(Function<? super T,? extends V> mapper)
Transform the items emitted by this
Flux by applying a synchronous function
to each item, which may produce null values. |
Flux<Signal<T>> |
materialize()
Transform incoming onNext, onError and onComplete signals into
Signal instances,
materializing these signals. |
static <I> Flux<I> |
merge(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. |
static <I> Flux<I> |
merge(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
merge(Publisher<? extends I>... sources)
Merge data from
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. |
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source)
|
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source,
int concurrency)
|
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source,
int concurrency,
int prefetch)
|
static <T> Flux<T> |
mergeComparing(Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator ). |
static <T> Flux<T> |
mergeComparing(int prefetch,
Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator ). |
static <I extends Comparable<? super I>> |
mergeComparing(Publisher<? extends I>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by their natural order). |
static <T> Flux<T> |
mergeComparingDelayError(int prefetch,
Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator ). |
Flux<T> |
mergeComparingWith(Publisher<? extends T> other,
Comparator<? super T> otherComparator)
Merge data from this
Flux and a Publisher into a reordered merge
sequence, by picking the smallest value from each sequence as defined by a provided
Comparator . |
static <I> Flux<I> |
mergeDelayError(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. |
static <T> Flux<T> |
mergeOrdered(Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Deprecated.
Use
mergeComparingDelayError(int, Comparator, Publisher[]) instead
(as mergeComparing(Publisher[]) don't have this operator's delayError behavior).
To be removed in 3.6.0 at the earliest. |
static <T> Flux<T> |
mergeOrdered(int prefetch,
Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Deprecated.
Use
mergeComparingDelayError(int, Comparator, Publisher[]) instead
(as mergeComparing(Publisher[]) don't have this operator's delayError behavior).
To be removed in 3.6.0 at the earliest. |
static <I extends Comparable<? super I>> |
mergeOrdered(Publisher<? extends I>... sources)
Deprecated.
Use
mergeComparingDelayError(int, Comparator, Publisher[]) instead
(as mergeComparing(Publisher[]) don't have this operator's delayError behavior).
To be removed in 3.6.0 at the earliest. |
Flux<T> |
mergeOrderedWith(Publisher<? extends T> other,
Comparator<? super T> otherComparator)
Deprecated.
Use
mergeComparingWith(Publisher, Comparator) instead
(with the caveat that it defaults to NOT delaying errors, unlike this operator).
To be removed in 3.6.0 at the earliest. |
static <T> Flux<T> |
mergePriority(Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator ) as they arrive. |
static <T> Flux<T> |
mergePriority(int prefetch,
Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator ) as they arrive. |
static <I extends Comparable<? super I>> |
mergePriority(Publisher<? extends I>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by their natural order) as they arrive. |
static <T> Flux<T> |
mergePriorityDelayError(int prefetch,
Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator ) as they arrive. |
static <I> Flux<I> |
mergeSequential(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences provided in an array/vararg
into an ordered merged sequence. |
static <I> Flux<I> |
mergeSequential(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
mergeSequential(Iterable<? extends Publisher<? extends I>> sources,
int maxConcurrency,
int prefetch)
|
static <I> Flux<I> |
mergeSequential(Publisher<? extends I>... sources)
Merge data from
Publisher sequences provided in an array/vararg
into an ordered merged sequence. |
static <T> Flux<T> |
mergeSequential(Publisher<? extends Publisher<? extends T>> sources)
|
static <T> Flux<T> |
mergeSequential(Publisher<? extends Publisher<? extends T>> sources,
int maxConcurrency,
int prefetch)
|
static <I> Flux<I> |
mergeSequentialDelayError(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences provided in an array/vararg
into an ordered merged sequence. |
static <I> Flux<I> |
mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources,
int maxConcurrency,
int prefetch)
|
static <T> Flux<T> |
mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources,
int maxConcurrency,
int prefetch)
|
Flux<T> |
mergeWith(Publisher<? extends T> other)
|
Flux<T> |
metrics()
Deprecated.
Prefer using the
tap(SignalListenerFactory) with the SignalListenerFactory provided by
the new reactor-core-micrometer module. To be removed in 3.6.0 at the earliest. |
Flux<T> |
name(String name)
Give a name to this sequence, which can be retrieved using
Scannable.name()
as long as this is the first reachable Scannable.parents() . |
static <T> Flux<T> |
never()
Create a
Flux that will never signal any data, error or completion signal. |
Mono<T> |
next()
|
<U> Flux<U> |
ofType(Class<U> clazz)
Evaluate each accepted value against the given
Class type. |
protected static <T> ConnectableFlux<T> |
onAssembly(ConnectableFlux<T> source)
To be used by custom operators: invokes assembly
Hooks pointcut given a
ConnectableFlux , potentially returning a new ConnectableFlux . |
protected static <T> Flux<T> |
onAssembly(Flux<T> source)
|
Flux<T> |
onBackpressureBuffer()
Request an unbounded demand and push to the returned
Flux , or park the
observed elements if not enough demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(Duration ttl,
int maxSize,
Consumer<? super T> onBufferEviction)
Request an unbounded demand and push to the returned
Flux , or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit and for a maximum Duration of ttl (as measured on the
parallel Scheduler ). |
Flux<T> |
onBackpressureBuffer(Duration ttl,
int maxSize,
Consumer<? super T> onBufferEviction,
Scheduler scheduler)
|
Flux<T> |
onBackpressureBuffer(int maxSize)
Request an unbounded demand and push to the returned
Flux , or park up to
maxSize elements when not enough demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(int maxSize,
BufferOverflowStrategy bufferOverflowStrategy)
Request an unbounded demand and push to the returned
Flux , or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. |
Flux<T> |
onBackpressureBuffer(int maxSize,
Consumer<? super T> onOverflow)
Request an unbounded demand and push to the returned
Flux , or park up to
maxSize elements when not enough demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(int maxSize,
Consumer<? super T> onBufferOverflow,
BufferOverflowStrategy bufferOverflowStrategy)
Request an unbounded demand and push to the returned
Flux , or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. |
Flux<T> |
onBackpressureDrop()
Request an unbounded demand and push to the returned
Flux , or drop
the observed elements if not enough demand is requested downstream. |
Flux<T> |
onBackpressureDrop(Consumer<? super T> onDropped)
|
Flux<T> |
onBackpressureError()
Request an unbounded demand and push to the returned
Flux , or emit onError
fom Exceptions.failWithOverflow() if not enough demand is requested
downstream. |
Flux<T> |
onBackpressureLatest()
Request an unbounded demand and push to the returned
Flux , or only keep
the most recent observed item if not enough demand is requested downstream. |
Flux<T> |
onErrorComplete()
Simply complete the sequence by replacing an
onError signal
with an onComplete signal . |
Flux<T> |
onErrorComplete(Class<? extends Throwable> type)
Simply complete the sequence by replacing an
onError signal
with an onComplete signal if the error matches the given
Class . |
Flux<T> |
onErrorComplete(Predicate<? super Throwable> predicate)
Simply complete the sequence by replacing an
onError signal
with an onComplete signal if the error matches the given
Predicate . |
Flux<T> |
onErrorContinue(BiConsumer<Throwable,Object> errorConsumer)
Let compatible operators upstream recover from errors by dropping the
incriminating element from the sequence and continuing with subsequent elements.
|
<E extends Throwable> |
onErrorContinue(Class<E> type,
BiConsumer<Throwable,Object> errorConsumer)
Let compatible operators upstream recover from errors by dropping the
incriminating element from the sequence and continuing with subsequent elements.
|
<E extends Throwable> |
onErrorContinue(Predicate<E> errorPredicate,
BiConsumer<Throwable,Object> errorConsumer)
Let compatible operators upstream recover from errors by dropping the
incriminating element from the sequence and continuing with subsequent elements.
|
<E extends Throwable> |
onErrorMap(Class<E> type,
Function<? super E,? extends Throwable> mapper)
Transform an error emitted by this
Flux by synchronously applying a function
to it if the error matches the given type. |
Flux<T> |
onErrorMap(Function<? super Throwable,? extends Throwable> mapper)
Transform any error emitted by this
Flux by synchronously applying a function to it. |
Flux<T> |
onErrorMap(Predicate<? super Throwable> predicate,
Function<? super Throwable,? extends Throwable> mapper)
Transform an error emitted by this
Flux by synchronously applying a function
to it if the error matches the given predicate. |
<E extends Throwable> |
onErrorResume(Class<E> type,
Function<? super E,? extends Publisher<? extends T>> fallback)
Subscribe to a fallback publisher when an error matching the given type
occurs, using a function to choose the fallback depending on the error.
|
Flux<T> |
onErrorResume(Function<? super Throwable,? extends Publisher<? extends T>> fallback)
Subscribe to a returned fallback publisher when any error occurs, using a function to
choose the fallback depending on the error.
|
Flux<T> |
onErrorResume(Predicate<? super Throwable> predicate,
Function<? super Throwable,? extends Publisher<? extends T>> fallback)
Subscribe to a fallback publisher when an error matching a given predicate
occurs.
|
<E extends Throwable> |
onErrorReturn(Class<E> type,
T fallbackValue)
Simply emit a captured fallback value when an error of the specified type is
observed on this
Flux . |
Flux<T> |
onErrorReturn(Predicate<? super Throwable> predicate,
T fallbackValue)
Simply emit a captured fallback value when an error matching the given predicate is
observed on this
Flux . |
Flux<T> |
onErrorReturn(T fallbackValue)
Simply emit a captured fallback value when any error is observed on this
Flux . |
Flux<T> |
onErrorStop()
If an
onErrorContinue(BiConsumer) variant has been used downstream, reverts
to the default 'STOP' mode where errors are terminal events upstream. |
Flux<T> |
onTerminateDetach()
Detaches both the child
Subscriber and the Subscription on
termination or cancellation. |
Flux<T> |
or(Publisher<? extends T> other)
|
ParallelFlux<T> |
parallel()
Prepare this
Flux by dividing data on a number of 'rails' matching the
number of CPU cores, in a round-robin fashion. |
ParallelFlux<T> |
parallel(int parallelism)
Prepare this
Flux by dividing data on a number of 'rails' matching the
provided parallelism parameter, in a round-robin fashion. |
ParallelFlux<T> |
parallel(int parallelism,
int prefetch)
|
ConnectableFlux<T> |
publish()
Prepare a
ConnectableFlux which shares this Flux sequence and
dispatches values to subscribers in a backpressure-aware manner. |
<R> Flux<R> |
publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform)
Shares a sequence for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream.
|
<R> Flux<R> |
publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform,
int prefetch)
Shares a sequence for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream.
|
ConnectableFlux<T> |
publish(int prefetch)
Prepare a
ConnectableFlux which shares this Flux sequence and
dispatches values to subscribers in a backpressure-aware manner. |
Mono<T> |
publishNext()
Deprecated.
use
shareNext() instead, or use `publish().next()` if you need
to `connect() . To be removed in 3.5.0 |
Flux<T> |
publishOn(Scheduler scheduler)
|
Flux<T> |
publishOn(Scheduler scheduler,
boolean delayError,
int prefetch)
Run onNext, onComplete and onError on a supplied
Scheduler
Scheduler.Worker . |
Flux<T> |
publishOn(Scheduler scheduler,
int prefetch)
Run onNext, onComplete and onError on a supplied
Scheduler
Scheduler.Worker . |
static <T> Flux<T> |
push(Consumer<? super FluxSink<T>> emitter)
|
static <T> Flux<T> |
push(Consumer<? super FluxSink<T>> emitter,
FluxSink.OverflowStrategy backpressure)
|
static Flux<Integer> |
range(int start,
int count)
|
<A> Mono<A> |
reduce(A initial,
BiFunction<A,? super T,A> accumulator)
Reduce the values from this
Flux sequence into a single object matching the
type of a seed value. |
Mono<T> |
reduce(BiFunction<T,T,T> aggregator)
Reduce the values from this
Flux sequence into a single object of the same
type than the emitted items. |
<A> Mono<A> |
reduceWith(Supplier<A> initial,
BiFunction<A,? super T,A> accumulator)
Reduce the values from this
Flux sequence into a single object matching the
type of a lazily supplied seed value. |
Flux<T> |
repeat()
Repeatedly and indefinitely subscribe to the source upon completion of the
previous subscription.
|
Flux<T> |
repeat(BooleanSupplier predicate)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.
|
Flux<T> |
repeat(long numRepeat)
Repeatedly subscribe to the source
numRepeat times. |
Flux<T> |
repeat(long numRepeat,
BooleanSupplier predicate)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous
subscription.
|
Flux<T> |
repeatWhen(Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
Repeatedly subscribe to this
Flux when a companion sequence emits elements in
response to the flux completion signal. |
ConnectableFlux<T> |
replay()
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber . |
ConnectableFlux<T> |
replay(Duration ttl)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
ConnectableFlux<T> |
replay(Duration ttl,
Scheduler timer)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
ConnectableFlux<T> |
replay(int history)
Turn this
Flux into a connectable hot source and cache last emitted
signals for further Subscriber . |
ConnectableFlux<T> |
replay(int history,
Duration ttl)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
ConnectableFlux<T> |
replay(int history,
Duration ttl,
Scheduler timer)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
Flux<T> |
retry()
Re-subscribes to this
Flux sequence if it signals any error, indefinitely. |
Flux<T> |
retry(long numRetries)
Re-subscribes to this
Flux sequence if it signals any error, for a fixed
number of times. |
Flux<T> |
retryWhen(Retry retrySpec)
|
Flux<T> |
sample(Duration timespan)
|
<U> Flux<T> |
sample(Publisher<U> sampler)
|
Flux<T> |
sampleFirst(Duration timespan)
Repeatedly take a value from this
Flux then skip the values that follow
within a given duration. |
<U> Flux<T> |
sampleFirst(Function<? super T,? extends Publisher<U>> samplerFactory)
|
<U> Flux<T> |
sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory)
|
<U> Flux<T> |
sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory,
int maxConcurrency)
|
<A> Flux<A> |
scan(A initial,
BiFunction<A,? super T,A> accumulator)
Reduce this
Flux values with an accumulator BiFunction and
also emit the intermediate results of this function. |
Flux<T> |
scan(BiFunction<T,T,T> accumulator)
Reduce this
Flux values with an accumulator BiFunction and
also emit the intermediate results of this function. |
<A> Flux<A> |
scanWith(Supplier<A> initial,
BiFunction<A,? super T,A> accumulator)
Reduce this
Flux values with the help of an accumulator BiFunction
and also emits the intermediate results. |
Flux<T> |
share()
|
Mono<T> |
shareNext()
|
Mono<T> |
single()
Expect and emit a single item from this
Flux source or signal
NoSuchElementException for an empty source, or
IndexOutOfBoundsException for a source with more than one element. |
Mono<T> |
single(T defaultValue)
Expect and emit a single item from this
Flux source and emit a default
value for an empty source, but signal an IndexOutOfBoundsException for a
source with more than one element. |
Mono<T> |
singleOrEmpty()
Expect and emit a single item from this
Flux source, and accept an empty
source but signal an IndexOutOfBoundsException for a source with more than
one element. |
Flux<T> |
skip(Duration timespan)
Skip elements from this
Flux emitted within the specified initial duration. |
Flux<T> |
skip(Duration timespan,
Scheduler timer)
|
Flux<T> |
skip(long skipped)
Skip the specified number of elements from the beginning of this
Flux then
emit the remaining elements. |
Flux<T> |
skipLast(int n)
Skip a specified number of elements at the end of this
Flux sequence. |
Flux<T> |
skipUntil(Predicate<? super T> untilPredicate)
|
Flux<T> |
skipUntilOther(Publisher<?> other)
|
Flux<T> |
skipWhile(Predicate<? super T> skipPredicate)
|
Flux<T> |
sort()
Sort elements from this
Flux by collecting and sorting them in the background
then emitting the sorted sequence once this sequence completes. |
Flux<T> |
sort(Comparator<? super T> sortFunction)
Sort elements from this
Flux using a Comparator function, by
collecting and sorting elements in the background then emitting the sorted sequence
once this sequence completes. |
Flux<T> |
startWith(Iterable<? extends T> iterable)
|
Flux<T> |
startWith(Publisher<? extends T> publisher)
|
Flux<T> |
startWith(T... values)
Prepend the given values before this
Flux sequence. |
Disposable |
subscribe()
Subscribe to this
Flux and request unbounded demand. |
Disposable |
subscribe(Consumer<? super T> consumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer)
Deprecated.
Because users tend to forget to
request the subsciption. If
the behavior is really needed, consider using subscribeWith(Subscriber) . To be removed in 3.5. |
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Context initialContext)
|
abstract void |
subscribe(CoreSubscriber<? super T> actual)
An internal
Publisher.subscribe(Subscriber) that will bypass
Hooks.onLastOperator(Function) pointcut. |
void |
subscribe(Subscriber<? super T> actual) |
Flux<T> |
subscribeOn(Scheduler scheduler)
Run subscribe, onSubscribe and request on a specified
Scheduler 's Scheduler.Worker . |
Flux<T> |
subscribeOn(Scheduler scheduler,
boolean requestOnSeparateThread)
Run subscribe and onSubscribe on a specified
Scheduler 's Scheduler.Worker . |
<E extends Subscriber<? super T>> |
subscribeWith(E subscriber)
Subscribe a provided instance of a subclass of
Subscriber to this Flux
and return said instance for further chaining calls. |
Flux<T> |
switchIfEmpty(Publisher<? extends T> alternate)
Switch to an alternative
Publisher if this sequence is completed without any data. |
<V> Flux<V> |
switchMap(Function<? super T,Publisher<? extends V>> fn)
|
<V> Flux<V> |
switchMap(Function<? super T,Publisher<? extends V>> fn,
int prefetch)
Deprecated.
to be removed in 3.6.0 at the earliest. In 3.5.0, you should replace
calls with prefetch=0 with calls to switchMap(fn), as the default behavior of the
single-parameter variant will then change to prefetch=0.
|
<V> Flux<V> |
switchOnFirst(BiFunction<Signal<? extends T>,Flux<T>,Publisher<? extends V>> transformer)
Transform the current
Flux once it emits its first element, making a
conditional transformation possible. |
<V> Flux<V> |
switchOnFirst(BiFunction<Signal<? extends T>,Flux<T>,Publisher<? extends V>> transformer,
boolean cancelSourceOnComplete)
Transform the current
Flux once it emits its first element, making a
conditional transformation possible. |
static <T> Flux<T> |
switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers)
|
static <T> Flux<T> |
switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers,
int prefetch)
Deprecated.
to be removed in 3.6.0 at the earliest. In 3.5.0, you should replace
calls with prefetch=0 with calls to switchOnNext(mergedPublishers), as the default
behavior of the single-parameter variant will then change to prefetch=0.
|
Flux<T> |
tag(String key,
String value)
Tag this flux with a key/value pair.
|
Flux<T> |
take(Duration timespan)
|
Flux<T> |
take(Duration timespan,
Scheduler timer)
|
Flux<T> |
take(long n)
Take only the first N values from this
Flux , if available. |
Flux<T> |
take(long n,
boolean limitRequest)
Take only the first N values from this
Flux , if available. |
Flux<T> |
takeLast(int n)
Emit the last N values this
Flux emitted before its completion. |
Flux<T> |
takeUntil(Predicate<? super T> predicate)
|
Flux<T> |
takeUntilOther(Publisher<?> other)
|
Flux<T> |
takeWhile(Predicate<? super T> continuePredicate)
Relay values from this
Flux while a predicate returns TRUE
for the values (checked before each value is delivered). |
Flux<T> |
tap(Function<ContextView,SignalListener<T>> listenerGenerator)
Tap into Reactive Streams signals emitted or received by this
Flux and notify a stateful per-Subscriber
SignalListener . |
Flux<T> |
tap(SignalListenerFactory<T,?> listenerFactory)
Tap into Reactive Streams signals emitted or received by this
Flux and notify a stateful per-Subscriber
SignalListener created by the provided SignalListenerFactory . |
Flux<T> |
tap(Supplier<SignalListener<T>> simpleListenerGenerator)
Tap into Reactive Streams signals emitted or received by this
Flux and notify a stateful per-Subscriber
SignalListener . |
Mono<Void> |
then()
Return a
Mono<Void> that completes when this Flux completes. |
<V> Mono<V> |
then(Mono<V> other)
|
Mono<Void> |
thenEmpty(Publisher<Void> other)
Return a
Mono<Void> that waits for this Flux to complete then
for a supplied Publisher<Void> to also complete. |
<V> Flux<V> |
thenMany(Publisher<V> other)
|
Flux<Timed<T>> |
timed()
Times
Subscriber.onNext(Object) events, encapsulated into a Timed object
that lets downstream consumer look at various time information gathered with nanosecond
resolution using the default clock (Schedulers.parallel() ):
Timed.elapsed() : the time in nanoseconds since last event, as a Duration . |
Flux<Timed<T>> |
timed(Scheduler clock)
Times
Subscriber.onNext(Object) events, encapsulated into a Timed object
that lets downstream consumer look at various time information gathered with nanosecond
resolution using the provided Scheduler as a clock:
Timed.elapsed() : the time in nanoseconds since last event, as a Duration . |
Flux<T> |
timeout(Duration timeout)
Propagate a
TimeoutException as soon as no item is emitted within the
given Duration from the previous emission (or the subscription for the first item). |
Flux<T> |
timeout(Duration timeout,
Publisher<? extends T> fallback)
|
Flux<T> |
timeout(Duration timeout,
Publisher<? extends T> fallback,
Scheduler timer)
|
Flux<T> |
timeout(Duration timeout,
Scheduler timer)
Propagate a
TimeoutException as soon as no item is emitted within the
given Duration from the previous emission (or the subscription for the first
item), as measured by the specified Scheduler . |
<U> Flux<T> |
timeout(Publisher<U> firstTimeout)
Signal a
TimeoutException in case the first item from this Flux has
not been emitted before the given Publisher emits. |
<U,V> Flux<T> |
timeout(Publisher<U> firstTimeout,
Function<? super T,? extends Publisher<V>> nextTimeoutFactory)
Signal a
TimeoutException in case the first item from this Flux has
not been emitted before the firstTimeout Publisher emits, and whenever
each subsequent elements is not emitted before a Publisher generated from
the latest element signals. |
<U,V> Flux<T> |
timeout(Publisher<U> firstTimeout,
Function<? super T,? extends Publisher<V>> nextTimeoutFactory,
Publisher<? extends T> fallback)
|
Flux<Tuple2<Long,T>> |
timestamp()
|
Flux<Tuple2<Long,T>> |
timestamp(Scheduler scheduler)
|
Iterable<T> |
toIterable()
|
Iterable<T> |
toIterable(int batchSize)
|
Iterable<T> |
toIterable(int batchSize,
Supplier<Queue<T>> queueProvider)
|
Stream<T> |
toStream()
|
Stream<T> |
toStream(int batchSize)
|
String |
toString() |
<V> Flux<V> |
transform(Function<? super Flux<T>,? extends Publisher<V>> transformer)
|
<V> Flux<V> |
transformDeferred(Function<? super Flux<T>,? extends Publisher<V>> transformer)
|
<V> Flux<V> |
transformDeferredContextual(BiFunction<? super Flux<T>,? super ContextView,? extends Publisher<V>> transformer)
|
static <T,D> Flux<T> |
using(Callable<? extends D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a
Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or
the Subscriber cancels.
|
static <T,D> Flux<T> |
using(Callable<? extends D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup,
boolean eager)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a
Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or
the Subscriber cancels.
|
static <T,D> Flux<T> |
usingWhen(Publisher<D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> resourceClosure,
Function<? super D,? extends Publisher<?>> asyncCleanup)
Uses a resource, generated by a
Publisher for each individual Subscriber ,
while streaming the values from a Publisher derived from the same resource. |
static <T,D> Flux<T> |
usingWhen(Publisher<D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> resourceClosure,
Function<? super D,? extends Publisher<?>> asyncComplete,
BiFunction<? super D,? super Throwable,? extends Publisher<?>> asyncError,
Function<? super D,? extends Publisher<?>> asyncCancel)
Uses a resource, generated by a
Publisher for each individual Subscriber ,
while streaming the values from a Publisher derived from the same resource. |
Flux<Flux<T>> |
window(Duration windowingTimespan)
|
Flux<Flux<T>> |
window(Duration windowingTimespan,
Duration openWindowEvery)
|
Flux<Flux<T>> |
window(Duration windowingTimespan,
Duration openWindowEvery,
Scheduler timer)
|
Flux<Flux<T>> |
window(Duration windowingTimespan,
Scheduler timer)
|
Flux<Flux<T>> |
window(int maxSize)
|
Flux<Flux<T>> |
window(int maxSize,
int skip)
|
Flux<Flux<T>> |
window(Publisher<?> boundary)
|
Flux<Flux<T>> |
windowTimeout(int maxSize,
Duration maxTime)
|
Flux<Flux<T>> |
windowTimeout(int maxSize,
Duration maxTime,
boolean fairBackpressure)
|
Flux<Flux<T>> |
windowTimeout(int maxSize,
Duration maxTime,
Scheduler timer)
|
Flux<Flux<T>> |
windowTimeout(int maxSize,
Duration maxTime,
Scheduler timer,
boolean fairBackpressure)
|
Flux<Flux<T>> |
windowUntil(Predicate<T> boundaryTrigger)
|
Flux<Flux<T>> |
windowUntil(Predicate<T> boundaryTrigger,
boolean cutBefore)
|
Flux<Flux<T>> |
windowUntil(Predicate<T> boundaryTrigger,
boolean cutBefore,
int prefetch)
|
Flux<Flux<T>> |
windowUntilChanged()
Collect subsequent repetitions of an element (that is, if they arrive right after
one another) into multiple
Flux windows. |
<V> Flux<Flux<T>> |
windowUntilChanged(Function<? super T,? extends V> keySelector,
BiPredicate<? super V,? super V> keyComparator)
Collect subsequent repetitions of an element (that is, if they arrive right after
one another), as compared by a key extracted through the user provided
Function and compared using a supplied BiPredicate , into multiple
Flux windows. |
<V> Flux<Flux<T>> |
windowUntilChanged(Function<? super T,? super V> keySelector)
|
<U,V> Flux<Flux<T>> |
windowWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector)
|
Flux<Flux<T>> |
windowWhile(Predicate<T> inclusionPredicate)
|
Flux<Flux<T>> |
windowWhile(Predicate<T> inclusionPredicate,
int prefetch)
|
<U,R> Flux<R> |
withLatestFrom(Publisher<? extends U> other,
BiFunction<? super T,? super U,? extends R> resultSelector)
Combine the most recently emitted values from both this
Flux and another
Publisher through a BiFunction and emits the result. |
static <I,O> Flux<O> |
zip(Function<? super Object[],? extends O> combinator,
int prefetch,
Publisher<? extends I>... sources)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <I,O> Flux<O> |
zip(Function<? super Object[],? extends O> combinator,
Publisher<? extends I>... sources)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <O> Flux<O> |
zip(Iterable<? extends Publisher<?>> sources,
Function<? super Object[],? extends O> combinator)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <O> Flux<O> |
zip(Iterable<? extends Publisher<?>> sources,
int prefetch,
Function<? super Object[],? extends O> combinator)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <TUPLE extends Tuple2,V> |
zip(Publisher<? extends Publisher<?>> sources,
Function<? super TUPLE,? extends V> combinator)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <T1,T2> Flux<Tuple2<T1,T2>> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2)
Zip two sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple2 . |
static <T1,T2,O> Flux<O> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
BiFunction<? super T1,? super T2,? extends O> combinator)
Zip two sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <T1,T2,T3> Flux<Tuple3<T1,T2,T3>> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3)
Zip three sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple3 . |
static <T1,T2,T3,T4> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4)
Zip four sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple4 . |
static <T1,T2,T3,T4,T5> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5)
Zip five sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple5 . |
static <T1,T2,T3,T4,T5,T6> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6)
Zip six sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple6 . |
static <T1,T2,T3,T4,T5,T6,T7> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6,
Publisher<? extends T7> source7)
Zip seven sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple7 . |
static <T1,T2,T3,T4,T5,T6,T7,T8> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6,
Publisher<? extends T7> source7,
Publisher<? extends T8> source8)
Zip eight sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple8 . |
<T2> Flux<Tuple2<T,T2>> |
zipWith(Publisher<? extends T2> source2)
|
<T2,V> Flux<V> |
zipWith(Publisher<? extends T2> source2,
BiFunction<? super T,? super T2,? extends V> combinator)
Zip this
Flux with another Publisher source, that is to say wait
for both to emit one element and combine these elements using a combinator
BiFunction
The operator will continue doing so until any of the sources completes. |
<T2> Flux<Tuple2<T,T2>> |
zipWith(Publisher<? extends T2> source2,
int prefetch)
|
<T2,V> Flux<V> |
zipWith(Publisher<? extends T2> source2,
int prefetch,
BiFunction<? super T,? super T2,? extends V> combinator)
Zip this
Flux with another Publisher source, that is to say wait
for both to emit one element and combine these elements using a combinator
BiFunction
The operator will continue doing so until any of the sources completes. |
<T2> Flux<Tuple2<T,T2>> |
zipWithIterable(Iterable<? extends T2> iterable)
|
<T2,V> Flux<V> |
zipWithIterable(Iterable<? extends T2> iterable,
BiFunction<? super T,? super T2,? extends V> zipper)
Zip elements from this
Flux with the content of an Iterable , that is
to say combine one element from each, pairwise, using the given zipper BiFunction . |
@SafeVarargs public static <T,V> Flux<V> combineLatest(Function<Object[],V> combinator, Publisher<? extends T>... sources)
Flux
whose data are generated by the combination of the
most recently published value from each of the Publisher
sources.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T
- type of the value from sourcesV
- The produced output after transformation by the given combinatorsources
- The Publisher
sources to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinations@SafeVarargs public static <T,V> Flux<V> combineLatest(Function<Object[],V> combinator, int prefetch, Publisher<? extends T>... sources)
Flux
whose data are generated by the combination of the
most recently published value from each of the Publisher
sources.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T
- type of the value from sourcesV
- The produced output after transformation by the given combinatorsources
- The Publisher
sources to combine values fromprefetch
- The demand sent to each combined source Publisher
combinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of two Publisher
sources.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T1
- type of the value from source1T2
- type of the value from source2V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,T3,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of three Publisher
sources.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromsource3
- The third Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,T3,T4,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of four Publisher
sources.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromsource3
- The third Publisher
source to combine values fromsource4
- The fourth Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,T3,T4,T5,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of five Publisher
sources.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromsource3
- The third Publisher
source to combine values fromsource4
- The fourth Publisher
source to combine values fromsource5
- The fifth Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,T3,T4,T5,T6,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of six Publisher
sources.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromsource3
- The third Publisher
source to combine values fromsource4
- The fourth Publisher
source to combine values fromsource5
- The fifth Publisher
source to combine values fromsource6
- The sixth Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T,V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each
of the Publisher
sources provided in an Iterable
.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T
- The common base type of the values from sourcesV
- The produced output after transformation by the given combinatorsources
- The list of Publisher
sources to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T,V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, int prefetch, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each
of the Publisher
sources provided in an Iterable
.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T
- The common base type of the values from sourcesV
- The produced output after transformation by the given combinatorsources
- The list of Publisher
sources to combine values fromprefetch
- demand produced to each combined source Publisher
combinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T> Flux<T> concat(Iterable<? extends Publisher<? extends T>> sources)
Iterable
, forwarding elements
emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
@SafeVarargs public final Flux<T> concatWithValues(T... values)
Flux
values
- The values to concatenateFlux
concatenating all source sequencespublic static <T> Flux<T> concat(Publisher<? extends Publisher<? extends T>> sources)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
public static <T> Flux<T> concat(Publisher<? extends Publisher<? extends T>> sources, int prefetch)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
@SafeVarargs public static <T> Flux<T> concat(Publisher<? extends T>... sources)
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, int prefetch)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
T
- The type of values in both source and output sequencessources
- The Publisher
of Publisher
to concatenateprefetch
- number of elements to prefetch from the source, to be turned into inner PublishersFlux
concatenating all inner sources sequences until complete or errorpublic static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, boolean delayUntilEnd, int prefetch)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes.
Errors do not interrupt the main sequence but are propagated after the current
concat backlog if delayUntilEnd
is false or after all sources
have had a chance to be concatenated if delayUntilEnd
is true.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
T
- The type of values in both source and output sequencessources
- The Publisher
of Publisher
to concatenatedelayUntilEnd
- delay error until all sources have been consumed instead of
after the current sourceprefetch
- the number of Publishers to prefetch from the outer Publisher
Flux
concatenating all inner sources sequences until complete or error@SafeVarargs public static <T> Flux<T> concatDelayError(Publisher<? extends T>... sources)
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)
Flux
with the capability of emitting multiple
elements in a synchronous or asynchronous manner through the FluxSink
API.
This includes emitting elements from multiple threads.
This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>create(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
});
Discard Support: The FluxSink
exposed by this operator buffers in case of
overflow. The buffer is discarded when the main sequence is cancelled.
T
- The type of values in the sequenceemitter
- Consume the FluxSink
provided per-subscriber by Reactor to generate signals.Flux
push(Consumer)
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure)
Flux
with the capability of emitting multiple
elements in a synchronous or asynchronous manner through the FluxSink
API.
This includes emitting elements from multiple threads.
This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>create(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
}, FluxSink.OverflowStrategy.LATEST);
Discard Support: The FluxSink
exposed by this operator discards elements
as relevant to the chosen FluxSink.OverflowStrategy
. For example, the FluxSink.OverflowStrategy.DROP
discards each items as they are being dropped, while FluxSink.OverflowStrategy.BUFFER
will discard the buffer upon cancellation.
T
- The type of values in the sequencebackpressure
- the backpressure mode, see FluxSink.OverflowStrategy
for the
available backpressure modesemitter
- Consume the FluxSink
provided per-subscriber by Reactor to generate signals.Flux
push(Consumer, reactor.core.publisher.FluxSink.OverflowStrategy)
public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter)
Flux
with the capability of emitting multiple
elements from a single-threaded producer through the FluxSink
API. For
a multi-threaded capable alternative, see create(Consumer)
.
This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>push(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
});
Discard Support: The FluxSink
exposed by this operator buffers in case of
overflow. The buffer is discarded when the main sequence is cancelled.
T
- The type of values in the sequenceemitter
- Consume the FluxSink
provided per-subscriber by Reactor to generate signals.Flux
create(Consumer)
public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure)
Flux
with the capability of emitting multiple
elements from a single-threaded producer through the FluxSink
API. For
a multi-threaded capable alternative, see create(Consumer, reactor.core.publisher.FluxSink.OverflowStrategy)
.
This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>push(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
}, FluxSink.OverflowStrategy.LATEST);
Discard Support: The FluxSink
exposed by this operator discards elements
as relevant to the chosen FluxSink.OverflowStrategy
. For example, the FluxSink.OverflowStrategy.DROP
discards each items as they are being dropped, while FluxSink.OverflowStrategy.BUFFER
will discard the buffer upon cancellation.
T
- The type of values in the sequencebackpressure
- the backpressure mode, see FluxSink.OverflowStrategy
for the
available backpressure modesemitter
- Consume the FluxSink
provided per-subscriber by Reactor to generate signals.Flux
create(Consumer, reactor.core.publisher.FluxSink.OverflowStrategy)
public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier)
Publisher
every time a Subscription
is made on the
resulting Flux
, so the actual source instantiation is deferred until each
subscribe and the Supplier
can create a subscriber-specific instance.
If the supplier doesn't generate a new instance however, this operator will
effectively behave like from(Publisher)
.
T
- the type of values passing through the Flux
supplier
- the Publisher
Supplier
to call on subscribeFlux
deferContextual(Function)
public static <T> Flux<T> deferContextual(Function<ContextView,? extends Publisher<T>> contextualPublisherFactory)
Publisher
every time a Subscription
is made on the
resulting Flux
, so the actual source instantiation is deferred until each
subscribe and the Function
can create a subscriber-specific instance.
This operator behaves the same way as defer(Supplier)
,
but accepts a Function
that will receive the current ContextView
as an argument.
If the function doesn't generate a new instance however, this operator will
effectively behave like from(Publisher)
.
public static <T> Flux<T> empty()
Flux
that completes without emitting any item.
T
- the reified type of the target Subscriber
Flux
public static <T> Flux<T> error(Throwable error)
Flux
that terminates with the specified error immediately after
being subscribed to.
T
- the reified type of the target Subscriber
error
- the error to signal to each Subscriber
Flux
public static <T> Flux<T> error(Supplier<? extends Throwable> errorSupplier)
Flux
that terminates with an error immediately after being
subscribed to. The Throwable
is generated by a Supplier
, invoked
each time there is a subscription and allowing for lazy instantiation.
T
- the reified type of the target Subscriber
errorSupplier
- the error signal Supplier
to invoke for each Subscriber
Flux
public static <O> Flux<O> error(Throwable throwable, boolean whenRequested)
Flux
that terminates with the specified error, either immediately
after being subscribed to or after being first requested.
O
- the reified type of the target Subscriber
throwable
- the error to signal to each Subscriber
whenRequested
- if true, will onError on the first request instead of subscribe().Flux
@SafeVarargs @Deprecated public static <I> Flux<I> first(Publisher<? extends I>... sources)
firstWithSignal(Publisher[])
. To be removed in reactor 3.5.Publisher
to emit any signal (onNext/onError/onComplete) and
replay all signals from that Publisher
, effectively behaving like the
fastest of these competing sources.
I
- The type of values in both source and output sequencessources
- The competing source publishersFlux
behaving like the fastest of its sources@Deprecated public static <I> Flux<I> first(Iterable<? extends Publisher<? extends I>> sources)
firstWithSignal(Iterable)
. To be removed in reactor 3.5.Publisher
to emit any signal (onNext/onError/onComplete) and
replay all signals from that Publisher
, effectively behaving like the
fastest of these competing sources.
I
- The type of values in both source and output sequencessources
- The competing source publishersFlux
behaving like the fastest of its sources@SafeVarargs public static <I> Flux<I> firstWithSignal(Publisher<? extends I>... sources)
Publisher
to emit any signal (onNext/onError/onComplete) and
replay all signals from that Publisher
, effectively behaving like the
fastest of these competing sources.
I
- The type of values in both source and output sequencessources
- The competing source publishersFlux
behaving like the fastest of its sourcespublic static <I> Flux<I> firstWithSignal(Iterable<? extends Publisher<? extends I>> sources)
Publisher
to emit any signal (onNext/onError/onComplete) and
replay all signals from that Publisher
, effectively behaving like the
fastest of these competing sources.
I
- The type of values in both source and output sequencessources
- The competing source publishersFlux
behaving like the fastest of its sourcespublic static <I> Flux<I> firstWithValue(Iterable<? extends Publisher<? extends I>> sources)
Publisher
to emit any value and replay all values
from that Publisher
, effectively behaving like the source that first
emits an onNext
.
Sources with values always "win" over empty sources (ones that only emit onComplete) or failing sources (ones that only emit onError).
When no source can provide a value, this operator fails with a NoSuchElementException
(provided there are at least two sources). This exception has a composite
as its cause
that can be used to inspect what went wrong with each source
(so the composite has as many elements as there are sources).
Exceptions from failing sources are directly reflected in the composite at the index of the failing source.
For empty sources, a NoSuchElementException
is added at their respective index.
One can use Exceptions.unwrapMultiple(topLevel.getCause())
to easily inspect these errors as a List
.
Note that like in firstWithSignal(Iterable)
, an infinite source can be problematic
if no other source emits onNext.
@SafeVarargs public static <I> Flux<I> firstWithValue(Publisher<? extends I> first, Publisher<? extends I>... others)
Publisher
to emit any value and replay all values
from that Publisher
, effectively behaving like the source that first
emits an onNext
.
Sources with values always "win" over an empty source (ones that only emit onComplete) or failing sources (ones that only emit onError).
When no source can provide a value, this operator fails with a NoSuchElementException
(provided there are at least two sources). This exception has a composite
as its cause
that can be used to inspect what went wrong with each source
(so the composite has as many elements as there are sources).
Exceptions from failing sources are directly reflected in the composite at the index of the failing source.
For empty sources, a NoSuchElementException
is added at their respective index.
One can use Exceptions.unwrapMultiple(topLevel.getCause())
to easily inspect these errors as a List
.
Note that like in firstWithSignal(Publisher[])
, an infinite source can be problematic
if no other source emits onNext.
In case the first
source is already an array-based firstWithValue(Publisher, Publisher[])
instance, nesting is avoided: a single new array-based instance is created with all the
sources from first
plus all the others
sources at the same level.
I
- The type of values in both source and output sequencesfirst
- The first competing source publisherothers
- The other competing source publishersFlux
behaving like the fastest of its sourcespublic static <T> Flux<T> from(Publisher<? extends T> source)
Publisher
with the Flux
API.
Hooks.onEachOperator(String, Function)
and similar assembly hooks are applied
unless the source is already a Flux
.
T
- The type of values in both source and output sequencessource
- the source to decorateFlux
public static <T> Flux<T> fromArray(T[] array)
Flux
that emits the items contained in the provided array.
T
- The type of values in the source array and resulting Fluxarray
- the array to read data fromFlux
public static <T> Flux<T> fromIterable(Iterable<? extends T> it)
Flux
that emits the items contained in the provided Iterable
.
The Iterable.iterator()
method will be invoked at least once and at most twice
for each subscriber.
This operator inspects the Iterable
's Spliterator
to assess if the iteration
can be guaranteed to be finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)
).
Since the default Spliterator wraps the Iterator we can have two Iterable.iterator()
calls. This second invocation is skipped on a Collection
however, a type which is
assumed to be always finite.
Discard Support: Upon cancellation, this operator attempts to discard the remainder of the
Iterable
if it can safely ensure the iterator is finite.
Note that this means the Iterable.iterator()
method could be invoked twice.
public static <T> Flux<T> fromStream(Stream<? extends T> s)
Flux
that emits the items contained in the provided Stream
.
Keep in mind that a Stream
cannot be re-used, which can be problematic in
case of multiple subscriptions or re-subscription (like with repeat()
or
retry()
). The Stream
is closed
automatically
by the operator on cancellation, error or completion.
Discard Support: Upon cancellation, this operator attempts to discard remainder of the
Stream
through its open Spliterator
, if it can safely ensure it is finite
(see Operators.onDiscardMultiple(Iterator, boolean, Context)
).
public static <T> Flux<T> fromStream(Supplier<Stream<? extends T>> streamSupplier)
Flux
that emits the items contained in a Stream
created by
the provided Supplier
for each subscription. The Stream
is
closed
automatically by the operator on cancellation, error
or completion.
Discard Support: Upon cancellation, this operator attempts to discard remainder of the
Stream
through its open Spliterator
, if it can safely ensure it is finite
(see Operators.onDiscardMultiple(Iterator, boolean, Context)
).
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
Flux
by generating signals one-by-one via a
consumer callback.
T
- the value type emittedgenerator
- Consume the SynchronousSink
provided per-subscriber by Reactor
to generate a single signal on each pass.Flux
public static <T,S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator)
Flux
by generating signals one-by-one via a
consumer callback and some state. The stateSupplier
may return null.
T
- the value type emittedS
- the per-subscriber custom state typestateSupplier
- called for each incoming Subscriber to provide the initial state for the generator bifunctiongenerator
- Consume the SynchronousSink
provided per-subscriber by Reactor
as well as the current state to generate a single signal on each pass
and return a (new) state.Flux
public static <T,S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator, Consumer<? super S> stateConsumer)
Flux
by generating signals one-by-one via a
consumer callback and some state, with a final cleanup callback. The
stateSupplier
may return null but your cleanup stateConsumer
will need to handle the null case.
T
- the value type emittedS
- the per-subscriber custom state typestateSupplier
- called for each incoming Subscriber to provide the initial state for the generator bifunctiongenerator
- Consume the SynchronousSink
provided per-subscriber by Reactor
as well as the current state to generate a single signal on each pass
and return a (new) state.stateConsumer
- called after the generator has terminated or the downstream cancelled, receiving the last
state to be handled (i.e., release resources or do other cleanup).Flux
public static Flux<Long> interval(Duration period)
Flux
that emits long values starting with 0 and incrementing at
specified time intervals on the global timer. The first element is emitted after
an initial delay equal to the period
. If demand is not produced in time,
an onError will be signalled with an overflow
IllegalStateException
detailing the tick that couldn't be emitted.
In normal conditions, the Flux
will never complete.
Runs on the Schedulers.parallel()
Scheduler.
public static Flux<Long> interval(Duration delay, Duration period)
Flux
that emits long values starting with 0 and incrementing at
specified time intervals, after an initial delay, on the global timer. If demand is
not produced in time, an onError will be signalled with an
overflow
IllegalStateException
detailing the tick that couldn't be emitted. In normal conditions, the Flux
will never complete.
Runs on the Schedulers.parallel()
Scheduler.
public static Flux<Long> interval(Duration period, Scheduler timer)
Flux
that emits long values starting with 0 and incrementing at
specified time intervals, on the specified Scheduler
. The first element is
emitted after an initial delay equal to the period
. If demand is not
produced in time, an onError will be signalled with an overflow
IllegalStateException
detailing the tick that couldn't be emitted.
In normal conditions, the Flux
will never complete.
public static Flux<Long> interval(Duration delay, Duration period, Scheduler timer)
Flux
that emits long values starting with 0 and incrementing at
specified time intervals, after an initial delay, on the specified Scheduler
.
If demand is not produced in time, an onError will be signalled with an
overflow
IllegalStateException
detailing the tick that couldn't be emitted. In normal conditions, the Flux
will never complete.
@SafeVarargs public static <T> Flux<T> just(T... data)
Flux
that emits the provided elements and then completes.
T
- the emitted data typedata
- the elements to emit, as a varargFlux
public static <T> Flux<T> just(T data)
Flux
that will only emit a single element then onComplete.
T
- the emitted data typedata
- the single element to emitFlux
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source)
Publisher
sequences emitted by the passed Publisher
into an interleaved merged sequence. Unlike concat
, inner
sources are subscribed to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency)
Publisher
sequences emitted by the passed Publisher
into an interleaved merged sequence. Unlike concat
, inner
sources are subscribed to eagerly (but at most concurrency
sources are
subscribed to at the same time).
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency, int prefetch)
Publisher
sequences emitted by the passed Publisher
into an interleaved merged sequence. Unlike concat
, inner
sources are subscribed to eagerly (but at most concurrency
sources are
subscribed to at the same time).
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public static <I> Flux<I> merge(Iterable<? extends Publisher<? extends I>> sources)
Publisher
sequences contained in an Iterable
into an interleaved merged sequence. Unlike concat
, inner
sources are subscribed to eagerly.
A new Iterator
will be created for each subscriber.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I> Flux<I> merge(Publisher<? extends I>... sources)
Publisher
sequences contained in an array / vararg
into an interleaved merged sequence. Unlike concat
,
sources are subscribed to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I> Flux<I> merge(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences contained in an array / vararg
into an interleaved merged sequence. Unlike concat
,
sources are subscribed to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I> Flux<I> mergeDelayError(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences contained in an array / vararg
into an interleaved merged sequence. Unlike concat
,
sources are subscribed to eagerly.
This variant will delay any error until after the rest of the merge backlog has been processed.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I extends Comparable<? super I>> Flux<I> mergePriority(Publisher<? extends I>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by their natural order) as they arrive.
This is not a sort()
, as it doesn't consider the whole of each sequences. Unlike mergeComparing,
this operator does not wait for a value from each source to arrive either.
While this operator does retrieve at most one value from each source, it only compares values when two or more sources emit at the same time. In that case it picks the smallest of these competing values and continues doing so as long as there is demand. It is therefore best suited for asynchronous sources where you do not want to wait for a value from each source before emitting a value downstream.
I
- a Comparable
merged type that has a natural order
sources
- Publisher
sources of Comparable
to mergeFlux
that compares the latest available value from each source, publishing the
smallest value and replenishing the source that produced it.@SafeVarargs public static <T> Flux<T> mergePriority(Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
) as they arrive. This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences. Unlike mergeComparing, this operator does not wait for a value from each
source to arrive either.
While this operator does retrieve at most one value from each source, it only compares values when two or more sources emit at the same time. In that case it picks the smallest of these competing values and continues doing so as long as there is demand. It is therefore best suited for asynchronous sources where you do not want to wait for a value from each source before emitting a value downstream.
T
- the merged typecomparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that compares the latest available value from each source, publishing the
smallest value and replenishing the source that produced it.@SafeVarargs public static <T> Flux<T> mergePriority(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
) as they arrive. This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences. Unlike mergeComparing, this operator does not wait for a value from each
source to arrive either.
While this operator does retrieve at most one value from each source, it only compares values when two or more sources emit at the same time. In that case it picks the smallest of these competing values and continues doing so as long as there is demand. It is therefore best suited for asynchronous sources where you do not want to wait for a value from each source before emitting a value downstream.
T
- the merged typeprefetch
- the number of elements to prefetch from each source (avoiding too
many small requests to the source when picking)comparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that compares the latest available value from each source, publishing the
smallest value and replenishing the source that produced it.@SafeVarargs public static <T> Flux<T> mergePriorityDelayError(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
) as they arrive. This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences. Unlike mergeComparing, this operator does not wait for a value from each
source to arrive either.
While this operator does retrieve at most one value from each source, it only compares values when two or more sources emit at the same time. In that case it picks the smallest of these competing values and continues doing so as long as there is demand. It is therefore best suited for asynchronous sources where you do not want to wait for a value from each source before emitting a value downstream.
Note that it is delaying errors until all data is consumed.
T
- the merged typeprefetch
- the number of elements to prefetch from each source (avoiding too
many small requests to the source when picking)comparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that compares the latest available value from each source, publishing the
smallest value and replenishing the source that produced it.@SafeVarargs public static <I extends Comparable<? super I>> Flux<I> mergeComparing(Publisher<? extends I>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by their natural order).
This is not a sort()
, as it doesn't consider the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
I
- a Comparable
merged type that has a natural order
sources
- Publisher
sources of Comparable
to mergeFlux
that , subscribing early but keeping the original ordering@SafeVarargs public static <T> Flux<T> mergeComparing(Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
). This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
T
- the merged typecomparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that compares latest values from each source, using the
smallest value and replenishing the source that produced it@SafeVarargs public static <T> Flux<T> mergeComparing(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
). This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
T
- the merged typeprefetch
- the number of elements to prefetch from each source (avoiding too
many small requests to the source when picking)comparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that compares latest values from each source, using the
smallest value and replenishing the source that produced it@SafeVarargs public static <T> Flux<T> mergeComparingDelayError(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
). This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
Note that it is delaying errors until all data is consumed.
T
- the merged typeprefetch
- the number of elements to prefetch from each source (avoiding too
many small requests to the source when picking)comparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that compares latest values from each source, using the
smallest value and replenishing the source that produced it@SafeVarargs @Deprecated public static <I extends Comparable<? super I>> Flux<I> mergeOrdered(Publisher<? extends I>... sources)
mergeComparingDelayError(int, Comparator, Publisher[])
instead
(as mergeComparing(Publisher[])
don't have this operator's delayError behavior).
To be removed in 3.6.0 at the earliest.Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by their natural order).
This is not a sort()
, as it doesn't consider the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
Note that it is delaying errors until all data is consumed.
I
- a Comparable
merged type that has a natural order
sources
- Publisher
sources of Comparable
to mergeFlux
that compares latest values from each source, using the
smallest value and replenishing the source that produced it@SafeVarargs @Deprecated public static <T> Flux<T> mergeOrdered(Comparator<? super T> comparator, Publisher<? extends T>... sources)
mergeComparingDelayError(int, Comparator, Publisher[])
instead
(as mergeComparing(Publisher[])
don't have this operator's delayError behavior).
To be removed in 3.6.0 at the earliest.Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
). This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
Note that it is delaying errors until all data is consumed.
T
- the merged typecomparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that compares latest values from each source, using the
smallest value and replenishing the source that produced it@SafeVarargs @Deprecated public static <T> Flux<T> mergeOrdered(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
mergeComparingDelayError(int, Comparator, Publisher[])
instead
(as mergeComparing(Publisher[])
don't have this operator's delayError behavior).
To be removed in 3.6.0 at the earliest.Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
). This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
Note that it is delaying errors until all data is consumed.
T
- the merged typeprefetch
- the number of elements to prefetch from each source (avoiding too
many small requests to the source when picking)comparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that compares latest values from each source, using the
smallest value and replenishing the source that produced itpublic static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources)
public static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch)
Publisher
sequences emitted by the passed Publisher
into an ordered merged sequence. Unlike concat, the inner publishers are subscribed to
eagerly (but at most maxConcurrency
sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
T
- the merged typesources
- a Publisher
of Publisher
sources to mergeprefetch
- the inner source request sizemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogFlux
, subscribing early but keeping the original orderingpublic static <T> Flux<T> mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch)
Publisher
sequences emitted by the passed Publisher
into an ordered merged sequence. Unlike concat, the inner publishers are subscribed to
eagerly (but at most maxConcurrency
sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
This variant will delay any error until after the rest of the mergeSequential backlog has been processed.
T
- the merged typesources
- a Publisher
of Publisher
sources to mergeprefetch
- the inner source request sizemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogFlux
, subscribing early but keeping the original ordering@SafeVarargs public static <I> Flux<I> mergeSequential(Publisher<? extends I>... sources)
Publisher
sequences provided in an array/vararg
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.
@SafeVarargs public static <I> Flux<I> mergeSequential(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences provided in an array/vararg
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.
@SafeVarargs public static <I> Flux<I> mergeSequentialDelayError(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences provided in an array/vararg
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.
This variant will delay any error until after the rest of the mergeSequential backlog
has been processed.
public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources)
public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch)
Publisher
sequences provided in an Iterable
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly (but at most maxConcurrency
sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
I
- the merged typesources
- an Iterable
of Publisher
sequences to mergemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogprefetch
- the inner source request sizeFlux
, subscribing early but keeping the original orderingpublic static <I> Flux<I> mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch)
Publisher
sequences provided in an Iterable
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly (but at most maxConcurrency
sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
This variant will delay any error until after the rest of the mergeSequential backlog
has been processed.
I
- the merged typesources
- an Iterable
of Publisher
sequences to mergemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogprefetch
- the inner source request sizeFlux
, subscribing early but keeping the original orderingpublic static <T> Flux<T> never()
Flux
that will never signal any data, error or completion signal.
T
- the Subscriber
type targetFlux
public static Flux<Integer> range(int start, int count)
Flux
that will only emit a sequence of count
incrementing integers,
starting from start
. That is, emit integers between start
(included)
and start + count
(excluded) then complete.
start
- the first integer to be emitcount
- the total number of incrementing values to emit, including the first valueFlux
public static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers)
Flux
that mirrors the most recently emitted Publisher
,
forwarding its data until a new Publisher
comes in the source.
The resulting Flux
will complete once there are no new Publisher
in
the source (source has completed) and the last mirrored Publisher
has also
completed.
This operator requests the mergedPublishers
source for an unbounded amount of inner publishers,
but doesn't request each inner Publisher
unless the downstream has made
a corresponding request (no prefetch on publishers emitted by mergedPublishers
).
@Deprecated public static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers, int prefetch)
public static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)
Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal event.
For an asynchronous version of the cleanup, with distinct path for onComplete, onError
and cancel terminations, see usingWhen(Publisher, Function, Function, BiFunction, Function)
.
T
- emitted typeD
- resource typeresourceSupplier
- a Callable
that is called on subscribe to generate the resourcesourceSupplier
- a factory to derive a Publisher
from the supplied resourceresourceCleanup
- a resource cleanup callback invoked on completionFlux
built around a disposable resourceusingWhen(Publisher, Function, Function, BiFunction, Function)
,
usingWhen(Publisher, Function, Function)
public static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager)
For an asynchronous version of the cleanup, with distinct path for onComplete, onError
and cancel terminations, see usingWhen(Publisher, Function, Function, BiFunction, Function)
.
T
- emitted typeD
- resource typeresourceSupplier
- a Callable
that is called on subscribe to generate the resourcesourceSupplier
- a factory to derive a Publisher
from the supplied resourceresourceCleanup
- a resource cleanup callback invoked on completioneager
- true to clean before terminating downstream subscribersFlux
built around a disposable resourceusingWhen(Publisher, Function, Function, BiFunction, Function)
,
usingWhen(Publisher, Function, Function)
public static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> resourceClosure, Function<? super D,? extends Publisher<?>> asyncCleanup)
Publisher
for each individual Subscriber
,
while streaming the values from a Publisher
derived from the same resource.
Whenever the resulting sequence terminates, a provided Function
generates
a "cleanup" Publisher
that is invoked but doesn't change the content of the
main sequence. Instead it just defers the termination (unless it errors, in which case
the error suppresses the original termination signal).
Note that if the resource supplying Publisher
emits more than one resource, the
subsequent resources are dropped (Operators.onNextDropped(Object, Context)
). If
the publisher errors AFTER having emitted one resource, the error is also silently dropped
(Operators.onErrorDropped(Throwable, Context)
).
An empty completion or error without at least one onNext signal triggers a short-circuit
of the main sequence with the same terminal signal (no resource is established, no
cleanup is invoked).
T
- the type of elements emitted by the resource closure, and thus the main sequenceD
- the type of the resource objectresourceSupplier
- a Publisher
that "generates" the resource,
subscribed for each subscription to the main sequenceresourceClosure
- a factory to derive a Publisher
from the supplied resourceasyncCleanup
- an asynchronous resource cleanup invoked when the resource
closure terminates (with onComplete, onError or cancel)Flux
built around a "transactional" resource, with asynchronous
cleanup on all terminations (onComplete, onError, cancel)public static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> resourceClosure, Function<? super D,? extends Publisher<?>> asyncComplete, BiFunction<? super D,? super Throwable,? extends Publisher<?>> asyncError, Function<? super D,? extends Publisher<?>> asyncCancel)
Publisher
for each individual Subscriber
,
while streaming the values from a Publisher
derived from the same resource.
Note that all steps of the operator chain that would need the resource to be in an open
stable state need to be described inside the resourceClosure
Function
.
Whenever the resulting sequence terminates, the relevant Function
generates
a "cleanup" Publisher
that is invoked but doesn't change the content of the
main sequence. Instead it just defers the termination (unless it errors, in which case
the error suppresses the original termination signal).
Individual cleanups can also be associated with main sequence cancellation and error terminations:
Note that if the resource supplying Publisher
emits more than one resource, the
subsequent resources are dropped (Operators.onNextDropped(Object, Context)
). If
the publisher errors AFTER having emitted one resource, the error is also silently dropped
(Operators.onErrorDropped(Throwable, Context)
).
An empty completion or error without at least one onNext signal triggers a short-circuit
of the main sequence with the same terminal signal (no resource is established, no
cleanup is invoked).
Additionally, the terminal signal is replaced by any error that might have happened
in the terminating Publisher
:
Finally, early cancellations will cancel the resource supplying Publisher
:
T
- the type of elements emitted by the resource closure, and thus the main sequenceD
- the type of the resource objectresourceSupplier
- a Publisher
that "generates" the resource,
subscribed for each subscription to the main sequenceresourceClosure
- a factory to derive a Publisher
from the supplied resourceasyncComplete
- an asynchronous resource cleanup invoked if the resource closure terminates with onCompleteasyncError
- an asynchronous resource cleanup invoked if the resource closure terminates with onError.
The terminating error is provided to the BiFunction
asyncCancel
- an asynchronous resource cleanup invoked if the resource closure is cancelled.
When null
, the asyncComplete
path is used instead.Flux
built around a "transactional" resource, with several
termination path triggering asynchronous cleanup sequencesusingWhen(Publisher, Function, Function)
public static <T1,T2,O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends O> combinator)
T1
- type of the value from source1T2
- type of the value from source2O
- The produced output after transformation by the combinatorsource1
- The first Publisher
source to zip.source2
- The second Publisher
source to zip.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamFlux
public static <T1,T2> Flux<Tuple2<T1,T2>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2)
Tuple2
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
public static <T1,T2,T3> Flux<Tuple3<T1,T2,T3>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3)
Tuple3
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4> Flux<Tuple4<T1,T2,T3,T4>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4)
Tuple4
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5> Flux<Tuple5<T1,T2,T3,T4,T5>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5)
Tuple5
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5,T6> Flux<Tuple6<T1,T2,T3,T4,T5,T6>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6)
Tuple6
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5,T6,T7> Flux<Tuple7<T1,T2,T3,T4,T5,T6,T7>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7)
Tuple7
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6T7
- type of the value from source7source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.source7
- The seventh upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5,T6,T7,T8> Flux<Tuple8<T1,T2,T3,T4,T5,T6,T7,T8>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7, Publisher<? extends T8> source8)
Tuple8
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6T7
- type of the value from source7T8
- type of the value from source8source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.source7
- The seventh upstream Publisher
to subscribe to.source8
- The eight upstream Publisher
to subscribe to.Flux
public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources, Function<? super Object[],? extends O> combinator)
Iterable.iterator()
will be called on each Publisher.subscribe(Subscriber)
.
public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources, int prefetch, Function<? super Object[],? extends O> combinator)
Iterable.iterator()
will be called on each Publisher.subscribe(Subscriber)
.
O
- the combined produced typesources
- the Iterable
providing sources to zipprefetch
- the inner source request sizecombinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
@SafeVarargs public static <I,O> Flux<O> zip(Function<? super Object[],? extends O> combinator, Publisher<? extends I>... sources)
I
- the type of the input sourcesO
- the combined produced typecombinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamsources
- the array providing sources to zipFlux
@SafeVarargs public static <I,O> Flux<O> zip(Function<? super Object[],? extends O> combinator, int prefetch, Publisher<? extends I>... sources)
I
- the type of the input sourcesO
- the combined produced typecombinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamprefetch
- individual source request sizesources
- the array providing sources to zipFlux
public static <TUPLE extends Tuple2,V> Flux<V> zip(Publisher<? extends Publisher<?>> sources, Function<? super TUPLE,? extends V> combinator)
Note that the Publisher
sources from the outer Publisher
will
accumulate into an exhaustive list before starting zip operation.
TUPLE
- the raw tuple typeV
- The produced output after transformation by the given combinatorsources
- The Publisher
of Publisher
sources to zip. A finite publisher is required.combinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
based on the produced valuepublic final Mono<Boolean> all(Predicate<? super T> predicate)
Predicate
.
The implementation uses short-circuit logic and completes with false if the predicate doesn't match a value.
public final Mono<Boolean> any(Predicate<? super T> predicate)
Flux
sequence match
the predicate.
The implementation uses short-circuit logic and completes with true if the predicate matches a value.
public final <P> P as(Function<? super Flux<T>,P> transformer)
Flux
into a target type.
flux.as(Mono::from).subscribe()
P
- the returned instance typetransformer
- the Function
to immediately map this Flux
into a target type instance.Flux
transformed to an instance of PtransformDeferred(Function) for a lazy transformation of Flux
@Nullable public final T blockFirst()
Flux
and block indefinitely
until the upstream signals its first value or completes. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException
if it was a checked
exception).
Note that each blockFirst() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
@Nullable public final T blockFirst(Duration timeout)
Flux
and block until the upstream
signals its first value, completes or a timeout expires. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException
if it was a checked
exception). If the provided timeout expires, a RuntimeException
is thrown.
Note that each blockFirst() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
timeout
- maximum time period to wait for before raising a RuntimeException
@Nullable public final T blockLast()
Flux
and block indefinitely
until the upstream signals its last value or completes. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException
if it was a checked
exception).
Note that each blockLast() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
@Nullable public final T blockLast(Duration timeout)
Flux
and block until the upstream
signals its last value, completes or a timeout expires. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException
if it was a checked
exception). If the provided timeout expires, a RuntimeException
is thrown.
Note that each blockLast() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
timeout
- maximum time period to wait for before raising a RuntimeException
public final Flux<List<T>> buffer()
List
buffer that will be emitted
by the returned Flux
once this Flux completes.
Discard Support: This operator discards the buffer upon cancellation or error triggered by a data signal.
Flux
of at most one List
for an alternative collecting algorithm returning {@link Mono}
public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, Supplier<C> bufferSupplier)
Collection
buffers that
will be emitted by the returned Flux
each time the given max size is reached
or once this Flux completes.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal, as well as latest unbuffered element if the bufferSupplier fails.
C
- the Collection
buffer typemaxSize
- the maximum collected sizebufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
public final Flux<List<T>> buffer(int maxSize, int skip)
List
buffers that will be emitted
by the returned Flux
each time the given max size is reached or once this
Flux completes. Buffers can be created with gaps, as a new buffer will be created
every time skip
values have been emitted by the source.
When maxSize < skip : dropping buffers
When maxSize > skip : overlapping buffers
When maxSize == skip : exact buffers
Discard Support: This operator discards elements in between buffers (in the case of dropping buffers). It also discards the currently open buffer upon cancellation or error triggered by a data signal. Note however that overlapping buffer variant DOES NOT discard, as this might result in an element being discarded from an early buffer while it is still valid in a more recent buffer.
public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, int skip, Supplier<C> bufferSupplier)
Collection
buffers that
will be emitted by the returned Flux
each time the given max size is reached
or once this Flux completes. Buffers can be created with gaps, as a new buffer will
be created every time skip
values have been emitted by the source
When maxSize < skip : dropping buffers
When maxSize > skip : overlapping buffers
When maxSize == skip : exact buffers
Discard Support: This operator discards elements in between buffers (in the case of dropping buffers). It also discards the currently open buffer upon cancellation or error triggered by a data signal. Note however that overlapping buffer variant DOES NOT discard, as this might result in an element being discarded from an early buffer while it is still valid in a more recent buffer.
C
- the Collection
buffer typeskip
- the number of items to count before creating a new buffermaxSize
- the max collected sizebufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of possibly overlapped or gapped
Collection
public final <C extends Collection<? super T>> Flux<C> buffer(Publisher<?> other, Supplier<C> bufferSupplier)
Collection
buffers, as
delimited by the signals of a companion Publisher
this operator will
subscribe to.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal, and the last received element when the bufferSupplier fails.
C
- the Collection
buffer typeother
- the companion Publisher
whose signals trigger new buffersbufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
delimited by signals from a Publisher
public final Flux<List<T>> buffer(Duration bufferingTimespan, Duration openBufferEvery)
List
buffers created at a given
openBufferEvery
period. Each buffer will last until the bufferingTimespan
has elapsed,
thus emitting the bucket in the resulting Flux
.
When bufferingTimespan < openBufferEvery : dropping buffers
When bufferingTimespan > openBufferEvery : overlapping buffers
When bufferingTimespan == openBufferEvery : exact buffers
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. It DOES NOT provide strong guarantees in the case of overlapping buffers, as elements might get discarded too early (from the first of two overlapping buffers for instance).
public final Flux<List<T>> buffer(Duration bufferingTimespan, Duration openBufferEvery, Scheduler timer)
List
buffers created at a given
openBufferEvery
period, as measured on the provided Scheduler
. Each
buffer will last until the bufferingTimespan
has elapsed (also measured on the scheduler),
thus emitting the bucket in the resulting Flux
.
When bufferingTimespan < openBufferEvery : dropping buffers
When bufferingTimespan > openBufferEvery : overlapping buffers
When bufferingTimespan == openBufferEvery : exact buffers
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. It DOES NOT provide strong guarantees in the case of overlapping buffers, as elements might get discarded too early (from the first of two overlapping buffers for instance).
bufferingTimespan
- the duration from buffer creation until a buffer is closed and emittedopenBufferEvery
- the interval at which to create a new buffertimer
- a time-capable Scheduler
instance to run onFlux
of List
delimited by the given period openBufferEvery and sized by bufferingTimespanpublic final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier)
Collection
buffers that
will be emitted by the returned Flux
each time the buffer reaches a maximum
size OR the maxTime Duration
elapses.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
C
- the Collection
buffer typemaxSize
- the max collected sizemaxTime
- the timeout enforcing the release of a partial bufferbufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
delimited by given size or a given period timeoutpublic final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer)
List
buffers that will be emitted
by the returned Flux
each time the buffer reaches a maximum size OR the
maxTime Duration
elapses, as measured on the provided Scheduler
.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer, Supplier<C> bufferSupplier)
Collection
buffers that
will be emitted by the returned Flux
each time the buffer reaches a maximum
size OR the maxTime Duration
elapses, as measured on the provided Scheduler
.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
C
- the Collection
buffer typemaxSize
- the max collected sizemaxTime
- the timeout enforcing the release of a partial buffertimer
- a time-capable Scheduler
instance to run onbufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
delimited by given size or a given period timeoutpublic final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, boolean fairBackpressure)
List
buffers that will be emitted
by the returned Flux
each time the buffer reaches a maximum size OR the
maxTime Duration
elapses.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
maxSize
- the max collected sizemaxTime
- the timeout enforcing the release of a partial bufferfairBackpressure
- If true
, prefetches maxSize * 4
from upstream and replenishes the buffer when the downstream demand is satisfactory.
When false
, no prefetching takes place and a single buffer is always ready to be pushed downstream.Flux
of List
delimited by given size or a given period timeoutpublic final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer, boolean fairBackpressure)
List
buffers that will be emitted
by the returned Flux
each time the buffer reaches a maximum size OR the
maxTime Duration
elapses, as measured on the provided Scheduler
.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
maxSize
- the max collected sizemaxTime
- the timeout enforcing the release of a partial buffertimer
- a time-capable Scheduler
instance to run onfairBackpressure
- If true
, prefetches maxSize * 4
from upstream and replenishes the buffer when the downstream demand is satisfactory.
When false
, no prefetching takes place and a single buffer is always ready to be pushed downstream.Flux
of List
delimited by given size or a given period timeoutpublic final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier, boolean fairBackpressure)
Collection
buffers that
will be emitted by the returned Flux
each time the buffer reaches a maximum
size OR the maxTime Duration
elapses.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
C
- the Collection
buffer typemaxSize
- the max collected sizemaxTime
- the timeout enforcing the release of a partial bufferbufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferfairBackpressure
- If true
, prefetches maxSize * 4
from upstream and replenishes the buffer when the downstream demand is satisfactory.
When false
, no prefetching takes place and a single buffer is always ready to be pushed downstream.Flux
of Collection
delimited by given size or a given period timeoutpublic final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer, Supplier<C> bufferSupplier, boolean fairBackpressure)
Collection
buffers that
will be emitted by the returned Flux
each time the buffer reaches a maximum
size OR the maxTime Duration
elapses, as measured on the provided Scheduler
.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
C
- the Collection
buffer typemaxSize
- the max collected sizemaxTime
- the timeout enforcing the release of a partial buffertimer
- a time-capable Scheduler
instance to run onbufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferfairBackpressure
- If true
, prefetches maxSize * 4
from upstream and replenishes the buffer when the downstream demand is satisfactory.
When false
, no prefetching takes place and a single buffer is always ready to be pushed downstream.Flux
of Collection
delimited by given size or a given period timeoutpublic final Flux<List<T>> bufferUntil(Predicate<? super T> predicate)
List
buffers that will be emitted by
the resulting Flux
each time the given predicate returns true. Note that
the element that triggers the predicate to return true (and thus closes a buffer)
is included as last element in the emitted buffer.
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
public final Flux<List<T>> bufferUntil(Predicate<? super T> predicate, boolean cutBefore)
List
buffers that will be emitted by
the resulting Flux
each time the given predicate returns true. Note that
the buffer into which the element that triggers the predicate to return true
(and thus closes a buffer) is included depends on the cutBefore
parameter:
set it to true to include the boundary element in the newly opened buffer, false to
include it in the closed buffer (as in bufferUntil(Predicate)
).
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
public final <V> Flux<List<T>> bufferUntilChanged(Function<? super T,? extends V> keySelector)
public final <V> Flux<List<T>> bufferUntilChanged(Function<? super T,? extends V> keySelector, BiPredicate<? super V,? super V> keyComparator)
Function
and compared using a supplied BiPredicate
, into multiple
List
buffers that will be emitted by the resulting Flux
.
public final Flux<List<T>> bufferWhile(Predicate<? super T> predicate)
List
buffers that will be emitted by
the resulting Flux
. Each buffer continues aggregating values while the
given predicate returns true, and a new buffer is created as soon as the
predicate returns false... Note that the element that triggers the predicate
to return false (and thus closes a buffer) is NOT included in any emitted buffer.
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal, as well as the buffer-triggering element.
public final <U,V> Flux<List<T>> bufferWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector)
List
buffers started each time an opening
companion Publisher
emits. Each buffer will last until the corresponding
closing companion Publisher
emits, thus releasing the buffer to the resulting Flux
.
When Open signal is strictly not overlapping Close signal : dropping buffers (see green marbles in diagram below).
When Open signal is strictly more frequent than Close signal : overlapping buffers (see second and third buffers in diagram below).
When Open signal is exactly coordinated with Close signal : exact buffers
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. It DOES NOT provide strong guarantees in the case of overlapping buffers, as elements might get discarded too early (from the first of two overlapping buffers for instance).
U
- the element type of the buffer-opening sequenceV
- the element type of the buffer-closing sequencebucketOpening
- a companion Publisher
to subscribe for buffer creation signals.closeSelector
- a factory that, given a buffer opening signal, returns a companion
Publisher
to subscribe to for buffer closure and emission signals.Flux
of List
delimited by an opening Publisher
and a relative
closing Publisher
public final <U,V,C extends Collection<? super T>> Flux<C> bufferWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector, Supplier<C> bufferSupplier)
Collection
buffers started each time an opening
companion Publisher
emits. Each buffer will last until the corresponding
closing companion Publisher
emits, thus releasing the buffer to the resulting Flux
.
When Open signal is strictly not overlapping Close signal : dropping buffers (see green marbles in diagram below).
When Open signal is strictly more frequent than Close signal : overlapping buffers (see second and third buffers in diagram below).
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. It DOES NOT provide strong guarantees in the case of overlapping buffers, as elements might get discarded too early (from the first of two overlapping buffers for instance).
U
- the element type of the buffer-opening sequenceV
- the element type of the buffer-closing sequenceC
- the Collection
buffer typebucketOpening
- a companion Publisher
to subscribe for buffer creation signals.closeSelector
- a factory that, given a buffer opening signal, returns a companion
Publisher
to subscribe to for buffer closure and emission signals.bufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
delimited by an opening Publisher
and a relative
closing Publisher
public final Flux<T> cache()
Flux
into a hot source and cache last emitted signals for further Subscriber
. Will
retain an unbounded volume of onNext signals. Completion and Error will also be
replayed.
Flux
public final Flux<T> cache(int history)
Flux
into a hot source and cache last emitted signals for further Subscriber
.
Will retain up to the given history size onNext signals. Completion and Error will also be
replayed.
Note that cache(0)
will only cache the terminal signal without
expiration.
history
- number of elements retained in cacheFlux
public final Flux<T> cache(Duration ttl)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain an unbounded history but apply a per-item expiry timeout
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
ttl
- Time-to-live for each cached item and post termination.Flux
public final Flux<T> cache(Duration ttl, Scheduler timer)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain an unbounded history but apply a per-item expiry timeout
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
public final Flux<T> cache(int history, Duration ttl)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain up to the given history size and apply a per-item expiry
timeout.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
history
- number of elements retained in cachettl
- Time-to-live for each cached item and post termination.Flux
public final Flux<T> cache(int history, Duration ttl, Scheduler timer)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain up to the given history size and apply a per-item expiry
timeout.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
public final <E> Flux<E> cast(Class<E> clazz)
Flux
produced type into a target produced type.
public final Flux<T> checkpoint()
Flux
, in case of an error
upstream of the checkpoint. Tracing incurs the cost of an exception stack trace
creation.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with the traceback.
The traceback is attached to the error as a suppressed exception
.
As such, if the error is a composite one
, the traceback
would appear as a component of the composite. In any case, the traceback nature can be detected via
Exceptions.isTraceback(Throwable)
.
Flux
.public final Flux<T> checkpoint(String description)
Flux
by giving it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. Note that unlike checkpoint()
, this doesn't create a
filled stack trace, avoiding the main cost of the operator.
However, as a trade-off the description must be unique enough for the user to find
out where this Flux was assembled. If you only want a generic description, and
still rely on the stack trace to find the assembly site, use the
checkpoint(String, boolean)
variant.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
The traceback is attached to the error as a suppressed exception
.
As such, if the error is a composite one
, the traceback
would appear as a component of the composite. In any case, the traceback nature can be detected via
Exceptions.isTraceback(Throwable)
.
description
- a unique enough description to include in the light assembly traceback.Flux
public final Flux<T> checkpoint(@Nullable String description, boolean forceStackTrace)
forceStackTrace
option).
By setting the forceStackTrace
parameter to true, activate assembly
tracing for this particular Flux
and give it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. Note that unlike checkpoint(String)
, this will incur
the cost of an exception stack trace creation. The description could for
example be a meaningful name for the assembled flux or a wider correlation ID,
since the stack trace will always provide enough information to locate where this
Flux was assembled.
By setting forceStackTrace
to false, behaves like
checkpoint(String)
and is subject to the same caveat in choosing the
description.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly marker.
The traceback is attached to the error as a suppressed exception
.
As such, if the error is a composite one
, the traceback
would appear as a component of the composite. In any case, the traceback nature can be detected via
Exceptions.isTraceback(Throwable)
.
description
- a description (must be unique enough if forceStackTrace is set
to false).forceStackTrace
- false to make a light checkpoint without a stacktrace, true
to use a stack trace.Flux
.public final <E> Mono<E> collect(Supplier<E> containerSupplier, BiConsumer<E,? super T> collector)
Flux
into a user-defined container,
by applying a collector BiConsumer
taking the container and each element.
The collected result will be emitted when this sequence completes, emitting the
empty container if the sequence was empty.
Discard Support: This operator discards the container upon cancellation or error triggered by a data signal.
Either the container type is a Collection
(in which case individual elements are discarded)
or not (in which case the entire container is discarded). In case the collector BiConsumer
fails
to accumulate an element, the container is discarded as above and the triggering element is also discarded.
E
- the container typecontainerSupplier
- the supplier of the container instance for each Subscribercollector
- a consumer of both the container instance and the value being currently collectedMono
of the collected container on completepublic final <R,A> Mono<R> collect(Collector<? super T,A,? extends R> collector)
Flux
into a container,
by applying a Java 8 Stream API Collector
The collected result will be emitted when this sequence completes, emitting
the empty container if the sequence was empty.
Discard Support: This operator discards the intermediate container (see Collector.supplier()
) upon
cancellation, error or exception while applying the Collector.finisher()
. Either the container type
is a Collection
(in which case individual elements are discarded) or not (in which case the entire
container is discarded). In case the accumulator BiConsumer
of the collector fails to accumulate
an element into the intermediate container, the container is discarded as above and the triggering element
is also discarded.
public final <K> Mono<Map<K,T>> collectMap(Function<? super T,? extends K> keyExtractor)
Flux
into a hashed Map
that is
emitted by the resulting Mono
when this sequence completes, emitting the
empty Map
if the sequence was empty.
The key is extracted from each element by applying the keyExtractor
Function
. In case several elements map to the same key, the associated value
will be the most recently emitted element.
Discard Support: This operator discards the whole Map
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the map.
public final <K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor)
Flux
into a hashed Map
that is
emitted by the resulting Mono
when this sequence completes, emitting the
empty Map
if the sequence was empty.
The key is extracted from each element by applying the keyExtractor
Function
, and the value is extracted by the valueExtractor
Function.
In case several elements map to the same key, the associated value will be derived
from the most recently emitted element.
Discard Support: This operator discards the whole Map
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the map.
K
- the type of the key extracted from each source elementV
- the type of the value extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
valueExtractor
- a Function
to map elements to a value for the Map
Mono
of a Map
of key-element pairs (only including latest
element's value in case of key conflicts)public final <K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor, Supplier<Map<K,V>> mapSupplier)
Flux
into a user-defined Map
that is
emitted by the resulting Mono
when this sequence completes, emitting the
empty Map
if the sequence was empty.
The key is extracted from each element by applying the keyExtractor
Function
, and the value is extracted by the valueExtractor
Function.
In case several elements map to the same key, the associated value will be derived
from the most recently emitted element.
Discard Support: This operator discards the whole Map
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the map.
K
- the type of the key extracted from each source elementV
- the type of the value extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
valueExtractor
- a Function
to map elements to a value for the Map
mapSupplier
- a Map
factory called for each Subscriber
Mono
of a Map
of key-value pairs (only including latest
element's value in case of key conflicts)public final <K> Mono<Map<K,Collection<T>>> collectMultimap(Function<? super T,? extends K> keyExtractor)
Flux
into a multimap
that is
emitted by the resulting Mono
when this sequence completes, emitting the
empty multimap
if the sequence was empty.
The key is extracted from each element by applying the keyExtractor
Function
, and every element mapping to the same key is stored in the List
associated to said key.
Discard Support: This operator discards the whole Map
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the list values in the map.
public final <K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor)
Flux
into a multimap
that is
emitted by the resulting Mono
when this sequence completes, emitting the
empty multimap
if the sequence was empty.
The key is extracted from each element by applying the keyExtractor
Function
, and every element mapping to the same key is converted by the
valueExtractor
Function to a value stored in the List
associated to
said key.
Discard Support: This operator discards the whole Map
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the list values in the map.
K
- the type of the key extracted from each source elementV
- the type of the value extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
valueExtractor
- a Function
to map elements to a value for the Map
Mono
of a Map
of key-List(values) pairspublic final <K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor, Supplier<Map<K,Collection<V>>> mapSupplier)
Flux
into a user-defined multimap
that is
emitted by the resulting Mono
when this sequence completes, emitting the
empty multimap
if the sequence was empty.
The key is extracted from each element by applying the keyExtractor
Function
, and every element mapping to the same key is converted by the
valueExtractor
Function to a value stored in the Collection
associated to
said key.
Discard Support: This operator discards the whole Map
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the list values in the map.
K
- the type of the key extracted from each source elementV
- the type of the value extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
valueExtractor
- a Function
to map elements to a value for the Map
mapSupplier
- a multimap (Map
of Collection
) factory called
for each Subscriber
Mono
of a Map
of key-Collection(values) pairspublic final Mono<List<T>> collectSortedList()
Flux
until this sequence completes,
and then sort them in natural order into a List
that is emitted by the
resulting Mono
. If the sequence was empty, empty List
will be emitted.
Discard Support: This operator is based on collectList()
, and as such discards the
elements in the List
individually upon cancellation or error triggered by a data signal.
public final Mono<List<T>> collectSortedList(@Nullable Comparator<? super T> comparator)
Flux
until this sequence completes,
and then sort them using a Comparator
into a List
that is emitted
by the resulting Mono
. If the sequence was empty, empty List
will be emitted.
Discard Support: This operator is based on collectList()
, and as such discards the
elements in the List
individually upon cancellation or error triggered by a data signal.
comparator
- a Comparator
to sort the items of this sequencesMono
of a sorted List
of all values from this Flux
public final <V> Flux<V> concatMap(Function<? super T,? extends Publisher<? extends V>> mapper)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors will immediately short circuit current concat backlog. Note that no prefetching is done on the source, which gets requested only if there is downstream demand.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of VFlux
public final <V> Flux<V> concatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors will immediately short circuit current concat backlog. The prefetch argument
allows to give an arbitrary prefetch size to the upstream source, or to disable
prefetching with 0
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of Vprefetch
- the number of values to prefetch from upstream source, or 0
to disable prefetchingFlux
public final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors in the individual publishers will be delayed at the end of the whole concat
sequence (possibly getting combined into a composite
)
if several sources error.
Note that no prefetching is done on the source, which gets requested only if there
is downstream demand.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of VFlux
public final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors in the individual publishers will be delayed at the end of the whole concat
sequence (possibly getting combined into a composite
)
if several sources error.
The prefetch argument allows to give an arbitrary prefetch size to the upstream source,
or to disable prefetching with 0
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of Vprefetch
- the number of values to prefetch from upstream source, or 0
to disable prefetchingFlux
public final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, boolean delayUntilEnd, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors in the individual publishers will be delayed after the current concat
backlog if delayUntilEnd is false or after all sources if delayUntilEnd is true.
The prefetch argument allows to give an arbitrary prefetch size to the upstream source,
or to disable prefetching with 0
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of VdelayUntilEnd
- delay error until all sources have been consumed instead of
after the current sourceprefetch
- the number of values to prefetch from upstream source, or 0
to disable prefetchingFlux
public final <R> Flux<R> concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
Flux
into Iterable
, then flatten the elements from those by
concatenating them into a single Flux
. For each iterable, Iterable.iterator()
will be called
at least once and at most twice.
This operator inspects each Iterable
's Spliterator
to assess if the iteration
can be guaranteed to be finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)
).
Since the default Spliterator wraps the Iterator we can have two Iterable.iterator()
calls per iterable. This second invocation is skipped on a Collection
however, a type which is
assumed to be always finite.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
Discard Support: Upon cancellation, this operator discards T
elements it prefetched and, in
some cases, attempts to discard remainder of the currently processed Iterable
(if it can
safely ensure the iterator is finite). Note that this means each Iterable
's Iterable.iterator()
method could be invoked twice.
Error Mode Support: This operator supports resuming on errors
(including when fusion is enabled). Exceptions thrown by the consumer are passed to
the onErrorContinue(BiConsumer)
error consumer (the value consumer
is not invoked, as the source element will be part of the sequence). The onNext
signal is then propagated as normal.
public final <R> Flux<R> concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper, int prefetch)
Flux
into Iterable
, then flatten the emissions from those by
concatenating them into a single Flux
.
The prefetch argument allows to give an arbitrary prefetch size to the upstream source.
For each iterable, Iterable.iterator()
will be called at least once and at most twice.
This operator inspects each Iterable
's Spliterator
to assess if the iteration
can be guaranteed to be finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)
).
Since the default Spliterator wraps the Iterator we can have two Iterable.iterator()
calls per iterable. This second invocation is skipped on a Collection
however, a type which is
assumed to be always finite.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
Discard Support: Upon cancellation, this operator discards T
elements it prefetched and, in
some cases, attempts to discard remainder of the currently processed Iterable
(if it can
safely ensure the iterator is finite). Note that this means each Iterable
's Iterable.iterator()
method could be invoked twice.
Error Mode Support: This operator supports resuming on errors
(including when fusion is enabled). Exceptions thrown by the consumer are passed to
the onErrorContinue(BiConsumer)
error consumer (the value consumer
is not invoked, as the source element will be part of the sequence). The onNext
signal is then propagated as normal.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N Iterable
prefetch
- the number of values to request from the source upon subscription, to be transformed to Iterable
Flux
public final Flux<T> contextCapture()
Context
that is visible upstream of this operator.
As a result this operator should generally be used as close as possible to the end of the chain / subscription point.
If the ContextView
visible upstream is not empty, a small subset of operators will automatically
restore the context snapshot (handle
, tap
).
If context-propagation is not available at runtime, this operator simply returns the current Flux
instance.
Flux
where context-propagation API has been used to capture entries and
inject them into the Context
handle(BiConsumer)
,
tap(SignalListenerFactory)
public final Flux<T> contextWrite(ContextView contextToAppend)
Context
visible from downstream for the benefit of upstream
operators, by making all values from the provided ContextView
visible on top
of pairs from downstream.
A Context
(and its ContextView
) is tied to a given subscription
and is read by querying the downstream Subscriber
. Subscriber
that
don't enrich the context instead access their own downstream's context. As a result,
this operator conceptually enriches a Context
coming from under it in the chain
(downstream, by default an empty one) and makes the new enriched Context
visible to operators above it in the chain.
contextToAppend
- the ContextView
to merge with the downstream Context
,
resulting in a new more complete Context
that will be visible from upstream.Flux
ContextView
public final Flux<T> contextWrite(Function<Context,Context> contextModifier)
Context
visible from downstream for the benefit of upstream
operators, by applying a Function
to the downstream Context
.
The Function
takes a Context
for convenience, allowing to easily
call write APIs
to return a new Context
.
A Context
(and its ContextView
) is tied to a given subscription
and is read by querying the downstream Subscriber
. Subscriber
that
don't enrich the context instead access their own downstream's context. As a result,
this operator conceptually enriches a Context
coming from under it in the chain
(downstream, by default an empty one) and makes the new enriched Context
visible to operators above it in the chain.
public final Mono<Long> count()
Flux
.
The count will be emitted when onComplete is observed.
public final Flux<T> defaultIfEmpty(T defaultV)
defaultV
- the alternate value if this sequence is emptyFlux
public final Flux<T> delayElements(Duration delay)
Flux
elements (Subscriber.onNext(T)
signals)
by a given Duration
. Signals are delayed and continue on the
parallel
default Scheduler, but empty sequences or
immediate error signals are not delayed.
delay
- duration by which to delay each Subscriber.onNext(T)
signalFlux
delaySubscription to introduce a delay at the beginning of the sequence only
public final Flux<T> delayElements(Duration delay, Scheduler timer)
Flux
elements (Subscriber.onNext(T)
signals)
by a given Duration
. Signals are delayed and continue on a user-specified
Scheduler
, but empty sequences or immediate error signals are not delayed.
delay
- period to delay each Subscriber.onNext(T)
signaltimer
- a time-capable Scheduler
instance to delay each signal onFlux
public final Flux<T> delaySequence(Duration delay)
Flux
forward in time by a given Duration
.
Unlike with delayElements(Duration)
, elements are shifted forward in time
as they are emitted, always resulting in the delay between two elements being
the same as in the source (only the first element is visibly delayed from the
previous event, that is the subscription).
Signals are delayed and continue on the parallel
Scheduler
, but empty sequences or immediate error signals are not delayed.
With this operator, a source emitting at 10Hz with a delaySequence Duration
of 1s will still emit at 10Hz, with an initial "hiccup" of 1s.
On the other hand, delayElements(Duration)
would end up emitting
at 1Hz.
This is closer to delaySubscription(Duration)
, except the source
is subscribed to immediately.
Discard Support: This operator discards elements currently being delayed * if the sequence is cancelled during the delay.
public final Flux<T> delaySequence(Duration delay, Scheduler timer)
Flux
forward in time by a given Duration
.
Unlike with delayElements(Duration, Scheduler)
, elements are shifted forward in time
as they are emitted, always resulting in the delay between two elements being
the same as in the source (only the first element is visibly delayed from the
previous event, that is the subscription).
Signals are delayed and continue on a user-specified Scheduler
, but empty
sequences or immediate error signals are not delayed.
With this operator, a source emitting at 10Hz with a delaySequence Duration
of 1s will still emit at 10Hz, with an initial "hiccup" of 1s.
On the other hand, delayElements(Duration, Scheduler)
would end up emitting
at 1Hz.
This is closer to delaySubscription(Duration, Scheduler)
, except the source
is subscribed to immediately.
Discard Support: This operator discards elements currently being delayed if the sequence is cancelled during the delay.
public final Flux<T> delayUntil(Function<? super T,? extends Publisher<?>> triggerProvider)
Flux
and generate a Publisher
from each of this
Flux elements, each acting as a trigger for relaying said element.
That is to say, the resulting Flux
delays each of its emission until the
associated trigger Publisher terminates.
In case of an error either in the source or in a trigger, that error is propagated
immediately downstream.
Note that unlike with the Mono variant
there is
no fusion of subsequent calls.
public final Flux<T> delaySubscription(Duration delay)
subscription
to this Flux
source until the given
period elapses. The delay is introduced through the parallel
default Scheduler.
public final Flux<T> delaySubscription(Duration delay, Scheduler timer)
subscription
to this Flux
source until the given
period elapses, as measured on the user-provided Scheduler
.
public final <U> Flux<T> delaySubscription(Publisher<U> subscriptionDelay)
U
- the other source typesubscriptionDelay
- a companion Publisher
whose onNext/onComplete signal will trigger the subscription
Flux
public final <X> Flux<X> dematerialize()
Flux
emits onNext, onError or onComplete Signal
instances, transforming these materialized
signals into
real signals on the Subscriber
.
The error Signal
will trigger onError and complete Signal
will trigger
onComplete.
X
- the dematerialized typeFlux
materialize()
public final Flux<T> distinct()
Subscriber
, track elements from this Flux
that have been
seen and filter out duplicates.
The values themselves are recorded into a HashSet
for distinct detection.
Use distinct(Object::hashcode)
if you want a more lightweight approach that
doesn't retain all the objects, but is more susceptible to falsely considering two
elements as distinct due to a hashcode collision.
Discard Support: This operator discards elements that don't match the distinct predicate,
but you should use the version with a cleanup if you need discarding of keys
categorized by the operator as "seen". See distinct(Function, Supplier, BiPredicate, Consumer)
.
Flux
only emitting distinct valuespublic final <V> Flux<T> distinct(Function<? super T,? extends V> keySelector)
Subscriber
, track elements from this Flux
that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function
.
Discard Support: This operator discards elements that don't match the distinct predicate,
but you should use the version with a cleanup if you need discarding of keys
categorized by the operator as "seen". See distinct(Function, Supplier, BiPredicate, Consumer)
.
V
- the type of the key extracted from each value in this sequencekeySelector
- function to compute comparison key for each elementFlux
only emitting values with distinct keyspublic final <V,C extends Collection<? super V>> Flux<T> distinct(Function<? super T,? extends V> keySelector, Supplier<C> distinctCollectionSupplier)
Subscriber
, track elements from this Flux
that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function
and by the add method
of the Collection
supplied (typically a Set
).
Discard Support: This operator discards elements that don't match the distinct predicate,
but you should use the version with a cleanup if you need discarding of keys
categorized by the operator as "seen". See distinct(Function, Supplier, BiPredicate, Consumer)
.
V
- the type of the key extracted from each value in this sequenceC
- the type of Collection used for distinct checking of keyskeySelector
- function to compute comparison key for each elementdistinctCollectionSupplier
- supplier of the Collection
used for distinct
check through add
of the key.Flux
only emitting values with distinct keyspublic final <V,C> Flux<T> distinct(Function<? super T,? extends V> keySelector, Supplier<C> distinctStoreSupplier, BiPredicate<C,V> distinctPredicate, Consumer<C> cleanup)
Subscriber
, track elements from this Flux
that have been
seen and filter out duplicates, as compared by applying a BiPredicate
on
an arbitrary user-supplied <C>
store and a key extracted through the user
provided Function
. The BiPredicate should typically add the key to the
arbitrary store for further comparison. A cleanup callback is also invoked on the
store upon termination of the sequence.
Discard Support: This operator discards elements that don't match the distinct predicate,
but you should use the cleanup
as well if you need discarding of keys
categorized by the operator as "seen".
V
- the type of the key extracted from each value in this sequenceC
- the type of store backing the BiPredicate
keySelector
- function to compute comparison key for each elementdistinctStoreSupplier
- supplier of the arbitrary store object used in distinct
checks along the extracted key.distinctPredicate
- the BiPredicate
to apply to the arbitrary store +
extracted key to perform a distinct check. Since nothing is assumed of the store,
this predicate should also add the key to the store as necessary.cleanup
- the cleanup callback to invoke on the store upon termination.Flux
only emitting values with distinct keyspublic final Flux<T> distinctUntilChanged()
The last distinct value seen is retained for further comparison, which is done
on the values themselves using the equals method
.
Use distinctUntilChanged(Object::hashcode)
if you want a more lightweight approach that
doesn't retain all the objects, but is more susceptible to falsely considering two
elements as distinct due to a hashcode collision.
Discard Support: Although this operator discards elements that are considered as "already seen", it is not recommended for cases where discarding is needed as the operator doesn't discard the "key" (in this context, the distinct instance that was last seen).
Flux
with only one occurrence in a row of each element
(yet elements can repeat in the overall sequence)public final <V> Flux<T> distinctUntilChanged(Function<? super T,? extends V> keySelector)
Function
using equality.
Discard Support: This operator discards elements that are considered as "already seen". The keys themselves are not discarded.
V
- the type of the key extracted from each value in this sequencekeySelector
- function to compute comparison key for each elementFlux
with only one occurrence in a row of each element of
the same key (yet element keys can repeat in the overall sequence)public final <V> Flux<T> distinctUntilChanged(Function<? super T,? extends V> keySelector, BiPredicate<? super V,? super V> keyComparator)
Function
and then comparing keys with the supplied BiPredicate
.
Discard Support: This operator discards elements that are considered as "already seen"
(for which the keyComparator
returns true). The keys themselves
are not discarded.
V
- the type of the key extracted from each value in this sequencekeySelector
- function to compute comparison key for each elementkeyComparator
- predicate used to compare keys.Flux
with only one occurrence in a row of each element
of the same key for which the predicate returns true (yet element keys can repeat
in the overall sequence)public final Flux<T> doAfterTerminate(Runnable afterTerminate)
Flux
terminates, either by completing downstream successfully or with an error.
The relevant signal is propagated downstream, then the Runnable
is executed.
afterTerminate
- the callback to call after Subscriber.onComplete()
or Subscriber.onError(java.lang.Throwable)
Flux
public final Flux<T> doOnCancel(Runnable onCancel)
Flux
is cancelled.
The handler is executed first, then the cancel signal is propagated upstream to the source.
onCancel
- the callback to call on Subscription.cancel()
Flux
public final Flux<T> doOnComplete(Runnable onComplete)
Flux
completes successfully.
The Runnable
is executed first, then the onComplete signal is propagated
downstream.
onComplete
- the callback to call on Subscriber.onComplete()
Flux
public final <R> Flux<T> doOnDiscard(Class<R> type, Consumer<? super R> discardHook)
The discardHook
MUST be idempotent and safe to use on any instance of the desired
type.
Calls to this method are additive, and the order of invocation of the discardHook
is the same as the order of declaration (calling .filter(...).doOnDiscard(first).doOnDiscard(second)
will let the filter invoke first
then second
handlers).
Two main categories of discarding operators exist:
type
- the Class
of elements in the upstream chain of operators that
this cleanup hook should take into account.discardHook
- a Consumer
of elements in the upstream chain of operators
that performs the cleanup.Flux
that cleans up matching elements that get discarded upstream of it.public final Flux<T> doOnEach(Consumer<? super Signal<T>> signalConsumer)
Flux
emits an item, fails with an error
or completes successfully. All these events are represented as a Signal
that is passed to the side-effect callback. Note that this is an advanced operator,
typically used for monitoring of a Flux. These Signal
have a Context
associated to them.
The Consumer
is executed first, then the relevant signal is propagated
downstream.
signalConsumer
- the mandatory callback to call on
Subscriber.onNext(Object)
, Subscriber.onError(Throwable)
and
Subscriber.onComplete()
Flux
doOnNext(Consumer)
,
doOnError(Consumer)
,
doOnComplete(Runnable)
,
materialize()
,
Signal
public final Flux<T> doOnError(Consumer<? super Throwable> onError)
Flux
completes with an error.
The Consumer
is executed first, then the onError signal is propagated
downstream.
onError
- the callback to call on Subscriber.onError(java.lang.Throwable)
Flux
public final <E extends Throwable> Flux<T> doOnError(Class<E> exceptionType, Consumer<? super E> onError)
Flux
completes with an error matching the given exception type.
The Consumer
is executed first, then the onError signal is propagated
downstream.
E
- type of the error to handleexceptionType
- the type of exceptions to handleonError
- the error handler for each errorFlux
public final Flux<T> doOnError(Predicate<? super Throwable> predicate, Consumer<? super Throwable> onError)
Flux
completes with an error matching the given exception.
The Consumer
is executed first, then the onError signal is propagated
downstream.
predicate
- the matcher for exceptions to handleonError
- the error handler for each errorFlux
public final Flux<T> doOnNext(Consumer<? super T> onNext)
Flux
emits an item.
The Consumer
is executed first, then the onNext signal is propagated
downstream.
Error Mode Support: This operator supports resuming on errors
(including when fusion is enabled). Exceptions thrown by the consumer are passed to
the onErrorContinue(BiConsumer)
error consumer (the value consumer
is not invoked, as the source element will be part of the sequence). The onNext
signal is then propagated as normal.
onNext
- the callback to call on Subscriber.onNext(T)
Flux
public final Flux<T> doOnRequest(LongConsumer consumer)
LongConsumer
when this Flux
receives any request.
Note that non fatal error raised in the callback will not be propagated and
will simply trigger Operators.onOperatorError(Throwable, Context)
.
The LongConsumer
is executed first, then the request signal is propagated
upstream to the parent.
consumer
- the consumer to invoke on each requestFlux
public final Flux<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe)
Flux
is being subscribed,
that is to say when a Subscription
has been produced by the Publisher
and is being passed to the Subscriber.onSubscribe(Subscription)
.
This method is not intended for capturing the subscription and calling its methods,
but for side effects like monitoring. For instance, the correct way to cancel a subscription is
to call Disposable.dispose()
on the Disposable returned by subscribe()
.
The Consumer
is executed first, then the Subscription
is propagated
downstream to the next subscriber in the chain that is being established.
onSubscribe
- the callback to call on Subscriber.onSubscribe(org.reactivestreams.Subscription)
Flux
doFirst(Runnable)
public final Flux<T> doOnTerminate(Runnable onTerminate)
Flux
terminates, either by
completing successfully or failing with an error.
The Runnable
is executed first, then the onComplete/onError signal is propagated
downstream.
onTerminate
- the callback to call on Subscriber.onComplete()
or Subscriber.onError(java.lang.Throwable)
Flux
public final Flux<T> doFirst(Runnable onFirst)
Flux
is
subscribed to, which should be the first event after assembly time.
Note that when several doFirst(Runnable)
operators are used anywhere in a
chain of operators, their order of execution is reversed compared to the declaration
order (as subscribe signal flows backward, from the ultimate subscriber to the source
publisher):
Flux.just(1, 2)
.doFirst(() -> System.out.println("three"))
.doFirst(() -> System.out.println("two"))
.doFirst(() -> System.out.println("one"));
//would print one two three
In case the Runnable
throws an exception, said exception will be directly
propagated to the subscribing Subscriber
along with a no-op Subscription
,
similarly to what error(Throwable)
does. Otherwise, after the handler has
executed, the Subscriber
is directly subscribed to the original source
Flux
(this
).
This side-effect method provides stronger first guarantees compared to
doOnSubscribe(Consumer)
, which is triggered once the Subscription
has been set up and passed to the Subscriber
.
public final Flux<T> doFinally(Consumer<SignalType> onFinally)
Flux
terminates for any reason,
including cancellation. The terminating event (SignalType.ON_COMPLETE
,
SignalType.ON_ERROR
and SignalType.CANCEL
) is passed to the consumer,
which is executed after the signal has been passed downstream.
Note that the fact that the signal is propagated downstream before the callback is
executed means that several doFinally in a row will be executed in
reverse order. If you want to assert the execution of the callback
please keep in mind that the Flux will complete before it is executed, so its
effect might not be visible immediately after eg. a blockLast()
.
onFinally
- the callback to execute after a terminal signal (complete, error
or cancel)Flux
public final Flux<Tuple2<Long,T>> elapsed()
Flux
into Tuple2<Long, T>
of timemillis and source data. The timemillis corresponds to the elapsed time
between each signal as measured by the parallel
scheduler.
First duration is measured between the subscription and the first element.
Flux
that emits a tuple of time elapsed in milliseconds and matching datatimed()
,
Timed.elapsed()
public final Flux<Tuple2<Long,T>> elapsed(Scheduler scheduler)
Flux
into Tuple2<Long, T>
of timemillis and source data. The timemillis corresponds to the elapsed time
between each signal as measured by the provided Scheduler
.
First duration is measured between the subscription and the first element.
scheduler
- a Scheduler
instance to read time from
Flux
that emits tuples of time elapsed in milliseconds and matching datatimed(Scheduler)
,
Timed.elapsed()
public final Mono<T> elementAt(int index)
IndexOutOfBoundsException
if the sequence is shorter.
Discard Support: This operator discards elements that appear before the requested index.
index
- zero-based index of the only item to emitMono
of the item at the specified zero-based indexpublic final Mono<T> elementAt(int index, T defaultValue)
Discard Support: This operator discards elements that appear before the requested index.
index
- zero-based index of the only item to emitdefaultValue
- a default value to emit if the sequence is shorterMono
of the item at the specified zero-based index or a default valuepublic final Flux<T> expandDeep(Function<? super T,? extends Publisher<? extends T>> expander, int capacityHint)
That is: emit one value from this Flux
, expand it and emit the first value
at this first level of recursion, and so on... When no more recursion is possible,
backtrack to the previous level and re-apply the strategy.
For example, given the hierarchical structure
A - AA - aa1 B - BB - bb1Expands
Flux.just(A, B)
into
A AA aa1 B BB bb1
public final Flux<T> expandDeep(Function<? super T,? extends Publisher<? extends T>> expander)
That is: emit one value from this Flux
, expand it and emit the first value
at this first level of recursion, and so on... When no more recursion is possible,
backtrack to the previous level and re-apply the strategy.
For example, given the hierarchical structure
A - AA - aa1 B - BB - bb1Expands
Flux.just(A, B)
into
A AA aa1 B BB bb1
public final Flux<T> expand(Function<? super T,? extends Publisher<? extends T>> expander, int capacityHint)
That is: emit the values from this Flux
first, then expand each at a first level of
recursion and emit all of the resulting values, then expand all of these at a second
level and so on.
For example, given the hierarchical structure
A - AA - aa1 B - BB - bb1Expands
Flux.just(A, B)
into
A B AA BB aa1 bb1
public final Flux<T> expand(Function<? super T,? extends Publisher<? extends T>> expander)
That is: emit the values from this Flux
first, then expand each at a first level of
recursion and emit all of the resulting values, then expand all of these at a second
level and so on..
For example, given the hierarchical structure
A - AA - aa1 B - BB - bb1Expands
Flux.just(A, B)
into
A B AA BB aa1 bb1
public final Flux<T> filter(Predicate<? super T> p)
Predicate
. If the predicate test succeeds, the value is
emitted. If the predicate test fails, the value is ignored and a request of 1 is made upstream.
Discard Support: This operator discards elements that do not match the filter. It also discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.
Error Mode Support: This operator supports resuming on errors
(including when fusion is enabled). Exceptions thrown by the predicate are
considered as if the predicate returned false: they cause the source value to be
dropped and a new element (request(1)
) being requested from upstream.
public final Flux<T> filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate)
Flux
asynchronously using a generated
Publisher<Boolean>
test. A value is replayed if the first item emitted
by its corresponding test is true. It is dropped if its test is either
empty or its first emitted value is false.
Note that only the first value of the test publisher is considered, and unless it
is a Mono
, test will be cancelled after receiving that first value. Test
publishers are generated and subscribed to in sequence.
Discard Support: This operator discards elements that do not match the filter. It also discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.
public final Flux<T> filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate, int bufferSize)
Flux
asynchronously using a generated
Publisher<Boolean>
test. A value is replayed if the first item emitted
by its corresponding test is true. It is dropped if its test is either
empty or its first emitted value is false.
Note that only the first value of the test publisher is considered, and unless it
is a Mono
, test will be cancelled after receiving that first value. Test
publishers are generated and subscribed to in sequence.
Discard Support: This operator discards elements that do not match the filter. It also discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.
asyncPredicate
- the function generating a Publisher
of Boolean
for each value, to filter the Flux withbufferSize
- the maximum expected number of values to hold pending a result of
their respective asynchronous predicates, rounded to the next power of two. This is
capped depending on the size of the heap and the JVM limits, so be careful with
large values (although eg. 65536 should still be fine). Also serves as
the initial request size for the source.Flux
public final <R> Flux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
through merging,
which allow them to interleave.
There are three dimensions to this operator that can be compared with
flatMapSequential
and concatMap
:
Discard Support: This operator discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.
Error Mode Support: This operator supports resuming on errors
in the mapper Function
. Exceptions thrown by the mapper then behave as if
it had mapped the value to an empty publisher. If the mapper does map to a scalar
publisher (an optimization in which the value can be resolved immediately without
subscribing to the publisher, e.g. a Mono.fromCallable(Callable)
) but said
publisher throws, this can be resumed from in the same manner.
public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
through merging,
which allow them to interleave.
There are three dimensions to this operator that can be compared with
flatMapSequential
and concatMap
:
Publisher
can be
subscribed to and merged in parallel. In turn, that argument shows the size of
the first Subscription.request(long)
to the upstream.
Discard Support: This operator discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.
Error Mode Support: This operator supports resuming on errors
in the mapper Function
. Exceptions thrown by the mapper then behave as if
it had mapped the value to an empty publisher. If the mapper does map to a scalar
publisher (an optimization in which the value can be resolved immediately without
subscribing to the publisher, e.g. a Mono.fromCallable(Callable)
) but said
publisher throws, this can be resumed from in the same manner.
public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
through merging,
which allow them to interleave.
There are three dimensions to this operator that can be compared with
flatMapSequential
and concatMap
:
Publisher
can be
subscribed to and merged in parallel. In turn, that argument shows the size of
the first Subscription.request(long)
to the upstream.
The prefetch argument allows to give an arbitrary prefetch size to the merged
Publisher
(in other words prefetch size means the size of the first
Subscription.request(long)
to the merged Publisher
).
Discard Support: This operator discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.
Error Mode Support: This operator supports resuming on errors
in the mapper Function
. Exceptions thrown by the mapper then behave as if
it had mapped the value to an empty publisher. If the mapper does map to a scalar
publisher (an optimization in which the value can be resolved immediately without
subscribing to the publisher, e.g. a Mono.fromCallable(Callable)
) but said
publisher throws, this can be resumed from in the same manner.
V
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
concurrency
- the maximum number of in-flight inner sequencesprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
public final <V> Flux<V> flatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
through merging,
which allow them to interleave.
There are three dimensions to this operator that can be compared with
flatMapSequential
and concatMap
:
Publisher
can be
subscribed to and merged in parallel. The prefetch argument allows to give an
arbitrary prefetch size to the merged Publisher
. This variant will delay
any error until after the rest of the flatMap backlog has been processed.
Discard Support: This operator discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.
Error Mode Support: This operator supports resuming on errors
in the mapper Function
. Exceptions thrown by the mapper then behave as if
it had mapped the value to an empty publisher. If the mapper does map to a scalar
publisher (an optimization in which the value can be resolved immediately without
subscribing to the publisher, e.g. a Mono.fromCallable(Callable)
) but said
publisher throws, this can be resumed from in the same manner.
V
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
concurrency
- the maximum number of in-flight inner sequencesprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
public final <R> Flux<R> flatMap(@Nullable Function<? super T,? extends Publisher<? extends R>> mapperOnNext, @Nullable Function<? super Throwable,? extends Publisher<? extends R>> mapperOnError, @Nullable Supplier<? extends Publisher<? extends R>> mapperOnComplete)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
through merging,
which allow them to interleave. Note that at least one of the signal mappers must
be provided, and all provided mappers must produce a publisher.
There are three dimensions to this operator that can be compared with
flatMapSequential
and concatMap
:
OnError will be transformed into completion signal after its mapping callback has been applied.
R
- the output Publisher
type targetmapperOnNext
- the Function
to call on next data and returning a sequence to merge.
Use null to ignore (provided at least one other mapper is specified).mapperOnError
- the Function
to call on error signal and returning a sequence to merge.
Use null to ignore (provided at least one other mapper is specified).mapperOnComplete
- the Function
to call on complete signal and returning a sequence to merge.
Use null to ignore (provided at least one other mapper is specified).Flux
public final <R> Flux<R> flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
Flux
into Iterable
, then flatten the elements from those by
merging them into a single Flux
. For each iterable, Iterable.iterator()
will be called at least
once and at most twice.
This operator inspects each Iterable
's Spliterator
to assess if the iteration
can be guaranteed to be finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)
).
Since the default Spliterator wraps the Iterator we can have two Iterable.iterator()
calls per iterable. This second invocation is skipped on a Collection
however, a type which is
assumed to be always finite.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
Discard Support: Upon cancellation, this operator discards T
elements it prefetched and, in
some cases, attempts to discard remainder of the currently processed Iterable
(if it can
safely ensure the iterator is finite). Note that this means each Iterable
's Iterable.iterator()
method could be invoked twice.
Error Mode Support: This operator supports resuming on errors
(including when fusion is enabled). Exceptions thrown by the consumer are passed to
the onErrorContinue(BiConsumer)
error consumer (the value consumer
is not invoked, as the source element will be part of the sequence). The onNext
signal is then propagated as normal.
public final <R> Flux<R> flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper, int prefetch)
Flux
into Iterable
, then flatten the emissions from those by
merging them into a single Flux
. The prefetch argument allows to give an
arbitrary prefetch size to the upstream source.
For each iterable, Iterable.iterator()
will be called at least once and at most twice.
This operator inspects each Iterable
's Spliterator
to assess if the iteration
can be guaranteed to be finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)
).
Since the default Spliterator wraps the Iterator we can have two Iterable.iterator()
calls per iterable. This second invocation is skipped on a Collection
however, a type which is
assumed to be always finite.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
Discard Support: Upon cancellation, this operator discards T
elements it prefetched and, in
some cases, attempts to discard remainder of the currently processed Iterable
(if it can
safely ensure the iterator is finite).
Note that this means each Iterable
's Iterable.iterator()
method could be invoked twice.
Error Mode Support: This operator supports resuming on errors
(including when fusion is enabled). Exceptions thrown by the consumer are passed to
the onErrorContinue(BiConsumer)
error consumer (the value consumer
is not invoked, as the source element will be part of the sequence). The onNext
signal is then propagated as normal.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N Iterable
prefetch
- the number of values to request from the source upon subscription, to be transformed to Iterable
Flux
public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, but merge them in
the order of their source element.
There are three dimensions to this operator that can be compared with
flatMap
and concatMap
:
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher
. However, if such an early inner takes more time to complete than
subsequent faster inners, the data from these faster inners will be queued until
the earlier inner completes, so as to maintain source ordering.
public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, but merge them in
the order of their source element.
There are three dimensions to this operator that can be compared with
flatMap
and concatMap
:
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher
. However, if such an early inner takes more time to complete than
subsequent faster inners, the data from these faster inners will be queued until
the earlier inner completes, so as to maintain source ordering.
The concurrency argument allows to control how many merged Publisher
can happen in parallel.
public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, but merge them in
the order of their source element.
There are three dimensions to this operator that can be compared with
flatMap
and concatMap
:
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher
. However, if such an early inner takes more time to complete than
subsequent faster inners, the data from these faster inners will be queued until
the earlier inner completes, so as to maintain source ordering.
The concurrency argument allows to control how many merged Publisher
can happen in parallel. The prefetch argument allows to give an arbitrary prefetch
size to the merged Publisher
.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
maxConcurrency
- the maximum number of in-flight inner sequencesprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
, subscribing early but keeping the original orderingpublic final <R> Flux<R> flatMapSequentialDelayError(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, but merge them in
the order of their source element.
There are three dimensions to this operator that can be compared with
flatMap
and concatMap
:
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher
. However, if such an early inner takes more time to complete than
subsequent faster inners, the data from these faster inners will be queued until
the earlier inner completes, so as to maintain source ordering.
The concurrency argument allows to control how many merged Publisher
can happen in parallel. The prefetch argument allows to give an arbitrary prefetch
size to the merged Publisher
. This variant will delay any error until after the
rest of the flatMap backlog has been processed.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
maxConcurrency
- the maximum number of in-flight inner sequencesprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
, subscribing early but keeping the original orderingpublic int getPrefetch()
Flux
Flux
, -1 if unspecifiedpublic final <K> Flux<GroupedFlux<K,T>> groupBy(Function<? super T,? extends K> keyMapper)
Flux
(or groups) for each
unique key, as produced by the provided keyMapper Function
. Note that
groupBy works best with a low cardinality of groups, so chose your keyMapper
function accordingly.
The groups need to be drained and consumed downstream for groupBy to work correctly.
Notably when the criteria produces a large amount of groups, it can lead to hanging
if the groups are not suitably consumed downstream (eg. due to a flatMap
with a maxConcurrency
parameter that is set too low).
Note that groups are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a specific group more than once: groups are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
K
- the key type extracted from each value of this sequencekeyMapper
- the key mapping Function
that evaluates an incoming data and returns a key.Flux
of GroupedFlux
grouped sequencespublic final <K> Flux<GroupedFlux<K,T>> groupBy(Function<? super T,? extends K> keyMapper, int prefetch)
Flux
(or groups) for each
unique key, as produced by the provided keyMapper Function
. Note that
groupBy works best with a low cardinality of groups, so chose your keyMapper
function accordingly.
The groups need to be drained and consumed downstream for groupBy to work correctly.
Notably when the criteria produces a large amount of groups, it can lead to hanging
if the groups are not suitably consumed downstream (eg. due to a flatMap
with a maxConcurrency
parameter that is set too low).
Note that groups are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a specific group more than once: groups are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
K
- the key type extracted from each value of this sequencekeyMapper
- the key mapping Function
that evaluates an incoming data and returns a key.prefetch
- the number of values to prefetch from the sourceFlux
of GroupedFlux
grouped sequencespublic final <K,V> Flux<GroupedFlux<K,V>> groupBy(Function<? super T,? extends K> keyMapper, Function<? super T,? extends V> valueMapper)
Flux
(or groups) for each
unique key, as produced by the provided keyMapper Function
. Source elements
are also mapped to a different value using the valueMapper
. Note that
groupBy works best with a low cardinality of groups, so chose your keyMapper
function accordingly.
The groups need to be drained and consumed downstream for groupBy to work correctly.
Notably when the criteria produces a large amount of groups, it can lead to hanging
if the groups are not suitably consumed downstream (eg. due to a flatMap
with a maxConcurrency
parameter that is set too low).
Note that groups are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a specific group more than once: groups are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
K
- the key type extracted from each value of this sequenceV
- the value type extracted from each value of this sequencekeyMapper
- the key mapping function that evaluates an incoming data and returns a key.valueMapper
- the value mapping function that evaluates which data to extract for re-routing.Flux
of GroupedFlux
grouped sequencespublic final <K,V> Flux<GroupedFlux<K,V>> groupBy(Function<? super T,? extends K> keyMapper, Function<? super T,? extends V> valueMapper, int prefetch)
Flux
(or groups) for each
unique key, as produced by the provided keyMapper Function
. Source elements
are also mapped to a different value using the valueMapper
. Note that
groupBy works best with a low cardinality of groups, so chose your keyMapper
function accordingly.
The groups need to be drained and consumed downstream for groupBy to work correctly.
Notably when the criteria produces a large amount of groups, it can lead to hanging
if the groups are not suitably consumed downstream (eg. due to a flatMap
with a maxConcurrency
parameter that is set too low).
Note that groups are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a specific group more than once: groups are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
K
- the key type extracted from each value of this sequenceV
- the value type extracted from each value of this sequencekeyMapper
- the key mapping function that evaluates an incoming data and returns a key.valueMapper
- the value mapping function that evaluates which data to extract for re-routing.prefetch
- the number of values to prefetch from the sourceFlux
of GroupedFlux
grouped sequencespublic final <TRight,TLeftEnd,TRightEnd,R> Flux<R> groupJoin(Publisher<? extends TRight> other, Function<? super T,? extends Publisher<TLeftEnd>> leftEnd, Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd, BiFunction<? super T,? super Flux<TRight>,? extends R> resultSelector)
Flux
and a Flux
emitting the value from the other
Publisher
to a BiFunction
.
There are no guarantees in what order the items get combined when multiple items from one or both source Publishers overlap.
Unlike join(org.reactivestreams.Publisher<? extends TRight>, java.util.function.Function<? super T, ? extends org.reactivestreams.Publisher<TLeftEnd>>, java.util.function.Function<? super TRight, ? extends org.reactivestreams.Publisher<TRightEnd>>, java.util.function.BiFunction<? super T, ? super TRight, ? extends R>)
, items from the second Publisher
will be provided
as a Flux
to the resultSelector
.
TRight
- the type of the elements from the right Publisher
TLeftEnd
- the type for this Flux
window signalsTRightEnd
- the type for the right Publisher
window signalsR
- the combined result typeother
- the other Publisher
to correlate items withleftEnd
- a function that returns a Publisher whose emissions indicate the
time window for the source value to be consideredrightEnd
- a function that returns a Publisher whose emissions indicate the
time window for the right
Publisher value to be consideredresultSelector
- a function that takes an item emitted by this Flux
and
a Flux
representation of the overlapping item from the other Publisher
and returns the value to be emitted by the resulting Flux
Flux
join(Publisher, Function, Function, BiFunction)
public final <R> Flux<R> handle(BiConsumer<? super T,SynchronousSink<R>> handler)
Flux
by calling a biconsumer with the
output sink for each onNext. At most one SynchronousSink.next(Object)
call must be performed and/or 0 or 1 SynchronousSink.error(Throwable)
or
SynchronousSink.complete()
.
Error Mode Support: This operator supports resuming on errors
(including when
fusion is enabled) when the BiConsumer
throws an exception or if an error is signaled explicitly via
SynchronousSink.error(Throwable)
.
When the context-propagation library
is available at runtime and the downstream ContextView
is not empty, this operator implicitly uses the
library to restore thread locals around the handler BiConsumer
. Typically, this would be done in conjunction
with the use of contextCapture()
operator down the chain.
R
- the transformed typehandler
- the handling BiConsumer
Flux
public final Mono<Boolean> hasElement(T value)
Flux
sequence is
equal to the provided value.
The implementation uses short-circuit logic and completes with true if an element matches the value.
value
- constant compared to incoming signalsFlux
with true
if any element is equal to a given value and false
otherwisepublic final Mono<Boolean> hasElements()
Flux
sequence has at least one element.
The implementation uses short-circuit logic and completes with true on onNext.
Mono
with true
if any value is emitted and false
otherwisepublic Flux<T> hide()
Flux
instance.
The main purpose of this operator is to prevent certain identity-based optimizations from happening, mostly for diagnostic purposes.
Flux
preventing Publisher
/ Subscription
based Reactor optimizationspublic final Flux<Tuple2<Long,T>> index()
Flux
of Tuple2<(index, value)>
.
Flux
with each source value combined with its 0-based index.public final <I> Flux<I> index(BiFunction<? super Long,? super T,? extends I> indexMapper)
I
using the provided BiFunction
,
returning a Flux<I>
.
Typical usage would be to produce a Tuple2
similar to index()
, but
1-based instead of 0-based:
index((i, v) -> Tuples.of(i+1, v))
indexMapper
- the BiFunction
to use to combine elements and their index.Flux
with each source value combined with its computed index.public final Mono<T> ignoreElements()
Discard Support: This operator discards the upstream's elements.
public final <TRight,TLeftEnd,TRightEnd,R> Flux<R> join(Publisher<? extends TRight> other, Function<? super T,? extends Publisher<TLeftEnd>> leftEnd, Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd, BiFunction<? super T,? super TRight,? extends R> resultSelector)
Function
. If the
Publisher signals its first value or completes, the time windows for the original
element is immediately closed. The emitted elements are obtained by passing the
values from this Flux
and the other Publisher
to a BiFunction
.
There are no guarantees in what order the items get combined when multiple items from one or both source Publishers overlap.
TRight
- the type of the elements from the right Publisher
TLeftEnd
- the type for this Flux
window signalsTRightEnd
- the type for the right Publisher
window signalsR
- the combined result typeother
- the other Publisher
to correlate items withleftEnd
- a function that returns a Publisher whose emissions indicate the
time window for the source value to be consideredrightEnd
- a function that returns a Publisher whose emissions indicate the
time window for the right
Publisher value to be consideredresultSelector
- a function that takes an item emitted by each Publisher and returns the
value to be emitted by the resulting Flux
Flux
groupJoin(Publisher, Function, Function, BiFunction)
public final Mono<T> last()
Mono
, or emit
NoSuchElementException
error if the source was empty.
For a passive version use takeLast(int)
Discard Support: This operator discards elements before the last.
public final Mono<T> last(T defaultValue)
Mono
, or emit
the defaultValue
if the source was empty.
For a passive version use takeLast(int)
Discard Support: This operator discards elements before the last.
public final Flux<T> limitRate(int prefetchRate)
prefetchRate
when propagated upstream, effectively
rate limiting the upstream Publisher
.
Note that this is an upper bound, and that this operator uses a prefetch-and-replenish strategy, requesting a replenishing amount when 75% of the prefetch amount has been emitted.
Typically used for scenarios where consumer(s) request a large amount of data
(eg. Long.MAX_VALUE
) but the data source behaves better or can be optimized
with smaller requests (eg. database paging, etc...). All data is still processed,
unlike with take(long)
which will cap the grand total request
amount.
Equivalent to flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe()
.
Note that the prefetchRate
is an upper bound, and that this operator uses a
prefetch-and-replenish strategy, requesting a replenishing amount when 75% of the
prefetch amount has been emitted.
prefetchRate
- the limit to apply to downstream's backpressureFlux
limiting downstream's backpressurepublishOn(Scheduler, int)
,
take(long)
public final Flux<T> limitRate(int highTide, int lowTide)
highTide
first, then replenishing at the provided
lowTide
, effectively rate limiting the upstream Publisher
.
Note that this is an upper bound, and that this operator uses a prefetch-and-replenish strategy, requesting a replenishing amount when 75% of the prefetch amount has been emitted.
Typically used for scenarios where consumer(s) request a large amount of data
(eg. Long.MAX_VALUE
) but the data source behaves better or can be optimized
with smaller requests (eg. database paging, etc...). All data is still processed,
unlike with take(long)
which will cap the grand total request
amount.
Similar to flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe()
,
except with a customized "low tide" instead of the default 75%.
Note that the smaller the lowTide is, the higher the potential for concurrency
between request and data production. And thus the more extraneous replenishment
requests this operator could make. For example, for a global downstream
request of 14, with a highTide of 10 and a lowTide of 2, the operator would perform
low tide requests (request(2)
) seven times in a row, whereas with the default
lowTide of 8 it would only perform one low tide request (request(8)
).
Using a lowTide
equal to highTide
reverts to the default 75% strategy,
while using a lowTide
of 0 disables the lowTide, resulting in
all requests strictly adhering to the highTide.
highTide
- the initial request amountlowTide
- the subsequent (or replenishing) request amount, 0 to
disable early replenishing, highTide to revert to a 75% replenish strategy.Flux
limiting downstream's backpressure and customizing the
replenishment request amountpublishOn(Scheduler, int)
,
take(long)
@Deprecated public final Flux<T> limitRequest(long n)
take(n, true)
in 3.4.x, then take(long)
in 3.5.0.
To be removed in 3.6.0 at the earliest. See https://github.com/reactor/reactor-core/issues/2339Flux
, if available.
Furthermore, ensure that the total amount requested upstream is capped at n
.
If n is zero, the source isn't even subscribed to and the operator completes immediately
upon subscription.
Backpressure signals from downstream subscribers are smaller than the cap are propagated as is, but if they would cause the total requested amount to go over the cap, they are reduced to the minimum value that doesn't go over.
As a result, this operator never let the upstream produce more elements than the cap. Typically useful for cases where a race between request and cancellation can lead the upstream to producing a lot of extraneous data, and such a production is undesirable (e.g. a source that would send the extraneous data over the network).
n
- the number of elements to emit from this flux, which is also the backpressure
cap for all of downstream's requestFlux
of n
elements from the source, that requests AT MOST n
from upstream in total.take(long)
,
take(long, boolean)
public final Flux<T> log()
Logger
support.
Default will use Level.INFO
and java.util.logging
.
If SLF4J is available, it will be used instead.
The default log category will be "reactor.Flux.", followed by a suffix generated from the source operator, e.g. "reactor.Flux.Map".
Flux
that logs signalspublic final Flux<T> log(String category)
Logger
support.
Default will use Level.INFO
and java.util.logging
.
If SLF4J is available, it will be used instead.
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will be added, e.g. "reactor.Flux.Map".Flux
that logs signalspublic final Flux<T> log(@Nullable String category, Level level, SignalType... options)
options
and
trace them using Logger
support. Default will use Level.INFO
and
java.util.logging
. If SLF4J is available, it will be used instead.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
flux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will be added, e.g. "reactor.Flux.Map".level
- the Level
to enforce for this tracing Flux (only FINEST, FINE,
INFO, WARNING and SEVERE are taken into account)options
- a vararg SignalType
option to filter log messagesFlux
that logs signalspublic final Flux<T> log(@Nullable String category, Level level, boolean showOperatorLine, SignalType... options)
options
and
trace them using Logger
support. Default will use Level.INFO
and
java.util.logging
. If SLF4J is available, it will be used instead.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
flux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will be added, e.g. "reactor.Flux.Map".level
- the Level
to enforce for this tracing Flux (only FINEST, FINE,
INFO, WARNING and SEVERE are taken into account)showOperatorLine
- capture the current stack to display operator class/line number.options
- a vararg SignalType
option to filter log messagesFlux
that logs signalspublic final Flux<T> log(Logger logger)
options
and
trace them using a specific user-provided Logger
, at Level.INFO
level.
public final Flux<T> log(Logger logger, Level level, boolean showOperatorLine, SignalType... options)
options
and
trace them using a specific user-provided Logger
, at the given Level
.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
flux.log(myCustomLogger, Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
logger
- the Logger
to use, instead of resolving one through a category.level
- the Level
to enforce for this tracing Flux (only FINEST, FINE,
INFO, WARNING and SEVERE are taken into account)showOperatorLine
- capture the current stack to display operator class/line number (default in overload is false).options
- a vararg SignalType
option to filter log messagesFlux
that logs signalspublic final <V> Flux<V> map(Function<? super T,? extends V> mapper)
Flux
by applying a synchronous function
to each item.
Error Mode Support: This operator supports resuming on errors
(including when fusion is enabled). Exceptions thrown by the mapper then cause the
source value to be dropped and a new element (request(1)
) being requested
from upstream.
public final <V> Flux<V> mapNotNull(Function<? super T,? extends V> mapper)
Flux
by applying a synchronous function
to each item, which may produce null
values. In that case, no value is emitted.
This operator effectively behaves like map(Function)
followed by filter(Predicate)
although null
is not a supported value, so it can't be filtered out.
Error Mode Support: This operator supports resuming on errors
(including when fusion is enabled). Exceptions thrown by the mapper then cause the
source value to be dropped and a new element (request(1)
) being requested
from upstream.
public final Flux<Signal<T>> materialize()
Signal
instances,
materializing these signals.
Since the error is materialized as a Signal
, the propagation will be stopped and onComplete will be
emitted. Complete signal will first emit a Signal.complete()
and then effectively complete the flux.
All these Signal
have a Context
associated to them.
Flux
of materialized Signal
dematerialize()
@Deprecated public final Flux<T> mergeOrderedWith(Publisher<? extends T> other, Comparator<? super T> otherComparator)
mergeComparingWith(Publisher, Comparator)
instead
(with the caveat that it defaults to NOT delaying errors, unlike this operator).
To be removed in 3.6.0 at the earliest.Flux
and a Publisher
into a reordered merge
sequence, by picking the smallest value from each sequence as defined by a provided
Comparator
. Note that subsequent calls are combined, and their comparators are
in lexicographic order as defined by Comparator.thenComparing(Comparator)
.
The combination step is avoided if the two Comparators
are
equal
(which can easily be achieved by using the
same reference, and is also always true of Comparator.naturalOrder()
).
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
Note that it is delaying errors until all data is consumed.
other
- the Publisher
to merge withotherComparator
- the Comparator
to use for mergingFlux
that compares latest values from the given publisher
and this flux, using the smallest value and replenishing the source that produced itpublic final Flux<T> mergeComparingWith(Publisher<? extends T> other, Comparator<? super T> otherComparator)
Flux
and a Publisher
into a reordered merge
sequence, by picking the smallest value from each sequence as defined by a provided
Comparator
. Note that subsequent calls are combined, and their comparators are
in lexicographic order as defined by Comparator.thenComparing(Comparator)
.
The combination step is avoided if the two Comparators
are
equal
(which can easily be achieved by using the
same reference, and is also always true of Comparator.naturalOrder()
).
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
mergeComparingWith doesn't delay errors by default, but it will inherit the delayError behavior of a mergeComparingDelayError directly above it.
other
- the Publisher
to merge withotherComparator
- the Comparator
to use for mergingFlux
that compares latest values from the given publisher
and this flux, using the smallest value and replenishing the source that produced itpublic final Flux<T> mergeWith(Publisher<? extends T> other)
Flux
and a Publisher
into an interleaved merged
sequence. Unlike concat
, inner sources are subscribed
to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@Deprecated public final Flux<T> metrics()
tap(SignalListenerFactory)
with the SignalListenerFactory
provided by
the new reactor-core-micrometer module. To be removed in 3.6.0 at the earliest.
Metrics are gathered on Subscriber
events, and it is recommended to also
name
(and optionally tag
) the
sequence.
The name serves as a prefix in the reported metrics names. In case no name has been provided, the default name "reactor" will be applied.
The MeterRegistry
used by reactor can be configured via
Metrics.MicrometerConfiguration.useRegistry(MeterRegistry)
prior to using this operator, the default being
Metrics.globalRegistry
.
Flux
name(String)
,
tag(String, String)
public final Flux<T> name(String name)
Scannable.name()
as long as this is the first reachable Scannable.parents()
.
The name is typically visible at assembly time by the tap(SignalListenerFactory)
operator,
which could for example be configured with a metrics listener using the name as a prefix for meters' id.
name
- a name for the sequencemetrics()
,
tag(String, String)
public final Flux<T> onBackpressureBuffer()
Flux
, or park the
observed elements if not enough demand is requested downstream. Errors will be
delayed until the buffer gets consumed.
Discard Support: This operator discards the buffered overflow elements upon cancellation or error triggered by a data signal.
Flux
that buffers with unbounded capacitypublic final Flux<T> onBackpressureBuffer(int maxSize)
Flux
, or park up to
maxSize
elements when not enough demand is requested downstream.
The first element past this buffer to arrive out of sync with the downstream
subscriber's demand (the "overflowing" element) immediately triggers an overflow
error and cancels the source.
The Flux
is going to terminate with an overflow error, but this error is
delayed, which lets the subscriber make more requests for the content of the buffer.
Discard Support: This operator discards the buffered overflow elements upon cancellation or error triggered by a data signal,
as well as elements that are rejected by the buffer due to maxSize
.
maxSize
- maximum number of elements overflowing request before the source is cancelledFlux
that buffers with bounded capacitypublic final Flux<T> onBackpressureBuffer(int maxSize, Consumer<? super T> onOverflow)
Flux
, or park up to
maxSize
elements when not enough demand is requested downstream.
The first element past this buffer to arrive out of sync with the downstream
subscriber's demand (the "overflowing" element) is immediately passed to a
Consumer
and the source is cancelled.
The Flux
is going to terminate with an overflow error, but this error is
delayed, which lets the subscriber make more requests for the content of the buffer.
Note that should the cancelled source produce further overflowing elements, these
would be passed to the onNextDropped hook
.
Discard Support: This operator discards the buffered overflow elements upon cancellation or error triggered by a data signal,
as well as elements that are rejected by the buffer due to maxSize
(even though
they are passed to the onOverflow
Consumer
first).
maxSize
- maximum number of elements overflowing request before callback is called and source is cancelledonOverflow
- callback to invoke on overflowFlux
that buffers with a bounded capacitypublic final Flux<T> onBackpressureBuffer(int maxSize, BufferOverflowStrategy bufferOverflowStrategy)
Flux
, or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. Over that limit, the overflow strategy is applied (see BufferOverflowStrategy
).
Note that for the ERROR
strategy, the overflow
error will be delayed after the current backlog is consumed.
Discard Support: This operator discards the buffered overflow elements upon cancellation or error triggered by a data signal,
as well as elements that are rejected by the buffer due to maxSize
(even though
they are passed to the bufferOverflowStrategy
first).
maxSize
- maximum buffer backlog size before overflow strategy is appliedbufferOverflowStrategy
- strategy to apply to overflowing elementsFlux
that buffers up to a capacity then applies an
overflow strategypublic final Flux<T> onBackpressureBuffer(int maxSize, Consumer<? super T> onBufferOverflow, BufferOverflowStrategy bufferOverflowStrategy)
Flux
, or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. Over that limit, the overflow strategy is applied (see BufferOverflowStrategy
).
A Consumer
is immediately invoked when there is an overflow, receiving the
value that was discarded because of the overflow (which can be different from the
latest element emitted by the source in case of a
DROP_LATEST
strategy).
Note that for the ERROR
strategy, the overflow
error will be delayed after the current backlog is consumed. The consumer is still
invoked immediately.
Discard Support: This operator discards the buffered overflow elements upon cancellation or error triggered by a data signal,
as well as elements that are rejected by the buffer due to maxSize
(even though
they are passed to the onOverflow
Consumer
AND the bufferOverflowStrategy
first).
maxSize
- maximum buffer backlog size before overflow callback is calledonBufferOverflow
- callback to invoke on overflowbufferOverflowStrategy
- strategy to apply to overflowing elementsFlux
that buffers up to a capacity then applies an
overflow strategypublic final Flux<T> onBackpressureBuffer(Duration ttl, int maxSize, Consumer<? super T> onBufferEviction)
Flux
, or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit and for a maximum Duration
of ttl
(as measured on the
parallel Scheduler
). Over that limit, oldest
elements from the source are dropped.
Elements evicted based on the TTL are passed to a cleanup Consumer
, which
is also immediately invoked when there is an overflow.
Discard Support: This operator discards its internal buffer of elements that overflow,
after having applied the onBufferEviction
handler.
ttl
- maximum Duration
for which an element is kept in the backlogmaxSize
- maximum buffer backlog size before overflow callback is calledonBufferEviction
- callback to invoke once TTL is reached or on overflowFlux
that buffers with a TTL and up to a capacity then applies an
overflow strategypublic final Flux<T> onBackpressureBuffer(Duration ttl, int maxSize, Consumer<? super T> onBufferEviction, Scheduler scheduler)
Flux
, or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit and for a maximum Duration
of ttl
(as measured on the provided
Scheduler
). Over that limit, oldest elements from the source are dropped.
Elements evicted based on the TTL are passed to a cleanup Consumer
, which
is also immediately invoked when there is an overflow.
Discard Support: This operator discards its internal buffer of elements that overflow,
after having applied the onBufferEviction
handler.
ttl
- maximum Duration
for which an element is kept in the backlogmaxSize
- maximum buffer backlog size before overflow callback is calledonBufferEviction
- callback to invoke once TTL is reached or on overflowscheduler
- the scheduler on which to run the timeout checkFlux
that buffers with a TTL and up to a capacity then applies an
overflow strategypublic final Flux<T> onBackpressureDrop()
Flux
, or drop
the observed elements if not enough demand is requested downstream.
Discard Support: This operator discards elements that it drops.
Flux
that drops overflowing elementspublic final Flux<T> onBackpressureDrop(Consumer<? super T> onDropped)
Flux
, or drop and
notify dropping Consumer
with the observed elements if not enough demand
is requested downstream.
Discard Support: This operator discards elements that it drops after having passed
them to the provided onDropped
handler.
onDropped
- the Consumer called when a value gets dropped due to lack of downstream requestsFlux
that drops overflowing elementspublic final Flux<T> onBackpressureError()
Flux
, or emit onError
fom Exceptions.failWithOverflow()
if not enough demand is requested
downstream.
Discard Support: This operator discards elements that it drops, after having propagated the error.
Flux
that errors on overflowing elementspublic final Flux<T> onBackpressureLatest()
Flux
, or only keep
the most recent observed item if not enough demand is requested downstream.
Discard Support: Each time a new element comes in (the new "latest"), this operator discards the previously retained element.
Flux
that will only keep a reference to the last observed itempublic final Flux<T> onErrorComplete()
onError signal
with an onComplete signal
. All other signals are propagated as-is.
Flux
falling back on completion when an onError occursonErrorReturn(Object)
public final Flux<T> onErrorComplete(Class<? extends Throwable> type)
onError signal
with an onComplete signal
if the error matches the given
Class
. All other signals, including non-matching onError, are propagated as-is.
Flux
falling back on completion when a matching error occursonErrorReturn(Class, Object)
public final Flux<T> onErrorComplete(Predicate<? super Throwable> predicate)
onError signal
with an onComplete signal
if the error matches the given
Predicate
. All other signals, including non-matching onError, are propagated as-is.
Flux
falling back on completion when a matching error occursonErrorReturn(Predicate, Object)
public final Flux<T> onErrorContinue(BiConsumer<Throwable,Object> errorConsumer)
BiConsumer
.
Alternatively, throwing from that biconsumer will propagate the thrown exception downstream
in place of the original error, which is added as a suppressed exception to the new one.
Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)
In most cases, you should instead handle the error inside the specific function which may cause
it. Specifically, on each inner publisher you can use doOnError
to log the error, and
onErrorResume(e -> Mono.empty())
to drop erroneous elements:
.flatMap(id -> repository.retrieveById(id) .doOnError(System.err::println) .onErrorResume(e -> Mono.empty()))
This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.
errorConsumer
- a BiConsumer
fed with errors matching the predicate and the value
that triggered the error.Flux
that attempts to continue processing on errors.public final <E extends Throwable> Flux<T> onErrorContinue(Class<E> type, BiConsumer<Throwable,Object> errorConsumer)
type
are recovered from.
The recovered error and associated value are notified via the provided BiConsumer
.
Alternatively, throwing from that biconsumer will propagate the thrown exception downstream
in place of the original error, which is added as a suppressed exception to the new one.
Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)
In most cases, you should instead handle the error inside the specific function which may cause
it. Specifically, on each inner publisher you can use doOnError
to log the error, and
onErrorResume(e -> Mono.empty())
to drop erroneous elements:
.flatMap(id -> repository.retrieveById(id) .doOnError(MyException.class, System.err::println) .onErrorResume(MyException.class, e -> Mono.empty()))
This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.
type
- the Class
of Exception
that are resumed from.errorConsumer
- a BiConsumer
fed with errors matching the Class
and the value that triggered the error.Flux
that attempts to continue processing on some errors.public final <E extends Throwable> Flux<T> onErrorContinue(Predicate<E> errorPredicate, BiConsumer<Throwable,Object> errorConsumer)
Predicate
are recovered from (note that this
predicate can be applied several times and thus must be idempotent).
The recovered error and associated value are notified via the provided BiConsumer
.
Alternatively, throwing from that biconsumer will propagate the thrown exception downstream
in place of the original error, which is added as a suppressed exception to the new one.
Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)
In most cases, you should instead handle the error inside the specific function which may cause
it. Specifically, on each inner publisher you can use doOnError
to log the error, and
onErrorResume(e -> Mono.empty())
to drop erroneous elements:
.flatMap(id -> repository.retrieveById(id) .doOnError(errorPredicate, System.err::println) .onErrorResume(errorPredicate, e -> Mono.empty()))
This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.
errorPredicate
- a Predicate
used to filter which errors should be resumed from.
This MUST be idempotent, as it can be used several times.errorConsumer
- a BiConsumer
fed with errors matching the predicate and the value
that triggered the error.Flux
that attempts to continue processing on some errors.public final Flux<T> onErrorStop()
onErrorContinue(BiConsumer)
variant has been used downstream, reverts
to the default 'STOP' mode where errors are terminal events upstream. It can be
used for easier scoping of the on next failure strategy or to override the
inherited strategy in a sub-stream (for example in a flatMap). It has no effect if
onErrorContinue(BiConsumer)
has not been used downstream.Flux
that terminates on errors, even if onErrorContinue(BiConsumer)
was used downstreampublic final Flux<T> onErrorMap(Function<? super Throwable,? extends Throwable> mapper)
Flux
by synchronously applying a function to it.
public final <E extends Throwable> Flux<T> onErrorMap(Class<E> type, Function<? super E,? extends Throwable> mapper)
Flux
by synchronously applying a function
to it if the error matches the given type. Otherwise let the error pass through.
public final Flux<T> onErrorMap(Predicate<? super Throwable> predicate, Function<? super Throwable,? extends Throwable> mapper)
Flux
by synchronously applying a function
to it if the error matches the given predicate. Otherwise let the error pass through.
public final Flux<T> onErrorResume(Function<? super Throwable,? extends Publisher<? extends T>> fallback)
public final <E extends Throwable> Flux<T> onErrorResume(Class<E> type, Function<? super E,? extends Publisher<? extends T>> fallback)
public final Flux<T> onErrorResume(Predicate<? super Throwable> predicate, Function<? super Throwable,? extends Publisher<? extends T>> fallback)
public final Flux<T> onErrorReturn(T fallbackValue)
Flux
.
fallbackValue
- the value to emit if an error occursFlux
onErrorComplete()
public final <E extends Throwable> Flux<T> onErrorReturn(Class<E> type, T fallbackValue)
Flux
.
E
- the error typetype
- the error type to matchfallbackValue
- the value to emit if an error occurs that matches the typeFlux
onErrorComplete(Class)
public final Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)
Flux
.
predicate
- the error predicate to matchfallbackValue
- the value to emit if an error occurs that matches the predicateFlux
onErrorComplete(Predicate)
public final Flux<T> onTerminateDetach()
Subscriber
and the Subscription
on
termination or cancellation.
This is an advanced interoperability operator that should help with odd
retention scenarios when running with non-reactor Subscriber
.
Flux
public final Flux<T> or(Publisher<? extends T> other)
Publisher
between this Flux
and another publisher
to emit any signal (onNext/onError/onComplete) and replay all signals from that
Publisher
, effectively behaving like the fastest of these competing sources.
other
- the Publisher
to race withfirstWithSignal(org.reactivestreams.Publisher<? extends I>...)
public final ParallelFlux<T> parallel()
Flux
by dividing data on a number of 'rails' matching the
number of CPU cores, in a round-robin fashion. Note that to actually perform the
work in parallel, you should call ParallelFlux.runOn(Scheduler)
afterward.
ParallelFlux
instancepublic final ParallelFlux<T> parallel(int parallelism)
Flux
by dividing data on a number of 'rails' matching the
provided parallelism
parameter, in a round-robin fashion. Note that to
actually perform the work in parallel, you should call ParallelFlux.runOn(Scheduler)
afterward.
parallelism
- the number of parallel railsParallelFlux
instancepublic final ParallelFlux<T> parallel(int parallelism, int prefetch)
Flux
by dividing data on a number of 'rails' matching the
provided parallelism
parameter, in a round-robin fashion and using a
custom prefetch amount and queue for dealing with the source Flux
's values.
Note that to actually perform the work in parallel, you should call
ParallelFlux.runOn(Scheduler)
afterward.
parallelism
- the number of parallel railsprefetch
- the number of values to prefetch from the sourceParallelFlux
instancepublic final ConnectableFlux<T> publish()
ConnectableFlux
which shares this Flux
sequence and
dispatches values to subscribers in a backpressure-aware manner. Prefetch will
default to Queues.SMALL_BUFFER_SIZE
. This will effectively turn
any type of sequence into a hot sequence.
Backpressure will be coordinated on Subscription.request(long)
and if any
Subscriber
is missing demand (requested = 0), multicast will pause
pushing/pulling.
ConnectableFlux
public final ConnectableFlux<T> publish(int prefetch)
ConnectableFlux
which shares this Flux
sequence and
dispatches values to subscribers in a backpressure-aware manner. This will
effectively turn any type of sequence into a hot sequence.
Backpressure will be coordinated on Subscription.request(long)
and if any
Subscriber
is missing demand (requested = 0), multicast will pause
pushing/pulling.
prefetch
- bounded requested demandConnectableFlux
public final <R> Flux<R> publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform)
R
- the output value typetransform
- the transformation functionFlux
public final <R> Flux<R> publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform, int prefetch)
R
- the output value typetransform
- the transformation functionprefetch
- the request sizeFlux
@Deprecated public final Mono<T> publishNext()
shareNext()
instead, or use `publish().next()` if you need
to `connect()
. To be removed in 3.5.0Mono
which shares this Flux
sequence and dispatches the
first observed item to subscribers in a backpressure-aware manner.
This will effectively turn any type of sequence into a hot sequence when the first
Subscriber
subscribes.
Mono
public final Flux<T> publishOn(Scheduler scheduler)
Scheduler
Worker
.
This operator influences the threading context where the rest of the operators in
the chain below it will execute, up to a new occurrence of publishOn
.
Typically used for fast publisher, slow consumer(s) scenarios.
flux.publishOn(Schedulers.single()).subscribe()
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
scheduler
- a Scheduler
providing the Scheduler.Worker
where to publishFlux
producing asynchronously on a given Scheduler
public final Flux<T> publishOn(Scheduler scheduler, int prefetch)
Scheduler
Scheduler.Worker
.
This operator influences the threading context where the rest of the operators in
the chain below it will execute, up to a new occurrence of publishOn
.
Typically used for fast publisher, slow consumer(s) scenarios.
flux.publishOn(Schedulers.single()).subscribe()
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
scheduler
- a Scheduler
providing the Scheduler.Worker
where to publishprefetch
- the asynchronous boundary capacityFlux
producing asynchronouslypublic final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch)
Scheduler
Scheduler.Worker
.
This operator influences the threading context where the rest of the operators in
the chain below it will execute, up to a new occurrence of publishOn
.
Typically used for fast publisher, slow consumer(s) scenarios.
flux.publishOn(Schedulers.single()).subscribe()
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
scheduler
- a Scheduler
providing the Scheduler.Worker
where to publishdelayError
- should the buffer be consumed before forwarding any errorprefetch
- the asynchronous boundary capacityFlux
producing asynchronouslypublic final Mono<T> reduce(BiFunction<T,T,T> aggregator)
Flux
sequence into a single object of the same
type than the emitted items. Reduction is performed using a BiFunction
that
takes the intermediate result of the reduction and the current value and returns
the next intermediate value of the reduction. Note, BiFunction
will not
be invoked for a sequence with 0 or 1 elements. In case of one element's
sequence, the result will be directly sent to the subscriber.
Discard Support: This operator discards the internally accumulated value upon cancellation or error.
aggregator
- the reducing BiFunction
Flux
public final <A> Mono<A> reduce(A initial, BiFunction<A,? super T,A> accumulator)
Flux
sequence into a single object matching the
type of a seed value. Reduction is performed using a BiFunction
that
takes the intermediate result of the reduction and the current value and returns
the next intermediate value of the reduction. First element is paired with the seed
value, initial.
Discard Support: This operator discards the internally accumulated value upon cancellation or error.
A
- the type of the seed and the reduced objectaccumulator
- the reducing BiFunction
initial
- the seed, the initial leftmost argument to pass to the reducing BiFunction
Flux
public final <A> Mono<A> reduceWith(Supplier<A> initial, BiFunction<A,? super T,A> accumulator)
Flux
sequence into a single object matching the
type of a lazily supplied seed value. Reduction is performed using a
BiFunction
that takes the intermediate result of the reduction and the
current value and returns the next intermediate value of the reduction. First
element is paired with the seed value, supplied via initial.
Discard Support: This operator discards the internally accumulated value upon cancellation or error.
A
- the type of the seed and the reduced objectaccumulator
- the reducing BiFunction
initial
- a Supplier
of the seed, called on subscription and passed to the the reducing BiFunction
Flux
public final Flux<T> repeat()
Flux
on onCompletepublic final Flux<T> repeat(BooleanSupplier predicate)
predicate
- the boolean to evaluate on onComplete.Flux
that repeats on onComplete while the predicate matchespublic final Flux<T> repeat(long numRepeat)
numRepeat
times. This results in
numRepeat + 1
total subscriptions to the original source. As a consequence,
using 0 plays the original sequence once.
numRepeat
- the number of times to re-subscribe on onComplete (positive, or 0 for original sequence only)Flux
that repeats on onComplete, up to the specified number of repetitionspublic final Flux<T> repeat(long numRepeat, BooleanSupplier predicate)
numRepeat
- the number of times to re-subscribe on complete (positive, or 0 for original sequence only)predicate
- the boolean to evaluate on onCompleteFlux
that repeats on onComplete while the predicate matches,
up to the specified number of repetitionspublic final Flux<T> repeatWhen(Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
Flux
when a companion sequence emits elements in
response to the flux completion signal. Any terminal signal from the companion
sequence will terminate the resulting Flux
with the same signal immediately.
If the companion sequence signals when this Flux
is active, the repeat
attempt is suppressed.
Note that if the companion Publisher
created by the repeatFactory
emits Context
as trigger objects, these Context
will be merged with
the previous Context:
.repeatWhen(emittedEachAttempt -> emittedEachAttempt.handle((lastEmitted, sink) -> {
Context ctx = sink.currentContext();
int rl = ctx.getOrDefault("repeatsLeft", 0);
if (rl > 0) {
sink.next(Context.of(
"repeatsLeft", rl - 1,
"emitted", lastEmitted
));
} else {
sink.error(new IllegalStateException("repeats exhausted"));
}
}))
repeatFactory
- the Function
that returns the associated Publisher
companion, given a Flux
that signals each onComplete as a Long
representing the number of source elements emitted in the latest attempt.Flux
that repeats on onComplete when the companion Publisher
produces an
onNext signalpublic final ConnectableFlux<T> replay()
Flux
into a hot source and cache last emitted signals for further Subscriber
. Will
retain an unbounded amount of onNext signals. Completion and Error will also be
replayed.
ConnectableFlux
public final ConnectableFlux<T> replay(int history)
Flux
into a connectable hot source and cache last emitted
signals for further Subscriber
.
Will retain up to the given history size onNext signals. Completion and Error will also be
replayed.
Note that replay(0)
will only cache the terminal signal without
expiration.
Re-connects are not supported.
history
- number of events retained in history excluding complete and
errorConnectableFlux
public final ConnectableFlux<T> replay(Duration ttl)
Flux
into a connectable hot source and cache last emitted signals
for further Subscriber
. Will retain each onNext up to the given per-item
expiry timeout.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription
ttl
- Per-item and post termination timeout durationConnectableFlux
public final ConnectableFlux<T> replay(int history, Duration ttl)
Flux
into a connectable hot source and cache last emitted signals
for further Subscriber
. Will retain up to the given history size onNext
signals with a per-item ttl.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription
history
- number of events retained in history excluding complete and errorttl
- Per-item and post termination timeout durationConnectableFlux
public final ConnectableFlux<T> replay(Duration ttl, Scheduler timer)
Flux
into a connectable hot source and cache last emitted signals
for further Subscriber
. Will retain onNext signal for up to the given
Duration
with a per-item ttl.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription
ttl
- Per-item and post termination timeout durationtimer
- a time-capable Scheduler
instance to read current time fromConnectableFlux
public final ConnectableFlux<T> replay(int history, Duration ttl, Scheduler timer)
Flux
into a connectable hot source and cache last emitted signals
for further Subscriber
. Will retain up to the given history size onNext
signals with a per-item ttl.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription
history
- number of events retained in history excluding complete and errorttl
- Per-item and post termination timeout durationtimer
- a Scheduler
instance to read current time fromConnectableFlux
public final Flux<T> retry()
Flux
sequence if it signals any error, indefinitely.
Flux
that retries on onErrorpublic final Flux<T> retry(long numRetries)
Flux
sequence if it signals any error, for a fixed
number of times.
Note that passing Long.MAX_VALUE is treated as infinite retry.
numRetries
- the number of times to tolerate an errorFlux
that retries on onError up to the specified number of retry attempts.public final Flux<T> retryWhen(Retry retrySpec)
Flux
in response to signals emitted by a companion Publisher
.
The companion is generated by the provided Retry
instance, see Retry.max(long)
, Retry.maxInARow(long)
and Retry.backoff(long, Duration)
for readily available strategy builders.
The operator generates a base for the companion, a Flux
of Retry.RetrySignal
which each give metadata about each retryable failure whenever this Flux
signals an error. The final companion
should be derived from that base companion and emit data in response to incoming onNext (although it can emit less
elements, or delay the emissions).
Terminal signals in the companion terminate the sequence with the same signal, so emitting an Subscriber.onError(Throwable)
will fail the resulting Flux
with that same error.
Note that the Retry.RetrySignal
state can be
transient and change between each source
onError
or
onNext
. If processed with a delay,
this could lead to the represented state being out of sync with the state at which the retry
was evaluated. Map it to Retry.RetrySignal.copy()
right away to mediate this.
Note that if the companion Publisher
created by the whenFactory
emits Context
as trigger objects, these Context
will be merged with
the previous Context:
Retry customStrategy = Retry.from(companion -> companion.handle((retrySignal, sink) -> { ContextView ctx = sink.contextView(); int rl = ctx.getOrDefault("retriesLeft", 0); if (rl > 0) { sink.next(Context.of( "retriesLeft", rl - 1, "lastError", retrySignal.failure() )); } else { sink.error(Exceptions.retryExhausted("retries exhausted", retrySignal.failure())); } })); Flux<T> retried = originalFlux.retryWhen(customStrategy);
retrySpec
- the Retry
strategy that will generate the companion Publisher
,
given a Flux
that signals each onError as a Retry.RetrySignal
.Flux
that retries on onError when a companion Publisher
produces an onNext signalRetry.max(long)
,
Retry.maxInARow(long)
,
Retry.backoff(long, Duration)
public final Flux<T> sample(Duration timespan)
Flux
by periodically emitting an item corresponding to that
Flux
latest emitted value within the periodical time window.
Note that if some elements are emitted quicker than the timespan just before source
completion, the last of these elements will be emitted along with the onComplete
signal.
Discard Support: This operator discards elements that are not part of the sampling.
timespan
- the duration of the window after which to emit the latest observed itemFlux
sampled to the last item seen over each periodic windowpublic final <U> Flux<T> sample(Publisher<U> sampler)
Flux
by emitting an item corresponding to that Flux
latest emitted value whenever a companion sampler Publisher
signals a value.
Termination of either Publisher
will result in termination for the Subscriber
as well.
Note that if some elements are emitted just before source completion and before a
last sampler can trigger, the last of these elements will be emitted along with the
onComplete signal.
Both Publisher
will run in unbounded mode because the backpressure
would interfere with the sampling precision.
Discard Support: This operator discards elements that are not part of the sampling.
public final Flux<T> sampleFirst(Duration timespan)
Flux
then skip the values that follow
within a given duration.
Discard Support: This operator discards elements that are not part of the sampling.
timespan
- the duration during which to skip values after each sampleFlux
sampled to the first item of each duration-based windowpublic final <U> Flux<T> sampleFirst(Function<? super T,? extends Publisher<U>> samplerFactory)
public final <U> Flux<T> sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory)
Flux
only if there were no new values emitted
during the window defined by a companion Publisher
derived from that particular
value.
Note that this means that the last value in the sequence is always emitted.
Discard Support: This operator discards elements that are not part of the sampling.
U
- the companion reified typethrottlerFactory
- supply a companion sampler Publisher
which signals
the end of the window during which no new emission should occur. If it is the case,
the original value triggering the window is emitted.Flux
sampled to items not followed by any other item within a window
defined by a companion Publisher
public final <U> Flux<T> sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory, int maxConcurrency)
Flux
only if there were no new values emitted
during the window defined by a companion Publisher
derived from that particular
value.
The provided maxConcurrency will keep a bounded maximum of concurrent timeouts and drop any new items until at least one timeout terminates.
Note that this means that the last value in the sequence is always emitted.
Discard Support: This operator discards elements that are not part of the sampling.
U
- the throttling typethrottlerFactory
- supply a companion sampler Publisher
which signals
the end of the window during which no new emission should occur. If it is the case,
the original value triggering the window is emitted.maxConcurrency
- the maximum number of concurrent timeoutsFlux
sampled to items not followed by any other item within a window
defined by a companion Publisher
public final Flux<T> scan(BiFunction<T,T,T> accumulator)
Flux
values with an accumulator BiFunction
and
also emit the intermediate results of this function.
Unlike scan(Object, BiFunction)
, this operator doesn't take an initial value
but treats the first Flux
value as initial value.
The accumulation works as follows:
result[0] = source[0]
result[1] = accumulator(result[0], source[1])
result[2] = accumulator(result[1], source[2])
result[3] = accumulator(result[2], source[3])
...
accumulator
- the accumulating BiFunction
Flux
public final <A> Flux<A> scan(A initial, BiFunction<A,? super T,A> accumulator)
Flux
values with an accumulator BiFunction
and
also emit the intermediate results of this function.
The accumulation works as follows:
result[0] = initialValue;
result[1] = accumulator(result[0], source[0])
result[2] = accumulator(result[1], source[1])
result[3] = accumulator(result[2], source[2])
...
A
- the accumulated typeinitial
- the initial leftmost argument to pass to the reduce functionaccumulator
- the accumulating BiFunction
Flux
starting with initial statepublic final <A> Flux<A> scanWith(Supplier<A> initial, BiFunction<A,? super T,A> accumulator)
Flux
values with the help of an accumulator BiFunction
and also emits the intermediate results. A seed value is lazily provided by a
Supplier
invoked for each Subscriber
.
The accumulation works as follows:
result[0] = initialValue;
result[1] = accumulator(result[0], source[0])
result[2] = accumulator(result[1], source[1])
result[3] = accumulator(result[2], source[2])
...
A
- the accumulated typeinitial
- the supplier providing the seed, the leftmost parameter initially
passed to the reduce functionaccumulator
- the accumulating BiFunction
Flux
starting with initial statepublic final Flux<T> share()
Flux
that multicasts (shares) the original Flux
.
As long as there is at least one Subscriber
this Flux
will be subscribed and
emitting data.
When all subscribers have cancelled it will cancel the source
Flux
.
This is an alias for publish()
.ConnectableFlux.refCount()
.
public final Mono<T> shareNext()
Mono
which shares this Flux
sequence and dispatches the
first observed item to subscribers.
This will effectively turn any type of sequence into a hot sequence when the first
Subscriber
subscribes.
Mono
public final Mono<T> single()
Flux
source or signal
NoSuchElementException
for an empty source, or
IndexOutOfBoundsException
for a source with more than one element.
Mono
with the single item or an error signalpublic final Mono<T> single(T defaultValue)
Flux
source and emit a default
value for an empty source, but signal an IndexOutOfBoundsException
for a
source with more than one element.
public final Mono<T> singleOrEmpty()
Flux
source, and accept an empty
source but signal an IndexOutOfBoundsException
for a source with more than
one element.
Mono
with the expected single item, no item or an errorpublic final Flux<T> skip(long skipped)
Flux
then
emit the remaining elements.
Discard Support: This operator discards elements that are skipped.
skipped
- the number of elements to dropFlux
with the specified number of elements skipped at
the beginningpublic final Flux<T> skip(Duration timespan)
Flux
emitted within the specified initial duration.
Discard Support: This operator discards elements that are skipped.
timespan
- the initial time window during which to drop elementsFlux
dropping at the beginning until the end of the given durationpublic final Flux<T> skipLast(int n)
Flux
sequence.
Discard Support: This operator discards elements that are skipped.
n
- the number of elements to drop before completionFlux
dropping the specified number of elements at the end of the
sequencepublic final Flux<T> sort()
Flux
by collecting and sorting them in the background
then emitting the sorted sequence once this sequence completes.
Each item emitted by the Flux
must implement Comparable
with
respect to all other items in the sequence.
Note that calling sort
with long, non-terminating or infinite sources
might cause OutOfMemoryError
. Use sequence splitting like window(int)
to sort batches in that case.
Flux
ClassCastException
- if any item emitted by the Flux
does not implement
Comparable
with respect to all other items emitted by the Flux
public final Flux<T> sort(Comparator<? super T> sortFunction)
Flux
using a Comparator
function, by
collecting and sorting elements in the background then emitting the sorted sequence
once this sequence completes.
Note that calling sort
with long, non-terminating or infinite sources
might cause OutOfMemoryError
@SafeVarargs public final Flux<T> startWith(T... values)
Flux
sequence.
public final Disposable subscribe()
Flux
and request unbounded demand.
This version doesn't specify any consumption behavior for the events from the chain, especially no error handling, so other variants should usually be preferred.
Disposable
that can be used to cancel the underlying Subscription
public final Disposable subscribe(Consumer<? super T> consumer)
Consumer
to this Flux
that will consume all the
elements in the sequence. It will request an unbounded demand (Long.MAX_VALUE
).
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer)
.
For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber)
with a BaseSubscriber
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each value (onNext signal)Disposable
that can be used to cancel the underlying Subscription
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer)
Flux
with a Consumer
that will consume all the
elements in the sequence, as well as a Consumer
that will handle errors.
The subscription will request an unbounded demand (Long.MAX_VALUE
).
For a passive version that observe and forward incoming data see
doOnNext(java.util.function.Consumer)
and doOnError(java.util.function.Consumer)
.
For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber)
with a BaseSubscriber
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumers are not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each next signalerrorConsumer
- the consumer to invoke on error signalDisposable
that can be used to cancel the underlying Subscription
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer)
Consumer
to this Flux
that will respectively consume all the
elements in the sequence, handle errors and react to completion. The subscription
will request unbounded demand (Long.MAX_VALUE
).
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer)
,
doOnError(java.util.function.Consumer)
and doOnComplete(Runnable)
.
For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber)
with a BaseSubscriber
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each valueerrorConsumer
- the consumer to invoke on error signalcompleteConsumer
- the consumer to invoke on complete signalDisposable
that can be used to cancel the underlying Subscription
@Deprecated public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer)
request
the subsciption. If
the behavior is really needed, consider using subscribeWith(Subscriber)
. To be removed in 3.5.Consumer
to this Flux
that will respectively consume all the
elements in the sequence, handle errors, react to completion, and request upon subscription.
It will let the provided subscriptionConsumer
request the adequate amount of data, or request unbounded demand
Long.MAX_VALUE
if no such consumer is provided.
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer)
,
doOnError(java.util.function.Consumer)
, doOnComplete(Runnable)
and doOnSubscribe(Consumer)
.
For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber)
with a BaseSubscriber
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each valueerrorConsumer
- the consumer to invoke on error signalcompleteConsumer
- the consumer to invoke on complete signalsubscriptionConsumer
- the consumer to invoke on subscribe signal, to be used
for the initial request
, or null for max requestDisposable
that can be used to cancel the underlying Subscription
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Context initialContext)
Consumer
to this Flux
that will respectively consume all the
elements in the sequence, handle errors and react to completion. Additionally, a Context
is tied to the subscription. At subscription, an unbounded request is implicitly made.
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer)
,
doOnError(java.util.function.Consumer)
, doOnComplete(Runnable)
and doOnSubscribe(Consumer)
.
For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber)
with a BaseSubscriber
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each valueerrorConsumer
- the consumer to invoke on error signalcompleteConsumer
- the consumer to invoke on complete signalinitialContext
- the base Context
tied to the subscription that will
be visible to operators upstreamDisposable
that can be used to cancel the underlying Subscription
public final void subscribe(Subscriber<? super T> actual)
public abstract void subscribe(CoreSubscriber<? super T> actual)
Publisher.subscribe(Subscriber)
that will bypass
Hooks.onLastOperator(Function)
pointcut.
In addition to behave as expected by Publisher.subscribe(Subscriber)
in a controlled manner, it supports direct subscribe-time Context
passing.
subscribe
in interface CorePublisher<T>
actual
- the Subscriber
interested into the published sequencesubscribe(Subscriber)
public final Flux<T> subscribeOn(Scheduler scheduler)
Scheduler
's Scheduler.Worker
.
As such, placing this operator anywhere in the chain will also impact the execution
context of onNext/onError/onComplete signals from the beginning of the chain up to
the next occurrence of a publishOn
.
Note that if you are using an eager or blocking
create(Consumer, FluxSink.OverflowStrategy)
as the source, it can lead to deadlocks due to requests piling up behind the emitter.
In such case, you should call subscribeOn(scheduler, false)
instead.
Typically used for slow publisher e.g., blocking IO, fast consumer(s) scenarios.
flux.subscribeOn(Schedulers.single()).subscribe()
Note that Scheduler.Worker.schedule(Runnable)
raising
RejectedExecutionException
on late
Subscription.request(long)
will be propagated to the request caller.
scheduler
- a Scheduler
providing the Scheduler.Worker
where to subscribeFlux
requesting asynchronouslypublishOn(Scheduler)
,
subscribeOn(Scheduler, boolean)
public final Flux<T> subscribeOn(Scheduler scheduler, boolean requestOnSeparateThread)
Scheduler
's Scheduler.Worker
.
Request will be run on that worker too depending on the requestOnSeparateThread
parameter (which defaults to true in the subscribeOn(Scheduler)
version).
As such, placing this operator anywhere in the chain will also impact the execution
context of onNext/onError/onComplete signals from the beginning of the chain up to
the next occurrence of a publishOn
.
Note that if you are using an eager or blocking
create(Consumer, FluxSink.OverflowStrategy)
as the source, it can lead to deadlocks due to requests piling up behind the emitter.
Thus this operator has a requestOnSeparateThread
parameter, which should be
set to false
in this case.
Typically used for slow publisher e.g., blocking IO, fast consumer(s) scenarios.
flux.subscribeOn(Schedulers.single()).subscribe()
Note that Scheduler.Worker.schedule(Runnable)
raising
RejectedExecutionException
on late
Subscription.request(long)
will be propagated to the request caller.
scheduler
- a Scheduler
providing the Scheduler.Worker
where to subscriberequestOnSeparateThread
- whether or not to also perform requests on the worker.
true
to behave like subscribeOn(Scheduler)
Flux
requesting asynchronouslypublishOn(Scheduler)
,
subscribeOn(Scheduler)
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber)
Subscriber
to this Flux
and return said instance for further chaining calls. This is similar to as(Function)
,
except a subscription is explicitly performed by this method.
If you need more control over backpressure and the request, use a BaseSubscriber
.
E
- the reified type from the input/output subscribersubscriber
- the Subscriber
to subscribe with and returnSubscriber
public final <V> Flux<V> switchOnFirst(BiFunction<Signal<? extends T>,Flux<T>,Publisher<? extends V>> transformer)
Flux
once it emits its first element, making a
conditional transformation possible. This operator first requests one element
from the source then applies a transformation derived from the first Signal
and the source. The whole source (including the first signal) is passed as second
argument to the BiFunction
and it is very strongly advised to always build
upon with operators (see below).
Note that the source might complete or error immediately instead of emitting,
in which case the Signal
would be onComplete or onError. It is NOT
necessarily an onNext Signal, and must be checked accordingly.
For example, this operator could be used to define a dynamic transformation that depends on the first element (which could contain routing metadata for instance):
fluxOfIntegers.switchOnFirst((signal, flux) -> { if (signal.hasValue()) { ColoredShape firstColor = signal.get(); return flux.filter(v -> !v.hasSameColorAs(firstColor)) } return flux; //either early complete or error, this forwards the termination in any case //`return flux.onErrorResume(t -> Mono.empty());` instead would suppress an early error //`return Flux.just(1,2,3);` instead would suppress an early error and return 1, 2, 3. //It would also only cancel the original `flux` at the completion of `just`. })
It is advised to return a Publisher
derived from the original Flux
in all cases, as not doing so would keep the original Publisher
open and
hanging with a single request until the inner Publisher
terminates or
the whole Flux
is cancelled.
V
- the item type in the returned Flux
transformer
- A BiFunction
executed once the first signal is
available and used to transform the source conditionally. The whole source (including
first signal) is passed as second argument to the BiFunction.Flux
that transform the upstream once a signal is availablepublic final <V> Flux<V> switchOnFirst(BiFunction<Signal<? extends T>,Flux<T>,Publisher<? extends V>> transformer, boolean cancelSourceOnComplete)
Flux
once it emits its first element, making a
conditional transformation possible. This operator first requests one element
from the source then applies a transformation derived from the first Signal
and the source. The whole source (including the first signal) is passed as second
argument to the BiFunction
and it is very strongly advised to always build
upon with operators (see below).
Note that the source might complete or error immediately instead of emitting,
in which case the Signal
would be onComplete or onError. It is NOT
necessarily an onNext Signal, and must be checked accordingly.
For example, this operator could be used to define a dynamic transformation that depends on the first element (which could contain routing metadata for instance):
fluxOfIntegers.switchOnFirst((signal, flux) -> { if (signal.hasValue()) { ColoredShape firstColor = signal.get(); return flux.filter(v -> !v.hasSameColorAs(firstColor)) } return flux; //either early complete or error, this forwards the termination in any case //`return flux.onErrorResume(t -> Mono.empty());` instead would suppress an early error //`return Flux.just(1,2,3);` instead would suppress an early error and return 1, 2, 3. //It would also only cancel the original `flux` at the completion of `just`. })
It is advised to return a Publisher
derived from the original Flux
in all cases, as not doing so would keep the original Publisher
open and
hanging with a single request. In case the value of the cancelSourceOnComplete
parameter is true
the original publisher until the inner Publisher
terminates or
the whole Flux
is cancelled. Otherwise the original publisher will hang forever.
V
- the item type in the returned Flux
transformer
- A BiFunction
executed once the first signal is
available and used to transform the source conditionally. The whole source (including
first signal) is passed as second argument to the BiFunction.cancelSourceOnComplete
- specify whether original publisher should be cancelled on onComplete
from the derived oneFlux
that transform the upstream once a signal is availablepublic final Flux<T> switchIfEmpty(Publisher<? extends T> alternate)
Publisher
if this sequence is completed without any data.
public final <V> Flux<V> switchMap(Function<? super T,Publisher<? extends V>> fn)
Publisher
generated via a Function
whenever
this Flux
produces an item. As such, the elements from each generated
Publisher are emitted in the resulting Flux
.
This operator requests the source for an unbounded amount, but doesn't
request each generated Publisher
unless the downstream has made
a corresponding request (no prefetch of inner publishers).
@Deprecated public final <V> Flux<V> switchMap(Function<? super T,Publisher<? extends V>> fn, int prefetch)
public final Flux<T> tag(String key, String value)
Set
of
all tags throughout the publisher chain by using Scannable.tags()
(as
traversed by Scannable.parents()
).
The name is typically visible at assembly time by the tap(SignalListenerFactory)
operator,
which could for example be configured with a metrics listener applying the tag(s) to its meters.
key
- a tag keyvalue
- a tag valuename(String)
,
metrics()
public final Flux<T> take(long n)
Flux
, if available.
If n is zero, the source isn't even subscribed to and the operator completes immediately upon subscription.
This ensures that the total amount requested upstream is capped at n
, although smaller
requests can be made if the downstream makes requests < n. In any case, this operator never lets
the upstream produce more elements than the cap, and it can be used to more strictly adhere to backpressure.
This mode is typically useful for cases where a race between request and cancellation can lead
the upstream to producing a lot of extraneous data, and such a production is undesirable (e.g.
a source that would send the extraneous data over the network).
It is equivalent to take(long, boolean)
with limitRequest == true
,
If there is a requirement for unbounded upstream request (eg. for performance reasons),
use take(long, boolean)
with limitRequest=false
instead.
n
- the maximum number of items to request from upstream and emit from this Flux
Flux
limited to size Ntake(long, boolean)
public final Flux<T> take(long n, boolean limitRequest)
Flux
, if available.
If limitRequest == true
, ensure that the total amount requested upstream is capped
at n
. In that configuration, this operator never let the upstream produce more elements
than the cap, and it can be used to more strictly adhere to backpressure.
If n is zero, the source isn't even subscribed to and the operator completes immediately
upon subscription (the behavior inherited from take(long)
).
This mode is typically useful for cases where a race between request and cancellation can lead the upstream to producing a lot of extraneous data, and such a production is undesirable (e.g. a source that would send the extraneous data over the network).
If limitRequest == false
this operator doesn't propagate the backpressure requested amount.
Rather, it makes an unbounded request and cancels once N elements have been emitted.
If n is zero, the source is subscribed to but immediately cancelled, then the operator completes.
In this mode, the source could produce a lot of extraneous elements despite cancellation.
If that behavior is undesirable and you do not own the request from downstream
(e.g. prefetching operators), consider using limitRequest = true
instead.
public final Flux<T> takeLast(int n)
Flux
emitted before its completion.
public final Flux<T> takeUntil(Predicate<? super T> predicate)
Flux
until the given Predicate
matches.
This includes the matching data (unlike takeWhile(java.util.function.Predicate<? super T>)
).
public final Flux<T> takeWhile(Predicate<? super T> continuePredicate)
Flux
while a predicate returns TRUE
for the values (checked before each value is delivered).
This only includes the matching data (unlike takeUntil(java.util.function.Predicate<? super T>)
).
public final Flux<T> tap(Supplier<SignalListener<T>> simpleListenerGenerator)
Flux
and notify a stateful per-Subscriber
SignalListener
.
Any exception thrown by the SignalListener
methods causes the subscription to be cancelled
and the subscriber to be terminated with an onError signal
of that
exception. Note that SignalListener.doFinally(SignalType)
, SignalListener.doAfterComplete()
and
SignalListener.doAfterError(Throwable)
instead just drop
the exception.
This simplified variant assumes the state is purely initialized within the Supplier
,
as it is called for each incoming Subscriber
without additional context.
When the context-propagation library
is available at runtime and the downstream ContextView
is not empty, this operator implicitly uses the library
to restore thread locals around all invocations of SignalListener
methods. Typically, this would be done
in conjunction with the use of contextCapture()
operator down the chain.
simpleListenerGenerator
- the Supplier
to create a new SignalListener
on each subscriptionFlux
with side effects defined by generated SignalListener
tap(Function)
,
tap(SignalListenerFactory)
public final Flux<T> tap(Function<ContextView,SignalListener<T>> listenerGenerator)
Flux
and notify a stateful per-Subscriber
SignalListener
.
Any exception thrown by the SignalListener
methods causes the subscription to be cancelled
and the subscriber to be terminated with an onError signal
of that
exception. Note that SignalListener.doFinally(SignalType)
, SignalListener.doAfterComplete()
and
SignalListener.doAfterError(Throwable)
instead just drop
the exception.
This simplified variant allows the SignalListener
to be constructed for each subscription
with access to the incoming Subscriber
's ContextView
.
When the context-propagation library
is available at runtime and the ContextView
is not empty, this operator implicitly uses the library
to restore thread locals around all invocations of SignalListener
methods. Typically, this would be done
in conjunction with the use of contextCapture()
operator down the chain.
listenerGenerator
- the Function
to create a new SignalListener
on each subscriptionFlux
with side effects defined by generated SignalListener
tap(Supplier)
,
tap(SignalListenerFactory)
public final Flux<T> tap(SignalListenerFactory<T,?> listenerFactory)
Flux
and notify a stateful per-Subscriber
SignalListener
created by the provided SignalListenerFactory
.
The factory will initialize a state object
for
each Flux
or Mono
instance it is used with, and that state will be cached and exposed for each
incoming Subscriber
in order to generate the associated listener
.
Any exception thrown by the SignalListener
methods causes the subscription to be cancelled
and the subscriber to be terminated with an onError signal
of that
exception. Note that SignalListener.doFinally(SignalType)
, SignalListener.doAfterComplete()
and
SignalListener.doAfterError(Throwable)
instead just drop
the exception.
When the context-propagation library
is available at runtime and the downstream ContextView
is not empty, this operator implicitly uses the library
to restore thread locals around all invocations of SignalListener
methods. Typically, this would be done
in conjunction with the use of contextCapture()
operator down the chain.
listenerFactory
- the SignalListenerFactory
to create a new SignalListener
on each subscriptionFlux
with side effects defined by generated SignalListener
tap(Supplier)
,
tap(Function)
public final Mono<Void> then()
Mono<Void>
that completes when this Flux
completes.
This will actively ignore the sequence and only replay completion or error signals.
Discard Support: This operator discards elements from the source.
public final <V> Mono<V> then(Mono<V> other)
Flux
complete then play signals from a provided Mono
.
In other words ignore element from this Flux
and transform its completion signal into the
emission and completion signal of a provided Mono<V>
. Error signal is
replayed in the resulting Mono<V>
.
Discard Support: This operator discards elements from the source.
public final Mono<Void> thenEmpty(Publisher<Void> other)
Mono<Void>
that waits for this Flux
to complete then
for a supplied Publisher<Void>
to also complete. The
second completion signal is replayed, or any error signal that occurs instead.
Discard Support: This operator discards elements from the source.
public final Flux<Timed<T>> timed()
Subscriber.onNext(Object)
events, encapsulated into a Timed
object
that lets downstream consumer look at various time information gathered with nanosecond
resolution using the default clock (Schedulers.parallel()
):
Timed.elapsed()
: the time in nanoseconds since last event, as a Duration
.
For the first onNext, "last event" is the subscription. Otherwise it is the previous onNext.
This is functionally equivalent to elapsed()
, with a more expressive and precise
representation than a Tuple2
with a long.Timed.timestamp()
: the timestamp of this onNext, as an Instant
(with nanoseconds part). This is functionally equivalent to timestamp()
, with a more
expressive and precise representation than a Tuple2
with a long.Timed.elapsedSinceSubscription()
: the time in nanoseconds since subscription,
as a Duration
.
The Timed
object instances are safe to store and use later, as they are created as an
immutable wrapper around the <T>
value and immediately passed downstream.
Flux
elapsed()
,
timestamp()
public final Flux<Timed<T>> timed(Scheduler clock)
Subscriber.onNext(Object)
events, encapsulated into a Timed
object
that lets downstream consumer look at various time information gathered with nanosecond
resolution using the provided Scheduler
as a clock:
Timed.elapsed()
: the time in nanoseconds since last event, as a Duration
.
For the first onNext, "last event" is the subscription. Otherwise it is the previous onNext.
This is functionally equivalent to elapsed()
, with a more expressive and precise
representation than a Tuple2
with a long.Timed.timestamp()
: the timestamp of this onNext, as an Instant
(with nanoseconds part). This is functionally equivalent to timestamp()
, with a more
expressive and precise representation than a Tuple2
with a long.Timed.elapsedSinceSubscription()
: the time in nanoseconds since subscription,
as a Duration
.
The Timed
object instances are safe to store and use later, as they are created as an
immutable wrapper around the <T>
value and immediately passed downstream.
Flux
elapsed(Scheduler)
,
timestamp(Scheduler)
public final Flux<T> timeout(Duration timeout)
TimeoutException
as soon as no item is emitted within the
given Duration
from the previous emission (or the subscription for the first item).
public final Flux<T> timeout(Duration timeout, @Nullable Publisher<? extends T> fallback)
Flux
as soon as no item is emitted within the
given Duration
from the previous emission (or the subscription for the first item).
If the given Publisher
is null, signal a TimeoutException
instead.
public final Flux<T> timeout(Duration timeout, Scheduler timer)
TimeoutException
as soon as no item is emitted within the
given Duration
from the previous emission (or the subscription for the first
item), as measured by the specified Scheduler
.
public final Flux<T> timeout(Duration timeout, @Nullable Publisher<? extends T> fallback, Scheduler timer)
Flux
as soon as no item is emitted within the
given Duration
from the previous emission (or the subscription for the
first item), as measured on the specified Scheduler
.
If the given Publisher
is null, signal a TimeoutException
instead.
public final <U> Flux<T> timeout(Publisher<U> firstTimeout)
TimeoutException
in case the first item from this Flux
has
not been emitted before the given Publisher
emits.
public final <U,V> Flux<T> timeout(Publisher<U> firstTimeout, Function<? super T,? extends Publisher<V>> nextTimeoutFactory)
TimeoutException
in case the first item from this Flux
has
not been emitted before the firstTimeout
Publisher
emits, and whenever
each subsequent elements is not emitted before a Publisher
generated from
the latest element signals.
Discard Support: This operator discards an element if it comes right after the timeout.
U
- the type of the elements of the first timeout PublisherV
- the type of the elements of the subsequent timeout PublishersfirstTimeout
- the timeout Publisher
that must not emit before the first signal from this Flux
nextTimeoutFactory
- the timeout Publisher
factory for each next itemFlux
that can time out if each element does not come before
a signal from a per-item companion Publisher
public final <U,V> Flux<T> timeout(Publisher<U> firstTimeout, Function<? super T,? extends Publisher<V>> nextTimeoutFactory, Publisher<? extends T> fallback)
Publisher
in case the first item from this Flux
has
not been emitted before the firstTimeout
Publisher
emits, and whenever
each subsequent elements is not emitted before a Publisher
generated from
the latest element signals.
U
- the type of the elements of the first timeout PublisherV
- the type of the elements of the subsequent timeout PublishersfirstTimeout
- the timeout Publisher
that must not emit before the first signal from this Flux
nextTimeoutFactory
- the timeout Publisher
factory for each next itemfallback
- the fallback Publisher
to subscribe when a timeout occursFlux
that can time out if each element does not come before
a signal from a per-item companion Publisher
public final Flux<Tuple2<Long,T>> timestamp()
Tuple2
pair of T1 the current clock time in
millis (as a Long
measured by the parallel
Scheduler) and T2 the emitted data (as a T
), for each item from this Flux
.
Flux
timed()
,
Timed.timestamp()
public final Flux<Tuple2<Long,T>> timestamp(Scheduler scheduler)
Tuple2
pair of T1 the current clock time in
millis (as a Long
measured by the provided Scheduler
) and T2
the emitted data (as a T
), for each item from this Flux
.
The provider Scheduler
will be asked to provide time
with a granularity of TimeUnit.MILLISECONDS
. In order for this operator to work as advertised, the
provided Scheduler should thus return results that can be interpreted as unix timestamps.
scheduler
- the Scheduler
to read time fromFlux
Scheduler.now(TimeUnit)
,
timed(Scheduler)
,
Timed.timestamp()
public final Iterable<T> toIterable()
Flux
into a lazy Iterable
blocking on
Iterator.next()
calls.
Note that iterating from within threads marked as "non-blocking only" is illegal and will
cause an IllegalStateException
to be thrown, but obtaining the Iterable
itself within these threads is ok.
Iterable
public final Iterable<T> toIterable(int batchSize)
Flux
into a lazy Iterable
blocking on
Iterator.next()
calls.
Note that iterating from within threads marked as "non-blocking only" is illegal and will
cause an IllegalStateException
to be thrown, but obtaining the Iterable
itself within these threads is ok.
public final Iterable<T> toIterable(int batchSize, @Nullable Supplier<Queue<T>> queueProvider)
Flux
into a lazy Iterable
blocking on
Iterator.next()
calls.
Note that iterating from within threads marked as "non-blocking only" is illegal and will
cause an IllegalStateException
to be thrown, but obtaining the Iterable
itself within these threads is ok.
public final Stream<T> toStream()
Flux
into a lazy Stream
blocking for each source
onNext
call.
Note that iterating from within threads marked as "non-blocking only" is illegal and will
cause an IllegalStateException
to be thrown, but obtaining the Stream
itself or applying lazy intermediate operation on the stream within these threads is ok.
Stream
of unknown size with onClose attached to Subscription.cancel()
public final Stream<T> toStream(int batchSize)
Flux
into a lazy Stream
blocking for each source
onNext
call.
Note that iterating from within threads marked as "non-blocking only" is illegal and will
cause an IllegalStateException
to be thrown, but obtaining the Stream
itself or applying lazy intermediate operation on the stream within these threads is ok.
batchSize
- the bounded capacity to prefetch from this Flux
or
Integer.MAX_VALUE
for unbounded demandStream
of unknown size with onClose attached to Subscription.cancel()
public final <V> Flux<V> transform(Function<? super Flux<T>,? extends Publisher<V>> transformer)
Flux
in order to generate a target Flux
. Unlike transformDeferred(Function)
, the
provided function is executed as part of assembly.
FunctionapplySchedulers = flux -> flux.subscribeOn(Schedulers.boundedElastic()) .publishOn(Schedulers.parallel()); flux.transform(applySchedulers).map(v -> v * v).subscribe();
V
- the item type in the returned Flux
transformer
- the Function
to immediately map this Flux
into a target Flux
instance.Flux
transformDeferred(Function) for deferred composition of Flux for each @link Subscriber
,
as(Function) for a loose conversion to an arbitrary type
public final <V> Flux<V> transformDeferred(Function<? super Flux<T>,? extends Publisher<V>> transformer)
Flux
in order to generate a target Flux
type.
A transformation will occur for each Subscriber
. For instance:
flux.transformDeferred(original -> original.log());
V
- the item type in the returned Publisher
transformer
- the Function
to lazily map this Flux
into a target Publisher
instance for each new subscriberFlux
transform(Function) for immmediate transformation of Flux
,
transformDeferredContextual(BiFunction) for a similarly deferred transformation of Flux reading the ContextView
,
as(Function) for a loose conversion to an arbitrary type
public final <V> Flux<V> transformDeferredContextual(BiFunction<? super Flux<T>,? super ContextView,? extends Publisher<V>> transformer)
Flux
in order to generate a
target Flux
type. A transformation will occur for each
Subscriber
. In addition, the transforming BiFunction
exposes
the ContextView
of each Subscriber
. For instance:
Flux<T> fluxLogged = flux.transformDeferredContextual((original, ctx) -> original.log("for RequestID" + ctx.get("RequestID")) //...later subscribe. Each subscriber has its Context with a RequestID entry fluxLogged.contextWrite(Context.of("RequestID", "requestA").subscribe(); fluxLogged.contextWrite(Context.of("RequestID", "requestB").subscribe();
V
- the item type in the returned Publisher
transformer
- the BiFunction
to lazily map this Flux
into a target Flux
instance upon subscription, with access to ContextView
Flux
transform(Function) for immmediate transformation of Flux
,
transformDeferred(Function) for a similarly deferred transformation of Flux without the ContextView
,
as(Function) for a loose conversion to an arbitrary type
public final Flux<Flux<T>> window(int maxSize)
Flux
sequence into multiple Flux
windows containing
maxSize
elements (or less for the final window) and starting from the first item.
Each Flux
window will onComplete after maxSize
items have been routed.
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
public final Flux<Flux<T>> window(int maxSize, int skip)
Flux
sequence into multiple Flux
windows of size
maxSize
, that each open every skip
elements in the source.
When maxSize < skip : dropping windows
When maxSize > skip : overlapping windows
When maxSize == skip : exact windows
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: The overlapping variant DOES NOT discard elements, as they might be part of another still valid window. The exact window and dropping window variants bot discard elements they internally queued for backpressure upon cancellation or error triggered by a data signal. The dropping window variant also discards elements in between windows.
public final Flux<Flux<T>> window(Publisher<?> boundary)
Flux
sequence into continuous, non-overlapping windows
where the window boundary is signalled by another Publisher
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors and those emitted by the boundary
delivered to the window
Flux
are wrapped in Exceptions.SourceException
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
public final Flux<Flux<T>> window(Duration windowingTimespan)
Flux
sequence into continuous, non-overlapping windows that open
for a windowingTimespan
Duration
(as measured on the parallel
Scheduler).
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
public final Flux<Flux<T>> window(Duration windowingTimespan, Duration openWindowEvery)
Flux
sequence into multiple Flux
windows that open
for a given windowingTimespan
Duration
, after which it closes with onComplete.
Each window is opened at a regular timeShift
interval, starting from the
first item.
Both durations are measured on the parallel
Scheduler.
When windowingTimespan < openWindowEvery : dropping windows
When windowingTimespan > openWindowEvery : overlapping windows
When windowingTimespan == openWindowEvery : exact windows
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: The overlapping variant DOES NOT discard elements, as they might be part of another still valid window. The exact window and dropping window variants bot discard elements they internally queued for backpressure upon cancellation or error triggered by a data signal. The dropping window variant also discards elements in between windows.
public final Flux<Flux<T>> window(Duration windowingTimespan, Scheduler timer)
Flux
sequence into continuous, non-overlapping windows that open
for a windowingTimespan
Duration
(as measured on the provided Scheduler
).
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
public final Flux<Flux<T>> window(Duration windowingTimespan, Duration openWindowEvery, Scheduler timer)
Flux
sequence into multiple Flux
windows that open
for a given windowingTimespan
Duration
, after which it closes with onComplete.
Each window is opened at a regular timeShift
interval, starting from the
first item.
Both durations are measured on the provided Scheduler
.
When windowingTimespan < openWindowEvery : dropping windows
When windowingTimespan > openWindowEvery : overlapping windows
When openWindowEvery == openWindowEvery : exact windows
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: The overlapping variant DOES NOT discard elements, as they might be part of another still valid window. The exact window and dropping window variants bot discard elements they internally queued for backpressure upon cancellation or error triggered by a data signal. The dropping window variant also discards elements in between windows.
public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime)
Flux
sequence into multiple Flux
windows containing
maxSize
elements (or less for the final window) and starting from the first item.
Each Flux
window will onComplete once it contains maxSize
elements
OR it has been open for the given Duration
(as measured on the parallel
Scheduler).
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime, boolean fairBackpressure)
Flux
sequence into multiple Flux
windows containing
maxSize
elements (or less for the final window) and starting from the first item.
Each Flux
window will onComplete once it contains maxSize
elements
OR it has been open for the given Duration
(as measured on the parallel
Scheduler).
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
maxSize
- the maximum number of items to emit in the window before closing itmaxTime
- the maximum Duration
since the window was opened before closing itfairBackpressure
- define whether operator request unbounded demand or
prefetch by maxSizeFlux
of Flux
windows based on element count and durationpublic final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime, Scheduler timer)
Flux
sequence into multiple Flux
windows containing
maxSize
elements (or less for the final window) and starting from the first item.
Each Flux
window will onComplete once it contains maxSize
elements
OR it has been open for the given Duration
(as measured on the provided
Scheduler
).
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime, Scheduler timer, boolean fairBackpressure)
Flux
sequence into multiple Flux
windows containing
maxSize
elements (or less for the final window) and starting from the first item.
Each Flux
window will onComplete once it contains maxSize
elements
OR it has been open for the given Duration
(as measured on the provided
Scheduler
).
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
maxSize
- the maximum number of items to emit in the window before closing itmaxTime
- the maximum Duration
since the window was opened before closing ittimer
- a time-capable Scheduler
instance to run onfairBackpressure
- define whether operator request unbounded demand or
prefetch by maxSizeFlux
of Flux
windows based on element count and durationpublic final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger)
Flux
sequence into multiple Flux
windows delimited by the
given predicate. A new window is opened each time the predicate returns true, at which
point the previous window will receive the triggering element then onComplete.
Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window errors). This variant shouldn't expose empty windows, as the separators are emitted into the windows they close.
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.
public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore)
Flux
sequence into multiple Flux
windows delimited by the
given predicate. A new window is opened each time the predicate returns true.
Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window completes or errors).
If cutBefore
is true, the old window will onComplete and the triggering
element will be emitted in the new window, which becomes immediately available.
This variant can emit an empty window if the sequence starts with a separator.
Otherwise, the triggering element will be emitted in the old window before it does
onComplete, similar to windowUntil(Predicate)
. This variant shouldn't
expose empty windows, as the separators are emitted into the windows they close.
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.
public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore, int prefetch)
Flux
sequence into multiple Flux
windows delimited by the given
predicate and using a prefetch. A new window is opened each time the predicate
returns true.
Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window completes or errors).
If cutBefore
is true, the old window will onComplete and the triggering
element will be emitted in the new window. This variant can emit an empty window
if the sequence starts with a separator.
Otherwise, the triggering element will be emitted in the old window before it does
onComplete, similar to windowUntil(Predicate)
. This variant shouldn't
expose empty windows, as the separators are emitted into the windows they close.
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.
boundaryTrigger
- a predicate that triggers the next window when it becomes true.cutBefore
- set to true to include the triggering element in the new window rather than the old.prefetch
- the request size to use for this Flux
.Flux
of Flux
windows, bounded depending
on the predicate.public final Flux<Flux<T>> windowUntilChanged()
Flux
windows.
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.
public final <V> Flux<Flux<T>> windowUntilChanged(Function<? super T,? super V> keySelector)
Function
, into multiple Flux
windows.
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.
public final <V> Flux<Flux<T>> windowUntilChanged(Function<? super T,? extends V> keySelector, BiPredicate<? super V,? super V> keyComparator)
Function
and compared using a supplied BiPredicate
, into multiple
Flux
windows.
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.
public final Flux<Flux<T>> windowWhile(Predicate<T> inclusionPredicate)
Flux
sequence into multiple Flux
windows that stay open
while a given predicate matches the source elements. Once the predicate returns
false, the window closes with an onComplete and the triggering element is discarded.
Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window completes or errors). Empty windows can happen when a sequence starts with a separator or contains multiple separators, but a sequence that finishes with a separator won't cause a remainder empty window to be emitted.
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal, as well as the triggering element(s) (that doesn't match the predicate). Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.
public final Flux<Flux<T>> windowWhile(Predicate<T> inclusionPredicate, int prefetch)
Flux
sequence into multiple Flux
windows that stay open
while a given predicate matches the source elements. Once the predicate returns
false, the window closes with an onComplete and the triggering element is discarded.
Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window completes or errors). Empty windows can happen when a sequence starts with a separator or contains multiple separators, but a sequence that finishes with a separator won't cause a remainder empty window to be emitted.
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal, as well as the triggering element(s) (that doesn't match the predicate). Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.
public final <U,V> Flux<Flux<T>> windowWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector)
Flux
sequence into potentially overlapping windows controlled by items of a
start Publisher
and end Publisher
derived from the start values.
When Open signal is strictly not overlapping Close signal : dropping windows
When Open signal is strictly more frequent than Close signal : overlapping windows
When Open signal is exactly coordinated with Close signal : exact windows
Note that windows are a live view of part of the underlying source publisher,
and as such their lifecycle is tied to that source. As a result, it is not possible
to subscribe to a window more than once: they are unicast.
This is most noticeable when trying to retry()
or repeat()
a window,
as these operators are based on re-subscription.
To distinguish errors emitted by the processing of individual windows, source
sequence errors delivered to the window Flux
are wrapped in
Exceptions.SourceException
.
Discard Support: This operator DOES NOT discard elements.
U
- the type of the sequence opening windowsV
- the type of the sequence closing windows opened by the bucketOpening Publisher's elementsbucketOpening
- a Publisher
that opens a new window when it emits any itemcloseSelector
- a Function
given an opening signal and returning a Publisher
that
will close the window when emittingFlux
of Flux
windows opened by signals from a first
Publisher
and lasting until a selected second Publisher
emitspublic final <U,R> Flux<R> withLatestFrom(Publisher<? extends U> other, BiFunction<? super T,? super U,? extends R> resultSelector)
Flux
and another
Publisher
through a BiFunction
and emits the result.
The operator will drop values from this Flux
until the other
Publisher
produces any value.
If the other Publisher
completes without any value, the sequence is completed.
U
- the other Publisher
sequence typeR
- the result typeother
- the Publisher
to combine withresultSelector
- the bi-function called with each pair of source and other
elements that should return a single value to be emittedFlux
gated by another Publisher
public final <T2> Flux<Tuple2<T,T2>> zipWith(Publisher<? extends T2> source2)
Flux
with another Publisher
source, that is to say wait
for both to emit one element and combine these elements once into a Tuple2
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
public final <T2,V> Flux<V> zipWith(Publisher<? extends T2> source2, BiFunction<? super T,? super T2,? extends V> combinator)
Flux
with another Publisher
source, that is to say wait
for both to emit one element and combine these elements using a combinator
BiFunction
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T2
- type of the value from source2V
- The produced output after transformation by the combinatorsource2
- The second source Publisher
to zip with this Flux
.combinator
- The aggregate function that will receive a unique value from each
source and return the value to signal downstreamFlux
public final <T2,V> Flux<V> zipWith(Publisher<? extends T2> source2, int prefetch, BiFunction<? super T,? super T2,? extends V> combinator)
Flux
with another Publisher
source, that is to say wait
for both to emit one element and combine these elements using a combinator
BiFunction
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T2
- type of the value from source2V
- The produced output after transformation by the combinatorsource2
- The second source Publisher
to zip with this Flux
.prefetch
- the request size to use for this Flux
and the other Publisher
combinator
- The aggregate function that will receive a unique value from each
source and return the value to signal downstreamFlux
public final <T2> Flux<Tuple2<T,T2>> zipWith(Publisher<? extends T2> source2, int prefetch)
Flux
with another Publisher
source, that is to say wait
for both to emit one element and combine these elements once into a Tuple2
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
public final <T2> Flux<Tuple2<T,T2>> zipWithIterable(Iterable<? extends T2> iterable)
public final <T2,V> Flux<V> zipWithIterable(Iterable<? extends T2> iterable, BiFunction<? super T,? super T2,? extends V> zipper)
Flux
with the content of an Iterable
, that is
to say combine one element from each, pairwise, using the given zipper BiFunction
.
T2
- the value type of the other iterable sequenceV
- the result typeiterable
- the Iterable
to zip withzipper
- the BiFunction
pair combinatorFlux
protected static <T> Flux<T> onAssembly(Flux<T> source)
Hooks
pointcut given a
Flux
, potentially returning a new Flux
. This is for example useful
to activate cross-cutting concerns at assembly time, eg. a generalized
checkpoint()
.T
- the value typesource
- the source to apply assembly hooks ontoprotected static <T> ConnectableFlux<T> onAssembly(ConnectableFlux<T> source)
Hooks
pointcut given a
ConnectableFlux
, potentially returning a new ConnectableFlux
. This
is for example useful to activate cross-cutting concerns at assembly time, eg. a
generalized checkpoint()
.T
- the value typesource
- the source to apply assembly hooks onto