T
- the element type of this Reactive Streams Publisher
public abstract class Flux<T> extends Object implements CorePublisher<T>
Publisher
with rx operators that emits 0 to N elements, and then completes
(successfully or with an error).
The recommended way to learn about the Flux
API and discover new operators is
through the reference documentation, rather than through this javadoc (as opposed to
learning more about individual operators). See the
"which operator do I need?" appendix.
It is intended to be used in implementations and return types. Input parameters should keep using raw
Publisher
as much as possible.
If it is known that the underlying Publisher
will emit 0 or 1 element, Mono
should be used
instead.
Note that using state in the java.util.function
/ lambdas used within Flux operators
should be avoided, as these may be shared between several Subscribers
.
subscribe(CoreSubscriber)
is an internal extension to
subscribe(Subscriber)
used internally for Context
passing. User
provided Subscriber
may
be passed to this "subscribe" extension but will loose the available
per-subscribe Hooks.onLastOperator(java.util.function.Function<? super org.reactivestreams.Publisher<java.lang.Object>, ? extends org.reactivestreams.Publisher<java.lang.Object>>)
.
Mono
Constructor and Description |
---|
Flux() |
Modifier and Type | Method and Description |
---|---|
Mono<Boolean> |
all(Predicate<? super T> predicate)
Emit a single boolean true if all values of this sequence match
the
Predicate . |
Mono<Boolean> |
any(Predicate<? super T> predicate)
Emit a single boolean true if any of the values of this
Flux sequence match
the predicate. |
<P> P |
as(Function<? super Flux<T>,P> transformer)
Transform this
Flux into a target type. |
T |
blockFirst()
Subscribe to this
Flux and block indefinitely
until the upstream signals its first value or completes. |
T |
blockFirst(Duration timeout)
Subscribe to this
Flux and block until the upstream
signals its first value, completes or a timeout expires. |
T |
blockLast()
Subscribe to this
Flux and block indefinitely
until the upstream signals its last value or completes. |
T |
blockLast(Duration timeout)
Subscribe to this
Flux and block until the upstream
signals its last value, completes or a timeout expires. |
Flux<List<T>> |
buffer()
|
Flux<List<T>> |
buffer(Duration bufferingTimespan)
|
Flux<List<T>> |
buffer(Duration bufferingTimespan,
Duration openBufferEvery)
Collect incoming values into multiple
List buffers created at a given
openBufferEvery period. |
Flux<List<T>> |
buffer(Duration bufferingTimespan,
Duration openBufferEvery,
Scheduler timer)
|
Flux<List<T>> |
buffer(Duration bufferingTimespan,
Scheduler timer)
|
Flux<List<T>> |
buffer(int maxSize)
|
Flux<List<T>> |
buffer(int maxSize,
int skip)
|
<C extends Collection<? super T>> |
buffer(int maxSize,
int skip,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the given max size is reached
or once this Flux completes. |
<C extends Collection<? super T>> |
buffer(int maxSize,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the given max size is reached
or once this Flux completes. |
Flux<List<T>> |
buffer(Publisher<?> other)
|
<C extends Collection<? super T>> |
buffer(Publisher<?> other,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers, as
delimited by the signals of a companion Publisher this operator will
subscribe to. |
Flux<List<T>> |
bufferTimeout(int maxSize,
Duration maxTime)
|
Flux<List<T>> |
bufferTimeout(int maxSize,
Duration maxTime,
Scheduler timer)
|
<C extends Collection<? super T>> |
bufferTimeout(int maxSize,
Duration maxTime,
Scheduler timer,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the buffer reaches a maximum
size OR the maxTime Duration elapses, as measured on the provided Scheduler . |
<C extends Collection<? super T>> |
bufferTimeout(int maxSize,
Duration maxTime,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the buffer reaches a maximum
size OR the maxTime Duration elapses. |
Flux<List<T>> |
bufferUntil(Predicate<? super T> predicate)
|
Flux<List<T>> |
bufferUntil(Predicate<? super T> predicate,
boolean cutBefore)
|
<V> Flux<List<T>> |
bufferUntilChanged()
|
<V> Flux<List<T>> |
bufferUntilChanged(Function<? super T,? extends V> keySelector)
|
<V> Flux<List<T>> |
bufferUntilChanged(Function<? super T,? extends V> keySelector,
BiPredicate<? super V,? super V> keyComparator)
Collect subsequent repetitions of an element (that is, if they arrive right after
one another), as compared by a key extracted through the user provided
Function and compared using a supplied BiPredicate , into multiple
List buffers that will be emitted by the resulting Flux . |
<U,V> Flux<List<T>> |
bufferWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector)
|
<U,V,C extends Collection<? super T>> |
bufferWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers started each time an opening
companion Publisher emits. |
Flux<List<T>> |
bufferWhile(Predicate<? super T> predicate)
|
Flux<T> |
cache()
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber . |
Flux<T> |
cache(Duration ttl)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cache(Duration ttl,
Scheduler timer)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cache(int history)
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber . |
Flux<T> |
cache(int history,
Duration ttl)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cache(int history,
Duration ttl,
Scheduler timer)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cancelOn(Scheduler scheduler)
|
<E> Flux<E> |
cast(Class<E> clazz)
Cast the current
Flux produced type into a target produced type. |
Flux<T> |
checkpoint()
Activate traceback (full assembly tracing) for this particular
Flux , in case of an error
upstream of the checkpoint. |
Flux<T> |
checkpoint(String description)
Activate traceback (assembly marker) for this particular
Flux by giving it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. |
Flux<T> |
checkpoint(String description,
boolean forceStackTrace)
Activate traceback (full assembly tracing or the lighter assembly marking depending on the
forceStackTrace option). |
<R,A> Mono<R> |
collect(Collector<? super T,A,? extends R> collector)
|
<E> Mono<E> |
collect(Supplier<E> containerSupplier,
BiConsumer<E,? super T> collector)
Collect all elements emitted by this
Flux into a user-defined container,
by applying a collector BiConsumer taking the container and each element. |
Mono<List<T>> |
collectList()
|
<K> Mono<Map<K,T>> |
collectMap(Function<? super T,? extends K> keyExtractor)
|
<K,V> Mono<Map<K,V>> |
collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor)
|
<K,V> Mono<Map<K,V>> |
collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,V>> mapSupplier)
|
<K> Mono<Map<K,Collection<T>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor)
|
<K,V> Mono<Map<K,Collection<V>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor)
|
<K,V> Mono<Map<K,Collection<V>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,Collection<V>>> mapSupplier)
|
Mono<List<T>> |
collectSortedList()
|
Mono<List<T>> |
collectSortedList(Comparator<? super T> comparator)
Collect all elements emitted by this
Flux until this sequence completes,
and then sort them using a Comparator into a List that is emitted
by the resulting Mono . |
static <T,V> Flux<V> |
combineLatest(Function<Object[],V> combinator,
int prefetch,
Publisher<? extends T>... sources)
|
static <T,V> Flux<V> |
combineLatest(Function<Object[],V> combinator,
Publisher<? extends T>... sources)
|
static <T,V> Flux<V> |
combineLatest(Iterable<? extends Publisher<? extends T>> sources,
Function<Object[],V> combinator)
|
static <T,V> Flux<V> |
combineLatest(Iterable<? extends Publisher<? extends T>> sources,
int prefetch,
Function<Object[],V> combinator)
|
static <T1,T2,V> Flux<V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
BiFunction<? super T1,? super T2,? extends V> combinator)
|
static <T1,T2,T3,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Function<Object[],V> combinator)
|
static <T1,T2,T3,T4,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Function<Object[],V> combinator)
|
static <T1,T2,T3,T4,T5,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Function<Object[],V> combinator)
|
static <T1,T2,T3,T4,T5,T6,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6,
Function<Object[],V> combinator)
|
static <T> Flux<T> |
concat(Iterable<? extends Publisher<? extends T>> sources)
Concatenate all sources provided in an
Iterable , forwarding elements
emitted by the sources downstream. |
static <T> Flux<T> |
concat(Publisher<? extends Publisher<? extends T>> sources)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concat(Publisher<? extends Publisher<? extends T>> sources,
int prefetch)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concat(Publisher<? extends T>... sources)
Concatenate all sources provided as a vararg, forwarding elements emitted by the
sources downstream.
|
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources,
boolean delayUntilEnd,
int prefetch)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources,
int prefetch)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concatDelayError(Publisher<? extends T>... sources)
Concatenate all sources provided as a vararg, forwarding elements emitted by the
sources downstream.
|
<V> Flux<V> |
concatMap(Function<? super T,? extends Publisher<? extends V>> mapper)
|
<V> Flux<V> |
concatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int prefetch)
|
<V> Flux<V> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper)
|
<V> Flux<V> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
boolean delayUntilEnd,
int prefetch)
|
<V> Flux<V> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
int prefetch)
|
<R> Flux<R> |
concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
|
<R> Flux<R> |
concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper,
int prefetch)
|
Flux<T> |
concatWith(Publisher<? extends T> other)
|
Flux<T> |
concatWithValues(T... values)
Concatenates the values to the end of the
Flux |
Flux<T> |
contextWrite(ContextView contextToAppend)
Enrich the
Context visible from downstream for the benefit of upstream
operators, by making all values from the provided ContextView visible on top
of pairs from downstream. |
Flux<T> |
contextWrite(Function<Context,Context> contextModifier)
|
Mono<Long> |
count()
Counts the number of values in this
Flux . |
static <T> Flux<T> |
create(Consumer<? super FluxSink<T>> emitter)
|
static <T> Flux<T> |
create(Consumer<? super FluxSink<T>> emitter,
FluxSink.OverflowStrategy backpressure)
|
Flux<T> |
defaultIfEmpty(T defaultV)
Provide a default unique value if this sequence is completed without any data
|
static <T> Flux<T> |
defer(Supplier<? extends Publisher<T>> supplier)
Lazily supply a
Publisher every time a Subscription is made on the
resulting Flux , so the actual source instantiation is deferred until each
subscribe and the Supplier can create a subscriber-specific instance. |
static <T> Flux<T> |
deferContextual(Function<ContextView,? extends Publisher<T>> contextualPublisherFactory)
Lazily supply a
Publisher every time a Subscription is made on the
resulting Flux , so the actual source instantiation is deferred until each
subscribe and the Function can create a subscriber-specific instance. |
static <T> Flux<T> |
deferWithContext(Function<Context,? extends Publisher<T>> contextualPublisherFactory)
Deprecated.
|
Flux<T> |
delayElements(Duration delay)
|
Flux<T> |
delayElements(Duration delay,
Scheduler timer)
|
Flux<T> |
delaySequence(Duration delay)
|
Flux<T> |
delaySequence(Duration delay,
Scheduler timer)
|
Flux<T> |
delaySubscription(Duration delay)
Delay the
subscription to this Flux source until the given
period elapses. |
Flux<T> |
delaySubscription(Duration delay,
Scheduler timer)
Delay the
subscription to this Flux source until the given
period elapses, as measured on the user-provided Scheduler . |
<U> Flux<T> |
delaySubscription(Publisher<U> subscriptionDelay)
|
Flux<T> |
delayUntil(Function<? super T,? extends Publisher<?>> triggerProvider)
|
<X> Flux<X> |
dematerialize()
An operator working only if this
Flux emits onNext, onError or onComplete Signal
instances, transforming these materialized signals into
real signals on the Subscriber . |
Flux<T> |
distinct()
For each
Subscriber , track elements from this Flux that have been
seen and filter out duplicates. |
<V> Flux<T> |
distinct(Function<? super T,? extends V> keySelector)
For each
Subscriber , track elements from this Flux that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function . |
<V,C extends Collection<? super V>> |
distinct(Function<? super T,? extends V> keySelector,
Supplier<C> distinctCollectionSupplier)
For each
Subscriber , track elements from this Flux that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function and by the add method
of the Collection supplied (typically a Set ). |
<V,C> Flux<T> |
distinct(Function<? super T,? extends V> keySelector,
Supplier<C> distinctStoreSupplier,
BiPredicate<C,V> distinctPredicate,
Consumer<C> cleanup)
For each
Subscriber , track elements from this Flux that have been
seen and filter out duplicates, as compared by applying a BiPredicate on
an arbitrary user-supplied <C> store and a key extracted through the user
provided Function . |
Flux<T> |
distinctUntilChanged()
Filter out subsequent repetitions of an element (that is, if they arrive right after
one another).
|
<V> Flux<T> |
distinctUntilChanged(Function<? super T,? extends V> keySelector)
Filter out subsequent repetitions of an element (that is, if they arrive right after
one another), as compared by a key extracted through the user provided
Function
using equality. |
<V> Flux<T> |
distinctUntilChanged(Function<? super T,? extends V> keySelector,
BiPredicate<? super V,? super V> keyComparator)
Filter out subsequent repetitions of an element (that is, if they arrive right
after one another), as compared by a key extracted through the user provided
Function and then comparing keys with the supplied BiPredicate . |
Flux<T> |
doAfterTerminate(Runnable afterTerminate)
Add behavior (side-effect) triggered after the
Flux terminates, either by completing downstream successfully or with an error. |
Flux<T> |
doFinally(Consumer<SignalType> onFinally)
Add behavior (side-effect) triggered after the
Flux terminates for any reason,
including cancellation. |
Flux<T> |
doFirst(Runnable onFirst)
Add behavior (side-effect) triggered before the
Flux is
subscribed to, which should be the first event after assembly time. |
Flux<T> |
doOnCancel(Runnable onCancel)
Add behavior (side-effect) triggered when the
Flux is cancelled. |
Flux<T> |
doOnComplete(Runnable onComplete)
Add behavior (side-effect) triggered when the
Flux completes successfully. |
<R> Flux<T> |
doOnDiscard(Class<R> type,
Consumer<? super R> discardHook)
Potentially modify the behavior of the whole chain of operators upstream of this one to
conditionally clean up elements that get discarded by these operators.
|
Flux<T> |
doOnEach(Consumer<? super Signal<T>> signalConsumer)
Add behavior (side-effects) triggered when the
Flux emits an item, fails with an error
or completes successfully. |
<E extends Throwable> |
doOnError(Class<E> exceptionType,
Consumer<? super E> onError)
Add behavior (side-effect) triggered when the
Flux completes with an error matching the given exception type. |
Flux<T> |
doOnError(Consumer<? super Throwable> onError)
Add behavior (side-effect) triggered when the
Flux completes with an error. |
Flux<T> |
doOnError(Predicate<? super Throwable> predicate,
Consumer<? super Throwable> onError)
Add behavior (side-effect) triggered when the
Flux completes with an error matching the given exception. |
Flux<T> |
doOnNext(Consumer<? super T> onNext)
Add behavior (side-effect) triggered when the
Flux emits an item. |
Flux<T> |
doOnRequest(LongConsumer consumer)
Add behavior (side-effect) triggering a
LongConsumer when this Flux
receives any request. |
Flux<T> |
doOnSubscribe(Consumer<? super Subscription> onSubscribe)
Add behavior (side-effect) triggered when the
Flux is being subscribed,
that is to say when a Subscription has been produced by the Publisher
and is being passed to the Subscriber.onSubscribe(Subscription) . |
Flux<T> |
doOnTerminate(Runnable onTerminate)
Add behavior (side-effect) triggered when the
Flux terminates, either by
completing successfully or failing with an error. |
Flux<Tuple2<Long,T>> |
elapsed()
Map this
Flux into Tuple2<Long, T>
of timemillis and source data. |
Flux<Tuple2<Long,T>> |
elapsed(Scheduler scheduler)
Map this
Flux into Tuple2<Long, T>
of timemillis and source data. |
Mono<T> |
elementAt(int index)
Emit only the element at the given index position or
IndexOutOfBoundsException
if the sequence is shorter. |
Mono<T> |
elementAt(int index,
T defaultValue)
Emit only the element at the given index position or fall back to a
default value if the sequence is shorter.
|
static <T> Flux<T> |
empty()
Create a
Flux that completes without emitting any item. |
static <T> Flux<T> |
error(Supplier<? extends Throwable> errorSupplier)
Create a
Flux that terminates with an error immediately after being
subscribed to. |
static <T> Flux<T> |
error(Throwable error)
Create a
Flux that terminates with the specified error immediately after
being subscribed to. |
static <O> Flux<O> |
error(Throwable throwable,
boolean whenRequested)
Create a
Flux that terminates with the specified error, either immediately
after being subscribed to or after being first requested. |
Flux<T> |
expand(Function<? super T,? extends Publisher<? extends T>> expander)
Recursively expand elements into a graph and emit all the resulting element using
a breadth-first traversal strategy.
|
Flux<T> |
expand(Function<? super T,? extends Publisher<? extends T>> expander,
int capacityHint)
Recursively expand elements into a graph and emit all the resulting element using
a breadth-first traversal strategy.
|
Flux<T> |
expandDeep(Function<? super T,? extends Publisher<? extends T>> expander)
Recursively expand elements into a graph and emit all the resulting element,
in a depth-first traversal order.
|
Flux<T> |
expandDeep(Function<? super T,? extends Publisher<? extends T>> expander,
int capacityHint)
Recursively expand elements into a graph and emit all the resulting element,
in a depth-first traversal order.
|
Flux<T> |
filter(Predicate<? super T> p)
Evaluate each source value against the given
Predicate . |
Flux<T> |
filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate)
Test each value emitted by this
Flux asynchronously using a generated
Publisher<Boolean> test. |
Flux<T> |
filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate,
int bufferSize)
Test each value emitted by this
Flux asynchronously using a generated
Publisher<Boolean> test. |
static <I> Flux<I> |
first(Iterable<? extends Publisher<? extends I>> sources)
Deprecated.
use
firstWithSignal(Iterable) . To be removed in reactor 3.5. |
static <I> Flux<I> |
first(Publisher<? extends I>... sources)
Deprecated.
use
firstWithSignal(Publisher[]) . To be removed in reactor 3.5. |
static <I> Flux<I> |
firstWithSignal(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
firstWithSignal(Publisher<? extends I>... sources)
|
static <I> Flux<I> |
firstWithValue(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
firstWithValue(Publisher<? extends I> first,
Publisher<? extends I>... others)
|
<R> Flux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
|
<R> Flux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapperOnNext,
Function<? super Throwable,? extends Publisher<? extends R>> mapperOnError,
Supplier<? extends Publisher<? extends R>> mapperOnComplete)
|
<V> Flux<V> |
flatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency)
|
<V> Flux<V> |
flatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency,
int prefetch)
|
<V> Flux<V> |
flatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency,
int prefetch)
|
<R> Flux<R> |
flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
|
<R> Flux<R> |
flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper,
int prefetch)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency,
int prefetch)
|
<R> Flux<R> |
flatMapSequentialDelayError(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency,
int prefetch)
|
static <T> Flux<T> |
from(Publisher<? extends T> source)
|
static <T> Flux<T> |
fromArray(T[] array)
Create a
Flux that emits the items contained in the provided array. |
static <T> Flux<T> |
fromIterable(Iterable<? extends T> it)
|
static <T> Flux<T> |
fromStream(Stream<? extends T> s)
|
static <T> Flux<T> |
fromStream(Supplier<Stream<? extends T>> streamSupplier)
|
static <T,S> Flux<T> |
generate(Callable<S> stateSupplier,
BiFunction<S,SynchronousSink<T>,S> generator)
Programmatically create a
Flux by generating signals one-by-one via a
consumer callback and some state. |
static <T,S> Flux<T> |
generate(Callable<S> stateSupplier,
BiFunction<S,SynchronousSink<T>,S> generator,
Consumer<? super S> stateConsumer)
Programmatically create a
Flux by generating signals one-by-one via a
consumer callback and some state, with a final cleanup callback. |
static <T> Flux<T> |
generate(Consumer<SynchronousSink<T>> generator)
Programmatically create a
Flux by generating signals one-by-one via a
consumer callback. |
int |
getPrefetch()
The prefetch configuration of the
Flux |
<K> Flux<GroupedFlux<K,T>> |
groupBy(Function<? super T,? extends K> keyMapper)
|
<K,V> Flux<GroupedFlux<K,V>> |
groupBy(Function<? super T,? extends K> keyMapper,
Function<? super T,? extends V> valueMapper)
|
<K,V> Flux<GroupedFlux<K,V>> |
groupBy(Function<? super T,? extends K> keyMapper,
Function<? super T,? extends V> valueMapper,
int prefetch)
|
<K> Flux<GroupedFlux<K,T>> |
groupBy(Function<? super T,? extends K> keyMapper,
int prefetch)
|
<TRight,TLeftEnd,TRightEnd,R> |
groupJoin(Publisher<? extends TRight> other,
Function<? super T,? extends Publisher<TLeftEnd>> leftEnd,
Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd,
BiFunction<? super T,? super Flux<TRight>,? extends R> resultSelector)
Map values from two Publishers into time windows and emit combination of values
in case their windows overlap.
|
<R> Flux<R> |
handle(BiConsumer<? super T,SynchronousSink<R>> handler)
Handle the items emitted by this
Flux by calling a biconsumer with the
output sink for each onNext. |
Mono<Boolean> |
hasElement(T value)
Emit a single boolean true if any of the elements of this
Flux sequence is
equal to the provided value. |
Mono<Boolean> |
hasElements()
Emit a single boolean true if this
Flux sequence has at least one element. |
Flux<T> |
hide()
Hides the identities of this
Flux instance. |
Mono<T> |
ignoreElements()
Ignores onNext signals (dropping them) and only propagate termination events.
|
Flux<Tuple2<Long,T>> |
index()
Keep information about the order in which source values were received by
indexing them with a 0-based incrementing long, returning a
Flux
of Tuple2<(index, value)> . |
<I> Flux<I> |
index(BiFunction<? super Long,? super T,? extends I> indexMapper)
Keep information about the order in which source values were received by
indexing them internally with a 0-based incrementing long then combining this
information with the source value into a
I using the provided BiFunction ,
returning a Flux<I> . |
static Flux<Long> |
interval(Duration period)
Create a
Flux that emits long values starting with 0 and incrementing at
specified time intervals on the global timer. |
static Flux<Long> |
interval(Duration delay,
Duration period)
Create a
Flux that emits long values starting with 0 and incrementing at
specified time intervals, after an initial delay, on the global timer. |
static Flux<Long> |
interval(Duration delay,
Duration period,
Scheduler timer)
|
static Flux<Long> |
interval(Duration period,
Scheduler timer)
|
<TRight,TLeftEnd,TRightEnd,R> |
join(Publisher<? extends TRight> other,
Function<? super T,? extends Publisher<TLeftEnd>> leftEnd,
Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd,
BiFunction<? super T,? super TRight,? extends R> resultSelector)
Combine values from two Publishers in case their windows overlap.
|
static <T> Flux<T> |
just(T... data)
Create a
Flux that emits the provided elements and then completes. |
static <T> Flux<T> |
just(T data)
Create a new
Flux that will only emit a single element then onComplete. |
Mono<T> |
last()
Emit the last element observed before complete signal as a
Mono , or emit
NoSuchElementException error if the source was empty. |
Mono<T> |
last(T defaultValue)
Emit the last element observed before complete signal as a
Mono , or emit
the defaultValue if the source was empty. |
Flux<T> |
limitRate(int prefetchRate)
Ensure that backpressure signals from downstream subscribers are split into batches
capped at the provided
prefetchRate when propagated upstream, effectively
rate limiting the upstream Publisher . |
Flux<T> |
limitRate(int highTide,
int lowTide)
Ensure that backpressure signals from downstream subscribers are split into batches
capped at the provided
highTide first, then replenishing at the provided
lowTide , effectively rate limiting the upstream Publisher . |
Flux<T> |
limitRequest(long n)
Deprecated.
replace with
take(n, true) in 3.4.x, then take(long) in 3.5.0.
To be removed in 3.6.0 at the earliest. See https://github.com/reactor/reactor-core/issues/2339 |
Flux<T> |
log()
Observe all Reactive Streams signals and trace them using
Logger support. |
Flux<T> |
log(Logger logger)
Observe Reactive Streams signals matching the passed filter
options and
trace them using a specific user-provided Logger , at Level.INFO level. |
Flux<T> |
log(Logger logger,
Level level,
boolean showOperatorLine,
SignalType... options)
|
Flux<T> |
log(String category)
Observe all Reactive Streams signals and trace them using
Logger support. |
Flux<T> |
log(String category,
Level level,
boolean showOperatorLine,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and
trace them using Logger support. |
Flux<T> |
log(String category,
Level level,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and
trace them using Logger support. |
<V> Flux<V> |
map(Function<? super T,? extends V> mapper)
Transform the items emitted by this
Flux by applying a synchronous function
to each item. |
<V> Flux<V> |
mapNotNull(Function<? super T,? extends V> mapper)
Transform the items emitted by this
Flux by applying a synchronous function
to each item, which may produce null values. |
Flux<Signal<T>> |
materialize()
Transform incoming onNext, onError and onComplete signals into
Signal instances,
materializing these signals. |
static <I> Flux<I> |
merge(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. |
static <I> Flux<I> |
merge(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
merge(Publisher<? extends I>... sources)
Merge data from
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. |
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source)
|
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source,
int concurrency)
|
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source,
int concurrency,
int prefetch)
|
static <T> Flux<T> |
mergeComparing(Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator ). |
static <T> Flux<T> |
mergeComparing(int prefetch,
Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator ). |
static <I extends Comparable<? super I>> |
mergeComparing(Publisher<? extends I>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by their natural order). |
static <T> Flux<T> |
mergeComparingDelayError(int prefetch,
Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator ). |
Flux<T> |
mergeComparingWith(Publisher<? extends T> other,
Comparator<? super T> otherComparator)
Merge data from this
Flux and a Publisher into a reordered merge
sequence, by picking the smallest value from each sequence as defined by a provided
Comparator . |
static <I> Flux<I> |
mergeDelayError(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. |
static <T> Flux<T> |
mergeOrdered(Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Deprecated.
Use
mergeComparingDelayError(int, Comparator, Publisher[]) instead
(as mergeComparing(Publisher[]) don't have this operator's delayError behavior).
To be removed in 3.6.0 at the earliest. |
static <T> Flux<T> |
mergeOrdered(int prefetch,
Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Deprecated.
Use
mergeComparingDelayError(int, Comparator, Publisher[]) instead
(as mergeComparing(Publisher[]) don't have this operator's delayError behavior).
To be removed in 3.6.0 at the earliest. |
static <I extends Comparable<? super I>> |
mergeOrdered(Publisher<? extends I>... sources)
Deprecated.
Use
mergeComparingDelayError(int, Comparator, Publisher[]) instead
(as mergeComparing(Publisher[]) don't have this operator's delayError behavior).
To be removed in 3.6.0 at the earliest. |
Flux<T> |
mergeOrderedWith(Publisher<? extends T> other,
Comparator<? super T> otherComparator)
Deprecated.
Use
mergeComparingWith(Publisher, Comparator) instead
(with the caveat that it defaults to NOT delaying errors, unlike this operator).
To be removed in 3.6.0 at the earliest. |
static <T> Flux<T> |
mergePriority(Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator ) as they arrive. |
static <T> Flux<T> |
mergePriority(int prefetch,
Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator ) as they arrive. |
static <I extends Comparable<? super I>> |
mergePriority(Publisher<? extends I>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by their natural order) as they arrive. |
static <T> Flux<T> |
mergePriorityDelayError(int prefetch,
Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator ) as they arrive. |
static <I> Flux<I> |
mergeSequential(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences provided in an array/vararg
into an ordered merged sequence. |
static <I> Flux<I> |
mergeSequential(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
mergeSequential(Iterable<? extends Publisher<? extends I>> sources,
int maxConcurrency,
int prefetch)
|
static <I> Flux<I> |
mergeSequential(Publisher<? extends I>... sources)
Merge data from
Publisher sequences provided in an array/vararg
into an ordered merged sequence. |
static <T> Flux<T> |
mergeSequential(Publisher<? extends Publisher<? extends T>> sources)
|
static <T> Flux<T> |
mergeSequential(Publisher<? extends Publisher<? extends T>> sources,
int maxConcurrency,
int prefetch)
|
static <I> Flux<I> |
mergeSequentialDelayError(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences provided in an array/vararg
into an ordered merged sequence. |
static <I> Flux<I> |
mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources,
int maxConcurrency,
int prefetch)
|
static <T> Flux<T> |
mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources,
int maxConcurrency,
int prefetch)
|
Flux<T> |
mergeWith(Publisher<? extends T> other)
|
Flux<T> |
metrics()
Activate metrics for this sequence, provided there is an instrumentation facade
on the classpath (otherwise this method is a pure no-op).
|
Flux<T> |
name(String name)
Give a name to this sequence, which can be retrieved using
Scannable.name()
as long as this is the first reachable Scannable.parents() . |
static <T> Flux<T> |
never()
Create a
Flux that will never signal any data, error or completion signal. |
Mono<T> |
next()
|
<U> Flux<U> |
ofType(Class<U> clazz)
Evaluate each accepted value against the given
Class type. |
protected static <T> ConnectableFlux<T> |
onAssembly(ConnectableFlux<T> source)
To be used by custom operators: invokes assembly
Hooks pointcut given a
ConnectableFlux , potentially returning a new ConnectableFlux . |
protected static <T> Flux<T> |
onAssembly(Flux<T> source)
|
Flux<T> |
onBackpressureBuffer()
Request an unbounded demand and push to the returned
Flux , or park the
observed elements if not enough demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(Duration ttl,
int maxSize,
Consumer<? super T> onBufferEviction)
Request an unbounded demand and push to the returned
Flux , or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit and for a maximum Duration of ttl (as measured on the
parallel Scheduler ). |
Flux<T> |
onBackpressureBuffer(Duration ttl,
int maxSize,
Consumer<? super T> onBufferEviction,
Scheduler scheduler)
|
Flux<T> |
onBackpressureBuffer(int maxSize)
Request an unbounded demand and push to the returned
Flux , or park up to
maxSize elements when not enough demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(int maxSize,
BufferOverflowStrategy bufferOverflowStrategy)
Request an unbounded demand and push to the returned
Flux , or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. |
Flux<T> |
onBackpressureBuffer(int maxSize,
Consumer<? super T> onOverflow)
Request an unbounded demand and push to the returned
Flux , or park up to
maxSize elements when not enough demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(int maxSize,
Consumer<? super T> onBufferOverflow,
BufferOverflowStrategy bufferOverflowStrategy)
Request an unbounded demand and push to the returned
Flux , or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. |
Flux<T> |
onBackpressureDrop()
Request an unbounded demand and push to the returned
Flux , or drop
the observed elements if not enough demand is requested downstream. |
Flux<T> |
onBackpressureDrop(Consumer<? super T> onDropped)
|
Flux<T> |
onBackpressureError()
Request an unbounded demand and push to the returned
Flux , or emit onError
fom Exceptions.failWithOverflow() if not enough demand is requested
downstream. |
Flux<T> |
onBackpressureLatest()
Request an unbounded demand and push to the returned
Flux , or only keep
the most recent observed item if not enough demand is requested downstream. |
Flux<T> |
onErrorComplete()
Simply complete the sequence by replacing an
onError signal
with an onComplete signal . |
Flux<T> |
onErrorComplete(Class<? extends Throwable> type)
Simply complete the sequence by replacing an
onError signal
with an onComplete signal if the error matches the given
Class . |
Flux<T> |
onErrorComplete(Predicate<? super Throwable> predicate)
Simply complete the sequence by replacing an
onError signal
with an onComplete signal if the error matches the given
Predicate . |
Flux<T> |
onErrorContinue(BiConsumer<Throwable,Object> errorConsumer)
Let compatible operators upstream recover from errors by dropping the
incriminating element from the sequence and continuing with subsequent elements.
|
<E extends Throwable> |
onErrorContinue(Class<E> type,
BiConsumer<Throwable,Object> errorConsumer)
Let compatible operators upstream recover from errors by dropping the
incriminating element from the sequence and continuing with subsequent elements.
|
<E extends Throwable> |
onErrorContinue(Predicate<E> errorPredicate,
BiConsumer<Throwable,Object> errorConsumer)
Let compatible operators upstream recover from errors by dropping the
incriminating element from the sequence and continuing with subsequent elements.
|
<E extends Throwable> |
onErrorMap(Class<E> type,
Function<? super E,? extends Throwable> mapper)
Transform an error emitted by this
Flux by synchronously applying a function
to it if the error matches the given type. |
Flux<T> |
onErrorMap(Function<? super Throwable,? extends Throwable> mapper)
Transform any error emitted by this
Flux by synchronously applying a function to it. |
Flux<T> |
onErrorMap(Predicate<? super Throwable> predicate,
Function<? super Throwable,? extends Throwable> mapper)
Transform an error emitted by this
Flux by synchronously applying a function
to it if the error matches the given predicate. |
<E extends Throwable> |
onErrorResume(Class<E> type,
Function<? super E,? extends Publisher<? extends T>> fallback)
Subscribe to a fallback publisher when an error matching the given type
occurs, using a function to choose the fallback depending on the error.
|
Flux<T> |
onErrorResume(Function<? super Throwable,? extends Publisher<? extends T>> fallback)
Subscribe to a returned fallback publisher when any error occurs, using a function to
choose the fallback depending on the error.
|
Flux<T> |
onErrorResume(Predicate<? super Throwable> predicate,
Function<? super Throwable,? extends Publisher<? extends T>> fallback)
Subscribe to a fallback publisher when an error matching a given predicate
occurs.
|
<E extends Throwable> |
onErrorReturn(Class<E> type,
T fallbackValue)
Simply emit a captured fallback value when an error of the specified type is
observed on this
Flux . |
Flux<T> |
onErrorReturn(Predicate<? super Throwable> predicate,
T fallbackValue)
Simply emit a captured fallback value when an error matching the given predicate is
observed on this
Flux . |
Flux<T> |
onErrorReturn(T fallbackValue)
Simply emit a captured fallback value when any error is observed on this
Flux . |
Flux<T> |
onErrorStop()
If an
onErrorContinue(BiConsumer) variant has been used downstream, reverts
to the default 'STOP' mode where errors are terminal events upstream. |
Flux<T> |
onTerminateDetach()
Detaches both the child
Subscriber and the Subscription on
termination or cancellation. |
Flux<T> |
or(Publisher<? extends T> other)
|
ParallelFlux<T> |
parallel()
Prepare this
Flux by dividing data on a number of 'rails' matching the
number of CPU cores, in a round-robin fashion. |
ParallelFlux<T> |
parallel(int parallelism)
Prepare this
Flux by dividing data on a number of 'rails' matching the
provided parallelism parameter, in a round-robin fashion. |
ParallelFlux<T> |
parallel(int parallelism,
int prefetch)
|
ConnectableFlux<T> |
publish()
Prepare a
ConnectableFlux which shares this Flux sequence and
dispatches values to subscribers in a backpressure-aware manner. |
<R> Flux<R> |
publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform)
Shares a sequence for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream.
|
<R> Flux<R> |
publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform,
int prefetch)
Shares a sequence for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream.
|
ConnectableFlux<T> |
publish(int prefetch)
Prepare a
ConnectableFlux which shares this Flux sequence and
dispatches values to subscribers in a backpressure-aware manner. |
Mono<T> |
publishNext()
Deprecated.
use
shareNext() instead, or use `publish().next()` if you need
to `connect() . To be removed in 3.5.0 |
Flux<T> |
publishOn(Scheduler scheduler)
|
Flux<T> |
publishOn(Scheduler scheduler,
boolean delayError,
int prefetch)
Run onNext, onComplete and onError on a supplied
Scheduler
Scheduler.Worker . |
Flux<T> |
publishOn(Scheduler scheduler,
int prefetch)
Run onNext, onComplete and onError on a supplied
Scheduler
Scheduler.Worker . |
static <T> Flux<T> |
push(Consumer<? super FluxSink<T>> emitter)
|
static <T> Flux<T> |
push(Consumer<? super FluxSink<T>> emitter,
FluxSink.OverflowStrategy backpressure)
|
static Flux<Integer> |
range(int start,
int count)
|
<A> Mono<A> |
reduce(A initial,
BiFunction<A,? super T,A> accumulator)
Reduce the values from this
Flux sequence into a single object matching the
type of a seed value. |
Mono<T> |
reduce(BiFunction<T,T,T> aggregator)
Reduce the values from this
Flux sequence into a single object of the same
type than the emitted items. |
<A> Mono<A> |
reduceWith(Supplier<A> initial,
BiFunction<A,? super T,A> accumulator)
Reduce the values from this
Flux sequence into a single object matching the
type of a lazily supplied seed value. |
Flux<T> |
repeat()
Repeatedly and indefinitely subscribe to the source upon completion of the
previous subscription.
|
Flux<T> |
repeat(BooleanSupplier predicate)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.
|
Flux<T> |
repeat(long numRepeat)
Repeatedly subscribe to the source
numRepeat times. |
Flux<T> |
repeat(long numRepeat,
BooleanSupplier predicate)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous
subscription.
|
Flux<T> |
repeatWhen(Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
Repeatedly subscribe to this
Flux when a companion sequence emits elements in
response to the flux completion signal. |
ConnectableFlux<T> |
replay()
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber . |
ConnectableFlux<T> |
replay(Duration ttl)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
ConnectableFlux<T> |
replay(Duration ttl,
Scheduler timer)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
ConnectableFlux<T> |
replay(int history)
Turn this
Flux into a connectable hot source and cache last emitted
signals for further Subscriber . |
ConnectableFlux<T> |
replay(int history,
Duration ttl)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
ConnectableFlux<T> |
replay(int history,
Duration ttl,
Scheduler timer)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
Flux<T> |
retry()
Re-subscribes to this
Flux sequence if it signals any error, indefinitely. |
Flux<T> |
retry(long numRetries)
Re-subscribes to this
Flux sequence if it signals any error, for a fixed
number of times. |
Flux<T> |
retryWhen(Retry retrySpec)
|
Flux<T> |
sample(Duration timespan)
|
<U> Flux<T> |
sample(Publisher<U> sampler)
|
Flux<T> |
sampleFirst(Duration timespan)
Repeatedly take a value from this
Flux then skip the values that follow
within a given duration. |
<U> Flux<T> |
sampleFirst(Function<? super T,? extends Publisher<U>> samplerFactory)
|
<U> Flux<T> |
sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory)
|
<U> Flux<T> |
sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory,
int maxConcurrency)
|
<A> Flux<A> |
scan(A initial,
BiFunction<A,? super T,A> accumulator)
Reduce this
Flux values with an accumulator BiFunction and
also emit the intermediate results of this function. |
Flux<T> |
scan(BiFunction<T,T,T> accumulator)
Reduce this
Flux values with an accumulator BiFunction and
also emit the intermediate results of this function. |
<A> Flux<A> |
scanWith(Supplier<A> initial,
BiFunction<A,? super T,A> accumulator)
Reduce this
Flux values with the help of an accumulator BiFunction
and also emits the intermediate results. |
Flux<T> |
share()
|
Mono<T> |
shareNext()
|
Mono<T> |
single()
Expect and emit a single item from this
Flux source or signal
NoSuchElementException for an empty source, or
IndexOutOfBoundsException for a source with more than one element. |
Mono<T> |
single(T defaultValue)
Expect and emit a single item from this
Flux source and emit a default
value for an empty source, but signal an IndexOutOfBoundsException for a
source with more than one element. |
Mono<T> |
singleOrEmpty()
Expect and emit a single item from this
Flux source, and accept an empty
source but signal an IndexOutOfBoundsException for a source with more than
one element. |
Flux<T> |
skip(Duration timespan)
Skip elements from this
Flux emitted within the specified initial duration. |
Flux<T> |
skip(Duration timespan,
Scheduler timer)
|
Flux<T> |
skip(long skipped)
Skip the specified number of elements from the beginning of this
Flux then
emit the remaining elements. |
Flux<T> |
skipLast(int n)
Skip a specified number of elements at the end of this
Flux sequence. |
Flux<T> |
skipUntil(Predicate<? super T> untilPredicate)
|
Flux<T> |
skipUntilOther(Publisher<?> other)
|
Flux<T> |
skipWhile(Predicate<? super T> skipPredicate)
|
Flux<T> |
sort()
Sort elements from this
Flux by collecting and sorting them in the background
then emitting the sorted sequence once this sequence completes. |
Flux<T> |
sort(Comparator<? super T> sortFunction)
Sort elements from this
Flux using a Comparator function, by
collecting and sorting elements in the background then emitting the sorted sequence
once this sequence completes. |
Flux<T> |
startWith(Iterable<? extends T> iterable)
|
Flux<T> |
startWith(Publisher<? extends T> publisher)
|
Flux<T> |
startWith(T... values)
Prepend the given values before this
Flux sequence. |
Disposable |
subscribe()
Subscribe to this
Flux and request unbounded demand. |
Disposable |
subscribe(Consumer<? super T> consumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer)
Deprecated.
Because users tend to forget to
request the subsciption. If
the behavior is really needed, consider using subscribeWith(Subscriber) . To be removed in 3.5. |
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Context initialContext)
|
abstract void |
subscribe(CoreSubscriber<? super T> actual)
An internal
Publisher.subscribe(Subscriber) that will bypass
Hooks.onLastOperator(Function) pointcut. |
void |
subscribe(Subscriber<? super T> actual) |
Flux<T> |
subscribeOn(Scheduler scheduler)
Run subscribe, onSubscribe and request on a specified
Scheduler 's Scheduler.Worker . |
Flux<T> |
subscribeOn(Scheduler scheduler,
boolean requestOnSeparateThread)
Run subscribe and onSubscribe on a specified
Scheduler 's Scheduler.Worker . |
Flux<T> |
subscriberContext(Context mergeContext)
Deprecated.
Use
contextWrite(ContextView) instead. To be removed in 3.5.0. |
Flux<T> |
subscriberContext(Function<Context,Context> doOnContext)
Deprecated.
Use
contextWrite(Function) instead. To be removed in 3.5.0. |
<E extends Subscriber<? super T>> |
subscribeWith(E subscriber)
|
Flux<T> |
switchIfEmpty(Publisher<? extends T> alternate)
Switch to an alternative
Publisher if this sequence is completed without any data. |
<V> Flux<V> |
switchMap(Function<? super T,Publisher<? extends V>> fn)
|
<V> Flux<V> |
switchMap(Function<? super T,Publisher<? extends V>> fn,
int prefetch)
Deprecated.
to be removed in 3.6.0 at the earliest. In 3.5.0, you should replace
calls with prefetch=0 with calls to switchMap(fn), as the default behavior of the
single-parameter variant will then change to prefetch=0.
|
<V> Flux<V> |
switchOnFirst(BiFunction<Signal<? extends T>,Flux<T>,Publisher<? extends V>> transformer)
Transform the current
Flux once it emits its first element, making a
conditional transformation possible. |
<V> Flux<V> |
switchOnFirst(BiFunction<Signal<? extends T>,Flux<T>,Publisher<? extends V>> transformer,
boolean cancelSourceOnComplete)
Transform the current
Flux once it emits its first element, making a
conditional transformation possible. |
static <T> Flux<T> |
switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers)
|
static <T> Flux<T> |
switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers,
int prefetch)
Deprecated.
to be removed in 3.6.0 at the earliest. In 3.5.0, you should replace
calls with prefetch=0 with calls to switchOnNext(mergedPublishers), as the default
behavior of the single-parameter variant will then change to prefetch=0.
|
Flux<T> |
tag(String key,
String value)
Tag this flux with a key/value pair.
|
Flux<T> |
take(Duration timespan)
|
Flux<T> |
take(Duration timespan,
Scheduler timer)
|
Flux<T> |
take(long n)
Take only the first N values from this
Flux , if available. |
Flux<T> |
take(long n,
boolean limitRequest)
Take only the first N values from this
Flux , if available. |
Flux<T> |
takeLast(int n)
Emit the last N values this
Flux emitted before its completion. |
Flux<T> |
takeUntil(Predicate<? super T> predicate)
|
Flux<T> |
takeUntilOther(Publisher<?> other)
|
Flux<T> |
takeWhile(Predicate<? super T> continuePredicate)
Relay values from this
Flux while a predicate returns TRUE
for the values (checked before each value is delivered). |
Mono<Void> |
then()
Return a
Mono<Void> that completes when this Flux completes. |
<V> Mono<V> |
then(Mono<V> other)
|
Mono<Void> |
thenEmpty(Publisher<Void> other)
Return a
Mono<Void> that waits for this Flux to complete then
for a supplied Publisher<Void> to also complete. |
<V> Flux<V> |
thenMany(Publisher<V> other)
|
Flux<Timed<T>> |
timed()
Times
Subscriber.onNext(Object) events, encapsulated into a Timed object
that lets downstream consumer look at various time information gathered with nanosecond
resolution using the default clock (Schedulers.parallel() ):
Timed.elapsed() : the time in nanoseconds since last event, as a Duration . |
Flux<Timed<T>> |
timed(Scheduler clock)
Times
Subscriber.onNext(Object) events, encapsulated into a Timed object
that lets downstream consumer look at various time information gathered with nanosecond
resolution using the provided Scheduler as a clock:
Timed.elapsed() : the time in nanoseconds since last event, as a Duration . |
Flux<T> |
timeout(Duration timeout)
Propagate a
TimeoutException as soon as no item is emitted within the
given Duration from the previous emission (or the subscription for the first item). |
Flux<T> |
timeout(Duration timeout,
Publisher<? extends T> fallback)
|
Flux<T> |
timeout(Duration timeout,
Publisher<? extends T> fallback,
Scheduler timer)
|
Flux<T> |
timeout(Duration timeout,
Scheduler timer)
Propagate a
TimeoutException as soon as no item is emitted within the
given Duration from the previous emission (or the subscription for the first
item), as measured by the specified Scheduler . |
<U> Flux<T> |
timeout(Publisher<U> firstTimeout)
Signal a
TimeoutException in case the first item from this Flux has
not been emitted before the given Publisher emits. |
<U,V> Flux<T> |
timeout(Publisher<U> firstTimeout,
Function<? super T,? extends Publisher<V>> nextTimeoutFactory)
Signal a
TimeoutException in case the first item from this Flux has
not been emitted before the firstTimeout Publisher emits, and whenever
each subsequent elements is not emitted before a Publisher generated from
the latest element signals. |
<U,V> Flux<T> |
timeout(Publisher<U> firstTimeout,
Function<? super T,? extends Publisher<V>> nextTimeoutFactory,
Publisher<? extends T> fallback)
|
Flux<Tuple2<Long,T>> |
timestamp()
|
Flux<Tuple2<Long,T>> |
timestamp(Scheduler scheduler)
|
Iterable<T> |
toIterable()
|
Iterable<T> |
toIterable(int batchSize)
|
Iterable<T> |
toIterable(int batchSize,
Supplier<Queue<T>> queueProvider)
|
Stream<T> |
toStream()
|
Stream<T> |
toStream(int batchSize)
|
String |
toString() |
<V> Flux<V> |
transform(Function<? super Flux<T>,? extends Publisher<V>> transformer)
|
<V> Flux<V> |
transformDeferred(Function<? super Flux<T>,? extends Publisher<V>> transformer)
|
<V> Flux<V> |
transformDeferredContextual(BiFunction<? super Flux<T>,? super ContextView,? extends Publisher<V>> transformer)
|
static <T,D> Flux<T> |
using(Callable<? extends D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a
Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or
the Subscriber cancels.
|
static <T,D> Flux<T> |
using(Callable<? extends D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup,
boolean eager)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a
Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or
the Subscriber cancels.
|
static <T,D> Flux<T> |
usingWhen(Publisher<D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> resourceClosure,
Function<? super D,? extends Publisher<?>> asyncCleanup)
Uses a resource, generated by a
Publisher for each individual Subscriber ,
while streaming the values from a Publisher derived from the same resource. |
static <T,D> Flux<T> |
usingWhen(Publisher<D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> resourceClosure,
Function<? super D,? extends Publisher<?>> asyncComplete,
BiFunction<? super D,? super Throwable,? extends Publisher<?>> asyncError,
Function<? super D,? extends Publisher<?>> asyncCancel)
Uses a resource, generated by a
Publisher for each individual Subscriber ,
while streaming the values from a Publisher derived from the same resource. |
Flux<Flux<T>> |
window(Duration windowingTimespan)
|
Flux<Flux<T>> |
window(Duration windowingTimespan,
Duration openWindowEvery)
|
Flux<Flux<T>> |
window(Duration windowingTimespan,
Duration openWindowEvery,
Scheduler timer)
|
Flux<Flux<T>> |
window(Duration windowingTimespan,
Scheduler timer)
|
Flux<Flux<T>> |
window(int maxSize)
|
Flux<Flux<T>> |
window(int maxSize,
int skip)
|
Flux<Flux<T>> |
window(Publisher<?> boundary)
|
Flux<Flux<T>> |
windowTimeout(int maxSize,
Duration maxTime)
|
Flux<Flux<T>> |
windowTimeout(int maxSize,
Duration maxTime,
boolean fairBackpressure)
|
Flux<Flux<T>> |
windowTimeout(int maxSize,
Duration maxTime,
Scheduler timer)
|
Flux<Flux<T>> |
windowTimeout(int maxSize,
Duration maxTime,
Scheduler timer,
boolean fairBackpressure)
|
Flux<Flux<T>> |
windowUntil(Predicate<T> boundaryTrigger)
|
Flux<Flux<T>> |
windowUntil(Predicate<T> boundaryTrigger,
boolean cutBefore)
|
Flux<Flux<T>> |
windowUntil(Predicate<T> boundaryTrigger,
boolean cutBefore,
int prefetch)
|
<V> Flux<Flux<T>> |
windowUntilChanged()
Collect subsequent repetitions of an element (that is, if they arrive right after
one another) into multiple
Flux windows. |
<V> Flux<Flux<T>> |
windowUntilChanged(Function<? super T,? extends V> keySelector,
BiPredicate<? super V,? super V> keyComparator)
Collect subsequent repetitions of an element (that is, if they arrive right after
one another), as compared by a key extracted through the user provided
Function and compared using a supplied BiPredicate , into multiple
Flux windows. |
<V> Flux<Flux<T>> |
windowUntilChanged(Function<? super T,? super V> keySelector)
|
<U,V> Flux<Flux<T>> |
windowWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector)
|
Flux<Flux<T>> |
windowWhile(Predicate<T> inclusionPredicate)
|
Flux<Flux<T>> |
windowWhile(Predicate<T> inclusionPredicate,
int prefetch)
|
<U,R> Flux<R> |
withLatestFrom(Publisher<? extends U> other,
BiFunction<? super T,? super U,? extends R> resultSelector)
Combine the most recently emitted values from both this
Flux and another
Publisher through a BiFunction and emits the result. |
static <I,O> Flux<O> |
zip(Function<? super Object[],? extends O> combinator,
int prefetch,
Publisher<? extends I>... sources)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <I,O> Flux<O> |
zip(Function<? super Object[],? extends O> combinator,
Publisher<? extends I>... sources)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <O> Flux<O> |
zip(Iterable<? extends Publisher<?>> sources,
Function<? super Object[],? extends O> combinator)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <O> Flux<O> |
zip(Iterable<? extends Publisher<?>> sources,
int prefetch,
Function<? super Object[],? extends O> combinator)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <TUPLE extends Tuple2,V> |
zip(Publisher<? extends Publisher<?>> sources,
Function<? super TUPLE,? extends V> combinator)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <T1,T2> Flux<Tuple2<T1,T2>> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2)
Zip two sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple2 . |
static <T1,T2,O> Flux<O> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
BiFunction<? super T1,? super T2,? extends O> combinator)
Zip two sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <T1,T2,T3> Flux<Tuple3<T1,T2,T3>> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3)
Zip three sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple3 . |
static <T1,T2,T3,T4> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4)
Zip four sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple4 . |
static <T1,T2,T3,T4,T5> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5)
Zip five sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple5 . |
static <T1,T2,T3,T4,T5,T6> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6)
Zip six sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple6 . |
static <T1,T2,T3,T4,T5,T6,T7> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6,
Publisher<? extends T7> source7)
Zip seven sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple7 . |
static <T1,T2,T3,T4,T5,T6,T7,T8> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6,
Publisher<? extends T7> source7,
Publisher<? extends T8> source8)
Zip eight sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple8 . |
<T2> Flux<Tuple2<T,T2>> |
zipWith(Publisher<? extends T2> source2)
|
<T2,V> Flux<V> |
zipWith(Publisher<? extends T2> source2,
BiFunction<? super T,? super T2,? extends V> combinator)
Zip this
Flux with another Publisher source, that is to say wait
for both to emit one element and combine these elements using a combinator
BiFunction
The operator will continue doing so until any of the sources completes. |
<T2> Flux<Tuple2<T,T2>> |
zipWith(Publisher<? extends T2> source2,
int prefetch)
|
<T2,V> Flux<V> |
zipWith(Publisher<? extends T2> source2,
int prefetch,
BiFunction<? super T,? super T2,? extends V> combinator)
Zip this
Flux with another Publisher source, that is to say wait
for both to emit one element and combine these elements using a combinator
BiFunction
The operator will continue doing so until any of the sources completes. |
<T2> Flux<Tuple2<T,T2>> |
zipWithIterable(Iterable<? extends T2> iterable)
|
<T2,V> Flux<V> |
zipWithIterable(Iterable<? extends T2> iterable,
BiFunction<? super T,? super T2,? extends V> zipper)
Zip elements from this
Flux with the content of an Iterable , that is
to say combine one element from each, pairwise, using the given zipper BiFunction . |
@SafeVarargs public static <T,V> Flux<V> combineLatest(Function<Object[],V> combinator, Publisher<? extends T>... sources)
Flux
whose data are generated by the combination of the
most recently published value from each of the Publisher
sources.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T
- type of the value from sourcesV
- The produced output after transformation by the given combinatorsources
- The Publisher
sources to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinations@SafeVarargs public static <T,V> Flux<V> combineLatest(Function<Object[],V> combinator, int prefetch, Publisher<? extends T>... sources)
Flux
whose data are generated by the combination of the
most recently published value from each of the Publisher
sources.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T
- type of the value from sourcesV
- The produced output after transformation by the given combinatorsources
- The Publisher
sources to combine values fromprefetch
- The demand sent to each combined source Publisher
combinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of two Publisher
sources.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T1
- type of the value from source1T2
- type of the value from source2V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,T3,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of three Publisher
sources.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromsource3
- The third Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,T3,T4,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of four Publisher
sources.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromsource3
- The third Publisher
source to combine values fromsource4
- The fourth Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,T3,T4,T5,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of five Publisher
sources.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromsource3
- The third Publisher
source to combine values fromsource4
- The fourth Publisher
source to combine values fromsource5
- The fifth Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,T3,T4,T5,T6,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of six Publisher
sources.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromsource3
- The third Publisher
source to combine values fromsource4
- The fourth Publisher
source to combine values fromsource5
- The fifth Publisher
source to combine values fromsource6
- The sixth Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T,V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each
of the Publisher
sources provided in an Iterable
.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T
- The common base type of the values from sourcesV
- The produced output after transformation by the given combinatorsources
- The list of Publisher
sources to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T,V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, int prefetch, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each
of the Publisher
sources provided in an Iterable
.
Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
T
- The common base type of the values from sourcesV
- The produced output after transformation by the given combinatorsources
- The list of Publisher
sources to combine values fromprefetch
- demand produced to each combined source Publisher
combinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T> Flux<T> concat(Iterable<? extends Publisher<? extends T>> sources)
Iterable
, forwarding elements
emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
@SafeVarargs public final Flux<T> concatWithValues(T... values)
Flux
values
- The values to concatenateFlux
concatenating all source sequencespublic static <T> Flux<T> concat(Publisher<? extends Publisher<? extends T>> sources)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
public static <T> Flux<T> concat(Publisher<? extends Publisher<? extends T>> sources, int prefetch)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
@SafeVarargs public static <T> Flux<T> concat(Publisher<? extends T>... sources)
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, int prefetch)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
T
- The type of values in both source and output sequencessources
- The Publisher
of Publisher
to concatenateprefetch
- number of elements to prefetch from the source, to be turned into inner PublishersFlux
concatenating all inner sources sequences until complete or errorpublic static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, boolean delayUntilEnd, int prefetch)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes.
Errors do not interrupt the main sequence but are propagated after the current
concat backlog if delayUntilEnd
is false or after all sources
have had a chance to be concatenated if delayUntilEnd
is true.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
T
- The type of values in both source and output sequencessources
- The Publisher
of Publisher
to concatenatedelayUntilEnd
- delay error until all sources have been consumed instead of
after the current sourceprefetch
- the number of Publishers to prefetch from the outer Publisher
Flux
concatenating all inner sources sequences until complete or error@SafeVarargs public static <T> Flux<T> concatDelayError(Publisher<? extends T>... sources)
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)
Flux
with the capability of emitting multiple
elements in a synchronous or asynchronous manner through the FluxSink
API.
This includes emitting elements from multiple threads.
This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>create(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
});
Discard Support: The FluxSink
exposed by this operator buffers in case of
overflow. The buffer is discarded when the main sequence is cancelled.
T
- The type of values in the sequenceemitter
- Consume the FluxSink
provided per-subscriber by Reactor to generate signals.Flux
push(Consumer)
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure)
Flux
with the capability of emitting multiple
elements in a synchronous or asynchronous manner through the FluxSink
API.
This includes emitting elements from multiple threads.
This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>create(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
}, FluxSink.OverflowStrategy.LATEST);
Discard Support: The FluxSink
exposed by this operator discards elements
as relevant to the chosen FluxSink.OverflowStrategy
. For example, the FluxSink.OverflowStrategy.DROP
discards each items as they are being dropped, while FluxSink.OverflowStrategy.BUFFER
will discard the buffer upon cancellation.
T
- The type of values in the sequencebackpressure
- the backpressure mode, see FluxSink.OverflowStrategy
for the
available backpressure modesemitter
- Consume the FluxSink
provided per-subscriber by Reactor to generate signals.Flux
push(Consumer, reactor.core.publisher.FluxSink.OverflowStrategy)
public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter)
Flux
with the capability of emitting multiple
elements from a single-threaded producer through the FluxSink
API. For
a multi-threaded capable alternative, see create(Consumer)
.
This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>push(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
});
Discard Support: The FluxSink
exposed by this operator buffers in case of
overflow. The buffer is discarded when the main sequence is cancelled.
T
- The type of values in the sequenceemitter
- Consume the FluxSink
provided per-subscriber by Reactor to generate signals.Flux
create(Consumer)
public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure)
Flux
with the capability of emitting multiple
elements from a single-threaded producer through the FluxSink
API. For
a multi-threaded capable alternative, see create(Consumer, reactor.core.publisher.FluxSink.OverflowStrategy)
.
This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>push(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
}, FluxSink.OverflowStrategy.LATEST);
Discard Support: The FluxSink
exposed by this operator discards elements
as relevant to the chosen FluxSink.OverflowStrategy
. For example, the FluxSink.OverflowStrategy.DROP
discards each items as they are being dropped, while FluxSink.OverflowStrategy.BUFFER
will discard the buffer upon cancellation.
T
- The type of values in the sequencebackpressure
- the backpressure mode, see FluxSink.OverflowStrategy
for the
available backpressure modesemitter
- Consume the FluxSink
provided per-subscriber by Reactor to generate signals.Flux
create(Consumer, reactor.core.publisher.FluxSink.OverflowStrategy)
public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier)
Publisher
every time a Subscription
is made on the
resulting Flux
, so the actual source instantiation is deferred until each
subscribe and the Supplier
can create a subscriber-specific instance.
If the supplier doesn't generate a new instance however, this operator will
effectively behave like from(Publisher)
.
T
- the type of values passing through the Flux
supplier
- the Publisher
Supplier
to call on subscribeFlux
deferContextual(Function)
@Deprecated public static <T> Flux<T> deferWithContext(Function<Context,? extends Publisher<T>> contextualPublisherFactory)
deferContextual(Function)
Publisher
every time a Subscription
is made on the
resulting Flux
, so the actual source instantiation is deferred until each
subscribe and the Function
can create a subscriber-specific instance.
This operator behaves the same way as defer(Supplier)
,
but accepts a Function
that will receive the current Context
as an argument.
If the supplier doesn't generate a new instance however, this operator will
effectively behave like from(Publisher)
.
public static <T> Flux<T> deferContextual(Function<ContextView,? extends Publisher<T>> contextualPublisherFactory)
Publisher
every time a Subscription
is made on the
resulting Flux
, so the actual source instantiation is deferred until each
subscribe and the Function
can create a subscriber-specific instance.
This operator behaves the same way as defer(Supplier)
,
but accepts a Function
that will receive the current ContextView
as an argument.
If the function doesn't generate a new instance however, this operator will
effectively behave like from(Publisher)
.
public static <T> Flux<T> empty()
Flux
that completes without emitting any item.
T
- the reified type of the target Subscriber
Flux
public static <T> Flux<T> error(Throwable error)
Flux
that terminates with the specified error immediately after
being subscribed to.
T
- the reified type of the target Subscriber
error
- the error to signal to each Subscriber
Flux
public static <T> Flux<T> error(Supplier<? extends Throwable> errorSupplier)
Flux
that terminates with an error immediately after being
subscribed to. The Throwable
is generated by a Supplier
, invoked
each time there is a subscription and allowing for lazy instantiation.
T
- the reified type of the target Subscriber
errorSupplier
- the error signal Supplier
to invoke for each Subscriber
Flux
public static <O> Flux<O> error(Throwable throwable, boolean whenRequested)
Flux
that terminates with the specified error, either immediately
after being subscribed to or after being first requested.
O
- the reified type of the target Subscriber
throwable
- the error to signal to each Subscriber
whenRequested
- if true, will onError on the first request instead of subscribe().Flux
@SafeVarargs @Deprecated public static <I> Flux<I> first(Publisher<? extends I>... sources)
firstWithSignal(Publisher[])
. To be removed in reactor 3.5.Publisher
to emit any signal (onNext/onError/onComplete) and
replay all signals from that Publisher
, effectively behaving like the
fastest of these competing sources.
I
- The type of values in both source and output sequencessources
- The competing source publishersFlux
behaving like the fastest of its sources@Deprecated public static <I> Flux<I> first(Iterable<? extends Publisher<? extends I>> sources)
firstWithSignal(Iterable)
. To be removed in reactor 3.5.Publisher
to emit any signal (onNext/onError/onComplete) and
replay all signals from that Publisher
, effectively behaving like the
fastest of these competing sources.
I
- The type of values in both source and output sequencessources
- The competing source publishersFlux
behaving like the fastest of its sources@SafeVarargs public static <I> Flux<I> firstWithSignal(Publisher<? extends I>... sources)
Publisher
to emit any signal (onNext/onError/onComplete) and
replay all signals from that Publisher
, effectively behaving like the
fastest of these competing sources.
I
- The type of values in both source and output sequencessources
- The competing source publishersFlux
behaving like the fastest of its sourcespublic static <I> Flux<I> firstWithSignal(Iterable<? extends Publisher<? extends I>> sources)
Publisher
to emit any signal (onNext/onError/onComplete) and
replay all signals from that Publisher
, effectively behaving like the
fastest of these competing sources.
I
- The type of values in both source and output sequencessources
- The competing source publishersFlux
behaving like the fastest of its sourcespublic static <I> Flux<I> firstWithValue(Iterable<? extends Publisher<? extends I>> sources)
Publisher
to emit any value and replay all values
from that Publisher
, effectively behaving like the source that first
emits an onNext
.
Sources with values always "win" over empty sources (ones that only emit onComplete) or failing sources (ones that only emit onError).
When no source can provide a value, this operator fails with a NoSuchElementException
(provided there are at least two sources). This exception has a composite
as its cause
that can be used to inspect what went wrong with each source
(so the composite has as many elements as there are sources).
Exceptions from failing sources are directly reflected in the composite at the index of the failing source.
For empty sources, a NoSuchElementException
is added at their respective index.
One can use Exceptions.unwrapMultiple(topLevel.getCause())
to easily inspect these errors as a List
.
Note that like in firstWithSignal(Iterable)
, an infinite source can be problematic
if no other source emits onNext.
@SafeVarargs public static <I> Flux<I> firstWithValue(Publisher<? extends I> first, Publisher<? extends I>... others)
Publisher
to emit any value and replay all values
from that Publisher
, effectively behaving like the source that first
emits an onNext
.
Sources with values always "win" over an empty source (ones that only emit onComplete) or failing sources (ones that only emit onError).
When no source can provide a value, this operator fails with a NoSuchElementException
(provided there are at least two sources). This exception has a composite
as its cause
that can be used to inspect what went wrong with each source
(so the composite has as many elements as there are sources).
Exceptions from failing sources are directly reflected in the composite at the index of the failing source.
For empty sources, a NoSuchElementException
is added at their respective index.
One can use Exceptions.unwrapMultiple(topLevel.getCause())
to easily inspect these errors as a List
.
Note that like in firstWithSignal(Publisher[])
, an infinite source can be problematic
if no other source emits onNext.
In case the first
source is already an array-based firstWithValue(Publisher, Publisher[])
instance, nesting is avoided: a single new array-based instance is created with all the
sources from first
plus all the others
sources at the same level.
I
- The type of values in both source and output sequencesfirst
- The first competing source publisherothers
- The other competing source publishersFlux
behaving like the fastest of its sourcespublic static <T> Flux<T> from(Publisher<? extends T> source)
Publisher
with the Flux
API.
Hooks.onEachOperator(String, Function)
and similar assembly hooks are applied
unless the source is already a Flux
.
T
- The type of values in both source and output sequencessource
- the source to decorateFlux
public static <T> Flux<T> fromArray(T[] array)
Flux
that emits the items contained in the provided array.
T
- The type of values in the source array and resulting Fluxarray
- the array to read data fromFlux
public static <T> Flux<T> fromIterable(Iterable<? extends T> it)
Flux
that emits the items contained in the provided Iterable
.
The Iterable.iterator()
method will be invoked at least once and at most twice
for each subscriber.
This operator inspects the Iterable
's Spliterator
to assess if the iteration
can be guaranteed to be finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)
).
Since the default Spliterator wraps the Iterator we can have two Iterable.iterator()
calls. This second invocation is skipped on a Collection
however, a type which is
assumed to be always finite.
Discard Support: Upon cancellation, this operator attempts to discard the remainder of the
Iterable
if it can safely ensure the iterator is finite.
Note that this means the Iterable.iterator()
method could be invoked twice.
public static <T> Flux<T> fromStream(Stream<? extends T> s)
Flux
that emits the items contained in the provided Stream
.
Keep in mind that a Stream
cannot be re-used, which can be problematic in
case of multiple subscriptions or re-subscription (like with repeat()
or
retry()
). The Stream
is closed
automatically
by the operator on cancellation, error or completion.
Discard Support: Upon cancellation, this operator attempts to discard remainder of the
Stream
through its open Spliterator
, if it can safely ensure it is finite
(see Operators.onDiscardMultiple(Iterator, boolean, Context)
).
public static <T> Flux<T> fromStream(Supplier<Stream<? extends T>> streamSupplier)
Flux
that emits the items contained in a Stream
created by
the provided Supplier
for each subscription. The Stream
is
closed
automatically by the operator on cancellation, error
or completion.
Discard Support: Upon cancellation, this operator attempts to discard remainder of the
Stream
through its open Spliterator
, if it can safely ensure it is finite
(see Operators.onDiscardMultiple(Iterator, boolean, Context)
).
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
Flux
by generating signals one-by-one via a
consumer callback.
T
- the value type emittedgenerator
- Consume the SynchronousSink
provided per-subscriber by Reactor
to generate a single signal on each pass.Flux
public static <T,S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator)
Flux
by generating signals one-by-one via a
consumer callback and some state. The stateSupplier
may return null.
T
- the value type emittedS
- the per-subscriber custom state typestateSupplier
- called for each incoming Subscriber to provide the initial state for the generator bifunctiongenerator
- Consume the SynchronousSink
provided per-subscriber by Reactor
as well as the current state to generate a single signal on each pass
and return a (new) state.Flux
public static <T,S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator, Consumer<? super S> stateConsumer)
Flux
by generating signals one-by-one via a
consumer callback and some state, with a final cleanup callback. The
stateSupplier
may return null but your cleanup stateConsumer
will need to handle the null case.
T
- the value type emittedS
- the per-subscriber custom state typestateSupplier
- called for each incoming Subscriber to provide the initial state for the generator bifunctiongenerator
- Consume the SynchronousSink
provided per-subscriber by Reactor
as well as the current state to generate a single signal on each pass
and return a (new) state.stateConsumer
- called after the generator has terminated or the downstream cancelled, receiving the last
state to be handled (i.e., release resources or do other cleanup).Flux
public static Flux<Long> interval(Duration period)
Flux
that emits long values starting with 0 and incrementing at
specified time intervals on the global timer. The first element is emitted after
an initial delay equal to the period
. If demand is not produced in time,
an onError will be signalled with an overflow
IllegalStateException
detailing the tick that couldn't be emitted.
In normal conditions, the Flux
will never complete.
Runs on the Schedulers.parallel()
Scheduler.
public static Flux<Long> interval(Duration delay, Duration period)
Flux
that emits long values starting with 0 and incrementing at
specified time intervals, after an initial delay, on the global timer. If demand is
not produced in time, an onError will be signalled with an
overflow
IllegalStateException
detailing the tick that couldn't be emitted. In normal conditions, the Flux
will never complete.
Runs on the Schedulers.parallel()
Scheduler.
public static Flux<Long> interval(Duration period, Scheduler timer)
Flux
that emits long values starting with 0 and incrementing at
specified time intervals, on the specified Scheduler
. The first element is
emitted after an initial delay equal to the period
. If demand is not
produced in time, an onError will be signalled with an overflow
IllegalStateException
detailing the tick that couldn't be emitted.
In normal conditions, the Flux
will never complete.
public static Flux<Long> interval(Duration delay, Duration period, Scheduler timer)
Flux
that emits long values starting with 0 and incrementing at
specified time intervals, after an initial delay, on the specified Scheduler
.
If demand is not produced in time, an onError will be signalled with an
overflow
IllegalStateException
detailing the tick that couldn't be emitted. In normal conditions, the Flux
will never complete.
@SafeVarargs public static <T> Flux<T> just(T... data)
Flux
that emits the provided elements and then completes.
T
- the emitted data typedata
- the elements to emit, as a varargFlux
public static <T> Flux<T> just(T data)
Flux
that will only emit a single element then onComplete.
T
- the emitted data typedata
- the single element to emitFlux
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source)
Publisher
sequences emitted by the passed Publisher
into an interleaved merged sequence. Unlike concat
, inner
sources are subscribed to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency)
Publisher
sequences emitted by the passed Publisher
into an interleaved merged sequence. Unlike concat
, inner
sources are subscribed to eagerly (but at most concurrency
sources are
subscribed to at the same time).
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency, int prefetch)
Publisher
sequences emitted by the passed Publisher
into an interleaved merged sequence. Unlike concat
, inner
sources are subscribed to eagerly (but at most concurrency
sources are
subscribed to at the same time).
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public static <I> Flux<I> merge(Iterable<? extends Publisher<? extends I>> sources)
Publisher
sequences contained in an Iterable
into an interleaved merged sequence. Unlike concat
, inner
sources are subscribed to eagerly.
A new Iterator
will be created for each subscriber.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I> Flux<I> merge(Publisher<? extends I>... sources)
Publisher
sequences contained in an array / vararg
into an interleaved merged sequence. Unlike concat
,
sources are subscribed to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I> Flux<I> merge(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences contained in an array / vararg
into an interleaved merged sequence. Unlike concat
,
sources are subscribed to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I> Flux<I> mergeDelayError(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences contained in an array / vararg
into an interleaved merged sequence. Unlike concat
,
sources are subscribed to eagerly.
This variant will delay any error until after the rest of the merge backlog has been processed.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I extends Comparable<? super I>> Flux<I> mergePriority(Publisher<? extends I>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by their natural order) as they arrive.
This is not a sort()
, as it doesn't consider the whole of each sequences. Unlike mergeComparing,
this operator does not wait for a value from each source to arrive either.
While this operator does retrieve at most one value from each source, it only compares values when two or more sources emit at the same time. In that case it picks the smallest of these competing values and continues doing so as long as there is demand. It is therefore best suited for asynchronous sources where you do not want to wait for a value from each source before emitting a value downstream.
I
- a Comparable
merged type that has a natural order
sources
- Publisher
sources of Comparable
to mergeFlux
that compares the latest available value from each source, publishing the
smallest value and replenishing the source that produced it.@SafeVarargs public static <T> Flux<T> mergePriority(Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
) as they arrive. This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences. Unlike mergeComparing, this operator does not wait for a value from each
source to arrive either.
While this operator does retrieve at most one value from each source, it only compares values when two or more sources emit at the same time. In that case it picks the smallest of these competing values and continues doing so as long as there is demand. It is therefore best suited for asynchronous sources where you do not want to wait for a value from each source before emitting a value downstream.
T
- the merged typecomparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that compares the latest available value from each source, publishing the
smallest value and replenishing the source that produced it.@SafeVarargs public static <T> Flux<T> mergePriority(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
) as they arrive. This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences. Unlike mergeComparing, this operator does not wait for a value from each
source to arrive either.
While this operator does retrieve at most one value from each source, it only compares values when two or more sources emit at the same time. In that case it picks the smallest of these competing values and continues doing so as long as there is demand. It is therefore best suited for asynchronous sources where you do not want to wait for a value from each source before emitting a value downstream.
T
- the merged typeprefetch
- the number of elements to prefetch from each source (avoiding too
many small requests to the source when picking)comparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that compares the latest available value from each source, publishing the
smallest value and replenishing the source that produced it.@SafeVarargs public static <T> Flux<T> mergePriorityDelayError(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
) as they arrive. This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences. Unlike mergeComparing, this operator does not wait for a value from each
source to arrive either.
While this operator does retrieve at most one value from each source, it only compares values when two or more sources emit at the same time. In that case it picks the smallest of these competing values and continues doing so as long as there is demand. It is therefore best suited for asynchronous sources where you do not want to wait for a value from each source before emitting a value downstream.
Note that it is delaying errors until all data is consumed.
T
- the merged typeprefetch
- the number of elements to prefetch from each source (avoiding too
many small requests to the source when picking)comparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that compares the latest available value from each source, publishing the
smallest value and replenishing the source that produced it.@SafeVarargs public static <I extends Comparable<? super I>> Flux<I> mergeComparing(Publisher<? extends I>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by their natural order).
This is not a sort()
, as it doesn't consider the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
I
- a Comparable
merged type that has a natural order
sources
- Publisher
sources of Comparable
to mergeFlux
that , subscribing early but keeping the original ordering@SafeVarargs public static <T> Flux<T> mergeComparing(Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
). This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
T
- the merged typecomparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that compares latest values from each source, using the
smallest value and replenishing the source that produced it@SafeVarargs public static <T> Flux<T> mergeComparing(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
). This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
T
- the merged typeprefetch
- the number of elements to prefetch from each source (avoiding too
many small requests to the source when picking)comparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that compares latest values from each source, using the
smallest value and replenishing the source that produced it@SafeVarargs public static <T> Flux<T> mergeComparingDelayError(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
). This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
Note that it is delaying errors until all data is consumed.
T
- the merged typeprefetch
- the number of elements to prefetch from each source (avoiding too
many small requests to the source when picking)comparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that compares latest values from each source, using the
smallest value and replenishing the source that produced it@SafeVarargs @Deprecated public static <I extends Comparable<? super I>> Flux<I> mergeOrdered(Publisher<? extends I>... sources)
mergeComparingDelayError(int, Comparator, Publisher[])
instead
(as mergeComparing(Publisher[])
don't have this operator's delayError behavior).
To be removed in 3.6.0 at the earliest.Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by their natural order).
This is not a sort()
, as it doesn't consider the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
Note that it is delaying errors until all data is consumed.
I
- a Comparable
merged type that has a natural order
sources
- Publisher
sources of Comparable
to mergeFlux
that compares latest values from each source, using the
smallest value and replenishing the source that produced it@SafeVarargs @Deprecated public static <T> Flux<T> mergeOrdered(Comparator<? super T> comparator, Publisher<? extends T>... sources)
mergeComparingDelayError(int, Comparator, Publisher[])
instead
(as mergeComparing(Publisher[])
don't have this operator's delayError behavior).
To be removed in 3.6.0 at the earliest.Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
). This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
Note that it is delaying errors until all data is consumed.
T
- the merged typecomparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that compares latest values from each source, using the
smallest value and replenishing the source that produced it@SafeVarargs @Deprecated public static <T> Flux<T> mergeOrdered(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
mergeComparingDelayError(int, Comparator, Publisher[])
instead
(as mergeComparing(Publisher[])
don't have this operator's delayError behavior).
To be removed in 3.6.0 at the earliest.Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
). This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
Note that it is delaying errors until all data is consumed.
T
- the merged typeprefetch
- the number of elements to prefetch from each source (avoiding too
many small requests to the source when picking)comparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that compares latest values from each source, using the
smallest value and replenishing the source that produced itpublic static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources)
public static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch)
Publisher
sequences emitted by the passed Publisher
into an ordered merged sequence. Unlike concat, the inner publishers are subscribed to
eagerly (but at most maxConcurrency
sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
T
- the merged typesources
- a Publisher
of Publisher
sources to mergeprefetch
- the inner source request sizemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogFlux
, subscribing early but keeping the original orderingpublic static <T> Flux<T> mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch)
Publisher
sequences emitted by the passed Publisher
into an ordered merged sequence. Unlike concat, the inner publishers are subscribed to
eagerly (but at most maxConcurrency
sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
This variant will delay any error until after the rest of the mergeSequential backlog has been processed.
T
- the merged typesources
- a Publisher
of Publisher
sources to mergeprefetch
- the inner source request sizemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogFlux
, subscribing early but keeping the original ordering@SafeVarargs public static <I> Flux<I> mergeSequential(Publisher<? extends I>... sources)
Publisher
sequences provided in an array/vararg
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.
@SafeVarargs public static <I> Flux<I> mergeSequential(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences provided in an array/vararg
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.
@SafeVarargs public static <I> Flux<I> mergeSequentialDelayError(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences provided in an array/vararg
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.
This variant will delay any error until after the rest of the mergeSequential backlog
has been processed.
public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources)
public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch)
Publisher
sequences provided in an Iterable
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly (but at most maxConcurrency
sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
I
- the merged typesources
- an Iterable
of Publisher
sequences to mergemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogprefetch
- the inner source request sizeFlux
, subscribing early but keeping the original orderingpublic static <I> Flux<I> mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch)
Publisher
sequences provided in an Iterable
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly (but at most maxConcurrency
sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
This variant will delay any error until after the rest of the mergeSequential backlog
has been processed.
I
- the merged typesources
- an Iterable
of Publisher
sequences to mergemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogprefetch
- the inner source request sizeFlux
, subscribing early but keeping the original orderingpublic static <T> Flux<T> never()
Flux
that will never signal any data, error or completion signal.
T
- the Subscriber
type targetFlux
public static Flux<Integer> range(int start, int count)
Flux
that will only emit a sequence of count
incrementing integers,
starting from start
. That is, emit integers between start
(included)
and start + count
(excluded) then complete.
start
- the first integer to be emitcount
- the total number of incrementing values to emit, including the first valueFlux
public static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers)
Flux
that mirrors the most recently emitted Publisher
,
forwarding its data until a new Publisher
comes in the source.
The resulting Flux
will complete once there are no new Publisher
in
the source (source has completed) and the last mirrored Publisher
has also
completed.
T
- the produced typemergedPublishers
- The Publisher
of Publisher
to switch on and mirror.FluxProcessor
accepting publishers and producing T@Deprecated public static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers, int prefetch)
Flux
that mirrors the most recently emitted Publisher
,
forwarding its data until a new Publisher
comes in the source.
The resulting Flux
will complete once there are no new Publisher
in
the source (source has completed) and the last mirrored Publisher
has also
completed.
T
- the produced typemergedPublishers
- The Publisher
of Publisher
to switch on and mirror.prefetch
- the inner source request sizeFluxProcessor
accepting publishers and producing Tpublic static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)
Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal event.
For an asynchronous version of the cleanup, with distinct path for onComplete, onError
and cancel terminations, see usingWhen(Publisher, Function, Function, BiFunction, Function)
.
T
- emitted typeD
- resource typeresourceSupplier
- a Callable
that is called on subscribe to generate the resourcesourceSupplier
- a factory to derive a Publisher
from the supplied resourceresourceCleanup
- a resource cleanup callback invoked on completionFlux
built around a disposable resourceusingWhen(Publisher, Function, Function, BiFunction, Function)
,
usingWhen(Publisher, Function, Function)
public static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager)
For an asynchronous version of the cleanup, with distinct path for onComplete, onError
and cancel terminations, see usingWhen(Publisher, Function, Function, BiFunction, Function)
.
T
- emitted typeD
- resource typeresourceSupplier
- a Callable
that is called on subscribe to generate the resourcesourceSupplier
- a factory to derive a Publisher
from the supplied resourceresourceCleanup
- a resource cleanup callback invoked on completioneager
- true to clean before terminating downstream subscribersFlux
built around a disposable resourceusingWhen(Publisher, Function, Function, BiFunction, Function)
,
usingWhen(Publisher, Function, Function)
public static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> resourceClosure, Function<? super D,? extends Publisher<?>> asyncCleanup)
Publisher
for each individual Subscriber
,
while streaming the values from a Publisher
derived from the same resource.
Whenever the resulting sequence terminates, a provided Function
generates
a "cleanup" Publisher
that is invoked but doesn't change the content of the
main sequence. Instead it just defers the termination (unless it errors, in which case
the error suppresses the original termination signal).
Note that if the resource supplying Publisher
emits more than one resource, the
subsequent resources are dropped (Operators.onNextDropped(Object, Context)
). If
the publisher errors AFTER having emitted one resource, the error is also silently dropped
(Operators.onErrorDropped(Throwable, Context)
).
An empty completion or error without at least one onNext signal triggers a short-circuit
of the main sequence with the same terminal signal (no resource is established, no
cleanup is invoked).
T
- the type of elements emitted by the resource closure, and thus the main sequenceD
- the type of the resource objectresourceSupplier
- a Publisher
that "generates" the resource,
subscribed for each subscription to the main sequenceresourceClosure
- a factory to derive a Publisher
from the supplied resourceasyncCleanup
- an asynchronous resource cleanup invoked when the resource
closure terminates (with onComplete, onError or cancel)Flux
built around a "transactional" resource, with asynchronous
cleanup on all terminations (onComplete, onError, cancel)public static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> resourceClosure, Function<? super D,? extends Publisher<?>> asyncComplete, BiFunction<? super D,? super Throwable,? extends Publisher<?>> asyncError, Function<? super D,? extends Publisher<?>> asyncCancel)
Publisher
for each individual Subscriber
,
while streaming the values from a Publisher
derived from the same resource.
Note that all steps of the operator chain that would need the resource to be in an open
stable state need to be described inside the resourceClosure
Function
.
Whenever the resulting sequence terminates, the relevant Function
generates
a "cleanup" Publisher
that is invoked but doesn't change the content of the
main sequence. Instead it just defers the termination (unless it errors, in which case
the error suppresses the original termination signal).
Individual cleanups can also be associated with main sequence cancellation and error terminations:
Note that if the resource supplying Publisher
emits more than one resource, the
subsequent resources are dropped (Operators.onNextDropped(Object, Context)
). If
the publisher errors AFTER having emitted one resource, the error is also silently dropped
(Operators.onErrorDropped(Throwable, Context)
).
An empty completion or error without at least one onNext signal triggers a short-circuit
of the main sequence with the same terminal signal (no resource is established, no
cleanup is invoked).
Additionally, the terminal signal is replaced by any error that might have happened
in the terminating Publisher
:
Finally, early cancellations will cancel the resource supplying Publisher
:
T
- the type of elements emitted by the resource closure, and thus the main sequenceD
- the type of the resource objectresourceSupplier
- a Publisher
that "generates" the resource,
subscribed for each subscription to the main sequenceresourceClosure
- a factory to derive a Publisher
from the supplied resourceasyncComplete
- an asynchronous resource cleanup invoked if the resource closure terminates with onCompleteasyncError
- an asynchronous resource cleanup invoked if the resource closure terminates with onError.
The terminating error is provided to the BiFunction
asyncCancel
- an asynchronous resource cleanup invoked if the resource closure is cancelled.
When null
, the asyncComplete
path is used instead.Flux
built around a "transactional" resource, with several
termination path triggering asynchronous cleanup sequencesusingWhen(Publisher, Function, Function)
public static <T1,T2,O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends O> combinator)
T1
- type of the value from source1T2
- type of the value from source2O
- The produced output after transformation by the combinatorsource1
- The first Publisher
source to zip.source2
- The second Publisher
source to zip.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamFlux
public static <T1,T2> Flux<Tuple2<T1,T2>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2)
Tuple2
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
public static <T1,T2,T3> Flux<Tuple3<T1,T2,T3>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3)
Tuple3
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4> Flux<Tuple4<T1,T2,T3,T4>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4)
Tuple4
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5> Flux<Tuple5<T1,T2,T3,T4,T5>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5)
Tuple5
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5,T6> Flux<Tuple6<T1,T2,T3,T4,T5,T6>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6)
Tuple6
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5,T6,T7> Flux<Tuple7<T1,T2,T3,T4,T5,T6,T7>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7)
Tuple7
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6T7
- type of the value from source7source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.source7
- The seventh upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5,T6,T7,T8> Flux<Tuple8<T1,T2,T3,T4,T5,T6,T7,T8>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7, Publisher<? extends T8> source8)
Tuple8
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6T7
- type of the value from source7T8
- type of the value from source8source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.source7
- The seventh upstream Publisher
to subscribe to.source8
- The eight upstream Publisher
to subscribe to.Flux
public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources, Function<? super Object[],? extends O> combinator)
Iterable.iterator()
will be called on each Publisher.subscribe(Subscriber)
.
public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources, int prefetch, Function<? super Object[],? extends O> combinator)
Iterable.iterator()
will be called on each Publisher.subscribe(Subscriber)
.
O
- the combined produced typesources
- the Iterable
providing sources to zipprefetch
- the inner source request sizecombinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
@SafeVarargs public static <I,O> Flux<O> zip(Function<? super Object[],? extends O> combinator, Publisher<? extends I>... sources)
I
- the type of the input sourcesO
- the combined produced typecombinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamsources
- the array providing sources to zipFlux
@SafeVarargs public static <I,O> Flux<O> zip(Function<? super Object[],? extends O> combinator, int prefetch, Publisher<? extends I>... sources)
I
- the type of the input sourcesO
- the combined produced typecombinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamprefetch
- individual source request sizesources
- the array providing sources to zipFlux
public static <TUPLE extends Tuple2,V> Flux<V> zip(Publisher<? extends Publisher<?>> sources, Function<? super TUPLE,? extends V> combinator)
Note that the Publisher
sources from the outer Publisher
will
accumulate into an exhaustive list before starting zip operation.
TUPLE
- the raw tuple typeV
- The produced output after transformation by the given combinatorsources
- The Publisher
of Publisher
sources to zip. A finite publisher is required.combinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
based on the produced valuepublic final Mono<Boolean> all(Predicate<? super T> predicate)
Predicate
.
The implementation uses short-circuit logic and completes with false if the predicate doesn't match a value.
public final Mono<Boolean> any(Predicate<? super T> predicate)
Flux
sequence match
the predicate.
The implementation uses short-circuit logic and completes with true if the predicate matches a value.
public final <P> P as(Function<? super Flux<T>,P> transformer)
Flux
into a target type.
flux.as(Mono::from).subscribe()
P
- the returned instance typetransformer
- the Function
to immediately map this Flux
into a target type instance.Flux
transformed to an instance of PtransformDeferred(Function) for a lazy transformation of Flux
@Nullable public final T blockFirst()
Flux
and block indefinitely
until the upstream signals its first value or completes. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException
if it was a checked
exception).
Note that each blockFirst() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
@Nullable public final T blockFirst(Duration timeout)
Flux
and block until the upstream
signals its first value, completes or a timeout expires. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException
if it was a checked
exception). If the provided timeout expires, a RuntimeException
is thrown.
Note that each blockFirst() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
timeout
- maximum time period to wait for before raising a RuntimeException
@Nullable public final T blockLast()
Flux
and block indefinitely
until the upstream signals its last value or completes. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException
if it was a checked
exception).
Note that each blockLast() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
@Nullable public final T blockLast(Duration timeout)
Flux
and block until the upstream
signals its last value, completes or a timeout expires. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException
if it was a checked
exception). If the provided timeout expires, a RuntimeException
is thrown.
Note that each blockLast() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
timeout
- maximum time period to wait for before raising a RuntimeException
public final Flux<List<T>> buffer()
List
buffer that will be emitted
by the returned Flux
once this Flux completes.
Discard Support: This operator discards the buffer upon cancellation or error triggered by a data signal.
Flux
of at most one List
for an alternative collecting algorithm returning {@link Mono}
public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, Supplier<C> bufferSupplier)
Collection
buffers that
will be emitted by the returned Flux
each time the given max size is reached
or once this Flux completes.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal, as well as latest unbuffered element if the bufferSupplier fails.
C
- the Collection
buffer typemaxSize
- the maximum collected sizebufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
public final Flux<List<T>> buffer(int maxSize, int skip)
List
buffers that will be emitted
by the returned Flux
each time the given max size is reached or once this
Flux completes. Buffers can be created with gaps, as a new buffer will be created
every time skip
values have been emitted by the source.
When maxSize < skip : dropping buffers
When maxSize > skip : overlapping buffers
When maxSize == skip : exact buffers
Discard Support: This operator discards elements in between buffers (in the case of dropping buffers). It also discards the currently open buffer upon cancellation or error triggered by a data signal. Note however that overlapping buffer variant DOES NOT discard, as this might result in an element being discarded from an early buffer while it is still valid in a more recent buffer.
public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, int skip, Supplier<C> bufferSupplier)
Collection
buffers that
will be emitted by the returned Flux
each time the given max size is reached
or once this Flux completes. Buffers can be created with gaps, as a new buffer will
be created every time skip
values have been emitted by the source
When maxSize < skip : dropping buffers
When maxSize > skip : overlapping buffers
When maxSize == skip : exact buffers
Discard Support: This operator discards elements in between buffers (in the case of dropping buffers). It also discards the currently open buffer upon cancellation or error triggered by a data signal. Note however that overlapping buffer variant DOES NOT discard, as this might result in an element being discarded from an early buffer while it is still valid in a more recent buffer.
C
- the Collection
buffer typeskip
- the number of items to count before creating a new buffermaxSize
- the max collected sizebufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of possibly overlapped or gapped
Collection
public final <C extends Collection<? super T>> Flux<C> buffer(Publisher<?> other, Supplier<C> bufferSupplier)
Collection
buffers, as
delimited by the signals of a companion Publisher
this operator will
subscribe to.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal, and the last received element when the bufferSupplier fails.
C
- the Collection
buffer typeother
- the companion Publisher
whose signals trigger new buffersbufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
delimited by signals from a Publisher
public final Flux<List<T>> buffer(Duration bufferingTimespan, Duration openBufferEvery)
List
buffers created at a given
openBufferEvery
period. Each buffer will last until the bufferingTimespan
has elapsed,
thus emitting the bucket in the resulting Flux
.
When bufferingTimespan < openBufferEvery : dropping buffers
When bufferingTimespan > openBufferEvery : overlapping buffers
When bufferingTimespan == openBufferEvery : exact buffers
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. It DOES NOT provide strong guarantees in the case of overlapping buffers, as elements might get discarded too early (from the first of two overlapping buffers for instance).
public final Flux<List<T>> buffer(Duration bufferingTimespan, Duration openBufferEvery, Scheduler timer)
List
buffers created at a given
openBufferEvery
period, as measured on the provided Scheduler
. Each
buffer will last until the bufferingTimespan
has elapsed (also measured on the scheduler),
thus emitting the bucket in the resulting Flux
.
When bufferingTimespan < openBufferEvery : dropping buffers
When bufferingTimespan > openBufferEvery : overlapping buffers
When bufferingTimespan == openBufferEvery : exact buffers
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. It DOES NOT provide strong guarantees in the case of overlapping buffers, as elements might get discarded too early (from the first of two overlapping buffers for instance).
bufferingTimespan
- the duration from buffer creation until a buffer is closed and emittedopenBufferEvery
- the interval at which to create a new buffertimer
- a time-capable Scheduler
instance to run onFlux
of List
delimited by the given period openBufferEvery and sized by bufferingTimespanpublic final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier)
Collection
buffers that
will be emitted by the returned Flux
each time the buffer reaches a maximum
size OR the maxTime Duration
elapses.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
C
- the Collection
buffer typemaxSize
- the max collected sizemaxTime
- the timeout enforcing the release of a partial bufferbufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
delimited by given size or a given period timeoutpublic final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer)
List
buffers that will be emitted
by the returned Flux
each time the buffer reaches a maximum size OR the
maxTime Duration
elapses, as measured on the provided Scheduler
.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer, Supplier<C> bufferSupplier)
Collection
buffers that
will be emitted by the returned Flux
each time the buffer reaches a maximum
size OR the maxTime Duration
elapses, as measured on the provided Scheduler
.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
C
- the Collection
buffer typemaxSize
- the max collected sizemaxTime
- the timeout enforcing the release of a partial buffertimer
- a time-capable Scheduler
instance to run onbufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
delimited by given size or a given period timeoutpublic final Flux<List<T>> bufferUntil(Predicate<? super T> predicate)
List
buffers that will be emitted by
the resulting Flux
each time the given predicate returns true. Note that
the element that triggers the predicate to return true (and thus closes a buffer)
is included as last element in the emitted buffer.
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
public final Flux<List<T>> bufferUntil(Predicate<? super T> predicate, boolean cutBefore)
List
buffers that will be emitted by
the resulting Flux
each time the given predicate returns true. Note that
the buffer into which the element that triggers the predicate to return true
(and thus closes a buffer) is included depends on the cutBefore
parameter:
set it to true to include the boundary element in the newly opened buffer, false to
include it in the closed buffer (as in bufferUntil(Predicate)
).
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
public final <V> Flux<List<T>> bufferUntilChanged(Function<? super T,? extends V> keySelector)
public final <V> Flux<List<T>> bufferUntilChanged(Function<? super T,? extends V> keySelector, BiPredicate<? super V,? super V> keyComparator)
Function
and compared using a supplied BiPredicate
, into multiple
List
buffers that will be emitted by the resulting Flux
.
public final Flux<List<T>> bufferWhile(Predicate<? super T> predicate)
List
buffers that will be emitted by
the resulting Flux
. Each buffer continues aggregating values while the
given predicate returns true, and a new buffer is created as soon as the
predicate returns false... Note that the element that triggers the predicate
to return false (and thus closes a buffer) is NOT included in any emitted buffer.
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal, as well as the buffer-triggering element.
public final <U,V> Flux<List<T>> bufferWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector)
List
buffers started each time an opening
companion Publisher
emits. Each buffer will last until the corresponding
closing companion Publisher
emits, thus releasing the buffer to the resulting Flux
.
When Open signal is strictly not overlapping Close signal : dropping buffers (see green marbles in diagram below).
When Open signal is strictly more frequent than Close signal : overlapping buffers (see second and third buffers in diagram below).
When Open signal is exactly coordinated with Close signal : exact buffers
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. It DOES NOT provide strong guarantees in the case of overlapping buffers, as elements might get discarded too early (from the first of two overlapping buffers for instance).
U
- the element type of the buffer-opening sequenceV
- the element type of the buffer-closing sequencebucketOpening
- a companion Publisher
to subscribe for buffer creation signals.closeSelector
- a factory that, given a buffer opening signal, returns a companion
Publisher
to subscribe to for buffer closure and emission signals.Flux
of List
delimited by an opening Publisher
and a relative
closing Publisher
public final <U,V,C extends Collection<? super T>> Flux<C> bufferWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector, Supplier<C> bufferSupplier)
Collection
buffers started each time an opening
companion Publisher
emits. Each buffer will last until the corresponding
closing companion Publisher
emits, thus releasing the buffer to the resulting Flux
.
When Open signal is strictly not overlapping Close signal : dropping buffers (see green marbles in diagram below).
When Open signal is strictly more frequent than Close signal : overlapping buffers (see second and third buffers in diagram below).
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. It DOES NOT provide strong guarantees in the case of overlapping buffers, as elements might get discarded too early (from the first of two overlapping buffers for instance).
U
- the element type of the buffer-opening sequenceV
- the element type of the buffer-closing sequenceC
- the Collection
buffer typebucketOpening
- a companion Publisher
to subscribe for buffer creation signals.closeSelector
- a factory that, given a buffer opening signal, returns a companion
Publisher
to subscribe to for buffer closure and emission signals.bufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
delimited by an opening Publisher
and a relative
closing Publisher
public final Flux<T> cache()
Flux
into a hot source and cache last emitted signals for further Subscriber
. Will
retain an unbounded volume of onNext signals. Completion and Error will also be
replayed.
Flux
public final Flux<T> cache(int history)
Flux
into a hot source and cache last emitted signals for further Subscriber
.
Will retain up to the given history size onNext signals. Completion and Error will also be
replayed.
Note that cache(0)
will only cache the terminal signal without
expiration.
history
- number of elements retained in cacheFlux
public final Flux<T> cache(Duration ttl)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain an unbounded history but apply a per-item expiry timeout
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
ttl
- Time-to-live for each cached item and post termination.Flux
public final Flux<T> cache(Duration ttl, Scheduler timer)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain an unbounded history but apply a per-item expiry timeout
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
public final Flux<T> cache(int history, Duration ttl)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain up to the given history size and apply a per-item expiry
timeout.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
history
- number of elements retained in cachettl
- Time-to-live for each cached item and post termination.Flux
public final Flux<T> cache(int history, Duration ttl, Scheduler timer)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain up to the given history size and apply a per-item expiry
timeout.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
public final <E> Flux<E> cast(Class<E> clazz)
Flux
produced type into a target produced type.
public final Flux<T> checkpoint()
Flux
, in case of an error
upstream of the checkpoint. Tracing incurs the cost of an exception stack trace
creation.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with the traceback.
The traceback is attached to the error as a suppressed exception
.
As such, if the error is a composite one
, the traceback
would appear as a component of the composite. In any case, the traceback nature can be detected via
Exceptions.isTraceback(Throwable)
.
Flux
.public final Flux<T> checkpoint(String description)
Flux
by giving it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. Note that unlike checkpoint()
, this doesn't create a
filled stack trace, avoiding the main cost of the operator.
However, as a trade-off the description must be unique enough for the user to find
out where this Flux was assembled. If you only want a generic description, and
still rely on the stack trace to find the assembly site, use the
checkpoint(String, boolean)
variant.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
The traceback is attached to the error as a suppressed exception
.
As such, if the error is a composite one
, the traceback
would appear as a component of the composite. In any case, the traceback nature can be detected via
Exceptions.isTraceback(Throwable)
.
description
- a unique enough description to include in the light assembly traceback.Flux
public final Flux<T> checkpoint(@Nullable String description, boolean forceStackTrace)
forceStackTrace
option).
By setting the forceStackTrace
parameter to true, activate assembly
tracing for this particular Flux
and give it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. Note that unlike checkpoint(String)
, this will incur
the cost of an exception stack trace creation. The description could for
example be a meaningful name for the assembled flux or a wider correlation ID,
since the stack trace will always provide enough information to locate where this
Flux was assembled.
By setting forceStackTrace
to false, behaves like
checkpoint(String)
and is subject to the same caveat in choosing the
description.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly marker.
The traceback is attached to the error as a suppressed exception
.
As such, if the error is a composite one
, the traceback
would appear as a component of the composite. In any case, the traceback nature can be detected via
Exceptions.isTraceback(Throwable)
.
description
- a description (must be unique enough if forceStackTrace is set
to false).forceStackTrace
- false to make a light checkpoint without a stacktrace, true
to use a stack trace.Flux
.public final <E> Mono<E> collect(Supplier<E> containerSupplier, BiConsumer<E,? super T> collector)
Flux
into a user-defined container,
by applying a collector BiConsumer
taking the container and each element.
The collected result will be emitted when this sequence completes, emitting the
empty container if the sequence was empty.
Discard Support: This operator discards the container upon cancellation or error triggered by a data signal.
Either the container type is a Collection
(in which case individual elements are discarded)
or not (in which case the entire container is discarded). In case the collector BiConsumer
fails
to accumulate an element, the container is discarded as above and the triggering element is also discarded.
E
- the container typecontainerSupplier
- the supplier of the container instance for each Subscribercollector
- a consumer of both the container instance and the value being currently collectedMono
of the collected container on completepublic final <R,A> Mono<R> collect(Collector<? super T,A,? extends R> collector)
Flux
into a container,
by applying a Java 8 Stream API Collector
The collected result will be emitted when this sequence completes, emitting
the empty container if the sequence was empty.
Discard Support: This operator discards the intermediate container (see Collector.supplier()
) upon
cancellation, error or exception while applying the Collector.finisher()
. Either the container type
is a Collection
(in which case individual elements are discarded) or not (in which case the entire
container is discarded). In case the accumulator BiConsumer
of the collector fails to accumulate
an element into the intermediate container, the container is discarded as above and the triggering element
is also discarded.
public final <K> Mono<Map<K,T>> collectMap(Function<? super T,? extends K> keyExtractor)
Flux
into a hashed Map
that is
emitted by the resulting Mono
when this sequence completes, emitting the
empty Map
if the sequence was empty.
The key is extracted from each element by applying the keyExtractor
Function
. In case several elements map to the same key, the associated value
will be the most recently emitted element.
Discard Support: This operator discards the whole Map
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the map.
public final <K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor)
Flux
into a hashed Map
that is
emitted by the resulting Mono
when this sequence completes, emitting the
empty Map
if the sequence was empty.
The key is extracted from each element by applying the keyExtractor
Function
, and the value is extracted by the valueExtractor
Function.
In case several elements map to the same key, the associated value will be derived
from the most recently emitted element.
Discard Support: This operator discards the whole Map
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the map.
K
- the type of the key extracted from each source elementV
- the type of the value extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
valueExtractor
- a Function
to map elements to a value for the Map
Mono
of a Map
of key-element pairs (only including latest
element's value in case of key conflicts)public final <K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor, Supplier<Map<K,V>> mapSupplier)
Flux
into a user-defined Map
that is
emitted by the resulting Mono
when this sequence completes, emitting the
empty Map
if the sequence was empty.
The key is extracted from each element by applying the keyExtractor
Function
, and the value is extracted by the valueExtractor
Function.
In case several elements map to the same key, the associated value will be derived
from the most recently emitted element.
Discard Support: This operator discards the whole Map
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the map.
K
- the type of the key extracted from each source elementV
- the type of the value extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
valueExtractor
- a Function
to map elements to a value for the Map
mapSupplier
- a Map
factory called for each Subscriber
Mono
of a Map
of key-value pairs (only including latest
element's value in case of key conflicts)public final <K> Mono<Map<K,Collection<T>>> collectMultimap(Function<? super T,? extends K> keyExtractor)
Flux
into a multimap
that is
emitted by the resulting Mono
when this sequence completes, emitting the
empty multimap
if the sequence was empty.
The key is extracted from each element by applying the keyExtractor
Function
, and every element mapping to the same key is stored in the List
associated to said key.
Discard Support: This operator discards the whole Map
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the list values in the map.
public final <K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor)
Flux
into a multimap
that is
emitted by the resulting Mono
when this sequence completes, emitting the
empty multimap
if the sequence was empty.
The key is extracted from each element by applying the keyExtractor
Function
, and every element mapping to the same key is converted by the
valueExtractor
Function to a value stored in the List
associated to
said key.
Discard Support: This operator discards the whole Map
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the list values in the map.
K
- the type of the key extracted from each source elementV
- the type of the value extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
valueExtractor
- a Function
to map elements to a value for the Map
Mono
of a Map
of key-List(values) pairspublic final <K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor, Supplier<Map<K,Collection<V>>> mapSupplier)
Flux
into a user-defined multimap
that is
emitted by the resulting Mono
when this sequence completes, emitting the
empty multimap
if the sequence was empty.
The key is extracted from each element by applying the keyExtractor
Function
, and every element mapping to the same key is converted by the
valueExtractor
Function to a value stored in the Collection
associated to
said key.
Discard Support: This operator discards the whole Map
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the list values in the map.
K
- the type of the key extracted from each source elementV
- the type of the value extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
valueExtractor
- a Function
to map elements to a value for the Map
mapSupplier
- a multimap (Map
of Collection
) factory called
for each Subscriber
Mono
of a Map
of key-Collection(values) pairspublic final Mono<List<T>> collectSortedList()
Flux
until this sequence completes,
and then sort them in natural order into a List
that is emitted by the
resulting Mono
. If the sequence was empty, empty List
will be emitted.
Discard Support: This operator is based on collectList()
, and as such discards the
elements in the List
individually upon cancellation or error triggered by a data signal.
public final Mono<List<T>> collectSortedList(@Nullable Comparator<? super T> comparator)
Flux
until this sequence completes,
and then sort them using a Comparator
into a List
that is emitted
by the resulting Mono
. If the sequence was empty, empty List
will be emitted.
Discard Support: This operator is based on collectList()
, and as such discards the
elements in the List
individually upon cancellation or error triggered by a data signal.
comparator
- a Comparator
to sort the items of this sequencesMono
of a sorted List
of all values from this Flux
public final <V> Flux<V> concatMap(Function<? super T,? extends Publisher<? extends V>> mapper)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors will immediately short circuit current concat backlog.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of VFlux
public final <V> Flux<V> concatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors will immediately short circuit current concat backlog. The prefetch argument allows to give an arbitrary prefetch size to the upstream source.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of Vprefetch
- the number of values to prefetch from upstream source (set it to 0 if you don't want it to prefetch)Flux
public final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors in the individual publishers will be delayed at the end of the whole concat
sequence (possibly getting combined into a composite
)
if several sources error.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of VFlux
public final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors in the individual publishers will be delayed at the end of the whole concat
sequence (possibly getting combined into a composite
)
if several sources error.
The prefetch argument allows to give an arbitrary prefetch size to the upstream source.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of Vprefetch
- the number of values to prefetch from upstream sourceFlux
public final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, boolean delayUntilEnd, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors in the individual publishers will be delayed after the current concat backlog if delayUntilEnd is false or after all sources if delayUntilEnd is true. The prefetch argument allows to give an arbitrary prefetch size to the upstream source.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of VdelayUntilEnd
- delay error until all sources have been consumed instead of
after the current sourceprefetch
- the number of values to prefetch from upstream sourceFlux
public final <R> Flux<R> concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
Flux
into Iterable
, then flatten the elements from those by
concatenating them into a single Flux
. For each iterable, Iterable.iterator()
will be called
at least once and at most twice.
This operator inspects each Iterable
's Spliterator
to assess if the iteration
can be guaranteed to be finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)
).
Since the default Spliterator wraps the Iterator we can have two Iterable.iterator()
calls per iterable. This second invocation is skipped on a Collection
however, a type which is
assumed to be always finite.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
Discard Support: Upon cancellation, this operator discards T
elements it prefetched and, in
some cases, attempts to discard remainder of the currently processed Iterable
(if it can
safely ensure the iterator is finite). Note that this means each Iterable
's Iterable.iterator()
method could be invoked twice.
Error Mode Support: This operator supports resuming on errors
(including when fusion is enabled). Exceptions thrown by the consumer are passed to
the onErrorContinue(BiConsumer)
error consumer (the value consumer
is not invoked, as the source element will be part of the sequence). The onNext
signal is then propagated as normal.
public final <R> Flux<R> concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper, int prefetch)
Flux
into Iterable
, then flatten the emissions from those by
concatenating them into a single Flux
.
The prefetch argument allows to give an arbitrary prefetch size to the upstream source.
For each iterable, Iterable.iterator()
will be called at least once and at most twice.
This operator inspects each Iterable
's Spliterator
to assess if the iteration
can be guaranteed to be finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)
).
Since the default Spliterator wraps the Iterator we can have two Iterable.iterator()
calls per iterable. This second invocation is skipped on a Collection
however, a type which is
assumed to be always finite.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
Discard Support: Upon cancellation, this operator discards T
elements it prefetched and, in
some cases, attempts to discard remainder of the currently processed Iterable
(if it can
safely ensure the iterator is finite). Note that this means each Iterable
's Iterable.iterator()
method could be invoked twice.
Error Mode Support: This operator supports resuming on errors
(including when fusion is enabled). Exceptions thrown by the consumer are passed to
the onErrorContinue(BiConsumer)
error consumer (the value consumer
is not invoked, as the source element will be part of the sequence). The onNext
signal is then propagated as normal.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N Iterable
prefetch
- the number of values to request from the source upon subscription, to be transformed to Iterable
Flux
public final Flux<T> contextWrite(ContextView contextToAppend)
Context
visible from downstream for the benefit of upstream
operators, by making all values from the provided ContextView
visible on top
of pairs from downstream.
A Context
(and its ContextView
) is tied to a given subscription
and is read by querying the downstream Subscriber
. Subscriber
that
don't enrich the context instead access their own downstream's context. As a result,
this operator conceptually enriches a Context
coming from under it in the chain
(downstream, by default an empty one) and makes the new enriched Context
visible to operators above it in the chain.
contextToAppend
- the ContextView
to merge with the downstream Context
,
resulting in a new more complete Context
that will be visible from upstream.Flux
ContextView
public final Flux<T> contextWrite(Function<Context,Context> contextModifier)
Context
visible from downstream for the benefit of upstream
operators, by applying a Function
to the downstream Context
.
The Function
takes a Context
for convenience, allowing to easily
call write APIs
to return a new Context
.
A Context
(and its ContextView
) is tied to a given subscription
and is read by querying the downstream Subscriber
. Subscriber
that
don't enrich the context instead access their own downstream's context. As a result,
this operator conceptually enriches a Context
coming from under it in the chain
(downstream, by default an empty one) and makes the new enriched Context
visible to operators above it in the chain.
public final Mono<Long> count()
Flux
.
The count will be emitted when onComplete is observed.
public final Flux<T> defaultIfEmpty(T defaultV)
defaultV
- the alternate value if this sequence is emptyFlux
public final Flux<T> delayElements(Duration delay)
Flux
elements (Subscriber.onNext(T)
signals)
by a given Duration
. Signals are delayed and continue on the
parallel
default Scheduler, but empty sequences or
immediate error signals are not delayed.
delay
- duration by which to delay each Subscriber.onNext(T)
signalFlux
delaySubscription to introduce a delay at the beginning of the sequence only
public final Flux<T> delayElements(Duration delay, Scheduler timer)
Flux
elements (Subscriber.onNext(T)
signals)
by a given Duration
. Signals are delayed and continue on a user-specified
Scheduler
, but empty sequences or immediate error signals are not delayed.
delay
- period to delay each Subscriber.onNext(T)
signaltimer
- a time-capable Scheduler
instance to delay each signal onFlux
public final Flux<T> delaySequence(Duration delay)
Flux
forward in time by a given Duration
.
Unlike with delayElements(Duration)
, elements are shifted forward in time
as they are emitted, always resulting in the delay between two elements being
the same as in the source (only the first element is visibly delayed from the
previous event, that is the subscription).
Signals are delayed and continue on the parallel
Scheduler
, but empty sequences or immediate error signals are not delayed.
With this operator, a source emitting at 10Hz with a delaySequence Duration
of 1s will still emit at 10Hz, with an initial "hiccup" of 1s.
On the other hand, delayElements(Duration)
would end up emitting
at 1Hz.
This is closer to delaySubscription(Duration)
, except the source
is subscribed to immediately.
Discard Support: This operator discards elements currently being delayed * if the sequence is cancelled during the delay.
public final Flux<T> delaySequence(Duration delay, Scheduler timer)
Flux
forward in time by a given Duration
.
Unlike with delayElements(Duration, Scheduler)
, elements are shifted forward in time
as they are emitted, always resulting in the delay between two elements being
the same as in the source (only the first element is visibly delayed from the
previous event, that is the subscription).
Signals are delayed and continue on a user-specified Scheduler
, but empty
sequences or immediate error signals are not delayed.
With this operator, a source emitting at 10Hz with a delaySequence Duration
of 1s will still emit at 10Hz, with an initial "hiccup" of 1s.
On the other hand, delayElements(Duration, Scheduler)
would end up emitting
at 1Hz.
This is closer to delaySubscription(Duration, Scheduler)
, except the source
is subscribed to immediately.
Discard Support: This operator discards elements currently being delayed if the sequence is cancelled during the delay.
public final Flux<T> delayUntil(Function<? super T,? extends Publisher<?>> triggerProvider)
Flux
and generate a Publisher
from each of this
Flux elements, each acting as a trigger for relaying said element.
That is to say, the resulting Flux
delays each of its emission until the
associated trigger Publisher terminates.
In case of an error either in the source or in a trigger, that error is propagated
immediately downstream.
Note that unlike with the Mono variant
there is
no fusion of subsequent calls.
public final Flux<T> delaySubscription(Duration delay)
subscription
to this Flux
source until the given
period elapses. The delay is introduced through the parallel
default Scheduler.
public final Flux<T> delaySubscription(Duration delay, Scheduler timer)
subscription
to this Flux
source until the given
period elapses, as measured on the user-provided Scheduler
.
public final <U> Flux<T> delaySubscription(Publisher<U> subscriptionDelay)
U
- the other source typesubscriptionDelay
- a companion Publisher
whose onNext/onComplete signal will trigger the subscription
Flux
public final <X> Flux<X> dematerialize()
Flux
emits onNext, onError or onComplete Signal
instances, transforming these materialized
signals into
real signals on the Subscriber
.
The error Signal
will trigger onError and complete Signal
will trigger
onComplete.
X
- the dematerialized typeFlux
materialize()
public final Flux<T> distinct()
Subscriber
, track elements from this Flux
that have been
seen and filter out duplicates.
The values themselves are recorded into a HashSet
for distinct detection.
Use distinct(Object::hashcode)
if you want a more lightweight approach that
doesn't retain all the objects, but is more susceptible to falsely considering two
elements as distinct due to a hashcode collision.
Discard Support: This operator discards elements that don't match the distinct predicate,
but you should use the version with a cleanup if you need discarding of keys
categorized by the operator as "seen". See distinct(Function, Supplier, BiPredicate, Consumer)
.
Flux
only emitting distinct valuespublic final <V> Flux<T> distinct(Function<? super T,? extends V> keySelector)
Subscriber
, track elements from this Flux
that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function
.
Discard Support: This operator discards elements that don't match the distinct predicate,
but you should use the version with a cleanup if you need discarding of keys
categorized by the operator as "seen". See distinct(Function, Supplier, BiPredicate, Consumer)
.
V
- the type of the key extracted from each value in this sequencekeySelector
- function to compute comparison key for each elementFlux
only emitting values with distinct keyspublic final <V,C extends Collection<? super V>> Flux<T> distinct(Function<? super T,? extends V> keySelector, Supplier<C> distinctCollectionSupplier)
Subscriber
, track elements from this Flux
that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function
and by the add method
of the Collection
supplied (typically a Set
).
Discard Support: This operator discards elements that don't match the distinct predicate,
but you should use the version with a cleanup if you need discarding of keys
categorized by the operator as "seen". See distinct(Function, Supplier, BiPredicate, Consumer)
.
V
- the type of the key extracted from each value in this sequenceC
- the type of Collection used for distinct checking of keyskeySelector
- function to compute comparison key for each elementdistinctCollectionSupplier
- supplier of the Collection
used for distinct
check through add
of the key.Flux
only emitting values with distinct keyspublic final <V,C> Flux<T> distinct(Function<? super T,? extends V> keySelector, Supplier<C> distinctStoreSupplier, BiPredicate<C,V> distinctPredicate, Consumer<C> cleanup)
Subscriber
, track elements from this Flux
that have been
seen and filter out duplicates, as compared by applying a BiPredicate
on
an arbitrary user-supplied <C>
store and a key extracted through the user
provided Function
. The BiPredicate should typically add the key to the
arbitrary store for further comparison. A cleanup callback is also invoked on the
store upon termination of the sequence.
Discard Support: This operator discards elements that don't match the distinct predicate,
but you should use the cleanup
as well if you need discarding of keys
categorized by the operator as "seen".
V
- the type of the key extracted from each value in this sequenceC
- the type of store backing the BiPredicate
keySelector
- function to compute comparison key for each elementdistinctStoreSupplier
- supplier of the arbitrary store object used in distinct
checks along the extracted key.distinctPredicate
- the BiPredicate
to apply to the arbitrary store +
extracted key to perform a distinct check. Since nothing is assumed of the store,
this predicate should also add the key to the store as necessary.cleanup
- the cleanup callback to invoke on the store upon termination.Flux
only emitting values with distinct keyspublic final Flux<T> distinctUntilChanged()
The last distinct value seen is retained for further comparison, which is done
on the values themselves using the equals method
.
Use distinctUntilChanged(Object::hashcode)
if you want a more lightweight approach that
doesn't retain all the objects, but is more susceptible to falsely considering two
elements as distinct due to a hashcode collision.
Discard Support: Although this operator discards elements that are considered as "already seen", it is not recommended for cases where discarding is needed as the operator doesn't discard the "key" (in this context, the distinct instance that was last seen).
Flux
with only one occurrence in a row of each element
(yet elements can repeat in the overall sequence)public final <V> Flux<T> distinctUntilChanged(Function<? super T,? extends V> keySelector)
Function
using equality.
Discard Support: This operator discards elements that are considered as "already seen". The keys themselves are not discarded.
V
- the type of the key extracted from each value in this sequencekeySelector
- function to compute comparison key for each elementFlux
with only one occurrence in a row of each element of
the same key (yet element keys can repeat in the overall sequence)public final <V> Flux<T> distinctUntilChanged(Function<? super T,? extends V> keySelector, BiPredicate<? super V,? super V> keyComparator)
Function
and then comparing keys with the supplied BiPredicate
.
Discard Support: This operator discards elements that are considered as "already seen"
(for which the keyComparator
returns true). The keys themselves
are not discarded.
V
- the type of the key extracted from each value in this sequencekeySelector
- function to compute comparison key for each elementkeyComparator
- predicate used to compare keys.Flux
with only one occurrence in a row of each element
of the same key for which the predicate returns true (yet element keys can repeat
in the overall sequence)public final Flux<T> doAfterTerminate(Runnable afterTerminate)
Flux
terminates, either by completing downstream successfully or with an error.
The relevant signal is propagated downstream, then the Runnable
is executed.
afterTerminate
- the callback to call after Subscriber.onComplete()
or Subscriber.onError(java.lang.Throwable)
Flux
public final Flux<T> doOnCancel(Runnable onCancel)
Flux
is cancelled.
The handler is executed first, then the cancel signal is propagated upstream to the source.
onCancel
- the callback to call on Subscription.cancel()
Flux
public final Flux<T> doOnComplete(Runnable onComplete)
Flux
completes successfully.
The Runnable
is executed first, then the onComplete signal is propagated
downstream.
onComplete
- the callback to call on Subscriber.onComplete()
Flux
public final <R> Flux<T> doOnDiscard(Class<R> type, Consumer<? super R> discardHook)
The discardHook
MUST be idempotent and safe to use on any instance of the desired
type.
Calls to this method are additive, and the order of invocation of the discardHook
is the same as the order of declaration (calling .filter(...).doOnDiscard(first).doOnDiscard(second)
will let the filter invoke first
then second
handlers).
Two main categories of discarding operators exist:
type
- the Class
of elements in the upstream chain of operators that
this cleanup hook should take into account.discardHook
- a Consumer
of elements in the upstream chain of operators
that performs the cleanup.Flux
that cleans up matching elements that get discarded upstream of it.public final Flux<T> doOnEach(Consumer<? super Signal<T>> signalConsumer)
Flux
emits an item, fails with an error
or completes successfully. All these events are represented as a Signal
that is passed to the side-effect callback. Note that this is an advanced operator,
typically used for monitoring of a Flux. These Signal
have a Context
associated to them.
The Consumer
is executed first, then the relevant signal is propagated
downstream.
signalConsumer
- the mandatory callback to call on
Subscriber.onNext(Object)
, Subscriber.onError(Throwable)
and
Subscriber.onComplete()
Flux
doOnNext(Consumer)
,
doOnError(Consumer)
,
doOnComplete(Runnable)
,
materialize()
,
Signal
public final Flux<T> doOnError(Consumer<? super Throwable> onError)
Flux
completes with an error.
The Consumer
is executed first, then the onError signal is propagated
downstream.
onError
- the callback to call on Subscriber.onError(java.lang.Throwable)
Flux
public final <E extends Throwable> Flux<T> doOnError(Class<E> exceptionType, Consumer<? super E> onError)
Flux
completes with an error matching the given exception type.
The Consumer
is executed first, then the onError signal is propagated
downstream.
E
- type of the error to handleexceptionType
- the type of exceptions to handleonError
- the error handler for each errorFlux
public final Flux<T> doOnError(Predicate<? super Throwable> predicate, Consumer<? super Throwable> onError)
Flux
completes with an error matching the given exception.
The Consumer
is executed first, then the onError signal is propagated
downstream.
predicate
- the matcher for exceptions to handleonError
- the error handler for each errorFlux