T
- the element type of this Reactive Streams Publisher
public abstract class Flux<T> extends Object implements Publisher<T>
Publisher
with rx operators that emits 0 to N elements, and then completes
(successfully or with an error).
The recommended way to learn about the Flux
API and discover new operators is
through the reference documentation, rather than through this javadoc (as opposed to
learning more about individual operators). See the
"which operator do I need?" appendix.
It is intended to be used in implementations and return types. Input parameters should keep using raw
Publisher
as much as possible.
If it is known that the underlying Publisher
will emit 0 or 1 element, Mono
should be used
instead.
Note that using state in the java.util.function
/ lambdas used within Flux operators
should be avoided, as these may be shared between several Subscribers
.
subscribe(CoreSubscriber)
is an internal extension to
subscribe(Subscriber)
used internally for Context
passing. User
provided Subscriber
may
be passed to this "subscribe" extension but will loose the available
per-subscribe @link Hooks#onLastOperator}.
Mono
Constructor and Description |
---|
Flux() |
Modifier and Type | Method and Description |
---|---|
Mono<Boolean> |
all(Predicate<? super T> predicate)
Emit a single boolean true if all values of this sequence match
the
Predicate . |
Mono<Boolean> |
any(Predicate<? super T> predicate)
Emit a single boolean true if any of the values of this
Flux sequence match
the predicate. |
<P> P |
as(Function<? super Flux<T>,P> transformer)
Transform this
Flux into a target type. |
T |
blockFirst()
Subscribe to this
Flux and block indefinitely
until the upstream signals its first value or completes. |
T |
blockFirst(Duration timeout)
Subscribe to this
Flux and block until the upstream
signals its first value, completes or a timeout expires. |
T |
blockLast()
Subscribe to this
Flux and block indefinitely
until the upstream signals its last value or completes. |
T |
blockLast(Duration timeout)
Subscribe to this
Flux and block until the upstream
signals its last value, completes or a timeout expires. |
Flux<List<T>> |
buffer()
|
Flux<List<T>> |
buffer(Duration bufferingTimespan)
|
Flux<List<T>> |
buffer(Duration bufferingTimespan,
Duration openBufferEvery)
Collect incoming values into multiple
List buffers created at a given
openBufferEvery period. |
Flux<List<T>> |
buffer(Duration bufferingTimespan,
Duration openBufferEvery,
Scheduler timer)
|
Flux<List<T>> |
buffer(Duration bufferingTimespan,
Scheduler timer)
|
Flux<List<T>> |
buffer(int maxSize)
|
Flux<List<T>> |
buffer(int maxSize,
int skip)
|
<C extends Collection<? super T>> |
buffer(int maxSize,
int skip,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the given max size is reached
or once this Flux completes. |
<C extends Collection<? super T>> |
buffer(int maxSize,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the given max size is reached
or once this Flux completes. |
Flux<List<T>> |
buffer(Publisher<?> other)
|
<C extends Collection<? super T>> |
buffer(Publisher<?> other,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers, as
delimited by the signals of a companion Publisher this operator will
subscribe to. |
Flux<List<T>> |
bufferTimeout(int maxSize,
Duration maxTime)
|
Flux<List<T>> |
bufferTimeout(int maxSize,
Duration maxTime,
Scheduler timer)
|
<C extends Collection<? super T>> |
bufferTimeout(int maxSize,
Duration maxTime,
Scheduler timer,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the buffer reaches a maximum
size OR the maxTime Duration elapses, as measured on the provided Scheduler . |
<C extends Collection<? super T>> |
bufferTimeout(int maxSize,
Duration maxTime,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers that
will be emitted by the returned Flux each time the buffer reaches a maximum
size OR the maxTime Duration elapses. |
Flux<List<T>> |
bufferUntil(Predicate<? super T> predicate)
|
Flux<List<T>> |
bufferUntil(Predicate<? super T> predicate,
boolean cutBefore)
|
<U,V> Flux<List<T>> |
bufferWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector)
|
<U,V,C extends Collection<? super T>> |
bufferWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers started each time an opening
companion Publisher emits. |
Flux<List<T>> |
bufferWhile(Predicate<? super T> predicate)
|
Flux<T> |
cache()
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber . |
Flux<T> |
cache(Duration ttl)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cache(Duration ttl,
Scheduler timer)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cache(int history)
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber . |
Flux<T> |
cache(int history,
Duration ttl)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cache(int history,
Duration ttl,
Scheduler timer)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cancelOn(Scheduler scheduler)
|
<E> Flux<E> |
cast(Class<E> clazz)
Cast the current
Flux produced type into a target produced type. |
Flux<T> |
checkpoint()
Activate traceback (full assembly tracing) for this particular
Flux , in case of an error
upstream of the checkpoint. |
Flux<T> |
checkpoint(String description)
Activate traceback (assembly marker) for this particular
Flux by giving it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. |
Flux<T> |
checkpoint(String description,
boolean forceStackTrace)
Activate traceback (full assembly tracing or the lighter assembly marking depending on the
forceStackTrace option). |
<R,A> Mono<R> |
collect(Collector<? super T,A,? extends R> collector)
|
<E> Mono<E> |
collect(Supplier<E> containerSupplier,
BiConsumer<E,? super T> collector)
Collect all elements emitted by this
Flux into a user-defined container,
by applying a collector BiConsumer taking the container and each element. |
Mono<List<T>> |
collectList()
|
<K> Mono<Map<K,T>> |
collectMap(Function<? super T,? extends K> keyExtractor)
|
<K,V> Mono<Map<K,V>> |
collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor)
|
<K,V> Mono<Map<K,V>> |
collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,V>> mapSupplier)
|
<K> Mono<Map<K,Collection<T>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor)
|
<K,V> Mono<Map<K,Collection<V>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor)
|
<K,V> Mono<Map<K,Collection<V>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,Collection<V>>> mapSupplier)
|
Mono<List<T>> |
collectSortedList()
|
Mono<List<T>> |
collectSortedList(Comparator<? super T> comparator)
Collect all elements emitted by this
Flux until this sequence completes,
and then sort them using a Comparator into a List that is emitted
by the resulting Mono . |
static <T,V> Flux<V> |
combineLatest(Function<Object[],V> combinator,
int prefetch,
Publisher<? extends T>... sources)
|
static <T,V> Flux<V> |
combineLatest(Function<Object[],V> combinator,
Publisher<? extends T>... sources)
|
static <T,V> Flux<V> |
combineLatest(Iterable<? extends Publisher<? extends T>> sources,
Function<Object[],V> combinator)
|
static <T,V> Flux<V> |
combineLatest(Iterable<? extends Publisher<? extends T>> sources,
int prefetch,
Function<Object[],V> combinator)
|
static <T1,T2,V> Flux<V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
BiFunction<? super T1,? super T2,? extends V> combinator)
|
static <T1,T2,T3,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Function<Object[],V> combinator)
|
static <T1,T2,T3,T4,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Function<Object[],V> combinator)
|
static <T1,T2,T3,T4,T5,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Function<Object[],V> combinator)
|
static <T1,T2,T3,T4,T5,T6,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6,
Function<Object[],V> combinator)
|
<V> Flux<V> |
compose(Function<? super Flux<T>,? extends Publisher<V>> transformer)
|
static <T> Flux<T> |
concat(Iterable<? extends Publisher<? extends T>> sources)
Concatenate all sources provided in an
Iterable , forwarding elements
emitted by the sources downstream. |
static <T> Flux<T> |
concat(Publisher<? extends Publisher<? extends T>> sources)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concat(Publisher<? extends Publisher<? extends T>> sources,
int prefetch)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concat(Publisher<? extends T>... sources)
Concatenate all sources provided as a vararg, forwarding elements emitted by the
sources downstream.
|
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources,
boolean delayUntilEnd,
int prefetch)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources,
int prefetch)
Concatenate all sources emitted as an onNext signal from a parent
Publisher ,
forwarding elements emitted by the sources downstream. |
static <T> Flux<T> |
concatDelayError(Publisher<? extends T>... sources)
Concatenate all sources provided as a vararg, forwarding elements emitted by the
sources downstream.
|
<V> Flux<V> |
concatMap(Function<? super T,? extends Publisher<? extends V>> mapper)
|
<V> Flux<V> |
concatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int prefetch)
|
<V> Flux<V> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper)
|
<V> Flux<V> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
boolean delayUntilEnd,
int prefetch)
|
<V> Flux<V> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
int prefetch)
|
<R> Flux<R> |
concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
|
<R> Flux<R> |
concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper,
int prefetch)
|
Flux<T> |
concatWith(Publisher<? extends T> other)
|
Flux<T> |
concatWithValues(T... values)
Concatenates the values to the end of the
Flux |
Mono<Long> |
count()
Counts the number of values in this
Flux . |
static <T> Flux<T> |
create(Consumer<? super FluxSink<T>> emitter)
|
static <T> Flux<T> |
create(Consumer<? super FluxSink<T>> emitter,
FluxSink.OverflowStrategy backpressure)
|
Flux<T> |
defaultIfEmpty(T defaultV)
Provide a default unique value if this sequence is completed without any data
|
static <T> Flux<T> |
defer(Supplier<? extends Publisher<T>> supplier)
Lazily supply a
Publisher every time a Subscription is made on the
resulting Flux , so the actual source instantiation is deferred until each
subscribe and the Supplier can create a subscriber-specific instance. |
Flux<T> |
delayElements(Duration delay)
|
Flux<T> |
delayElements(Duration delay,
Scheduler timer)
|
Flux<T> |
delaySequence(Duration delay)
|
Flux<T> |
delaySequence(Duration delay,
Scheduler timer)
|
Flux<T> |
delaySubscription(Duration delay)
Delay the
subscription to this Flux source until the given
period elapses. |
Flux<T> |
delaySubscription(Duration delay,
Scheduler timer)
Delay the
subscription to this Flux source until the given
period elapses, as measured on the user-provided Scheduler . |
<U> Flux<T> |
delaySubscription(Publisher<U> subscriptionDelay)
|
Flux<T> |
delayUntil(Function<? super T,? extends Publisher<?>> triggerProvider)
|
<X> Flux<X> |
dematerialize()
An operator working only if this
Flux emits onNext, onError or onComplete Signal
instances, transforming these materialized signals into
real signals on the Subscriber . |
Flux<T> |
distinct()
For each
Subscriber , track elements from this Flux that have been
seen and filter out duplicates. |
<V> Flux<T> |
distinct(Function<? super T,? extends V> keySelector)
For each
Subscriber , track elements from this Flux that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function . |
<V,C extends Collection<? super V>> |
distinct(Function<? super T,? extends V> keySelector,
Supplier<C> distinctCollectionSupplier)
For each
Subscriber , track elements from this Flux that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function and by the add method
of the Collection supplied (typically a Set ). |
<V,C> Flux<T> |
distinct(Function<? super T,? extends V> keySelector,
Supplier<C> distinctStoreSupplier,
BiPredicate<C,V> distinctPredicate,
Consumer<C> cleanup)
For each
Subscriber , track elements from this Flux that have been
seen and filter out duplicates, as compared by applying a BiPredicate on
an arbitrary user-supplied <C> store and a key extracted through the user
provided Function . |
Flux<T> |
distinctUntilChanged()
Filter out subsequent repetitions of an element (that is, if they arrive right after
one another).
|
<V> Flux<T> |
distinctUntilChanged(Function<? super T,? extends V> keySelector)
Filter out subsequent repetitions of an element (that is, if they arrive right after
one another), as compared by a key extracted through the user provided
Function
using equality. |
<V> Flux<T> |
distinctUntilChanged(Function<? super T,? extends V> keySelector,
BiPredicate<? super V,? super V> keyComparator)
Filter out subsequent repetitions of an element (that is, if they arrive right
after one another), as compared by a key extracted through the user provided
Function and then comparing keys with the supplied BiPredicate . |
Flux<T> |
doAfterTerminate(Runnable afterTerminate)
Add behavior (side-effect) triggered after the
Flux terminates, either by completing downstream successfully or with an error. |
Flux<T> |
doFinally(Consumer<SignalType> onFinally)
Add behavior (side-effect) triggered after the
Flux terminates for any reason,
including cancellation. |
Flux<T> |
doFirst(Runnable onFirst)
Add behavior (side-effect) triggered before the
Flux is
subscribed to, which should be the first event after assembly time. |
Flux<T> |
doOnCancel(Runnable onCancel)
Add behavior (side-effect) triggered when the
Flux is cancelled. |
Flux<T> |
doOnComplete(Runnable onComplete)
Add behavior (side-effect) triggered when the
Flux completes successfully. |
<R> Flux<T> |
doOnDiscard(Class<R> type,
Consumer<? super R> discardHook)
Modify the behavior of the whole chain of operators upstream of this one to
conditionally clean up elements that get discarded by these operators.
|
Flux<T> |
doOnEach(Consumer<? super Signal<T>> signalConsumer)
Add behavior (side-effects) triggered when the
Flux emits an item, fails with an error
or completes successfully. |
<E extends Throwable> |
doOnError(Class<E> exceptionType,
Consumer<? super E> onError)
Add behavior (side-effect) triggered when the
Flux completes with an error matching the given exception type. |
Flux<T> |
doOnError(Consumer<? super Throwable> onError)
Add behavior (side-effect) triggered when the
Flux completes with an error. |
Flux<T> |
doOnError(Predicate<? super Throwable> predicate,
Consumer<? super Throwable> onError)
Add behavior (side-effect) triggered when the
Flux completes with an error matching the given exception. |
Flux<T> |
doOnNext(Consumer<? super T> onNext)
Add behavior (side-effect) triggered when the
Flux emits an item. |
Flux<T> |
doOnRequest(LongConsumer consumer)
Add behavior (side-effect) triggering a
LongConsumer when this Flux
receives any request. |
Flux<T> |
doOnSubscribe(Consumer<? super Subscription> onSubscribe)
Add behavior (side-effect) triggered when the
Flux is done being subscribed,
that is to say when a Subscription has been produced by the Publisher
and passed to the Subscriber.onSubscribe(Subscription) . |
Flux<T> |
doOnTerminate(Runnable onTerminate)
Add behavior (side-effect) triggered when the
Flux terminates, either by
completing successfully or with an error. |
Flux<Tuple2<Long,T>> |
elapsed()
Map this
Flux into Tuple2<Long, T>
of timemillis and source data. |
Flux<Tuple2<Long,T>> |
elapsed(Scheduler scheduler)
Map this
Flux into Tuple2<Long, T>
of timemillis and source data. |
Mono<T> |
elementAt(int index)
Emit only the element at the given index position or
IndexOutOfBoundsException
if the sequence is shorter. |
Mono<T> |
elementAt(int index,
T defaultValue)
Emit only the element at the given index position or fall back to a
default value if the sequence is shorter.
|
static <T> Flux<T> |
empty()
Create a
Flux that completes without emitting any item. |
static <T> Flux<T> |
error(Supplier<Throwable> errorSupplier)
Create a
Flux that terminates with an error immediately after being
subscribed to. |
static <T> Flux<T> |
error(Throwable error)
Create a
Flux that terminates with the specified error immediately after
being subscribed to. |
static <O> Flux<O> |
error(Throwable throwable,
boolean whenRequested)
Create a
Flux that terminates with the specified error, either immediately
after being subscribed to or after being first requested. |
Flux<T> |
expand(Function<? super T,? extends Publisher<? extends T>> expander)
Recursively expand elements into a graph and emit all the resulting element using
a breadth-first traversal strategy.
|
Flux<T> |
expand(Function<? super T,? extends Publisher<? extends T>> expander,
int capacityHint)
Recursively expand elements into a graph and emit all the resulting element using
a breadth-first traversal strategy.
|
Flux<T> |
expandDeep(Function<? super T,? extends Publisher<? extends T>> expander)
Recursively expand elements into a graph and emit all the resulting element,
in a depth-first traversal order.
|
Flux<T> |
expandDeep(Function<? super T,? extends Publisher<? extends T>> expander,
int capacityHint)
Recursively expand elements into a graph and emit all the resulting element,
in a depth-first traversal order.
|
Flux<T> |
filter(Predicate<? super T> p)
Evaluate each source value against the given
Predicate . |
Flux<T> |
filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate)
Test each value emitted by this
Flux asynchronously using a generated
Publisher<Boolean> test. |
Flux<T> |
filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate,
int bufferSize)
Test each value emitted by this
Flux asynchronously using a generated
Publisher<Boolean> test. |
static <I> Flux<I> |
first(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
first(Publisher<? extends I>... sources)
|
<R> Flux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
|
<R> Flux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapperOnNext,
Function<? super Throwable,? extends Publisher<? extends R>> mapperOnError,
Supplier<? extends Publisher<? extends R>> mapperOnComplete)
|
<V> Flux<V> |
flatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency)
|
<V> Flux<V> |
flatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency,
int prefetch)
|
<V> Flux<V> |
flatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency,
int prefetch)
|
<R> Flux<R> |
flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
|
<R> Flux<R> |
flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper,
int prefetch)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency,
int prefetch)
|
<R> Flux<R> |
flatMapSequentialDelayError(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency,
int prefetch)
|
static <T> Flux<T> |
from(Publisher<? extends T> source)
|
static <T> Flux<T> |
fromArray(T[] array)
Create a
Flux that emits the items contained in the provided array. |
static <T> Flux<T> |
fromIterable(Iterable<? extends T> it)
|
static <T> Flux<T> |
fromStream(Stream<? extends T> s)
|
static <T> Flux<T> |
fromStream(Supplier<Stream<? extends T>> streamSupplier)
|
static <T,S> Flux<T> |
generate(Callable<S> stateSupplier,
BiFunction<S,SynchronousSink<T>,S> generator)
Programmatically create a
Flux by generating signals one-by-one via a
consumer callback and some state. |
static <T,S> Flux<T> |
generate(Callable<S> stateSupplier,
BiFunction<S,SynchronousSink<T>,S> generator,
Consumer<? super S> stateConsumer)
Programmatically create a
Flux by generating signals one-by-one via a
consumer callback and some state, with a final cleanup callback. |
static <T> Flux<T> |
generate(Consumer<SynchronousSink<T>> generator)
Programmatically create a
Flux by generating signals one-by-one via a
consumer callback. |
int |
getPrefetch()
The prefetch configuration of the
Flux |
<K> Flux<GroupedFlux<K,T>> |
groupBy(Function<? super T,? extends K> keyMapper)
|
<K,V> Flux<GroupedFlux<K,V>> |
groupBy(Function<? super T,? extends K> keyMapper,
Function<? super T,? extends V> valueMapper)
|
<K,V> Flux<GroupedFlux<K,V>> |
groupBy(Function<? super T,? extends K> keyMapper,
Function<? super T,? extends V> valueMapper,
int prefetch)
|
<K> Flux<GroupedFlux<K,T>> |
groupBy(Function<? super T,? extends K> keyMapper,
int prefetch)
|
<TRight,TLeftEnd,TRightEnd,R> |
groupJoin(Publisher<? extends TRight> other,
Function<? super T,? extends Publisher<TLeftEnd>> leftEnd,
Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd,
BiFunction<? super T,? super Flux<TRight>,? extends R> resultSelector)
Map values from two Publishers into time windows and emit combination of values
in case their windows overlap.
|
<R> Flux<R> |
handle(BiConsumer<? super T,SynchronousSink<R>> handler)
Handle the items emitted by this
Flux by calling a biconsumer with the
output sink for each onNext. |
Mono<Boolean> |
hasElement(T value)
Emit a single boolean true if any of the elements of this
Flux sequence is
equal to the provided value. |
Mono<Boolean> |
hasElements()
Emit a single boolean true if this
Flux sequence has at least one element. |
Flux<T> |
hide()
Hides the identities of this
Flux instance. |
Mono<T> |
ignoreElements()
Ignores onNext signals (dropping them) and only propagate termination events.
|
Flux<Tuple2<Long,T>> |
index()
Keep information about the order in which source values were received by
indexing them with a 0-based incrementing long, returning a
Flux
of Tuple2<(index, value)> . |
<I> Flux<I> |
index(BiFunction<? super Long,? super T,? extends I> indexMapper)
Keep information about the order in which source values were received by
indexing them internally with a 0-based incrementing long then combining this
information with the source value into a
I using the provided BiFunction ,
returning a Flux<I> . |
static Flux<Long> |
interval(Duration period)
Create a
Flux that emits long values starting with 0 and incrementing at
specified time intervals on the global timer. |
static Flux<Long> |
interval(Duration delay,
Duration period)
Create a
Flux that emits long values starting with 0 and incrementing at
specified time intervals, after an initial delay, on the global timer. |
static Flux<Long> |
interval(Duration delay,
Duration period,
Scheduler timer)
|
static Flux<Long> |
interval(Duration period,
Scheduler timer)
|
<TRight,TLeftEnd,TRightEnd,R> |
join(Publisher<? extends TRight> other,
Function<? super T,? extends Publisher<TLeftEnd>> leftEnd,
Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd,
BiFunction<? super T,? super TRight,? extends R> resultSelector)
Map values from two Publishers into time windows and emit combination of values
in case their windows overlap.
|
static <T> Flux<T> |
just(T... data)
Create a
Flux that emits the provided elements and then completes. |
static <T> Flux<T> |
just(T data)
Create a new
Flux that will only emit a single element then onComplete. |
Mono<T> |
last()
Emit the last element observed before complete signal as a
Mono , or emit
NoSuchElementException error if the source was empty. |
Mono<T> |
last(T defaultValue)
Emit the last element observed before complete signal as a
Mono , or emit
the defaultValue if the source was empty. |
Flux<T> |
limitRate(int prefetchRate)
Ensure that backpressure signals from downstream subscribers are split into batches
capped at the provided
prefetchRate when propagated upstream, effectively
rate limiting the upstream Publisher . |
Flux<T> |
limitRate(int highTide,
int lowTide)
Ensure that backpressure signals from downstream subscribers are split into batches
capped at the provided
highTide first, then replenishing at the provided
lowTide , effectively rate limiting the upstream Publisher . |
Flux<T> |
limitRequest(long requestCap)
Ensure that the total amount requested upstream is capped at
cap . |
Flux<T> |
log()
Observe all Reactive Streams signals and trace them using
Logger support. |
Flux<T> |
log(Logger logger)
Observe Reactive Streams signals matching the passed filter
options and
trace them using a specific user-provided Logger , at Level.INFO level. |
Flux<T> |
log(Logger logger,
Level level,
boolean showOperatorLine,
SignalType... options)
|
Flux<T> |
log(String category)
Observe all Reactive Streams signals and trace them using
Logger support. |
Flux<T> |
log(String category,
Level level,
boolean showOperatorLine,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and
trace them using Logger support. |
Flux<T> |
log(String category,
Level level,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and
trace them using Logger support. |
<V> Flux<V> |
map(Function<? super T,? extends V> mapper)
Transform the items emitted by this
Flux by applying a synchronous function
to each item. |
Flux<Signal<T>> |
materialize()
Transform incoming onNext, onError and onComplete signals into
Signal instances,
materializing these signals. |
static <I> Flux<I> |
merge(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. |
static <I> Flux<I> |
merge(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
merge(Publisher<? extends I>... sources)
Merge data from
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. |
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source)
|
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source,
int concurrency)
|
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source,
int concurrency,
int prefetch)
|
static <I> Flux<I> |
mergeDelayError(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences contained in an array / vararg
into an interleaved merged sequence. |
static <T> Flux<T> |
mergeOrdered(Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator ). |
static <T> Flux<T> |
mergeOrdered(int prefetch,
Comparator<? super T> comparator,
Publisher<? extends T>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator ). |
static <I extends Comparable<? super I>> |
mergeOrdered(Publisher<? extends I>... sources)
Merge data from provided
Publisher sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by their natural order). |
Flux<T> |
mergeOrderedWith(Publisher<? extends T> other,
Comparator<? super T> otherComparator)
Merge data from this
Flux and a Publisher into a reordered merge
sequence, by picking the smallest value from each sequence as defined by a provided
Comparator . |
static <I> Flux<I> |
mergeSequential(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences provided in an array/vararg
into an ordered merged sequence. |
static <I> Flux<I> |
mergeSequential(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
mergeSequential(Iterable<? extends Publisher<? extends I>> sources,
int maxConcurrency,
int prefetch)
|
static <I> Flux<I> |
mergeSequential(Publisher<? extends I>... sources)
Merge data from
Publisher sequences provided in an array/vararg
into an ordered merged sequence. |
static <T> Flux<T> |
mergeSequential(Publisher<? extends Publisher<? extends T>> sources)
|
static <T> Flux<T> |
mergeSequential(Publisher<? extends Publisher<? extends T>> sources,
int maxConcurrency,
int prefetch)
|
static <I> Flux<I> |
mergeSequentialDelayError(int prefetch,
Publisher<? extends I>... sources)
Merge data from
Publisher sequences provided in an array/vararg
into an ordered merged sequence. |
static <I> Flux<I> |
mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources,
int maxConcurrency,
int prefetch)
|
static <T> Flux<T> |
mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources,
int maxConcurrency,
int prefetch)
|
Flux<T> |
mergeWith(Publisher<? extends T> other)
|
Flux<T> |
metrics()
Activate metrics for this sequence, provided there is an instrumentation facade
on the classpath (otherwise this method is a pure no-op).
|
Flux<T> |
name(String name)
Give a name to this sequence, which can be retrieved using
Scannable.name()
as long as this is the first reachable Scannable.parents() . |
static <T> Flux<T> |
never()
Create a
Flux that will never signal any data, error or completion signal. |
Mono<T> |
next()
|
<U> Flux<U> |
ofType(Class<U> clazz)
Evaluate each accepted value against the given
Class type. |
protected static <T> ConnectableFlux<T> |
onAssembly(ConnectableFlux<T> source)
To be used by custom operators: invokes assembly
Hooks pointcut given a
ConnectableFlux , potentially returning a new ConnectableFlux . |
protected static <T> Flux<T> |
onAssembly(Flux<T> source)
|
Flux<T> |
onBackpressureBuffer()
Request an unbounded demand and push to the returned
Flux , or park the
observed elements if not enough demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(Duration ttl,
int maxSize,
Consumer<? super T> onBufferEviction)
Request an unbounded demand and push to the returned
Flux , or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit and for a maximum Duration of ttl (as measured on the
elastic Scheduler ). |
Flux<T> |
onBackpressureBuffer(Duration ttl,
int maxSize,
Consumer<? super T> onBufferEviction,
Scheduler scheduler)
|
Flux<T> |
onBackpressureBuffer(int maxSize)
Request an unbounded demand and push to the returned
Flux , or park up to
maxSize elements when not enough demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(int maxSize,
BufferOverflowStrategy bufferOverflowStrategy)
Request an unbounded demand and push to the returned
Flux , or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. |
Flux<T> |
onBackpressureBuffer(int maxSize,
Consumer<? super T> onOverflow)
Request an unbounded demand and push to the returned
Flux , or park up to
maxSize elements when not enough demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(int maxSize,
Consumer<? super T> onBufferOverflow,
BufferOverflowStrategy bufferOverflowStrategy)
Request an unbounded demand and push to the returned
Flux , or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. |
Flux<T> |
onBackpressureDrop()
Request an unbounded demand and push to the returned
Flux , or drop
the observed elements if not enough demand is requested downstream. |
Flux<T> |
onBackpressureDrop(Consumer<? super T> onDropped)
|
Flux<T> |
onBackpressureError()
Request an unbounded demand and push to the returned
Flux , or emit onError
fom Exceptions.failWithOverflow() if not enough demand is requested
downstream. |
Flux<T> |
onBackpressureLatest()
Request an unbounded demand and push to the returned
Flux , or only keep
the most recent observed item if not enough demand is requested downstream. |
Flux<T> |
onErrorContinue(BiConsumer<Throwable,Object> errorConsumer)
Let compatible operators upstream recover from errors by dropping the
incriminating element from the sequence and continuing with subsequent elements.
|
<E extends Throwable> |
onErrorContinue(Class<E> type,
BiConsumer<Throwable,Object> errorConsumer)
Let compatible operators upstream recover from errors by dropping the
incriminating element from the sequence and continuing with subsequent elements.
|
<E extends Throwable> |
onErrorContinue(Predicate<E> errorPredicate,
BiConsumer<Throwable,Object> errorConsumer)
Let compatible operators upstream recover from errors by dropping the
incriminating element from the sequence and continuing with subsequent elements.
|
<E extends Throwable> |
onErrorMap(Class<E> type,
Function<? super E,? extends Throwable> mapper)
Transform an error emitted by this
Flux by synchronously applying a function
to it if the error matches the given type. |
Flux<T> |
onErrorMap(Function<? super Throwable,? extends Throwable> mapper)
Transform any error emitted by this
Flux by synchronously applying a function to it. |
Flux<T> |
onErrorMap(Predicate<? super Throwable> predicate,
Function<? super Throwable,? extends Throwable> mapper)
Transform an error emitted by this
Flux by synchronously applying a function
to it if the error matches the given predicate. |
<E extends Throwable> |
onErrorResume(Class<E> type,
Function<? super E,? extends Publisher<? extends T>> fallback)
Subscribe to a fallback publisher when an error matching the given type
occurs, using a function to choose the fallback depending on the error.
|
Flux<T> |
onErrorResume(Function<? super Throwable,? extends Publisher<? extends T>> fallback)
Subscribe to a returned fallback publisher when any error occurs, using a function to
choose the fallback depending on the error.
|
Flux<T> |
onErrorResume(Predicate<? super Throwable> predicate,
Function<? super Throwable,? extends Publisher<? extends T>> fallback)
Subscribe to a fallback publisher when an error matching a given predicate
occurs.
|
<E extends Throwable> |
onErrorReturn(Class<E> type,
T fallbackValue)
Simply emit a captured fallback value when an error of the specified type is
observed on this
Flux . |
Flux<T> |
onErrorReturn(Predicate<? super Throwable> predicate,
T fallbackValue)
Simply emit a captured fallback value when an error matching the given predicate is
observed on this
Flux . |
Flux<T> |
onErrorReturn(T fallbackValue)
Simply emit a captured fallback value when any error is observed on this
Flux . |
Flux<T> |
onErrorStop()
If an
onErrorContinue(BiConsumer) variant has been used downstream, reverts
to the default 'STOP' mode where errors are terminal events upstream. |
protected static <T> Flux<T> |
onLastAssembly(Flux<T> source)
|
Flux<T> |
onTerminateDetach()
Detaches both the child
Subscriber and the Subscription on
termination or cancellation. |
Flux<T> |
or(Publisher<? extends T> other)
|
ParallelFlux<T> |
parallel()
Prepare this
Flux by dividing data on a number of 'rails' matching the
number of CPU cores, in a round-robin fashion. |
ParallelFlux<T> |
parallel(int parallelism)
Prepare this
Flux by dividing data on a number of 'rails' matching the
provided parallelism parameter, in a round-robin fashion. |
ParallelFlux<T> |
parallel(int parallelism,
int prefetch)
|
ConnectableFlux<T> |
publish()
Prepare a
ConnectableFlux which shares this Flux sequence and
dispatches values to subscribers in a backpressure-aware manner. |
<R> Flux<R> |
publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform)
Shares a sequence for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream.
|
<R> Flux<R> |
publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform,
int prefetch)
Shares a sequence for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream.
|
ConnectableFlux<T> |
publish(int prefetch)
Prepare a
ConnectableFlux which shares this Flux sequence and
dispatches values to subscribers in a backpressure-aware manner. |
Mono<T> |
publishNext()
|
Flux<T> |
publishOn(Scheduler scheduler)
|
Flux<T> |
publishOn(Scheduler scheduler,
boolean delayError,
int prefetch)
Run onNext, onComplete and onError on a supplied
Scheduler
Scheduler.Worker . |
Flux<T> |
publishOn(Scheduler scheduler,
int prefetch)
Run onNext, onComplete and onError on a supplied
Scheduler
Scheduler.Worker . |
static <T> Flux<T> |
push(Consumer<? super FluxSink<T>> emitter)
|
static <T> Flux<T> |
push(Consumer<? super FluxSink<T>> emitter,
FluxSink.OverflowStrategy backpressure)
|
static Flux<Integer> |
range(int start,
int count)
|
<A> Mono<A> |
reduce(A initial,
BiFunction<A,? super T,A> accumulator)
Reduce the values from this
Flux sequence into a single object matching the
type of a seed value. |
Mono<T> |
reduce(BiFunction<T,T,T> aggregator)
Reduce the values from this
Flux sequence into a single object of the same
type than the emitted items. |
<A> Mono<A> |
reduceWith(Supplier<A> initial,
BiFunction<A,? super T,A> accumulator)
Reduce the values from this
Flux sequence into a single object matching the
type of a lazily supplied seed value. |
Flux<T> |
repeat()
Repeatedly and indefinitely subscribe to the source upon completion of the
previous subscription.
|
Flux<T> |
repeat(BooleanSupplier predicate)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.
|
Flux<T> |
repeat(long numRepeat)
Repeatedly subscribe to the source
numRepeat times. |
Flux<T> |
repeat(long numRepeat,
BooleanSupplier predicate)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous
subscription.
|
Flux<T> |
repeatWhen(Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
Repeatedly subscribe to this
Flux when a companion sequence emits elements in
response to the flux completion signal. |
ConnectableFlux<T> |
replay()
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber . |
ConnectableFlux<T> |
replay(Duration ttl)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
ConnectableFlux<T> |
replay(Duration ttl,
Scheduler timer)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
ConnectableFlux<T> |
replay(int history)
Turn this
Flux into a connectable hot source and cache last emitted
signals for further Subscriber . |
ConnectableFlux<T> |
replay(int history,
Duration ttl)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
ConnectableFlux<T> |
replay(int history,
Duration ttl,
Scheduler timer)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
Flux<T> |
retry()
Re-subscribes to this
Flux sequence if it signals any error, indefinitely. |
Flux<T> |
retry(long numRetries)
Re-subscribes to this
Flux sequence if it signals any error, for a fixed
number of times. |
Flux<T> |
retry(long numRetries,
Predicate<? super Throwable> retryMatcher)
|
Flux<T> |
retry(Predicate<? super Throwable> retryMatcher)
|
Flux<T> |
retryBackoff(long numRetries,
Duration firstBackoff)
In case of error, retry this
Flux up to numRetries times using a
randomized exponential backoff strategy (jitter). |
Flux<T> |
retryBackoff(long numRetries,
Duration firstBackoff,
Duration maxBackoff)
In case of error, retry this
Flux up to numRetries times using a
randomized exponential backoff strategy. |
Flux<T> |
retryBackoff(long numRetries,
Duration firstBackoff,
Duration maxBackoff,
double jitterFactor)
In case of error, retry this
Flux up to numRetries times using a
randomized exponential backoff strategy, randomized with a user-provided jitter
factor between 0.d (no jitter) and 1.0 (default is 0.5 ). |
Flux<T> |
retryBackoff(long numRetries,
Duration firstBackoff,
Duration maxBackoff,
double jitterFactor,
Scheduler backoffScheduler)
In case of error, retry this
Flux up to numRetries times using a
randomized exponential backoff strategy, randomized with a user-provided jitter
factor between 0.d (no jitter) and 1.0 (default is 0.5 ). |
Flux<T> |
retryBackoff(long numRetries,
Duration firstBackoff,
Duration maxBackoff,
Scheduler backoffScheduler)
In case of error, retry this
Flux up to numRetries times using a
randomized exponential backoff strategy. |
Flux<T> |
retryWhen(Function<Flux<Throwable>,? extends Publisher<?>> whenFactory)
|
Flux<T> |
sample(Duration timespan)
|
<U> Flux<T> |
sample(Publisher<U> sampler)
|
Flux<T> |
sampleFirst(Duration timespan)
Repeatedly take a value from this
Flux then skip the values that follow
within a given duration. |
<U> Flux<T> |
sampleFirst(Function<? super T,? extends Publisher<U>> samplerFactory)
|
<U> Flux<T> |
sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory)
|
<U> Flux<T> |
sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory,
int maxConcurrency)
|
<A> Flux<A> |
scan(A initial,
BiFunction<A,? super T,A> accumulator)
Reduce this
Flux values with an accumulator BiFunction and
also emit the intermediate results of this function. |
Flux<T> |
scan(BiFunction<T,T,T> accumulator)
Reduce this
Flux values with an accumulator BiFunction and
also emit the intermediate results of this function. |
<A> Flux<A> |
scanWith(Supplier<A> initial,
BiFunction<A,? super T,A> accumulator)
Reduce this
Flux values with the help of an accumulator BiFunction
and also emits the intermediate results. |
Flux<T> |
share()
|
Mono<T> |
single()
Expect and emit a single item from this
Flux source or signal
NoSuchElementException for an empty source, or
IndexOutOfBoundsException for a source with more than one element. |
Mono<T> |
single(T defaultValue)
Expect and emit a single item from this
Flux source and emit a default
value for an empty source, but signal an IndexOutOfBoundsException for a
source with more than one element. |
Mono<T> |
singleOrEmpty()
Expect and emit a single item from this
Flux source, and accept an empty
source but signal an IndexOutOfBoundsException for a source with more than
one element. |
Flux<T> |
skip(Duration timespan)
Skip elements from this
Flux emitted within the specified initial duration. |
Flux<T> |
skip(Duration timespan,
Scheduler timer)
|
Flux<T> |
skip(long skipped)
Skip the specified number of elements from the beginning of this
Flux then
emit the remaining elements. |
Flux<T> |
skipLast(int n)
Skip a specified number of elements at the end of this
Flux sequence. |
Flux<T> |
skipUntil(Predicate<? super T> untilPredicate)
|
Flux<T> |
skipUntilOther(Publisher<?> other)
|
Flux<T> |
skipWhile(Predicate<? super T> skipPredicate)
|
Flux<T> |
sort()
Sort elements from this
Flux by collecting and sorting them in the background
then emitting the sorted sequence once this sequence completes. |
Flux<T> |
sort(Comparator<? super T> sortFunction)
Sort elements from this
Flux using a Comparator function, by
collecting and sorting elements in the background then emitting the sorted sequence
once this sequence completes. |
Flux<T> |
startWith(Iterable<? extends T> iterable)
|
Flux<T> |
startWith(Publisher<? extends T> publisher)
|
Flux<T> |
startWith(T... values)
Prepend the given values before this
Flux sequence. |
Disposable |
subscribe()
Subscribe to this
Flux and request unbounded demand. |
Disposable |
subscribe(Consumer<? super T> consumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer)
|
abstract void |
subscribe(CoreSubscriber<? super T> actual)
An internal
Publisher.subscribe(Subscriber) that will bypass
Hooks.onLastOperator(Function) pointcut. |
void |
subscribe(Subscriber<? super T> actual) |
Flux<T> |
subscribeOn(Scheduler scheduler)
Run subscribe, onSubscribe and request on a specified
Scheduler 's Scheduler.Worker . |
Flux<T> |
subscribeOn(Scheduler scheduler,
boolean requestOnSeparateThread)
Run subscribe and onSubscribe on a specified
Scheduler 's Scheduler.Worker . |
Flux<T> |
subscriberContext(Context mergeContext)
|
Flux<T> |
subscriberContext(Function<Context,Context> doOnContext)
|
<E extends Subscriber<? super T>> |
subscribeWith(E subscriber)
|
Flux<T> |
switchIfEmpty(Publisher<? extends T> alternate)
Switch to an alternative
Publisher if this sequence is completed without any data. |
<V> Flux<V> |
switchMap(Function<? super T,Publisher<? extends V>> fn)
|
<V> Flux<V> |
switchMap(Function<? super T,Publisher<? extends V>> fn,
int prefetch)
|
<V> Flux<V> |
switchOnFirst(BiFunction<Signal<? extends T>,Flux<T>,Publisher<? extends V>> transformer)
Transform the current
Flux once it emits its first element, making a
conditional transformation possible. |
static <T> Flux<T> |
switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers)
|
static <T> Flux<T> |
switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers,
int prefetch)
|
Flux<T> |
tag(String key,
String value)
Tag this flux with a key/value pair.
|
Flux<T> |
take(Duration timespan)
|
Flux<T> |
take(Duration timespan,
Scheduler timer)
|
Flux<T> |
take(long n)
Take only the first N values from this
Flux , if available. |
Flux<T> |
takeLast(int n)
Emit the last N values this
Flux emitted before its completion. |
Flux<T> |
takeUntil(Predicate<? super T> predicate)
|
Flux<T> |
takeUntilOther(Publisher<?> other)
|
Flux<T> |
takeWhile(Predicate<? super T> continuePredicate)
Relay values from this
Flux while a predicate returns TRUE
for the values (checked before each value is delivered). |
Mono<Void> |
then()
Return a
Mono<Void> that completes when this Flux completes. |
<V> Mono<V> |
then(Mono<V> other)
|
Mono<Void> |
thenEmpty(Publisher<Void> other)
Return a
Mono<Void> that waits for this Flux to complete then
for a supplied Publisher<Void> to also complete. |
<V> Flux<V> |
thenMany(Publisher<V> other)
|
Flux<T> |
timeout(Duration timeout)
Propagate a
TimeoutException as soon as no item is emitted within the
given Duration from the previous emission (or the subscription for the first item). |
Flux<T> |
timeout(Duration timeout,
Publisher<? extends T> fallback)
|
Flux<T> |
timeout(Duration timeout,
Publisher<? extends T> fallback,
Scheduler timer)
|
Flux<T> |
timeout(Duration timeout,
Scheduler timer)
Propagate a
TimeoutException as soon as no item is emitted within the
given Duration from the previous emission (or the subscription for the first
item), as measured by the specified Scheduler . |
<U> Flux<T> |
timeout(Publisher<U> firstTimeout)
Signal a
TimeoutException in case the first item from this Flux has
not been emitted before the given Publisher emits. |
<U,V> Flux<T> |
timeout(Publisher<U> firstTimeout,
Function<? super T,? extends Publisher<V>> nextTimeoutFactory)
Signal a
TimeoutException in case the first item from this Flux has
not been emitted before the firstTimeout Publisher emits, and whenever
each subsequent elements is not emitted before a Publisher generated from
the latest element signals. |
<U,V> Flux<T> |
timeout(Publisher<U> firstTimeout,
Function<? super T,? extends Publisher<V>> nextTimeoutFactory,
Publisher<? extends T> fallback)
|
Flux<Tuple2<Long,T>> |
timestamp()
|
Flux<Tuple2<Long,T>> |
timestamp(Scheduler scheduler)
|
Iterable<T> |
toIterable()
|
Iterable<T> |
toIterable(int batchSize)
|
Iterable<T> |
toIterable(int batchSize,
Supplier<Queue<T>> queueProvider)
|
Stream<T> |
toStream()
|
Stream<T> |
toStream(int batchSize)
|
String |
toString() |
<V> Flux<V> |
transform(Function<? super Flux<T>,? extends Publisher<V>> transformer)
|
static <T,D> Flux<T> |
using(Callable<? extends D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a
Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or
the Subscriber cancels.
|
static <T,D> Flux<T> |
using(Callable<? extends D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup,
boolean eager)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a
Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or
the Subscriber cancels.
|
static <T,D> Flux<T> |
usingWhen(Publisher<D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> resourceClosure,
Function<? super D,? extends Publisher<?>> asyncCleanup)
Uses a resource, generated by a
Publisher for each individual Subscriber ,
while streaming the values from a Publisher derived from the same resource. |
static <T,D> Flux<T> |
usingWhen(Publisher<D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> resourceClosure,
Function<? super D,? extends Publisher<?>> asyncComplete,
Function<? super D,? extends Publisher<?>> asyncError)
Uses a resource, generated by a
Publisher for each individual Subscriber ,
while streaming the values from a Publisher derived from the same resource. |
static <T,D> Flux<T> |
usingWhen(Publisher<D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> resourceClosure,
Function<? super D,? extends Publisher<?>> asyncComplete,
Function<? super D,? extends Publisher<?>> asyncError,
Function<? super D,? extends Publisher<?>> asyncCancel)
Uses a resource, generated by a
Publisher for each individual Subscriber ,
while streaming the values from a Publisher derived from the same resource. |
Flux<Flux<T>> |
window(Duration windowingTimespan)
|
Flux<Flux<T>> |
window(Duration windowingTimespan,
Duration openWindowEvery)
|
Flux<Flux<T>> |
window(Duration windowingTimespan,
Duration openWindowEvery,
Scheduler timer)
|
Flux<Flux<T>> |
window(Duration windowingTimespan,
Scheduler timer)
|
Flux<Flux<T>> |
window(int maxSize)
|
Flux<Flux<T>> |
window(int maxSize,
int skip)
|
Flux<Flux<T>> |
window(Publisher<?> boundary)
|
Flux<Flux<T>> |
windowTimeout(int maxSize,
Duration maxTime)
|
Flux<Flux<T>> |
windowTimeout(int maxSize,
Duration maxTime,
Scheduler timer)
|
Flux<Flux<T>> |
windowUntil(Predicate<T> boundaryTrigger)
|
Flux<Flux<T>> |
windowUntil(Predicate<T> boundaryTrigger,
boolean cutBefore)
|
Flux<Flux<T>> |
windowUntil(Predicate<T> boundaryTrigger,
boolean cutBefore,
int prefetch)
|
<U,V> Flux<Flux<T>> |
windowWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector)
|
Flux<Flux<T>> |
windowWhile(Predicate<T> inclusionPredicate)
|
Flux<Flux<T>> |
windowWhile(Predicate<T> inclusionPredicate,
int prefetch)
|
<U,R> Flux<R> |
withLatestFrom(Publisher<? extends U> other,
BiFunction<? super T,? super U,? extends R> resultSelector)
Combine the most recently emitted values from both this
Flux and another
Publisher through a BiFunction and emits the result. |
static <I,O> Flux<O> |
zip(Function<? super Object[],? extends O> combinator,
int prefetch,
Publisher<? extends I>... sources)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <I,O> Flux<O> |
zip(Function<? super Object[],? extends O> combinator,
Publisher<? extends I>... sources)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <O> Flux<O> |
zip(Iterable<? extends Publisher<?>> sources,
Function<? super Object[],? extends O> combinator)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <O> Flux<O> |
zip(Iterable<? extends Publisher<?>> sources,
int prefetch,
Function<? super Object[],? extends O> combinator)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <TUPLE extends Tuple2,V> |
zip(Publisher<? extends Publisher<?>> sources,
Function<? super TUPLE,? extends V> combinator)
Zip multiple sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <T1,T2> Flux<Tuple2<T1,T2>> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2)
Zip two sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple2 . |
static <T1,T2,O> Flux<O> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
BiFunction<? super T1,? super T2,? extends O> combinator)
Zip two sources together, that is to say wait for all the sources to emit one
element and combine these elements once into an output value (constructed by the provided
combinator).
|
static <T1,T2,T3> Flux<Tuple3<T1,T2,T3>> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3)
Zip three sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple3 . |
static <T1,T2,T3,T4> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4)
Zip four sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple4 . |
static <T1,T2,T3,T4,T5> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5)
Zip five sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple5 . |
static <T1,T2,T3,T4,T5,T6> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6)
Zip six sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple6 . |
static <T1,T2,T3,T4,T5,T6,T7> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6,
Publisher<? extends T7> source7)
Zip seven sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple7 . |
static <T1,T2,T3,T4,T5,T6,T7,T8> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6,
Publisher<? extends T7> source7,
Publisher<? extends T8> source8)
Zip eight sources together, that is to say wait for all the sources to emit one
element and combine these elements once into a
Tuple8 . |
<T2> Flux<Tuple2<T,T2>> |
zipWith(Publisher<? extends T2> source2)
|
<T2,V> Flux<V> |
zipWith(Publisher<? extends T2> source2,
BiFunction<? super T,? super T2,? extends V> combinator)
Zip this
Flux with another Publisher source, that is to say wait
for both to emit one element and combine these elements using a combinator
BiFunction
The operator will continue doing so until any of the sources completes. |
<T2> Flux<Tuple2<T,T2>> |
zipWith(Publisher<? extends T2> source2,
int prefetch)
|
<T2,V> Flux<V> |
zipWith(Publisher<? extends T2> source2,
int prefetch,
BiFunction<? super T,? super T2,? extends V> combinator)
Zip this
Flux with another Publisher source, that is to say wait
for both to emit one element and combine these elements using a combinator
BiFunction
The operator will continue doing so until any of the sources completes. |
<T2> Flux<Tuple2<T,T2>> |
zipWithIterable(Iterable<? extends T2> iterable)
|
<T2,V> Flux<V> |
zipWithIterable(Iterable<? extends T2> iterable,
BiFunction<? super T,? super T2,? extends V> zipper)
Zip elements from this
Flux with the content of an Iterable , that is
to say combine one element from each, pairwise, using the given zipper BiFunction . |
public final Mono<Boolean> all(Predicate<? super T> predicate)
Predicate
.
The implementation uses short-circuit logic and completes with false if the predicate doesn't match a value.
public final Mono<Boolean> any(Predicate<? super T> predicate)
Flux
sequence match
the predicate.
The implementation uses short-circuit logic and completes with false if any value doesn't match the predicate.
public final <P> P as(Function<? super Flux<T>,P> transformer)
Flux
into a target type.
flux.as(Mono::from).subscribe()
P
- the returned instance typetransformer
- the Function
to immediately map this Flux
into a target type instance.Flux
transformed to an instance of Pfor a bounded conversion to {@link Publisher}
@Nullable public final T blockFirst()
Flux
and block indefinitely
until the upstream signals its first value or completes. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException
if it was a checked
exception).
Note that each blockFirst() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
@Nullable public final T blockFirst(Duration timeout)
Flux
and block until the upstream
signals its first value, completes or a timeout expires. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException
if it was a checked
exception). If the provided timeout expires,a RuntimeException
is thrown.
Note that each blockFirst() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
timeout
- maximum time period to wait for before raising a RuntimeException
@Nullable public final T blockLast()
Flux
and block indefinitely
until the upstream signals its last value or completes. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException
if it was a checked
exception).
Note that each blockLast() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
@Nullable public final T blockLast(Duration timeout)
Flux
and block until the upstream
signals its last value, completes or a timeout expires. Returns that value,
or null if the Flux completes empty. In case the Flux errors, the original
exception is thrown (wrapped in a RuntimeException
if it was a checked
exception). If the provided timeout expires,a RuntimeException
is thrown.
Note that each blockLast() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
timeout
- maximum time period to wait for before raising a RuntimeException
public final Flux<List<T>> buffer()
List
buffer that will be emitted
by the returned Flux
once this Flux completes.
Flux
of at most one List
for an alternative collecting algorithm returning {@link Mono}
public final Flux<List<T>> buffer(Duration bufferingTimespan, Duration openBufferEvery)
List
buffers created at a given
openBufferEvery
period. Each buffer will last until the bufferingTimespan
has elapsed,
thus emitting the bucket in the resulting Flux
.
When bufferingTimespan < openBufferEvery : dropping buffers
When bufferingTimespan > openBufferEvery : overlapping buffers
When bufferingTimespan == openBufferEvery : exact buffers
bufferingTimespan
- the duration from buffer creation until a buffer is closed and emittedopenBufferEvery
- the interval at which to create a new bufferFlux
of List
delimited by the given period openBufferEvery and sized by bufferingTimespanpublic final Flux<List<T>> buffer(Duration bufferingTimespan, Duration openBufferEvery, Scheduler timer)
List
buffers created at a given
openBufferEvery
period, as measured on the provided Scheduler
. Each
buffer will last until the bufferingTimespan
has elapsed (also measured on the scheduler),
thus emitting the bucket in the resulting Flux
.
When bufferingTimespan < openBufferEvery : dropping buffers
When bufferingTimespan > openBufferEvery : overlapping buffers
When bufferingTimespan == openBufferEvery : exact buffers
bufferingTimespan
- the duration from buffer creation until a buffer is closed and emittedopenBufferEvery
- the interval at which to create a new buffertimer
- a time-capable Scheduler
instance to run onFlux
of List
delimited by the given period openBufferEvery and sized by bufferingTimespanpublic final Flux<List<T>> buffer(Duration bufferingTimespan, Scheduler timer)
List
buffers that will be emitted by
the returned Flux
every bufferingTimespan
, as measured on the provided Scheduler
.
bufferingTimespan
- the duration from buffer creation until a buffer is closed and emittedtimer
- a time-capable Scheduler
instance to run onFlux
of List
delimited by the given periodpublic final Flux<List<T>> buffer(int maxSize, int skip)
List
buffers that will be emitted
by the returned Flux
each time the given max size is reached or once this
Flux completes. Buffers can be created with gaps, as a new buffer will be created
every time skip
values have been emitted by the source.
When maxSize < skip : dropping buffers
When maxSize > skip : overlapping buffers
When maxSize == skip : exact buffers
skip
- the number of items to count before creating a new buffermaxSize
- the max collected sizeFlux
of possibly overlapped or gapped List
public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, int skip, Supplier<C> bufferSupplier)
Collection
buffers that
will be emitted by the returned Flux
each time the given max size is reached
or once this Flux completes. Buffers can be created with gaps, as a new buffer will
be created every time skip
values have been emitted by the source
When maxSize < skip : dropping buffers
When maxSize > skip : overlapping buffers
When maxSize == skip : exact buffers
C
- the Collection
buffer typeskip
- the number of items to count before creating a new buffermaxSize
- the max collected sizebufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of possibly overlapped or gapped
Collection
public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, Supplier<C> bufferSupplier)
Collection
buffers that
will be emitted by the returned Flux
each time the given max size is reached
or once this Flux completes.
C
- the Collection
buffer typemaxSize
- the maximum collected sizebufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
public final <C extends Collection<? super T>> Flux<C> buffer(Publisher<?> other, Supplier<C> bufferSupplier)
Collection
buffers, as
delimited by the signals of a companion Publisher
this operator will
subscribe to.
C
- the Collection
buffer typeother
- the companion Publisher
whose signals trigger new buffersbufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
delimited by signals from a Publisher
public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime)
List
buffers that will be emitted
by the returned Flux
each time the buffer reaches a maximum size OR the
maxTime Duration
elapses.
maxSize
- the max collected sizemaxTime
- the timeout enforcing the release of a partial bufferFlux
of List
delimited by given size or a given period timeoutpublic final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer)
List
buffers that will be emitted
by the returned Flux
each time the buffer reaches a maximum size OR the
maxTime Duration
elapses, as measured on the provided Scheduler
.
maxSize
- the max collected sizemaxTime
- the timeout enforcing the release of a partial buffertimer
- a time-capable Scheduler
instance to run onFlux
of List
delimited by given size or a given period timeoutpublic final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer, Supplier<C> bufferSupplier)
Collection
buffers that
will be emitted by the returned Flux
each time the buffer reaches a maximum
size OR the maxTime Duration
elapses, as measured on the provided Scheduler
.
C
- the Collection
buffer typemaxSize
- the max collected sizemaxTime
- the timeout enforcing the release of a partial buffertimer
- a time-capable Scheduler
instance to run onbufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
delimited by given size or a given period timeoutpublic final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier)
Collection
buffers that
will be emitted by the returned Flux
each time the buffer reaches a maximum
size OR the maxTime Duration
elapses.
C
- the Collection
buffer typemaxSize
- the max collected sizemaxTime
- the timeout enforcing the release of a partial bufferbufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
delimited by given size or a given period timeoutpublic final Flux<List<T>> bufferUntil(Predicate<? super T> predicate)
List
buffers that will be emitted by
the resulting Flux
each time the given predicate returns true. Note that
the element that triggers the predicate to return true (and thus closes a buffer)
is included as last element in the emitted buffer.
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
public final Flux<List<T>> bufferUntil(Predicate<? super T> predicate, boolean cutBefore)
List
buffers that will be emitted by
the resulting Flux
each time the given predicate returns true. Note that
the buffer into which the element that triggers the predicate to return true
(and thus closes a buffer) is included depends on the cutBefore
parameter:
set it to true to include the boundary element in the newly opened buffer, false to
include it in the closed buffer (as in bufferUntil(Predicate)
).
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
predicate
- a predicate that triggers the next buffer when it becomes true.cutBefore
- set to true to include the triggering element in the new buffer rather than the old.Flux
of List
public final <U,V> Flux<List<T>> bufferWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector)
List
buffers started each time an opening
companion Publisher
emits. Each buffer will last until the corresponding
closing companion Publisher
emits, thus releasing the buffer to the resulting Flux
.
When Open signal is strictly not overlapping Close signal : dropping buffers (see green marbles in diagram below).
When Open signal is strictly more frequent than Close signal : overlapping buffers (see second and third buffers in diagram below).
When Open signal is exactly coordinated with Close signal : exact buffers
U
- the element type of the buffer-opening sequenceV
- the element type of the buffer-closing sequencebucketOpening
- a companion Publisher
to subscribe for buffer creation signals.closeSelector
- a factory that, given a buffer opening signal, returns a companion
Publisher
to subscribe to for buffer closure and emission signals.Flux
of List
delimited by an opening Publisher
and a relative
closing Publisher
public final <U,V,C extends Collection<? super T>> Flux<C> bufferWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector, Supplier<C> bufferSupplier)
Collection
buffers started each time an opening
companion Publisher
emits. Each buffer will last until the corresponding
closing companion Publisher
emits, thus releasing the buffer to the resulting Flux
.
When Open signal is strictly not overlapping Close signal : dropping buffers (see green marbles in diagram below).
When Open signal is strictly more frequent than Close signal : overlapping buffers (see second and third buffers in diagram below).
U
- the element type of the buffer-opening sequenceV
- the element type of the buffer-closing sequenceC
- the Collection
buffer typebucketOpening
- a companion Publisher
to subscribe for buffer creation signals.closeSelector
- a factory that, given a buffer opening signal, returns a companion
Publisher
to subscribe to for buffer closure and emission signals.bufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
delimited by an opening Publisher
and a relative
closing Publisher
public final Flux<List<T>> bufferWhile(Predicate<? super T> predicate)
List
buffers that will be emitted by
the resulting Flux
. Each buffer continues aggregating values while the
given predicate returns true, and a new buffer is created as soon as the
predicate returns false... Note that the element that triggers the predicate
to return false (and thus closes a buffer) is NOT included in any emitted buffer.
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
public final Flux<T> cache()
Flux
into a hot source and cache last emitted signals for further Subscriber
. Will
retain an unbounded volume of onNext signals. Completion and Error will also be
replayed.
Flux
public final Flux<T> cache(Duration ttl)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain an unbounded history but apply a per-item expiry timeout
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
ttl
- Time-to-live for each cached item and post termination.Flux
public final Flux<T> cache(Duration ttl, Scheduler timer)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain an unbounded history but apply a per-item expiry timeout
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
public final Flux<T> cache(int history)
Flux
into a hot source and cache last emitted signals for further Subscriber
.
Will retain up to the given history size onNext signals. Completion and Error will also be
replayed.
Note that cache(0)
will only cache the terminal signal without
expiration.
history
- number of elements retained in cacheFlux
public final Flux<T> cache(int history, Duration ttl)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain up to the given history size and apply a per-item expiry
timeout.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
history
- number of elements retained in cachettl
- Time-to-live for each cached item and post termination.Flux
public final Flux<T> cache(int history, Duration ttl, Scheduler timer)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain up to the given history size and apply a per-item expiry
timeout.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
public final <E> Flux<E> cast(Class<E> clazz)
Flux
produced type into a target produced type.
public final Flux<T> checkpoint()
Flux
, in case of an error
upstream of the checkpoint. Tracing incurs the cost of an exception stack trace
creation.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with the backtrace.
The traceback is attached to the error as a suppressed exception
.
As such, if the error is a composite one
, the traceback
would appear as a component of the composite. In any case, the traceback nature can be detected via
Exceptions.isTraceback(Throwable)
.
Flux
.public final Flux<T> checkpoint(String description)
Flux
by giving it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. Note that unlike checkpoint()
, this doesn't create a
filled stack trace, avoiding the main cost of the operator.
However, as a trade-off the description must be unique enough for the user to find
out where this Flux was assembled. If you only want a generic description, and
still rely on the stack trace to find the assembly site, use the
checkpoint(String, boolean)
variant.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
The traceback is attached to the error as a suppressed exception
.
As such, if the error is a composite one
, the traceback
would appear as a component of the composite. In any case, the traceback nature can be detected via
Exceptions.isTraceback(Throwable)
.
description
- a unique enough description to include in the light assembly traceback.Flux
public final Flux<T> checkpoint(@Nullable String description, boolean forceStackTrace)
forceStackTrace
option).
By setting the forceStackTrace
parameter to true, activate assembly
tracing for this particular Flux
and give it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. Note that unlike checkpoint(String)
, this will incur
the cost of an exception stack trace creation. The description could for
example be a meaningful name for the assembled flux or a wider correlation ID,
since the stack trace will always provide enough information to locate where this
Flux was assembled.
By setting forceStackTrace
to false, behaves like
checkpoint(String)
and is subject to the same caveat in choosing the
description.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly marker.
The traceback is attached to the error as a suppressed exception
.
As such, if the error is a composite one
, the traceback
would appear as a component of the composite. In any case, the traceback nature can be detected via
Exceptions.isTraceback(Throwable)
.
description
- a description (must be unique enough if forceStackTrace is set
to false).forceStackTrace
- false to make a light checkpoint without a stacktrace, true
to use a stack trace.Flux
.public final <R,A> Mono<R> collect(Collector<? super T,A,? extends R> collector)
Flux
into a container,
by applying a Java 8 Stream API Collector
The collected result will be emitted when this sequence completes.
A
- The mutable accumulation typeR
- the container typecollector
- the Collector
Mono
of the collected container on completeCollector.supplier()
upon
cancellation, error or exception while applying the Collector.finisher()
. Either the container type
is a Collection
(in which case individual elements are discarded) or not (in which case the entire
container is discarded). In case the accumulator BiConsumer
of the collector fails to accumulate
an element into the intermediate container, the container is discarded as above and the triggering element
is also discarded.public final <E> Mono<E> collect(Supplier<E> containerSupplier, BiConsumer<E,? super T> collector)
Flux
into a user-defined container,
by applying a collector BiConsumer
taking the container and each element.
The collected result will be emitted when this sequence completes.
E
- the container typecontainerSupplier
- the supplier of the container instance for each Subscribercollector
- a consumer of both the container instance and the value being currently collectedMono
of the collected container on completeCollection
(in which case individual elements are discarded)
or not (in which case the entire container is discarded). In case the collector BiConsumer
fails
to accumulate an element, the container is discarded as above and the triggering element is also discarded.public final <K> Mono<Map<K,T>> collectMap(Function<? super T,? extends K> keyExtractor)
Flux
into a hashed Map
that is
emitted by the resulting Mono
when this sequence completes.
The key is extracted from each element by applying the keyExtractor
Function
. In case several elements map to the same key, the associated value
will be the most recently emitted element.
K
- the type of the key extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
Mono
of a Map
of key-element pairs (only including latest
element in case of key conflicts)Map
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the map.public final <K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor)
Flux
into a hashed Map
that is
emitted by the resulting Mono
when this sequence completes.
The key is extracted from each element by applying the keyExtractor
Function
, and the value is extracted by the valueExtractor
Function.
In case several elements map to the same key, the associated value will be derived
from the most recently emitted element.
K
- the type of the key extracted from each source elementV
- the type of the value extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
valueExtractor
- a Function
to map elements to a value for the Map
Mono
of a Map
of key-element pairs (only including latest
element's value in case of key conflicts)Map
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the map.public final <K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor, Supplier<Map<K,V>> mapSupplier)
Flux
into a user-defined Map
that is
emitted by the resulting Mono
when this sequence completes.
The key is extracted from each element by applying the keyExtractor
Function
, and the value is extracted by the valueExtractor
Function.
In case several elements map to the same key, the associated value will be derived
from the most recently emitted element.
K
- the type of the key extracted from each source elementV
- the type of the value extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
valueExtractor
- a Function
to map elements to a value for the Map
mapSupplier
- a Map
factory called for each Subscriber
Mono
of a Map
of key-value pairs (only including latest
element's value in case of key conflicts)Map
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the map.public final <K> Mono<Map<K,Collection<T>>> collectMultimap(Function<? super T,? extends K> keyExtractor)
Flux
into a multimap
that is
emitted by the resulting Mono
when this sequence completes.
The key is extracted from each element by applying the keyExtractor
Function
, and every element mapping to the same key is stored in the List
associated to said key.
K
- the type of the key extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
Mono
of a Map
of key-List(elements) pairsMap
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the list values in the map.public final <K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor)
Flux
into a multimap
that is
emitted by the resulting Mono
when this sequence completes.
The key is extracted from each element by applying the keyExtractor
Function
, and every element mapping to the same key is converted by the
valueExtractor
Function to a value stored in the List
associated to
said key.
K
- the type of the key extracted from each source elementV
- the type of the value extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
valueExtractor
- a Function
to map elements to a value for the Map
Mono
of a Map
of key-List(values) pairsMap
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the list values in the map.public final <K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor, Supplier<Map<K,Collection<V>>> mapSupplier)
Flux
into a user-defined multimap
that is
emitted by the resulting Mono
when this sequence completes.
The key is extracted from each element by applying the keyExtractor
Function
, and every element mapping to the same key is converted by the
valueExtractor
Function to a value stored in the Collection
associated to
said key.
K
- the type of the key extracted from each source elementV
- the type of the value extracted from each source elementkeyExtractor
- a Function
to map elements to a key for the Map
valueExtractor
- a Function
to map elements to a value for the Map
mapSupplier
- a multimap (Map
of Collection
) factory called
for each Subscriber
Mono
of a Map
of key-Collection(values) pairsMap
upon cancellation or error
triggered by a data signal, so discard handlers will have to unpack the list values in the map.public final Mono<List<T>> collectSortedList()
Flux
until this sequence completes,
and then sort them in natural order into a List
that is emitted by the
resulting Mono
.
Mono
of a sorted List
of all values from this Flux
, in natural ordercollectList()
, and as such discards the
elements in the List
individually upon cancellation or error triggered by a data signal.public final Mono<List<T>> collectSortedList(@Nullable Comparator<? super T> comparator)
Flux
until this sequence completes,
and then sort them using a Comparator
into a List
that is emitted
by the resulting Mono
.
comparator
- a Comparator
to sort the items of this sequencesMono
of a sorted List
of all values from this Flux
collectList()
, and as such discards the
elements in the List
individually upon cancellation or error triggered by a data signal.@SafeVarargs public static <T,V> Flux<V> combineLatest(Function<Object[],V> combinator, int prefetch, Publisher<? extends T>... sources)
Flux
whose data are generated by the combination of the
most recently published value from each of the Publisher
sources.
T
- type of the value from sourcesV
- The produced output after transformation by the given combinatorsources
- The Publisher
sources to combine values fromprefetch
- The demand sent to each combined source Publisher
combinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinations@SafeVarargs public static <T,V> Flux<V> combineLatest(Function<Object[],V> combinator, Publisher<? extends T>... sources)
Flux
whose data are generated by the combination of the
most recently published value from each of the Publisher
sources.
T
- type of the value from sourcesV
- The produced output after transformation by the given combinatorsources
- The Publisher
sources to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T,V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each
of the Publisher
sources provided in an Iterable
.
T
- The common base type of the values from sourcesV
- The produced output after transformation by the given combinatorsources
- The list of Publisher
sources to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T,V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, int prefetch, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each
of the Publisher
sources provided in an Iterable
.
T
- The common base type of the values from sourcesV
- The produced output after transformation by the given combinatorsources
- The list of Publisher
sources to combine values fromprefetch
- demand produced to each combined source Publisher
combinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of two Publisher
sources.
T1
- type of the value from source1T2
- type of the value from source2V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,T3,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of three Publisher
sources.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromsource3
- The third Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,T3,T4,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of four Publisher
sources.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromsource3
- The third Publisher
source to combine values fromsource4
- The fourth Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,T3,T4,T5,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of five Publisher
sources.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromsource3
- The third Publisher
source to combine values fromsource4
- The fourth Publisher
source to combine values fromsource5
- The fifth Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic static <T1,T2,T3,T4,T5,T6,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the
most recently published value from each of six Publisher
sources.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6V
- The produced output after transformation by the given combinatorsource1
- The first Publisher
source to combine values fromsource2
- The second Publisher
source to combine values fromsource3
- The third Publisher
source to combine values fromsource4
- The fourth Publisher
source to combine values fromsource5
- The fifth Publisher
source to combine values fromsource6
- The sixth Publisher
source to combine values fromcombinator
- The aggregate function that will receive the latest value from each upstream and return the value
to signal downstreamFlux
based on the produced combinationspublic final <V> Flux<V> compose(Function<? super Flux<T>,? extends Publisher<V>> transformer)
Flux
in order to generate a target Flux
type.
A transformation will occur for each Subscriber
. For instance:
flux.compose(original -> original.log());
V
- the item type in the returned Publisher
transformer
- the Function
to lazily map this Flux
into a target Publisher
instance for each new subscriberFlux
transform() for immmediate transformation of {@link Flux}
,
as() for a loose conversion to an arbitrary type
public static <T> Flux<T> concat(Iterable<? extends Publisher<? extends T>> sources)
Iterable
, forwarding elements
emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
public static <T> Flux<T> concat(Publisher<? extends Publisher<? extends T>> sources)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
T
- The type of values in both source and output sequencessources
- The Publisher
of Publisher
to concatenateFlux
concatenating all inner sources sequencespublic static <T> Flux<T> concat(Publisher<? extends Publisher<? extends T>> sources, int prefetch)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
T
- The type of values in both source and output sequencessources
- The Publisher
of Publisher
to concatenateprefetch
- the inner source request sizeFlux
concatenating all inner sources sequences@SafeVarargs public static <T> Flux<T> concat(Publisher<? extends T>... sources)
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
T
- The type of values in both source and output sequencessources
- The Publisher
of Publisher
to concatenateFlux
concatenating all inner sources sequences, delaying errorspublic static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, boolean delayUntilEnd, int prefetch)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes.
Errors do not interrupt the main sequence but are propagated after the current
concat backlog if delayUntilEnd
is false or after all sources
have had a chance to be concatenated if delayUntilEnd
is true.
T
- The type of values in both source and output sequencessources
- The Publisher
of Publisher
to concatenatedelayUntilEnd
- delay error until all sources have been consumed instead of
after the current sourceprefetch
- the inner source request sizeFlux
concatenating all inner sources sequences until complete or errorpublic static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, int prefetch)
Publisher
,
forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
T
- The type of values in both source and output sequencessources
- The Publisher
of Publisher
to concatenateprefetch
- the inner source request sizeFlux
concatenating all inner sources sequences until complete or error@SafeVarargs public static <T> Flux<T> concatDelayError(Publisher<? extends T>... sources)
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
public final <V> Flux<V> concatMap(Function<? super T,? extends Publisher<? extends V>> mapper)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors will immediately short circuit current concat backlog.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of VFlux
public final <V> Flux<V> concatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors will immediately short circuit current concat backlog. The prefetch argument
allows to give an arbitrary prefetch size to the inner Publisher
.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of Vprefetch
- the inner source produced demandFlux
public final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors in the individual publishers will be delayed at the end of the whole concat
sequence (possibly getting combined into a composite
if several sources error.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of VFlux
public final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, boolean delayUntilEnd, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors in the individual publishers will be delayed after the current concat
backlog if delayUntilEnd is false or after all sources if delayUntilEnd is true.
The prefetch argument allows to give an arbitrary prefetch size to the inner Publisher
.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of VdelayUntilEnd
- delay error until all sources have been consumed instead of
after the current sourceprefetch
- the inner source produced demandFlux
public final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, sequentially and
preserving order using concatenation.
There are three dimensions to this operator that can be compared with
flatMap
and flatMapSequential
:
Errors in the individual publishers will be delayed at the end of the whole concat
sequence (possibly getting combined into a composite
if several sources error.
The prefetch argument allows to give an arbitrary prefetch size to the inner Publisher
.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of Vprefetch
- the inner source produced demandFlux
public final <R> Flux<R> concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
Flux
into Iterable
, then flatten the elements from those by
concatenating them into a single Flux
.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N Iterable
Flux
T
elements it prefetched and, in
some cases, attempts to discard remainder of the currently processed Iterable
(if it can
safely assume the iterator is not infinite, see Spliterator.getExactSizeIfKnown()
).public final <R> Flux<R> concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper, int prefetch)
Flux
into Iterable
, then flatten the emissions from those by
concatenating them into a single Flux
. The prefetch argument allows to give an arbitrary prefetch size to the merged Iterable
.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N Iterable
prefetch
- the maximum in-flight elements from each inner Iterable
sequenceFlux
T
elements it prefetched and, in
some cases, attempts to discard remainder of the currently processed Iterable
(if it can
safely assume the iterator is not infinite, see Spliterator.getExactSizeIfKnown()
).@SafeVarargs public final Flux<T> concatWithValues(T... values)
Flux
values
- The values to concatenateFlux
concatenating all source sequencespublic final Mono<Long> count()
Flux
.
The count will be emitted when onComplete is observed.
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)
Flux
with the capability of emitting multiple
elements in a synchronous or asynchronous manner through the FluxSink
API.
This includes emitting elements from multiple threads.
This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>create(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
});
T
- The type of values in the sequenceemitter
- Consume the FluxSink
provided per-subscriber by Reactor to generate signals.Flux
push(Consumer)
FluxSink
exposed by this operator buffers in case of
overflow. The buffer is discarded when the main sequence is cancelled.public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure)
Flux
with the capability of emitting multiple
elements in a synchronous or asynchronous manner through the FluxSink
API.
This includes emitting elements from multiple threads.
This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>create(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
}, FluxSink.OverflowStrategy.LATEST);
T
- The type of values in the sequencebackpressure
- the backpressure mode, see FluxSink.OverflowStrategy
for the
available backpressure modesemitter
- Consume the FluxSink
provided per-subscriber by Reactor to generate signals.Flux
push(Consumer, reactor.core.publisher.FluxSink.OverflowStrategy)
FluxSink
exposed by this operator discards elements
as relevant to the chosen FluxSink.OverflowStrategy
. For example, the FluxSink.OverflowStrategy.DROP
discards each items as they are being dropped, while FluxSink.OverflowStrategy.BUFFER
will discard the buffer upon cancellation.public final Flux<T> defaultIfEmpty(T defaultV)
defaultV
- the alternate value if this sequence is emptyFlux
public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier)
Publisher
every time a Subscription
is made on the
resulting Flux
, so the actual source instantiation is deferred until each
subscribe and the Supplier
can create a subscriber-specific instance.
If the supplier doesn't generate a new instance however, this operator will
effectively behave like from(Publisher)
.
public final Flux<T> delayElements(Duration delay)
Flux
elements (Subscriber.onNext(T)
signals)
by a given Duration
. Signals are delayed and continue on the
parallel
default Scheduler, but empty sequences or
immediate error signals are not delayed.
delay
- duration by which to delay each Subscriber.onNext(T)
signalFlux
delaySubscription to introduce a delay at the beginning of the sequence only
public final Flux<T> delayElements(Duration delay, Scheduler timer)
Flux
elements (Subscriber.onNext(T)
signals)
by a given Duration
. Signals are delayed and continue on an user-specified
Scheduler
, but empty sequences or immediate error signals are not delayed.
delay
- period to delay each Subscriber.onNext(T)
signaltimer
- a time-capable Scheduler
instance to delay each signal onFlux
public final Flux<T> delaySequence(Duration delay)
Flux
forward in time by a given Duration
.
Unlike with delayElements(Duration)
, elements are shifted forward in time
as they are emitted, always resulting in the delay between two elements being
the same as in the source (only the first element is visibly delayed from the
previous event, that is the subscription).
Signals are delayed and continue on the parallel
Scheduler
, but empty sequences or immediate error signals are not delayed.
With this operator, a source emitting at 10Hz with a delaySequence Duration
of 1s will still emit at 10Hz, with an initial "hiccup" of 1s.
On the other hand, delayElements(Duration)
would end up emitting
at 1Hz.
This is closer to delaySubscription(Duration)
, except the source
is subscribed to immediately.
public final Flux<T> delaySequence(Duration delay, Scheduler timer)
Flux
forward in time by a given Duration
.
Unlike with delayElements(Duration, Scheduler)
, elements are shifted forward in time
as they are emitted, always resulting in the delay between two elements being
the same as in the source (only the first element is visibly delayed from the
previous event, that is the subscription).
Signals are delayed and continue on an user-specified Scheduler
, but empty
sequences or immediate error signals are not delayed.
With this operator, a source emitting at 10Hz with a delaySequence Duration
of 1s will still emit at 10Hz, with an initial "hiccup" of 1s.
On the other hand, delayElements(Duration, Scheduler)
would end up emitting
at 1Hz.
This is closer to delaySubscription(Duration, Scheduler)
, except the source
is subscribed to immediately.
delay
- Duration
to shift the sequence bytimer
- a time-capable Scheduler
instance to delay signals onFlux
emitting at the same frequency as the sourcepublic final Flux<T> delaySubscription(Duration delay)
subscription
to this Flux
source until the given
period elapses. The delay is introduced through the parallel
default Scheduler.
public final Flux<T> delaySubscription(Duration delay, Scheduler timer)
subscription
to this Flux
source until the given
period elapses, as measured on the user-provided Scheduler
.
public final <U> Flux<T> delaySubscription(Publisher<U> subscriptionDelay)
U
- the other source typesubscriptionDelay
- a companion Publisher
whose onNext/onComplete signal will trigger the subscription
Flux
public final Flux<T> delayUntil(Function<? super T,? extends Publisher<?>> triggerProvider)
Flux
and generate a Publisher
from each of this
Flux elements, each acting as a trigger for relaying said element.
That is to say, the resulting Flux
delays each of its emission until the
associated trigger Publisher terminates.
In case of an error either in the source or in a trigger, that error is propagated
immediately downstream.
Note that unlike with the Mono variant
there is
no fusion of subsequent calls.
public final <X> Flux<X> dematerialize()
Flux
emits onNext, onError or onComplete Signal
instances, transforming these materialized
signals into
real signals on the Subscriber
.
The error Signal
will trigger onError and complete Signal
will trigger
onComplete.
X
- the dematerialized typeFlux
materialize()
public final Flux<T> distinct()
Subscriber
, track elements from this Flux
that have been
seen and filter out duplicates.
The values themselves are recorded into a HashSet
for distinct detection.
Use distinct(Object::hashcode)
if you want a more lightweight approach that
doesn't retain all the objects, but is more susceptible to falsely considering two
elements as distinct due to a hashcode collision.
Flux
only emitting distinct valuesdistinct(Function, Supplier, BiPredicate, Consumer)
.public final <V> Flux<T> distinct(Function<? super T,? extends V> keySelector)
Subscriber
, track elements from this Flux
that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function
.
V
- the type of the key extracted from each value in this sequencekeySelector
- function to compute comparison key for each elementFlux
only emitting values with distinct keysdistinct(Function, Supplier, BiPredicate, Consumer)
.public final <V,C extends Collection<? super V>> Flux<T> distinct(Function<? super T,? extends V> keySelector, Supplier<C> distinctCollectionSupplier)
Subscriber
, track elements from this Flux
that have been
seen and filter out duplicates, as compared by a key extracted through the user
provided Function
and by the add method
of the Collection
supplied (typically a Set
).
V
- the type of the key extracted from each value in this sequenceC
- the type of Collection used for distinct checking of keyskeySelector
- function to compute comparison key for each elementdistinctCollectionSupplier
- supplier of the Collection
used for distinct
check through add
of the key.Flux
only emitting values with distinct keysdistinct(Function, Supplier, BiPredicate, Consumer)
.public final <V,C> Flux<T> distinct(Function<? super T,? extends V> keySelector, Supplier<C> distinctStoreSupplier, BiPredicate<C,V> distinctPredicate, Consumer<C> cleanup)
Subscriber
, track elements from this Flux
that have been
seen and filter out duplicates, as compared by applying a BiPredicate
on
an arbitrary user-supplied <C>
store and a key extracted through the user
provided Function
. The BiPredicate should typically add the key to the
arbitrary store for further comparison. A cleanup callback is also invoked on the
store upon termination of the sequence.
V
- the type of the key extracted from each value in this sequenceC
- the type of store backing the BiPredicate
keySelector
- function to compute comparison key for each elementdistinctStoreSupplier
- supplier of the arbitrary store object used in distinct
checks along the extracted key.distinctPredicate
- the BiPredicate
to apply to the arbitrary store +
extracted key to perform a distinct check. Since nothing is assumed of the store,
this predicate should also add the key to the store as necessary.cleanup
- the cleanup callback to invoke on the store upon termination.Flux
only emitting values with distinct keyscleanup
as well if you need discarding of keys
categorized by the operator as "seen".public final Flux<T> distinctUntilChanged()
The last distinct value seen is retained for further comparison, which is done
on the values themselves using the equals method
.
Use distinctUntilChanged(Object::hashcode)
if you want a more lightweight approach that
doesn't retain all the objects, but is more susceptible to falsely considering two
elements as distinct due to a hashcode collision.
Flux
with only one occurrence in a row of each element
(yet elements can repeat in the overall sequence)public final <V> Flux<T> distinctUntilChanged(Function<? super T,? extends V> keySelector)
Function
using equality.
V
- the type of the key extracted from each value in this sequencekeySelector
- function to compute comparison key for each elementFlux
with only one occurrence in a row of each element of
the same key (yet element keys can repeat in the overall sequence)public final <V> Flux<T> distinctUntilChanged(Function<? super T,? extends V> keySelector, BiPredicate<? super V,? super V> keyComparator)
Function
and then comparing keys with the supplied BiPredicate
.
V
- the type of the key extracted from each value in this sequencekeySelector
- function to compute comparison key for each elementkeyComparator
- predicate used to compare keys.Flux
with only one occurrence in a row of each element
of the same key for which the predicate returns true (yet element keys can repeat
in the overall sequence)keyComparator
returns true). The keys themselves
are not discarded.public final Flux<T> doAfterTerminate(Runnable afterTerminate)
Flux
terminates, either by completing downstream successfully or with an error.
afterTerminate
- the callback to call after Subscriber.onComplete()
or Subscriber.onError(java.lang.Throwable)
Flux
public final Flux<T> doFinally(Consumer<SignalType> onFinally)
Flux
terminates for any reason,
including cancellation. The terminating event (SignalType.ON_COMPLETE
,
SignalType.ON_ERROR
and SignalType.CANCEL
) is passed to the consumer,
which is executed after the signal has been passed downstream.
Note that the fact that the signal is propagated downstream before the callback is
executed means that several doFinally in a row will be executed in
reverse order. If you want to assert the execution of the callback
please keep in mind that the Flux will complete before it is executed, so its
effect might not be visible immediately after eg. a blockLast()
.
onFinally
- the callback to execute after a terminal signal (complete, error
or cancel)Flux
public final Flux<T> doFirst(Runnable onFirst)
Flux
is
subscribed to, which should be the first event after assembly time.
Note that when several doFirst(Runnable)
operators are used anywhere in a
chain of operators, their order of execution is reversed compared to the declaration
order (as subscribe signal flows backward, from the ultimate subscriber to the source
publisher):
Flux.just(1, 2)
.doFirst(() -> System.out.println("three"))
.doFirst(() -> System.out.println("two"))
.doFirst(() -> System.out.println("one"));
//would print one two three
In case the Runnable
throws an exception, said exception will be directly
propagated to the subscribing Subscriber
along with a no-op Subscription
,
similarly to what error(Throwable)
does. Otherwise, after the handler has
executed, the Subscriber
is directly subscribed to the original source
Flux
(this
).
This side-effect method provides stronger first guarantees compared to
doOnSubscribe(Consumer)
, which is triggered once the Subscription
has been set up and passed to the Subscriber
.
public final Flux<T> doOnCancel(Runnable onCancel)
Flux
is cancelled.
onCancel
- the callback to call on Subscription.cancel()
Flux
public final Flux<T> doOnComplete(Runnable onComplete)
Flux
completes successfully.
onComplete
- the callback to call on Subscriber.onComplete()
Flux
public final <R> Flux<T> doOnDiscard(Class<R> type, Consumer<? super R> discardHook)
The discardHook
must be idempotent and safe to use on any instance of the desired
type.
Calls to this method are additive, and the order of invocation of the discardHook
is the same as the order of declaration (calling .filter(...).doOnDiscard(first).doOnDiscard(second)
will let the filter invoke first
then second
handlers).
Two main categories of discarding operators exist:
onDiscard Support
section.type
- the Class
of elements in the upstream chain of operators that
this cleanup hook should take into account.discardHook
- a Consumer
of elements in the upstream chain of operators
that performs the cleanup.Flux
that cleans up matching elements that get discarded upstream of it.public final Flux<T> doOnEach(Consumer<? super Signal<T>> signalConsumer)
Flux
emits an item, fails with an error
or completes successfully. All these events are represented as a Signal
that is passed to the side-effect callback. Note that this is an advanced operator,
typically used for monitoring of a Flux. These Signal
have a Context
associated to them.
signalConsumer
- the mandatory callback to call on
Subscriber.onNext(Object)
, Subscriber.onError(Throwable)
and
Subscriber.onComplete()
Flux
doOnNext(Consumer)
,
doOnError(Consumer)
,
doOnComplete(Runnable)
,
materialize()
,
Signal
public final <E extends Throwable> Flux<T> doOnError(Class<E> exceptionType, Consumer<? super E> onError)
Flux
completes with an error matching the given exception type.
E
- type of the error to handleexceptionType
- the type of exceptions to handleonError
- the error handler for each errorFlux
public final Flux<T> doOnError(Consumer<? super Throwable> onError)
Flux
completes with an error.
onError
- the callback to call on Subscriber.onError(java.lang.Throwable)
Flux
public final Flux<T> doOnError(Predicate<? super Throwable> predicate, Consumer<? super Throwable> onError)
Flux
completes with an error matching the given exception.
predicate
- the matcher for exceptions to handleonError
- the error handler for each errorFlux
public final Flux<T> doOnNext(Consumer<? super T> onNext)
Flux
emits an item.
onNext
- the callback to call on Subscriber.onNext(T)
Flux
resuming on errors
(including when fusion is enabled). Exceptions thrown by the consumer are passed to
the onErrorContinue(BiConsumer)
error consumer (the value consumer
is not invoked, as the source element will be part of the sequence). The onNext
signal is then propagated as normal.public final Flux<T> doOnRequest(LongConsumer consumer)
LongConsumer
when this Flux
receives any request.
Note that non fatal error raised in the callback will not be propagated and
will simply trigger Operators.onOperatorError(Throwable, Context)
.
consumer
- the consumer to invoke on each requestFlux
public final Flux<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe)
Flux
is done being subscribed,
that is to say when a Subscription
has been produced by the Publisher
and passed to the Subscriber.onSubscribe(Subscription)
.
This method is not intended for capturing the subscription and calling its methods,
but for side effects like monitoring. For instance, the correct way to cancel a subscription is
to call Disposable.dispose()
on the Disposable returned by subscribe()
.
onSubscribe
- the callback to call on Subscriber.onSubscribe(org.reactivestreams.Subscription)
Flux
doFirst(Runnable)
public final Flux<T> doOnTerminate(Runnable onTerminate)
Flux
terminates, either by
completing successfully or with an error.
onTerminate
- the callback to call on Subscriber.onComplete()
or Subscriber.onError(java.lang.Throwable)
Flux
public final Flux<Tuple2<Long,T>> elapsed()
Flux
into Tuple2<Long, T>
of timemillis and source data. The timemillis corresponds to the elapsed time
between each signal as measured by the parallel
scheduler.
First duration is measured between the subscription and the first element.
Flux
that emits a tuple of time elapsed in milliseconds and matching datapublic final Flux<Tuple2<Long,T>> elapsed(Scheduler scheduler)
Flux
into Tuple2<Long, T>
of timemillis and source data. The timemillis corresponds to the elapsed time
between each signal as measured by the provided Scheduler
.
First duration is measured between the subscription and the first element.
scheduler
- a Scheduler
instance to read time from
Flux
that emits tuples of time elapsed in milliseconds and matching datapublic final Mono<T> elementAt(int index)
IndexOutOfBoundsException
if the sequence is shorter.
index
- zero-based index of the only item to emitMono
of the item at the specified zero-based indexpublic final Mono<T> elementAt(int index, T defaultValue)
index
- zero-based index of the only item to emitdefaultValue
- a default value to emit if the sequence is shorterMono
of the item at the specified zero-based index or a default valuepublic static <T> Flux<T> empty()
Flux
that completes without emitting any item.
T
- the reified type of the target Subscriber
Flux
public static <T> Flux<T> error(Supplier<Throwable> errorSupplier)
Flux
that terminates with an error immediately after being
subscribed to. The Throwable
is generated by a Supplier
, invoked
each time there is a subscription and allowing for lazy instantiation.
T
- the reified type of the target Subscriber
errorSupplier
- the error signal Supplier
to invoke for each Subscriber
Flux
public static <T> Flux<T> error(Throwable error)
Flux
that terminates with the specified error immediately after
being subscribed to.
T
- the reified type of the target Subscriber
error
- the error to signal to each Subscriber
Flux
public static <O> Flux<O> error(Throwable throwable, boolean whenRequested)
Flux
that terminates with the specified error, either immediately
after being subscribed to or after being first requested.
O
- the reified type of the target Subscriber
throwable
- the error to signal to each Subscriber
whenRequested
- if true, will onError on the first request instead of subscribe().Flux
public final Flux<T> expand(Function<? super T,? extends Publisher<? extends T>> expander)
That is: emit the values from this Flux
first, then expand each at a first level of
recursion and emit all of the resulting values, then expand all of these at a second
level and so on..
For example, given the hierarchical structure
A - AA - aa1 B - BB - bb1Expands
Flux.just(A, B)
into
A B AA BB aa1 bb1
public final Flux<T> expand(Function<? super T,? extends Publisher<? extends T>> expander, int capacityHint)
That is: emit the values from this Flux
first, then expand each at a first level of
recursion and emit all of the resulting values, then expand all of these at a second
level and so on..
For example, given the hierarchical structure
A - AA - aa1 B - BB - bb1Expands
Flux.just(A, B)
into
A B AA BB aa1 bb1
public final Flux<T> expandDeep(Function<? super T,? extends Publisher<? extends T>> expander)
That is: emit one value from this Flux
, expand it and emit the first value
at this first level of recursion, and so on... When no more recursion is possible,
backtrack to the previous level and re-apply the strategy.
For example, given the hierarchical structure
A - AA - aa1 B - BB - bb1Expands
Flux.just(A, B)
into
A AA aa1 B BB bb1
public final Flux<T> expandDeep(Function<? super T,? extends Publisher<? extends T>> expander, int capacityHint)
That is: emit one value from this Flux
, expand it and emit the first value
at this first level of recursion, and so on... When no more recursion is possible,
backtrack to the previous level and re-apply the strategy.
For example, given the hierarchical structure
A - AA - aa1 B - BB - bb1Expands
Flux.just(A, B)
into
A AA aa1 B BB bb1
public final Flux<T> filter(Predicate<? super T> p)
Predicate
. If the predicate test succeeds, the value is
emitted. If the predicate test fails, the value is ignored and a request of 1 is made upstream.
p
- the Predicate
to test values againstFlux
containing only values that pass the predicate testresuming on errors
(including when fusion is enabled). Exceptions thrown by the predicate are
considered as if the predicate returned false: they cause the source value to be
dropped and a new element (request(1)
) being requested from upstream.public final Flux<T> filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate)
Flux
asynchronously using a generated
Publisher<Boolean>
test. A value is replayed if the first item emitted
by its corresponding test is true. It is dropped if its test is either
empty or its first emitted value is false.
Note that only the first value of the test publisher is considered, and unless it
is a Mono
, test will be cancelled after receiving that first value. Test
publishers are generated and subscribed to in sequence.
asyncPredicate
- the function generating a Publisher
of Boolean
for each value, to filter the Flux withFlux
public final Flux<T> filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate, int bufferSize)
Flux
asynchronously using a generated
Publisher<Boolean>
test. A value is replayed if the first item emitted
by its corresponding test is true. It is dropped if its test is either
empty or its first emitted value is false.
Note that only the first value of the test publisher is considered, and unless it
is a Mono
, test will be cancelled after receiving that first value. Test
publishers are generated and subscribed to in sequence.
asyncPredicate
- the function generating a Publisher
of Boolean
for each value, to filter the Flux withbufferSize
- the maximum expected number of values to hold pending a result of
their respective asynchronous predicates, rounded to the next power of two. This is
capped depending on the size of the heap and the JVM limits, so be careful with
large values (although eg. 65536 should still be fine). Also serves as
the initial request size for the source.Flux
public static <I> Flux<I> first(Iterable<? extends Publisher<? extends I>> sources)
Publisher
to emit any signal (onNext/onError/onComplete) and
replay all signals from that Publisher
, effectively behaving like the
fastest of these competing sources.
I
- The type of values in both source and output sequencessources
- The competing source publishersFlux
behaving like the fastest of its sources@SafeVarargs public static <I> Flux<I> first(Publisher<? extends I>... sources)
Publisher
to emit any signal (onNext/onError/onComplete) and
replay all signals from that Publisher
, effectively behaving like the
fastest of these competing sources.
I
- The type of values in both source and output sequencessources
- The competing source publishersFlux
behaving like the fastest of its sourcespublic final <R> Flux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
through merging,
which allow them to interleave.
There are three dimensions to this operator that can be compared with
flatMapSequential
and concatMap
:
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
Flux
resuming on errors
in the mapper Function
. Exceptions thrown by the mapper then behave as if
it had mapped the value to an empty publisher. If the mapper does map to a scalar
publisher (an optimization in which the value can be resolved immediately without
subscribing to the publisher, e.g. a Mono.fromCallable(Callable)
) but said
publisher throws, this can be resumed from in the same manner.public final <R> Flux<R> flatMap(@Nullable Function<? super T,? extends Publisher<? extends R>> mapperOnNext, @Nullable Function<? super Throwable,? extends Publisher<? extends R>> mapperOnError, @Nullable Supplier<? extends Publisher<? extends R>> mapperOnComplete)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
through merging,
which allow them to interleave. Note that at least one of the signal mappers must
be provided, and all provided mappers must produce a publisher.
There are three dimensions to this operator that can be compared with
flatMapSequential
and concatMap
:
OnError will be transformed into completion signal after its mapping callback has been applied.
R
- the output Publisher
type targetmapperOnNext
- the Function
to call on next data and returning a sequence to merge.
Use null to ignore (provided at least one other mapper is specified).mapperOnError
- the Function
to call on error signal and returning a sequence to merge.
Use null to ignore (provided at least one other mapper is specified).mapperOnComplete
- the Function
to call on complete signal and returning a sequence to merge.
Use null to ignore (provided at least one other mapper is specified).Flux
public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
through merging,
which allow them to interleave.
There are three dimensions to this operator that can be compared with
flatMapSequential
and concatMap
:
Publisher
can be
subscribed to and merged in parallel. In turn, that argument shows the size of
the first Subscription.request(long)
to the upstream.
V
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
concurrency
- the maximum number of in-flight inner sequencesFlux
resuming on errors
in the mapper Function
. Exceptions thrown by the mapper then behave as if
it had mapped the value to an empty publisher. If the mapper does map to a scalar
publisher (an optimization in which the value can be resolved immediately without
subscribing to the publisher, e.g. a Mono.fromCallable(Callable)
) but said
publisher throws, this can be resumed from in the same manner.public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
through merging,
which allow them to interleave.
There are three dimensions to this operator that can be compared with
flatMapSequential
and concatMap
:
Publisher
can be
subscribed to and merged in parallel. In turn, that argument shows the size of
the first Subscription.request(long)
to the upstream.
The prefetch argument allows to give an arbitrary prefetch size to the merged
Publisher
(in other words prefetch size means the size of the first
Subscription.request(long)
to the merged Publisher
).
V
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
concurrency
- the maximum number of in-flight inner sequencesprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
resuming on errors
in the mapper Function
. Exceptions thrown by the mapper then behave as if
it had mapped the value to an empty publisher. If the mapper does map to a scalar
publisher (an optimization in which the value can be resolved immediately without
subscribing to the publisher, e.g. a Mono.fromCallable(Callable)
) but said
publisher throws, this can be resumed from in the same manner.public final <V> Flux<V> flatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
through merging,
which allow them to interleave.
There are three dimensions to this operator that can be compared with
flatMapSequential
and concatMap
:
Publisher
can be
subscribed to and merged in parallel. The prefetch argument allows to give an
arbitrary prefetch size to the merged Publisher
. This variant will delay
any error until after the rest of the flatMap backlog has been processed.
V
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
concurrency
- the maximum number of in-flight inner sequencesprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
resuming on errors
in the mapper Function
. Exceptions thrown by the mapper then behave as if
it had mapped the value to an empty publisher. If the mapper does map to a scalar
publisher (an optimization in which the value can be resolved immediately without
subscribing to the publisher, e.g. a Mono.fromCallable(Callable)
) but said
publisher throws, this can be resumed from in the same manner.public final <R> Flux<R> flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
Flux
into Iterable
, then flatten the elements from those by
merging them into a single Flux
.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N Iterable
Flux
T
elements it prefetched and, in
some cases, attempts to discard remainder of the currently processed Iterable
(if it can
safely assume iterator is not infinite, see Spliterator.getExactSizeIfKnown()
).public final <R> Flux<R> flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper, int prefetch)
Flux
into Iterable
, then flatten the emissions from those by
merging them into a single Flux
. The prefetch argument allows to give an
arbitrary prefetch size to the merged Iterable
.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N Iterable
prefetch
- the maximum in-flight elements from each inner Iterable
sequenceFlux
T
elements it prefetched and, in
some cases, attempts to discard remainder of the currently processed Iterable
(if it can
safely assume the iterator is not infinite, see Spliterator.getExactSizeIfKnown()
).public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, but merge them in
the order of their source element.
There are three dimensions to this operator that can be compared with
flatMap
and concatMap
:
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher
. However, if such an early inner takes more time to complete than
subsequent faster inners, the data from these faster inners will be queued until
the earlier inner completes, so as to maintain source ordering.
public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, but merge them in
the order of their source element.
There are three dimensions to this operator that can be compared with
flatMap
and concatMap
:
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher
. However, if such an early inner takes more time to complete than
subsequent faster inners, the data from these faster inners will be queued until
the earlier inner completes, so as to maintain source ordering.
The concurrency argument allows to control how many merged Publisher
can happen in parallel.
public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, but merge them in
the order of their source element.
There are three dimensions to this operator that can be compared with
flatMap
and concatMap
:
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher
. However, if such an early inner takes more time to complete than
subsequent faster inners, the data from these faster inners will be queued until
the earlier inner completes, so as to maintain source ordering.
The concurrency argument allows to control how many merged Publisher
can happen in parallel. The prefetch argument allows to give an arbitrary prefetch
size to the merged Publisher
.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
maxConcurrency
- the maximum number of in-flight inner sequencesprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
, subscribing early but keeping the original orderingpublic final <R> Flux<R> flatMapSequentialDelayError(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency, int prefetch)
Flux
asynchronously into Publishers,
then flatten these inner publishers into a single Flux
, but merge them in
the order of their source element.
There are three dimensions to this operator that can be compared with
flatMap
and concatMap
:
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher
. However, if such an early inner takes more time to complete than
subsequent faster inners, the data from these faster inners will be queued until
the earlier inner completes, so as to maintain source ordering.
The concurrency argument allows to control how many merged Publisher
can happen in parallel. The prefetch argument allows to give an arbitrary prefetch
size to the merged Publisher
. This variant will delay any error until after the
rest of the flatMap backlog has been processed.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
maxConcurrency
- the maximum number of in-flight inner sequencesprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
, subscribing early but keeping the original orderingpublic static <T> Flux<T> from(Publisher<? extends T> source)
Publisher
with the Flux
API.
Hooks.onEachOperator(String, Function)
and similar assembly hooks are applied
unless the source is already a Flux
.
T
- The type of values in both source and output sequencessource
- the source to decorateFlux
public static <T> Flux<T> fromArray(T[] array)
Flux
that emits the items contained in the provided array.
T
- The type of values in the source array and resulting Fluxarray
- the array to read data fromFlux
public static <T> Flux<T> fromStream(Stream<? extends T> s)
Flux
that emits the items contained in the provided Stream
.
Keep in mind that a Stream
cannot be re-used, which can be problematic in
case of multiple subscriptions or re-subscription (like with repeat()
or
retry()
). The Stream
is closed
automatically
by the operator on cancellation, error or completion.
public static <T,S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator)
Flux
by generating signals one-by-one via a
consumer callback and some state. The stateSupplier
may return null.
T
- the value type emittedS
- the per-subscriber custom state typestateSupplier
- called for each incoming Subscriber to provide the initial state for the generator bifunctiongenerator
- Consume the SynchronousSink
provided per-subscriber by Reactor
as well as the current state to generate a single signal on each pass
and return a (new) state.Flux
public static <T,S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator, Consumer<? super S> stateConsumer)
Flux
by generating signals one-by-one via a
consumer callback and some state, with a final cleanup callback. The
stateSupplier
may return null but your cleanup stateConsumer
will need to handle the null case.
T
- the value type emittedS
- the per-subscriber custom state typestateSupplier
- called for each incoming Subscriber to provide the initial state for the generator bifunctiongenerator
- Consume the SynchronousSink
provided per-subscriber by Reactor
as well as the current state to generate a single signal on each pass
and return a (new) state.stateConsumer
- called after the generator has terminated or the downstream cancelled, receiving the last
state to be handled (i.e., release resources or do other cleanup).Flux
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
Flux
by generating signals one-by-one via a
consumer callback.
T
- the value type emittedgenerator
- Consume the SynchronousSink
provided per-subscriber by Reactor
to generate a single signal on each pass.Flux
public int getPrefetch()
Flux
Flux
, -1 if unspecifiedpublic final <K> Flux<GroupedFlux<K,T>> groupBy(Function<? super T,? extends K> keyMapper)
Flux
(or groups) for each
unique key, as produced by the provided keyMapper Function
. Note that
groupBy works best with a low cardinality of groups, so chose your keyMapper
function accordingly.
The groups need to be drained and consumed downstream for groupBy to work correctly.
Notably when the criteria produces a large amount of groups, it can lead to hanging
if the groups are not suitably consumed downstream (eg. due to a flatMap
with a maxConcurrency
parameter that is set too low).
K
- the key type extracted from each value of this sequencekeyMapper
- the key mapping Function
that evaluates an incoming data and returns a key.Flux
of GroupedFlux
grouped sequencespublic final <K,V> Flux<GroupedFlux<K,V>> groupBy(Function<? super T,? extends K> keyMapper, Function<? super T,? extends V> valueMapper)
Flux
(or groups) for each
unique key, as produced by the provided keyMapper Function
. Source elements
are also mapped to a different value using the valueMapper
. Note that
groupBy works best with a low cardinality of groups, so chose your keyMapper
function accordingly.
The groups need to be drained and consumed downstream for groupBy to work correctly.
Notably when the criteria produces a large amount of groups, it can lead to hanging
if the groups are not suitably consumed downstream (eg. due to a flatMap
with a maxConcurrency
parameter that is set too low).
K
- the key type extracted from each value of this sequenceV
- the value type extracted from each value of this sequencekeyMapper
- the key mapping function that evaluates an incoming data and returns a key.valueMapper
- the value mapping function that evaluates which data to extract for re-routing.Flux
of GroupedFlux
grouped sequencespublic final <K,V> Flux<GroupedFlux<K,V>> groupBy(Function<? super T,? extends K> keyMapper, Function<? super T,? extends V> valueMapper, int prefetch)
Flux
(or groups) for each
unique key, as produced by the provided keyMapper Function
. Source elements
are also mapped to a different value using the valueMapper
. Note that
groupBy works best with a low cardinality of groups, so chose your keyMapper
function accordingly.
The groups need to be drained and consumed downstream for groupBy to work correctly.
Notably when the criteria produces a large amount of groups, it can lead to hanging
if the groups are not suitably consumed downstream (eg. due to a flatMap
with a maxConcurrency
parameter that is set too low).
K
- the key type extracted from each value of this sequenceV
- the value type extracted from each value of this sequencekeyMapper
- the key mapping function that evaluates an incoming data and returns a key.valueMapper
- the value mapping function that evaluates which data to extract for re-routing.prefetch
- the number of values to prefetch from the sourceFlux
of GroupedFlux
grouped sequencespublic final <K> Flux<GroupedFlux<K,T>> groupBy(Function<? super T,? extends K> keyMapper, int prefetch)
Flux
(or groups) for each
unique key, as produced by the provided keyMapper Function
. Note that
groupBy works best with a low cardinality of groups, so chose your keyMapper
function accordingly.
The groups need to be drained and consumed downstream for groupBy to work correctly.
Notably when the criteria produces a large amount of groups, it can lead to hanging
if the groups are not suitably consumed downstream (eg. due to a flatMap
with a maxConcurrency
parameter that is set too low).
K
- the key type extracted from each value of this sequencekeyMapper
- the key mapping Function
that evaluates an incoming data and returns a key.prefetch
- the number of values to prefetch from the sourceFlux
of GroupedFlux
grouped sequencespublic final <TRight,TLeftEnd,TRightEnd,R> Flux<R> groupJoin(Publisher<? extends TRight> other, Function<? super T,? extends Publisher<TLeftEnd>> leftEnd, Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd, BiFunction<? super T,? super Flux<TRight>,? extends R> resultSelector)
Flux
and a Flux
emitting the value from the other
Publisher
to a BiFunction
.
There are no guarantees in what order the items get combined when multiple items from one or both source Publishers overlap.
Unlike join(org.reactivestreams.Publisher<? extends TRight>, java.util.function.Function<? super T, ? extends org.reactivestreams.Publisher<TLeftEnd>>, java.util.function.Function<? super TRight, ? extends org.reactivestreams.Publisher<TRightEnd>>, java.util.function.BiFunction<? super T, ? super TRight, ? extends R>)
, items from the second Publisher
will be provided
as a Flux
to the resultSelector
.
TRight
- the type of the elements from the right Publisher
TLeftEnd
- the type for this Flux
window signalsTRightEnd
- the type for the right Publisher
window signalsR
- the combined result typeother
- the other Publisher
to correlate items withleftEnd
- a function that returns a Publisher whose emissions indicate the
time window for the source value to be consideredrightEnd
- a function that returns a Publisher whose emissions indicate the
time window for the right
Publisher value to be consideredresultSelector
- a function that takes an item emitted by this Flux
and
a Flux
representation of the overlapping item from the other Publisher
and returns the value to be emitted by the resulting Flux
Flux
join(Publisher, Function, Function, BiFunction)
public final <R> Flux<R> handle(BiConsumer<? super T,SynchronousSink<R>> handler)
Flux
by calling a biconsumer with the
output sink for each onNext. At most one SynchronousSink.next(Object)
call must be performed and/or 0 or 1 SynchronousSink.error(Throwable)
or
SynchronousSink.complete()
.R
- the transformed typehandler
- the handling BiConsumer
Flux
resuming on errors
(including when
fusion is enabled) when the BiConsumer
throws an exception or if an error is signaled explicitly via
SynchronousSink.error(Throwable)
.public final Mono<Boolean> hasElement(T value)
Flux
sequence is
equal to the provided value.
The implementation uses short-circuit logic and completes with true if an element matches the value.
value
- constant compared to incoming signalsFlux
with true
if any element is equal to a given value and false
otherwisepublic final Mono<Boolean> hasElements()
Flux
sequence has at least one element.
The implementation uses short-circuit logic and completes with true on onNext.
Mono
with true
if any value is emitted and false
otherwisepublic final Flux<T> hide()
Flux
instance.
The main purpose of this operator is to prevent certain identity-based optimizations from happening, mostly for diagnostic purposes.
Flux
preventing Publisher
/ Subscription
based Reactor optimizationspublic final Mono<T> ignoreElements()
public final Flux<Tuple2<Long,T>> index()
Flux
of Tuple2<(index, value)>
.
Flux
with each source value combined with its 0-based index.public final <I> Flux<I> index(BiFunction<? super Long,? super T,? extends I> indexMapper)
I
using the provided BiFunction
,
returning a Flux<I>
.
Typical usage would be to produce a Tuple2
similar to index()
, but
1-based instead of 0-based:
index((i, v) -> Tuples.of(i+1, v))
indexMapper
- the BiFunction
to use to combine elements and their index.Flux
with each source value combined with its computed index.public static Flux<Long> interval(Duration period)
Flux
that emits long values starting with 0 and incrementing at
specified time intervals on the global timer. The first element is emitted after
an initial delay equal to the period
. If demand is not produced in time,
an onError will be signalled with an overflow
IllegalStateException
detailing the tick that couldn't be emitted.
In normal conditions, the Flux
will never complete.
Runs on the Schedulers.parallel()
Scheduler.
public static Flux<Long> interval(Duration delay, Duration period)
Flux
that emits long values starting with 0 and incrementing at
specified time intervals, after an initial delay, on the global timer. If demand is
not produced in time, an onError will be signalled with an
overflow
IllegalStateException
detailing the tick that couldn't be emitted. In normal conditions, the Flux
will never complete.
Runs on the Schedulers.parallel()
Scheduler.
public static Flux<Long> interval(Duration delay, Duration period, Scheduler timer)
Flux
that emits long values starting with 0 and incrementing at
specified time intervals, after an initial delay, on the specified Scheduler
.
If demand is not produced in time, an onError will be signalled with an
overflow
IllegalStateException
detailing the tick that couldn't be emitted. In normal conditions, the Flux
will never complete.
public static Flux<Long> interval(Duration period, Scheduler timer)
Flux
that emits long values starting with 0 and incrementing at
specified time intervals, on the specified Scheduler
. The first element is
emitted after an initial delay equal to the period
. If demand is not
produced in time, an onError will be signalled with an overflow
IllegalStateException
detailing the tick that couldn't be emitted.
In normal conditions, the Flux
will never complete.
public final <TRight,TLeftEnd,TRightEnd,R> Flux<R> join(Publisher<? extends TRight> other, Function<? super T,? extends Publisher<TLeftEnd>> leftEnd, Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd, BiFunction<? super T,? super TRight,? extends R> resultSelector)
Flux
and the other Publisher
to a BiFunction
.
There are no guarantees in what order the items get combined when multiple items from one or both source Publishers overlap.
TRight
- the type of the elements from the right Publisher
TLeftEnd
- the type for this Flux
window signalsTRightEnd
- the type for the right Publisher
window signalsR
- the combined result typeother
- the other Publisher
to correlate items withleftEnd
- a function that returns a Publisher whose emissions indicate the
time window for the source value to be consideredrightEnd
- a function that returns a Publisher whose emissions indicate the
time window for the right
Publisher value to be consideredresultSelector
- a function that takes an item emitted by each Publisher and returns the
value to be emitted by the resulting Flux
Flux
groupJoin(Publisher, Function, Function, BiFunction)
@SafeVarargs public static <T> Flux<T> just(T... data)
Flux
that emits the provided elements and then completes.
T
- the emitted data typedata
- the elements to emit, as a varargFlux
public static <T> Flux<T> just(T data)
Flux
that will only emit a single element then onComplete.
T
- the emitted data typedata
- the single element to emitFlux
public final Mono<T> last()
Mono
, or emit
NoSuchElementException
error if the source was empty.
For a passive version use takeLast(int)
public final Mono<T> last(T defaultValue)
Mono
, or emit
the defaultValue
if the source was empty.
For a passive version use takeLast(int)
public final Flux<T> limitRate(int prefetchRate)
prefetchRate
when propagated upstream, effectively
rate limiting the upstream Publisher
.
Note that this is an upper bound, and that this operator uses a prefetch-and-replenish strategy, requesting a replenishing amount when 75% of the prefetch amount has been emitted.
Typically used for scenarios where consumer(s) request a large amount of data
(eg. Long.MAX_VALUE
) but the data source behaves better or can be optimized
with smaller requests (eg. database paging, etc...). All data is still processed,
unlike with limitRequest(long)
which will cap the grand total request
amount.
Equivalent to flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe()
.
Note that the prefetchRate
is an upper bound, and that this operator uses a
prefetch-and-replenish strategy, requesting a replenishing amount when 75% of the
prefetch amount has been emitted.
prefetchRate
- the limit to apply to downstream's backpressureFlux
limiting downstream's backpressurepublishOn(Scheduler, int)
,
limitRequest(long)
public final Flux<T> limitRate(int highTide, int lowTide)
highTide
first, then replenishing at the provided
lowTide
, effectively rate limiting the upstream Publisher
.
Note that this is an upper bound, and that this operator uses a prefetch-and-replenish strategy, requesting a replenishing amount when 75% of the prefetch amount has been emitted.
Typically used for scenarios where consumer(s) request a large amount of data
(eg. Long.MAX_VALUE
) but the data source behaves better or can be optimized
with smaller requests (eg. database paging, etc...). All data is still processed,
unlike with limitRequest(long)
which will cap the grand total request
amount.
Similar to flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe()
,
except with a customized "low tide" instead of the default 75%.
Note that the smaller the lowTide is, the higher the potential for concurrency
between request and data production. And thus the more extraneous replenishment
requests this operator could make. For example, for a global downstream
request of 14, with a highTide of 10 and a lowTide of 2, the operator would perform
7 low tide requests, whereas with the default lowTide of 8 it would only perform one.
Using a lowTide
equal to highTide
reverts to the default 75% strategy,
while using a lowTide
of 0 disables the lowTide, resulting in
all requests strictly adhering to the highTide.
highTide
- the initial request amountlowTide
- the subsequent (or replenishing) request amount, 0 to
disable early replenishing, highTide to revert to a 75% replenish strategy.Flux
limiting downstream's backpressure and customizing the
replenishment request amountpublishOn(Scheduler, int)
,
limitRequest(long)
public final Flux<T> limitRequest(long requestCap)
cap
.
Backpressure signals from downstream subscribers are smaller than the cap are
propagated as is, but if they would cause the total requested amount to go over the
cap, they are reduced to the minimum value that doesn't go over.
As a result, this operator never let the upstream produce more elements than the
cap, and it can be used as a stricter form of take(long)
. Typically useful
for cases where a race between request and cancellation can lead the upstream to
producing a lot of extraneous data, and such a production is undesirable (e.g.
a source that would send the extraneous data over the network).
requestCap
- the global backpressure limit to apply to the sum of downstream's requestsFlux
that requests AT MOST cap
from upstream in total.limitRate(int)
,
take(long)
public final Flux<T> log()
Logger
support.
Default will use Level.INFO
and java.util.logging
.
If SLF4J is available, it will be used instead.
The default log category will be "reactor.Flux.", followed by a suffix generated from the source operator, e.g. "reactor.Flux.Map".
Flux
that logs signalspublic final Flux<T> log(Logger logger)
options
and
trace them using a specific user-provided Logger
, at Level.INFO
level.
public final Flux<T> log(Logger logger, Level level, boolean showOperatorLine, SignalType... options)
options
and
trace them using a specific user-provided Logger
, at the given Level
.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
flux.log(myCustomLogger, Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
logger
- the Logger
to use, instead of resolving one through a category.level
- the Level
to enforce for this tracing Flux (only FINEST, FINE,
INFO, WARNING and SEVERE are taken into account)showOperatorLine
- capture the current stack to display operator class/line number (default in overload is false).options
- a vararg SignalType
option to filter log messagesFlux
that logs signalspublic final Flux<T> log(String category)
Logger
support.
Default will use Level.INFO
and java.util.logging
.
If SLF4J is available, it will be used instead.
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will be added, e.g. "reactor.Flux.Map".Flux
that logs signalspublic final Flux<T> log(@Nullable String category, Level level, boolean showOperatorLine, SignalType... options)
options
and
trace them using Logger
support. Default will use Level.INFO
and
java.util.logging
. If SLF4J is available, it will be used instead.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
flux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will be added, e.g. "reactor.Flux.Map".level
- the Level
to enforce for this tracing Flux (only FINEST, FINE,
INFO, WARNING and SEVERE are taken into account)showOperatorLine
- capture the current stack to display operator class/line number.options
- a vararg SignalType
option to filter log messagesFlux
that logs signalspublic final Flux<T> log(@Nullable String category, Level level, SignalType... options)
options
and
trace them using Logger
support. Default will use Level.INFO
and
java.util.logging
. If SLF4J is available, it will be used instead.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
flux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will be added, e.g. "reactor.Flux.Map".level
- the Level
to enforce for this tracing Flux (only FINEST, FINE,
INFO, WARNING and SEVERE are taken into account)options
- a vararg SignalType
option to filter log messagesFlux
that logs signalspublic final <V> Flux<V> map(Function<? super T,? extends V> mapper)
Flux
by applying a synchronous function
to each item.
V
- the transformed typemapper
- the synchronous transforming Function
Flux
resuming on errors
(including when fusion is enabled). Exceptions thrown by the mapper then cause the
source value to be dropped and a new element (request(1)
) being requested
from upstream.public final Flux<Signal<T>> materialize()
Signal
instances,
materializing these signals.
Since the error is materialized as a Signal
, the propagation will be stopped and onComplete will be
emitted. Complete signal will first emit a Signal.complete()
and then effectively complete the flux.
All these Signal
have a Context
associated to them.
Flux
of materialized Signal
dematerialize()
@SafeVarargs public static <I> Flux<I> merge(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences contained in an array / vararg
into an interleaved merged sequence. Unlike concat
,
sources are subscribed to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public static <I> Flux<I> merge(Iterable<? extends Publisher<? extends I>> sources)
Publisher
sequences contained in an Iterable
into an interleaved merged sequence. Unlike concat
, inner
sources are subscribed to eagerly.
A new Iterator
will be created for each subscriber.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I> Flux<I> merge(Publisher<? extends I>... sources)
Publisher
sequences contained in an array / vararg
into an interleaved merged sequence. Unlike concat
,
sources are subscribed to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source)
Publisher
sequences emitted by the passed Publisher
into an interleaved merged sequence. Unlike concat
, inner
sources are subscribed to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency)
Publisher
sequences emitted by the passed Publisher
into an interleaved merged sequence. Unlike concat
, inner
sources are subscribed to eagerly (but at most concurrency
sources are
subscribed to at the same time).
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency, int prefetch)
Publisher
sequences emitted by the passed Publisher
into an interleaved merged sequence. Unlike concat
, inner
sources are subscribed to eagerly (but at most concurrency
sources are
subscribed to at the same time).
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <I> Flux<I> mergeDelayError(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences contained in an array / vararg
into an interleaved merged sequence. Unlike concat
,
sources are subscribed to eagerly.
This variant will delay any error until after the rest of the merge backlog has been processed.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
@SafeVarargs public static <T> Flux<T> mergeOrdered(Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
). This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
T
- the merged typecomparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that , subscribing early but keeping the original ordering@SafeVarargs public static <T> Flux<T> mergeOrdered(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by the provided
Comparator
). This is not a sort(Comparator)
, as it doesn't consider
the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
T
- the merged typeprefetch
- the number of elements to prefetch from each source (avoiding too
many small requests to the source when picking)comparator
- the Comparator
to use to find the smallest valuesources
- Publisher
sources to mergeFlux
that , subscribing early but keeping the original ordering@SafeVarargs public static <I extends Comparable<? super I>> Flux<I> mergeOrdered(Publisher<? extends I>... sources)
Publisher
sequences into an ordered merged sequence,
by picking the smallest values from each source (as defined by their natural order).
This is not a sort()
, as it doesn't consider the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
I
- a Comparable
merged type that has a natural order
sources
- Publisher
sources of Comparable
to mergeFlux
that , subscribing early but keeping the original orderingpublic final Flux<T> mergeOrderedWith(Publisher<? extends T> other, Comparator<? super T> otherComparator)
Flux
and a Publisher
into a reordered merge
sequence, by picking the smallest value from each sequence as defined by a provided
Comparator
. Note that subsequent calls are combined, and their comparators are
in lexicographic order as defined by Comparator.thenComparing(Comparator)
.
The combination step is avoided if the two Comparators
are
equal
(which can easily be achieved by using the
same reference, and is also always true of Comparator.naturalOrder()
).
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
other
- the Publisher
to merge withotherComparator
- the Comparator
to use for mergingFlux
@SafeVarargs public static <I> Flux<I> mergeSequential(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences provided in an array/vararg
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.
public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources)
public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch)
Publisher
sequences provided in an Iterable
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly (but at most maxConcurrency
sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
I
- the merged typesources
- an Iterable
of Publisher
sequences to mergemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogprefetch
- the inner source request sizeFlux
, subscribing early but keeping the original ordering@SafeVarargs public static <I> Flux<I> mergeSequential(Publisher<? extends I>... sources)
Publisher
sequences provided in an array/vararg
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.
public static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources)
public static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch)
Publisher
sequences emitted by the passed Publisher
into an ordered merged sequence. Unlike concat, the inner publishers are subscribed to
eagerly (but at most maxConcurrency
sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
T
- the merged typesources
- a Publisher
of Publisher
sources to mergeprefetch
- the inner source request sizemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogFlux
, subscribing early but keeping the original ordering@SafeVarargs public static <I> Flux<I> mergeSequentialDelayError(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences provided in an array/vararg
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.
This variant will delay any error until after the rest of the mergeSequential backlog
has been processed.
public static <I> Flux<I> mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch)
Publisher
sequences provided in an Iterable
into an ordered merged sequence. Unlike concat, sources are subscribed to
eagerly (but at most maxConcurrency
sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
This variant will delay any error until after the rest of the mergeSequential backlog
has been processed.
I
- the merged typesources
- an Iterable
of Publisher
sequences to mergemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogprefetch
- the inner source request sizeFlux
, subscribing early but keeping the original orderingpublic static <T> Flux<T> mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch)
Publisher
sequences emitted by the passed Publisher
into an ordered merged sequence. Unlike concat, the inner publishers are subscribed to
eagerly (but at most maxConcurrency
sources at a time). Unlike merge, their
emitted values are merged into the final sequence in subscription order.
This variant will delay any error until after the rest of the mergeSequential backlog has been processed.
T
- the merged typesources
- a Publisher
of Publisher
sources to mergeprefetch
- the inner source request sizemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogFlux
, subscribing early but keeping the original orderingpublic final Flux<T> mergeWith(Publisher<? extends T> other)
Flux
and a Publisher
into an interleaved merged
sequence. Unlike concat
, inner sources are subscribed
to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
public final Flux<T> metrics()
Metrics are gathered on Subscriber
events, and it is recommended to also
name
(and optionally tag
) the
sequence.
Flux
public final Flux<T> name(String name)
Scannable.name()
as long as this is the first reachable Scannable.parents()
.name
- a name for the sequencepublic static <T> Flux<T> never()
Flux
that will never signal any data, error or completion signal.
T
- the Subscriber
type targetFlux
protected static <T> ConnectableFlux<T> onAssembly(ConnectableFlux<T> source)
Hooks
pointcut given a
ConnectableFlux
, potentially returning a new ConnectableFlux
. This
is for example useful to activate cross-cutting concerns at assembly time, eg. a
generalized checkpoint()
.T
- the value typesource
- the source to apply assembly hooks ontoprotected static <T> Flux<T> onAssembly(Flux<T> source)
Hooks
pointcut given a
Flux
, potentially returning a new Flux
. This is for example useful
to activate cross-cutting concerns at assembly time, eg. a generalized
checkpoint()
.T
- the value typesource
- the source to apply assembly hooks ontopublic final Flux<T> onBackpressureBuffer()
Flux
, or park the
observed elements if not enough demand is requested downstream. Errors will be
delayed until the buffer gets consumed.
Flux
that buffers with unbounded capacitypublic final Flux<T> onBackpressureBuffer(Duration ttl, int maxSize, Consumer<? super T> onBufferEviction)
Flux
, or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit and for a maximum Duration
of ttl
(as measured on the
elastic Scheduler
). Over that limit, oldest
elements from the source are dropped.
Elements evicted based on the TTL are passed to a cleanup Consumer
, which
is also immediately invoked when there is an overflow.
ttl
- maximum Duration
for which an element is kept in the backlogmaxSize
- maximum buffer backlog size before overflow callback is calledonBufferEviction
- callback to invoke once TTL is reached or on overflowFlux
that buffers with a TTL and up to a capacity then applies an
overflow strategyonBufferEviction
handler.public final Flux<T> onBackpressureBuffer(Duration ttl, int maxSize, Consumer<? super T> onBufferEviction, Scheduler scheduler)
Flux
, or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit and for a maximum Duration
of ttl
(as measured on the provided
Scheduler
). Over that limit, oldest elements from the source are dropped.
Elements evicted based on the TTL are passed to a cleanup Consumer
, which
is also immediately invoked when there is an overflow.
ttl
- maximum Duration
for which an element is kept in the backlogmaxSize
- maximum buffer backlog size before overflow callback is calledonBufferEviction
- callback to invoke once TTL is reached or on overflowscheduler
- the scheduler on which to run the timeout checkFlux
that buffers with a TTL and up to a capacity then applies an
overflow strategyonBufferEviction
handler.public final Flux<T> onBackpressureBuffer(int maxSize)
Flux
, or park up to
maxSize
elements when not enough demand is requested downstream.
The first element past this buffer to arrive out of sync with the downstream
subscriber's demand (the "overflowing" element) immediately triggers an overflow
error and cancels the source.
maxSize
- maximum buffer backlog size before immediate errorFlux
that buffers with bounded capacitymaxSize
.public final Flux<T> onBackpressureBuffer(int maxSize, BufferOverflowStrategy bufferOverflowStrategy)
Flux
, or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. Over that limit, the overflow strategy is applied (see BufferOverflowStrategy
).
Note that for the ERROR
strategy, the overflow
error will be delayed after the current backlog is consumed.
maxSize
- maximum buffer backlog size before overflow strategy is appliedbufferOverflowStrategy
- strategy to apply to overflowing elementsFlux
that buffers up to a capacity then applies an
overflow strategymaxSize
(even though
they are passed to the bufferOverflowStrategy
first).public final Flux<T> onBackpressureBuffer(int maxSize, Consumer<? super T> onOverflow)
Flux
, or park up to
maxSize
elements when not enough demand is requested downstream.
The first element past this buffer to arrive out of sync with the downstream
subscriber's demand (the "overflowing" element) is immediately passed to a
Consumer
and the source is cancelled.
The Flux
is going to terminate with an overflow error, but this error is
delayed, which lets the subscriber make more requests for the content of the buffer.
Note that should the cancelled source produce further overflowing elements, these
would be passed to the onNextDropped hook
.
maxSize
- maximum buffer backlog size before overflow callback is called and source is cancelledonOverflow
- callback to invoke on overflowFlux
that buffers with a bounded capacitymaxSize
(even though
they are passed to the onOverflow
Consumer
first).public final Flux<T> onBackpressureBuffer(int maxSize, Consumer<? super T> onBufferOverflow, BufferOverflowStrategy bufferOverflowStrategy)
Flux
, or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. Over that limit, the overflow strategy is applied (see BufferOverflowStrategy
).
A Consumer
is immediately invoked when there is an overflow, receiving the
value that was discarded because of the overflow (which can be different from the
latest element emitted by the source in case of a
DROP_LATEST
strategy).
Note that for the ERROR
strategy, the overflow
error will be delayed after the current backlog is consumed. The consumer is still
invoked immediately.
maxSize
- maximum buffer backlog size before overflow callback is calledonBufferOverflow
- callback to invoke on overflowbufferOverflowStrategy
- strategy to apply to overflowing elementsFlux
that buffers up to a capacity then applies an
overflow strategymaxSize
(even though
they are passed to the onOverflow
Consumer
AND the bufferOverflowStrategy
first).public final Flux<T> onBackpressureDrop()
Flux
, or drop
the observed elements if not enough demand is requested downstream.
Flux
that drops overflowing elementspublic final Flux<T> onBackpressureDrop(Consumer<? super T> onDropped)
Flux
, or drop and
notify dropping Consumer
with the observed elements if not enough demand
is requested downstream.
onDropped
- the Consumer called when an value gets dropped due to lack of downstream requestsFlux
that drops overflowing elementsonDropped
handler.public final Flux<T> onBackpressureError()
Flux
, or emit onError
fom Exceptions.failWithOverflow()
if not enough demand is requested
downstream.
Flux
that errors on overflowing elementspublic final Flux<T> onBackpressureLatest()
Flux
, or only keep
the most recent observed item if not enough demand is requested downstream.
Flux
that will only keep a reference to the last observed itempublic final Flux<T> onErrorContinue(BiConsumer<Throwable,Object> errorConsumer)
BiConsumer
.
Alternatively, throwing from that biconsumer will propagate the thrown exception downstream
in place of the original error, which is added as a suppressed exception to the new one.
Note that this error handling mode is not necessarily implemented by all operators
(look for the Error Mode Support
javadoc section to find operators that
support it).
errorConsumer
- a BiConsumer
fed with errors matching the predicate and the value
that triggered the error.Flux
that attempts to continue processing on errors.public final <E extends Throwable> Flux<T> onErrorContinue(Class<E> type, BiConsumer<Throwable,Object> errorConsumer)
type
are recovered from.
The recovered error and associated value are notified via the provided BiConsumer
.
Alternatively, throwing from that biconsumer will propagate the thrown exception downstream
in place of the original error, which is added as a suppressed exception to the new one.
Note that this error handling mode is not necessarily implemented by all operators
(look for the Error Mode Support
javadoc section to find operators that
support it).
type
- the Class
of Exception
that are resumed from.errorConsumer
- a BiConsumer
fed with errors matching the Class
and the value that triggered the error.Flux
that attempts to continue processing on some errors.public final <E extends Throwable> Flux<T> onErrorContinue(Predicate<E> errorPredicate, BiConsumer<Throwable,Object> errorConsumer)
Predicate
are recovered from (note that this
predicate can be applied several times and thus must be idempotent).
The recovered error and associated value are notified via the provided BiConsumer
.
Alternatively, throwing from that biconsumer will propagate the thrown exception downstream
in place of the original error, which is added as a suppressed exception to the new one.
Note that this error handling mode is not necessarily implemented by all operators
(look for the Error Mode Support
javadoc section to find operators that
support it).
errorPredicate
- a Predicate
used to filter which errors should be resumed from.
This MUST be idempotent, as it can be used several times.errorConsumer
- a BiConsumer
fed with errors matching the predicate and the value
that triggered the error.Flux
that attempts to continue processing on some errors.public final <E extends Throwable> Flux<T> onErrorMap(Class<E> type, Function<? super E,? extends Throwable> mapper)
Flux
by synchronously applying a function
to it if the error matches the given type. Otherwise let the error pass through.
public final Flux<T> onErrorMap(Function<? super Throwable,? extends Throwable> mapper)
Flux
by synchronously applying a function to it.
public final Flux<T> onErrorMap(Predicate<? super Throwable> predicate, Function<? super Throwable,? extends Throwable> mapper)
Flux
by synchronously applying a function
to it if the error matches the given predicate. Otherwise let the error pass through.
public final <E extends Throwable> Flux<T> onErrorResume(Class<E> type, Function<? super E,? extends Publisher<? extends T>> fallback)
public final Flux<T> onErrorResume(Function<? super Throwable,? extends Publisher<? extends T>> fallback)
public final Flux<T> onErrorResume(Predicate<? super Throwable> predicate, Function<? super Throwable,? extends Publisher<? extends T>> fallback)
public final <E extends Throwable> Flux<T> onErrorReturn(Class<E> type, T fallbackValue)
Flux
.
E
- the error typetype
- the error type to matchfallbackValue
- the value to emit if an error occurs that matches the typeFlux
public final Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)
Flux
.
predicate
- the error predicate to matchfallbackValue
- the value to emit if an error occurs that matches the predicateFlux
public final Flux<T> onErrorReturn(T fallbackValue)
Flux
.
fallbackValue
- the value to emit if an error occursFlux
public final Flux<T> onErrorStop()
onErrorContinue(BiConsumer)
variant has been used downstream, reverts
to the default 'STOP' mode where errors are terminal events upstream. It can be
used for easier scoping of the on next failure strategy or to override the
inherited strategy in a sub-stream (for example in a flatMap). It has no effect if
onErrorContinue(BiConsumer)
has not been used downstream.Flux
that terminates on errors, even if onErrorContinue(BiConsumer)
was used downstreamprotected static <T> Flux<T> onLastAssembly(Flux<T> source)
Hooks
pointcut given a
Flux
, potentially returning a new Flux
. This is for example useful
to activate cross-cutting concerns at assembly time, eg. a generalized
checkpoint()
.T
- the value typesource
- the source to apply assembly hooks ontopublic final Flux<T> onTerminateDetach()
Subscriber
and the Subscription
on
termination or cancellation.
This is an advanced interoperability operator that should help with odd
retention scenarios when running with non-reactor Subscriber
.
Flux
public final Flux<T> or(Publisher<? extends T> other)
Publisher
between this Flux
and another publisher
to emit any signal (onNext/onError/onComplete) and replay all signals from that
Publisher
, effectively behaving like the fastest of these competing sources.
other
- the Publisher
to race withfirst(org.reactivestreams.Publisher<? extends I>...)
public final ParallelFlux<T> parallel()
Flux
by dividing data on a number of 'rails' matching the
number of CPU cores, in a round-robin fashion. Note that to actually perform the
work in parallel, you should call ParallelFlux.runOn(Scheduler)
afterward.
ParallelFlux
instancepublic final ParallelFlux<T> parallel(int parallelism)
Flux
by dividing data on a number of 'rails' matching the
provided parallelism
parameter, in a round-robin fashion. Note that to
actually perform the work in parallel, you should call ParallelFlux.runOn(Scheduler)
afterward.
parallelism
- the number of parallel railsParallelFlux
instancepublic final ParallelFlux<T> parallel(int parallelism, int prefetch)
Flux
by dividing data on a number of 'rails' matching the
provided parallelism
parameter, in a round-robin fashion and using a
custom prefetch amount and queue for dealing with the source Flux
's values.
Note that to actually perform the work in parallel, you should call
ParallelFlux.runOn(Scheduler)
afterward.
parallelism
- the number of parallel railsprefetch
- the number of values to prefetch from the sourceParallelFlux
instancepublic final ConnectableFlux<T> publish()
ConnectableFlux
which shares this Flux
sequence and
dispatches values to subscribers in a backpressure-aware manner. Prefetch will
default to Queues.SMALL_BUFFER_SIZE
. This will effectively turn
any type of sequence into a hot sequence.
Backpressure will be coordinated on Subscription.request(long)
and if any
Subscriber
is missing demand (requested = 0), multicast will pause
pushing/pulling.
ConnectableFlux
public final <R> Flux<R> publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform)
R
- the output value typetransform
- the transformation functionFlux
public final <R> Flux<R> publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform, int prefetch)
R
- the output value typetransform
- the transformation functionprefetch
- the request sizeFlux
public final ConnectableFlux<T> publish(int prefetch)
ConnectableFlux
which shares this Flux
sequence and
dispatches values to subscribers in a backpressure-aware manner. This will
effectively turn any type of sequence into a hot sequence.
Backpressure will be coordinated on Subscription.request(long)
and if any
Subscriber
is missing demand (requested = 0), multicast will pause
pushing/pulling.
prefetch
- bounded requested demandConnectableFlux
public final Mono<T> publishNext()
Mono
which shares this Flux
sequence and dispatches the
first observed item to subscribers in a backpressure-aware manner.
This will effectively turn any type of sequence into a hot sequence when the first
Subscriber
subscribes.
Mono
public final Flux<T> publishOn(Scheduler scheduler)
Scheduler
Worker
.
This operator influences the threading context where the rest of the operators in
the chain below it will execute, up to a new occurrence of publishOn
.
Typically used for fast publisher, slow consumer(s) scenarios.
flux.publishOn(Schedulers.single()).subscribe()
scheduler
- a Scheduler
providing the Scheduler.Worker
where to publishFlux
producing asynchronously on a given Scheduler
public final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch)
Scheduler
Scheduler.Worker
.
This operator influences the threading context where the rest of the operators in
the chain below it will execute, up to a new occurrence of publishOn
.
Typically used for fast publisher, slow consumer(s) scenarios.
flux.publishOn(Schedulers.single()).subscribe()
scheduler
- a Scheduler
providing the Scheduler.Worker
where to publishdelayError
- should the buffer be consumed before forwarding any errorprefetch
- the asynchronous boundary capacityFlux
producing asynchronouslypublic final Flux<T> publishOn(Scheduler scheduler, int prefetch)
Scheduler
Scheduler.Worker
.
This operator influences the threading context where the rest of the operators in
the chain below it will execute, up to a new occurrence of publishOn
.
Typically used for fast publisher, slow consumer(s) scenarios.
flux.publishOn(Schedulers.single()).subscribe()
scheduler
- a Scheduler
providing the Scheduler.Worker
where to publishprefetch
- the asynchronous boundary capacityFlux
producing asynchronouslypublic static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter)
Flux
with the capability of emitting multiple
elements from a single-threaded producer through the FluxSink
API. For
a multi-threaded capable alternative, see create(Consumer)
.
This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>push(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
});
T
- The type of values in the sequenceemitter
- Consume the FluxSink
provided per-subscriber by Reactor to generate signals.Flux
create(Consumer)
FluxSink
exposed by this operator buffers in case of
overflow. The buffer is discarded when the main sequence is cancelled.public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure)
Flux
with the capability of emitting multiple
elements from a single-threaded producer through the FluxSink
API. For
a multi-threaded capable alternative, see create(Consumer, reactor.core.publisher.FluxSink.OverflowStrategy)
.
This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>push(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
}, FluxSink.OverflowStrategy.LATEST);
T
- The type of values in the sequencebackpressure
- the backpressure mode, see FluxSink.OverflowStrategy
for the
available backpressure modesemitter
- Consume the FluxSink
provided per-subscriber by Reactor to generate signals.Flux
create(Consumer, reactor.core.publisher.FluxSink.OverflowStrategy)
FluxSink
exposed by this operator discards elements
as relevant to the chosen FluxSink.OverflowStrategy
. For example, the FluxSink.OverflowStrategy.DROP
discards each items as they are being dropped, while FluxSink.OverflowStrategy.BUFFER
will discard the buffer upon cancellation.public static Flux<Integer> range(int start, int count)
Flux
that will only emit a sequence of count
incrementing integers,
starting from start
. That is, emit integers between start
(included)
and start + count
(excluded) then complete.
start
- the first integer to be emitcount
- the total number of incrementing values to emit, including the first valueFlux
public final <A> Mono<A> reduce(A initial, BiFunction<A,? super T,A> accumulator)
Flux
sequence into a single object matching the
type of a seed value. Reduction is performed using a BiFunction
that
takes the intermediate result of the reduction and the current value and returns
the next intermediate value of the reduction. First element is paired with the seed
value, initial.
A
- the type of the seed and the reduced objectaccumulator
- the reducing BiFunction
initial
- the seed, the initial leftmost argument to pass to the reducing BiFunction
Flux
public final Mono<T> reduce(BiFunction<T,T,T> aggregator)
Flux
sequence into a single object of the same
type than the emitted items. Reduction is performed using a BiFunction
that
takes the intermediate result of the reduction and the current value and returns
the next intermediate value of the reduction. Note, BiFunction
will not
be invoked for a sequence with 0 or 1 elements. In case of one element's
sequence, the result will be directly sent to the subscriber.
aggregator
- the reducing BiFunction
Flux
public final <A> Mono<A> reduceWith(Supplier<A> initial, BiFunction<A,? super T,A> accumulator)
Flux
sequence into a single object matching the
type of a lazily supplied seed value. Reduction is performed using a
BiFunction
that takes the intermediate result of the reduction and the
current value and returns the next intermediate value of the reduction. First
element is paired with the seed value, supplied via initial.
A
- the type of the seed and the reduced objectaccumulator
- the reducing BiFunction
initial
- a Supplier
of the seed, called on subscription and passed to the the reducing BiFunction
Flux
public final Flux<T> repeat()
Flux
on onCompletepublic final Flux<T> repeat(BooleanSupplier predicate)
predicate
- the boolean to evaluate on onComplete.Flux
that repeats on onComplete while the predicate matchespublic final Flux<T> repeat(long numRepeat)
numRepeat
times. This results in
numRepeat + 1
total subscriptions to the original source. As a consequence,
using 0 plays the original sequence once.
numRepeat
- the number of times to re-subscribe on onComplete (positive, or 0 for original sequence only)Flux
that repeats on onComplete, up to the specified number of repetitionspublic final Flux<T> repeat(long numRepeat, BooleanSupplier predicate)
numRepeat
- the number of times to re-subscribe on complete (positive, or 0 for original sequence only)predicate
- the boolean to evaluate on onCompleteFlux
that repeats on onComplete while the predicate matches,
up to the specified number of repetitionspublic final Flux<T> repeatWhen(Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
Flux
when a companion sequence emits elements in
response to the flux completion signal. Any terminal signal from the companion
sequence will terminate the resulting Flux
with the same signal immediately.
If the companion sequence signals when this Flux
is active, the repeat
attempt is suppressed.
Note that if the companion Publisher
created by the repeatFactory
emits Context
as trigger objects, these Context
will REPLACE the
operator's own Context. Please be careful there: replacing the
Context means that some keys you don't own could be removed, breaking libraries
that depend on them. As a result, the recommended approach is to always create such
a Context
trigger by starting from the original Context (ensuring the trigger
contains all the keys from the original, unless you absolutely know you want to
remove one of these keys):
.repeatWhen(emittedEachAttempt -> emittedEachAttempt
.flatMap(e -> Mono.subscriberContext().map(ctx -> Tuples.of(e, ctx)))
.flatMap(t2 -> {
long lastEmitted = t2.getT1();
Context ctx = t2.getT2();
int rl = ctx.getOrDefault("repeatsLeft", 0);
if (rl > 0) {
// /!\ THE ctx.put HERE IS THE ESSENTIAL PART /!\
return Mono.just(ctx.put("repeatsLeft", rl - 1)
.put("emitted", lastEmitted));
} else {
return Mono.error(new IllegalStateException("repeats exhausted"));
}
})
)
repeatFactory
- the Function
that returns the associated Publisher
companion, given a Flux
that signals each onComplete as a Long
representing the number of source elements emitted in the latest attempt.Flux
that repeats on onComplete when the companion Publisher
produces an
onNext signalpublic final ConnectableFlux<T> replay()
Flux
into a hot source and cache last emitted signals for further Subscriber
. Will
retain an unbounded amount of onNext signals. Completion and Error will also be
replayed.
ConnectableFlux
public final ConnectableFlux<T> replay(Duration ttl)
Flux
into a connectable hot source and cache last emitted signals
for further Subscriber
. Will retain each onNext up to the given per-item
expiry timeout.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription
ttl
- Per-item and post termination timeout durationConnectableFlux
public final ConnectableFlux<T> replay(Duration ttl, Scheduler timer)
Flux
into a connectable hot source and cache last emitted signals
for further Subscriber
. Will retain onNext signal for up to the given
Duration
with a per-item ttl.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription
ttl
- Per-item and post termination timeout durationtimer
- a time-capable Scheduler
instance to read current time fromConnectableFlux
public final ConnectableFlux<T> replay(int history)
Flux
into a connectable hot source and cache last emitted
signals for further Subscriber
.
Will retain up to the given history size onNext signals. Completion and Error will also be
replayed.
Note that cache(0)
will only cache the terminal signal without
expiration.
history
- number of events retained in history excluding complete and
errorConnectableFlux
public final ConnectableFlux<T> replay(int history, Duration ttl)
Flux
into a connectable hot source and cache last emitted signals
for further Subscriber
. Will retain up to the given history size onNext
signals with a per-item ttl.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription
history
- number of events retained in history excluding complete and errorttl
- Per-item and post termination timeout durationConnectableFlux
public final ConnectableFlux<T> replay(int history, Duration ttl, Scheduler timer)
Flux
into a connectable hot source and cache last emitted signals
for further Subscriber
. Will retain up to the given history size onNext
signals with a per-item ttl.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription
history
- number of events retained in history excluding complete and errorttl
- Per-item and post termination timeout durationtimer
- a Scheduler
instance to read current time fromConnectableFlux
public final Flux<T> retry()
Flux
sequence if it signals any error, indefinitely.
Flux
that retries on onErrorpublic final Flux<T> retry(long numRetries)
Flux
sequence if it signals any error, for a fixed
number of times.
Note that passing Long.MAX_VALUE is treated as infinite retry.
numRetries
- the number of times to tolerate an errorFlux
that retries on onError up to the specified number of retry attempts.public final Flux<T> retry(long numRetries, Predicate<? super Throwable> retryMatcher)
Flux
sequence up to the specified number of retries if it signals any
error that match the given Predicate
, otherwise push the error downstream.
numRetries
- the number of times to tolerate an errorretryMatcher
- the predicate to evaluate if retry should occur based on a given error signalFlux
that retries on onError up to the specified number of retry
attempts, only if the predicate matches.public final Flux<T> retry(Predicate<? super Throwable> retryMatcher)
Flux
sequence if it signals any error
that matches the given Predicate
, otherwise push the error downstream.
retryMatcher
- the predicate to evaluate if retry should occur based on a given error signalFlux
that retries on onError if the predicates matches.public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff)
Flux
up to numRetries
times using a
randomized exponential backoff strategy (jitter). The jitter factor is 50%
but the effective backoff delay cannot be less than firstBackoff
.
The randomized exponential backoff is good at preventing two typical issues with other simpler backoff strategies, namely:
numRetries
- the maximum number of attempts before an IllegalStateException
is raised (having the original retry-triggering exception as cause).firstBackoff
- the first backoff delay to apply then grow exponentially. Also
minimum delay even taking jitter into account.Flux
that retries on onError with exponentially growing randomized delays between retries.public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff)
Flux
up to numRetries
times using a
randomized exponential backoff strategy. The jitter factor is 50%
but the effective backoff delay cannot be less than firstBackoff
nor more
than maxBackoff
.
The randomized exponential backoff is good at preventing two typical issues with other simpler backoff strategies, namely:
numRetries
- the maximum number of attempts before an IllegalStateException
is raised (having the original retry-triggering exception as cause).firstBackoff
- the first backoff delay to apply then grow exponentially. Also
minimum delay even taking jitter into account.maxBackoff
- the maximum delay to apply despite exponential growth and jitter.Flux
that retries on onError with exponentially growing randomized delays between retries.public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor)
Flux
up to numRetries
times using a
randomized exponential backoff strategy, randomized with a user-provided jitter
factor between 0.d
(no jitter) and 1.0
(default is 0.5
).
Even with the jitter, the effective backoff delay cannot be less than
firstBackoff
nor more than maxBackoff
.
The randomized exponential backoff is good at preventing two typical issues with other simpler backoff strategies, namely:
numRetries
- the maximum number of attempts before an IllegalStateException
is raised (having the original retry-triggering exception as cause).firstBackoff
- the first backoff delay to apply then grow exponentially. Also
minimum delay even taking jitter into account.maxBackoff
- the maximum delay to apply despite exponential growth and jitter.jitterFactor
- the jitter percentage (as a double between 0.0 and 1.0).Flux
that retries on onError with exponentially growing randomized delays between retries.public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor, Scheduler backoffScheduler)
Flux
up to numRetries
times using a
randomized exponential backoff strategy, randomized with a user-provided jitter
factor between 0.d
(no jitter) and 1.0
(default is 0.5
).
Even with the jitter, the effective backoff delay cannot be less than
firstBackoff
nor more than maxBackoff
. The delays and subsequent
attempts are executed on the provided backoff Scheduler
(see
Mono.delay(Duration, Scheduler)
).
The randomized exponential backoff is good at preventing two typical issues with other simpler backoff strategies, namely:
numRetries
- the maximum number of attempts before an IllegalStateException
is raised (having the original retry-triggering exception as cause).firstBackoff
- the first backoff delay to apply then grow exponentially. Also
minimum delay even taking jitter into account.maxBackoff
- the maximum delay to apply despite exponential growth and jitter.backoffScheduler
- the Scheduler
on which the delays and subsequent attempts are executed.jitterFactor
- the jitter percentage (as a double between 0.0 and 1.0).Flux
that retries on onError with exponentially growing randomized delays between retries.public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, Scheduler backoffScheduler)
Flux
up to numRetries
times using a
randomized exponential backoff strategy. The jitter factor is 50%
but the effective backoff delay cannot be less than firstBackoff
nor more
than maxBackoff
. The delays and subsequent attempts are materialized on the
provided backoff Scheduler
(see Mono.delay(Duration, Scheduler)
).
The randomized exponential backoff is good at preventing two typical issues with other simpler backoff strategies, namely:
numRetries
- the maximum number of attempts before an IllegalStateException
is raised (having the original retry-triggering exception as cause).firstBackoff
- the first backoff delay to apply then grow exponentially. Also
minimum delay even taking jitter into account.maxBackoff
- the maximum delay to apply despite exponential growth and jitter.backoffScheduler
- the Scheduler
on which the delays and subsequent attempts are executed.Flux
that retries on onError with exponentially growing randomized delays between retries.public final Flux<T> retryWhen(Function<Flux<Throwable>,? extends Publisher<?>> whenFactory)
Flux
when a companion sequence signals
an item in response to this Flux
error signal
If the companion sequence signals when the Flux
is active, the retry
attempt is suppressed and any terminal signal will terminate the Flux
source with the same signal
immediately.
Note that if the companion Publisher
created by the whenFactory
emits Context
as trigger objects, these Context
will REPLACE the
operator's own Context. Please be careful there: replacing the
Context means that some keys you don't own could be removed, breaking libraries
that depend on them. As a result, the recommended approach is to always create such
a Context
trigger by starting from the original Context (ensuring the trigger
contains all the keys from the original, unless you absolutely know you want to
remove one of these keys):
.retryWhen(errorCurrentAttempt -> errorCurrentAttempt
.flatMap(e -> Mono.subscriberContext().map(ctx -> Tuples.of(e, ctx)))
.flatMap(t2 -> {
Throwable lastError = t2.getT1();
Context ctx = t2.getT2();
int rl = ctx.getOrDefault("retriesLeft", 0);
if (rl > 0) {
// /!\ THE ctx.put HERE IS THE ESSENTIAL PART /!\
return Mono.just(ctx.put("retriesLeft", rl - 1)
.put("lastError", lastError));
} else {
return Mono.error(new IllegalStateException("retries exhausted", lastError));
}
})
)
public final Flux<T> sample(Duration timespan)
Flux
by periodically emitting an item corresponding to that
Flux
latest emitted value within the periodical time window.
Note that if some elements are emitted quicker than the timespan just before source
completion, the last of these elements will be emitted along with the onComplete
signal.
timespan
- the duration of the window after which to emit the latest observed itemFlux
sampled to the last item seen over each periodic windowpublic final <U> Flux<T> sample(Publisher<U> sampler)
Flux
by emitting an item corresponding to that Flux
latest emitted value whenever a companion sampler Publisher
signals a value.
Termination of either Publisher
will result in termination for the Subscriber
as well.
Note that if some elements are emitted just before source completion and before a
last sampler can trigger, the last of these elements will be emitted along with the
onComplete signal.
Both Publisher
will run in unbounded mode because the backpressure
would interfere with the sampling precision.
public final Flux<T> sampleFirst(Duration timespan)
Flux
then skip the values that follow
within a given duration.
timespan
- the duration during which to skip values after each sampleFlux
sampled to the first item of each duration-based windowpublic final <U> Flux<T> sampleFirst(Function<? super T,? extends Publisher<U>> samplerFactory)
Flux
then skip the values that follow
before the next signal from a companion sampler Publisher
.
U
- the companion reified typesamplerFactory
- supply a companion sampler Publisher
which signals the end of the skip windowFlux
sampled to the first item observed in each window closed by the sampler signalspublic final <U> Flux<T> sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory)
Flux
only if there were no new values emitted
during the window defined by a companion Publisher
derived from that particular
value.
Note that this means that the last value in the sequence is always emitted.
U
- the companion reified typethrottlerFactory
- supply a companion sampler Publisher
which signals
the end of the window during which no new emission should occur. If it is the case,
the original value triggering the window is emitted.Flux
sampled to items not followed by any other item within a window
defined by a companion Publisher
public final <U> Flux<T> sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory, int maxConcurrency)
Flux
only if there were no new values emitted
during the window defined by a companion Publisher
derived from that particular
value.
The provided maxConcurrency will keep a bounded maximum of concurrent timeouts and drop any new items until at least one timeout terminates.
Note that this means that the last value in the sequence is always emitted.
U
- the throttling typethrottlerFactory
- supply a companion sampler Publisher
which signals
the end of the window during which no new emission should occur. If it is the case,
the original value triggering the window is emitted.maxConcurrency
- the maximum number of concurrent timeoutsFlux
sampled to items not followed by any other item within a window
defined by a companion Publisher
public final <A> Flux<A> scan(A initial, BiFunction<A,? super T,A> accumulator)
Flux
values with an accumulator BiFunction
and
also emit the intermediate results of this function.
The accumulation works as follows:
result[0] = initialValue;
result[1] = accumulator(result[0], source[0])
result[2] = accumulator(result[1], source[1])
result[3] = accumulator(result[2], source[2])
...
A
- the accumulated typeinitial
- the initial leftmost argument to pass to the reduce functionaccumulator
- the accumulating BiFunction
Flux
starting with initial statepublic final Flux<T> scan(BiFunction<T,T,T> accumulator)
Flux
values with an accumulator BiFunction
and
also emit the intermediate results of this function.
Unlike scan(Object, BiFunction)
, this operator doesn't take an initial value
but treats the first Flux
value as initial value.
The accumulation works as follows:
result[0] = source[0]
result[1] = accumulator(result[0], source[1])
result[2] = accumulator(result[1], source[2])
result[3] = accumulator(result[2], source[3])
...
accumulator
- the accumulating BiFunction
Flux
public final <A> Flux<A> scanWith(Supplier<A> initial, BiFunction<A,? super T,A> accumulator)
Flux
values with the help of an accumulator BiFunction
and also emits the intermediate results. A seed value is lazily provided by a
Supplier
invoked for each Subscriber
.
The accumulation works as follows:
result[0] = initialValue;
result[1] = accumulator(result[0], source[0])
result[2] = accumulator(result[1], source[1])
result[3] = accumulator(result[2], source[2])
...
A
- the accumulated typeinitial
- the supplier providing the seed, the leftmost parameter initially
passed to the reduce functionaccumulator
- the accumulating BiFunction
Flux
starting with initial statepublic final Flux<T> share()
Flux
that multicasts (shares) the original Flux
.
As long as there is at least one Subscriber
this Flux
will be subscribed and
emitting data.
When all subscribers have cancelled it will cancel the source
Flux
.
This is an alias for publish()
.ConnectableFlux.refCount()
.
public final Mono<T> single()
Flux
source or signal
NoSuchElementException
for an empty source, or
IndexOutOfBoundsException
for a source with more than one element.
Mono
with the single item or an error signalpublic final Mono<T> single(T defaultValue)
Flux
source and emit a default
value for an empty source, but signal an IndexOutOfBoundsException
for a
source with more than one element.
public final Mono<T> singleOrEmpty()
Flux
source, and accept an empty
source but signal an IndexOutOfBoundsException
for a source with more than
one element.
Mono
with the expected single item, no item or an errorpublic final Flux<T> skip(Duration timespan)
Flux
emitted within the specified initial duration.
timespan
- the initial time window during which to drop elementsFlux
dropping at the beginning until the end of the given durationpublic final Flux<T> skip(long skipped)
Flux
then
emit the remaining elements.
skipped
- the number of elements to dropFlux
with the specified number of elements skipped at
the beginningpublic final Flux<T> skipLast(int n)
Flux
sequence.
n
- the number of elements to drop before completionFlux
dropping the specified number of elements at the end of the
sequencepublic final Flux<T> sort()
Flux
by collecting and sorting them in the background
then emitting the sorted sequence once this sequence completes.
Each item emitted by the Flux
must implement Comparable
with
respect to all other items in the sequence.
Note that calling sort
with long, non-terminating or infinite sources
might cause OutOfMemoryError
. Use sequence splitting like window(int)
to sort batches in that case.
Flux
ClassCastException
- if any item emitted by the Flux
does not implement
Comparable
with respect to all other items emitted by the Flux
public final Flux<T> sort(Comparator<? super T> sortFunction)
Flux
using a Comparator
function, by
collecting and sorting elements in the background then emitting the sorted sequence
once this sequence completes.
Note that calling sort
with long, non-terminating or infinite sources
might cause OutOfMemoryError
@SafeVarargs public final Flux<T> startWith(T... values)
Flux
sequence.
public final Disposable subscribe()
Flux
and request unbounded demand.
This version doesn't specify any consumption behavior for the events from the chain, especially no error handling, so other variants should usually be preferred.
Disposable
that can be used to cancel the underlying Subscription
public final Disposable subscribe(Consumer<? super T> consumer)
Consumer
to this Flux
that will consume all the
elements in the sequence. It will request an unbounded demand (Long.MAX_VALUE
).
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer)
.
For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber)
with a BaseSubscriber
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each value (onNext signal)Disposable
that can be used to cancel the underlying Subscription
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer)
Flux
with a Consumer
that will consume all the
elements in the sequence, as well as a Consumer
that will handle errors.
The subscription will request an unbounded demand (Long.MAX_VALUE
).
For a passive version that observe and forward incoming data see
doOnNext(java.util.function.Consumer)
and doOnError(java.util.function.Consumer)
.
For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber)
with a BaseSubscriber
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumers are not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each next signalerrorConsumer
- the consumer to invoke on error signalDisposable
that can be used to cancel the underlying Subscription
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer)
Consumer
to this Flux
that will respectively consume all the
elements in the sequence, handle errors and react to completion. The subscription
will request unbounded demand (Long.MAX_VALUE
).
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer)
,
doOnError(java.util.function.Consumer)
and doOnComplete(Runnable)
.
For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber)
with a BaseSubscriber
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each valueerrorConsumer
- the consumer to invoke on error signalcompleteConsumer
- the consumer to invoke on complete signalDisposable
that can be used to cancel the underlying Subscription
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer)
Consumer
to this Flux
that will respectively consume all the
elements in the sequence, handle errors, react to completion, and request upon subscription.
It will let the provided subscriptionConsumer
request the adequate amount of data, or request unbounded demand
Long.MAX_VALUE
if no such consumer is provided.
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer)
,
doOnError(java.util.function.Consumer)
, doOnComplete(Runnable)
and doOnSubscribe(Consumer)
.
For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber)
with a BaseSubscriber
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each valueerrorConsumer
- the consumer to invoke on error signalcompleteConsumer
- the consumer to invoke on complete signalsubscriptionConsumer
- the consumer to invoke on subscribe signal, to be used
for the initial request
, or null for max requestDisposable
that can be used to cancel the underlying Subscription
public abstract void subscribe(CoreSubscriber<? super T> actual)
Publisher.subscribe(Subscriber)
that will bypass
Hooks.onLastOperator(Function)
pointcut.
In addition to behave as expected by Publisher.subscribe(Subscriber)
in a controlled manner, it supports direct subscribe-time Context
passing.
actual
- the Subscriber
interested into the published sequencesubscribe(Subscriber)
public final void subscribe(Subscriber<? super T> actual)
public final Flux<T> subscribeOn(Scheduler scheduler)
Scheduler
's Scheduler.Worker
.
As such, placing this operator anywhere in the chain will also impact the execution
context of onNext/onError/onComplete signals from the beginning of the chain up to
the next occurrence of a publishOn
.
Note that if you are using an eager or blocking
create(Consumer, FluxSink.OverflowStrategy)
as the source, it can lead to deadlocks due to requests piling up behind the emitter.
In such case, you should call subscribeOn(scheduler, false)
instead.
Typically used for slow publisher e.g., blocking IO, fast consumer(s) scenarios.
flux.subscribeOn(Schedulers.single()).subscribe()
Note that Scheduler.Worker.schedule(Runnable)
raising
RejectedExecutionException
on late
Subscription.request(long)
will be propagated to the request caller.
scheduler
- a Scheduler
providing the Scheduler.Worker
where to subscribeFlux
requesting asynchronouslypublishOn(Scheduler)
,
subscribeOn(Scheduler, boolean)
public final Flux<T> subscribeOn(Scheduler scheduler, boolean requestOnSeparateThread)
Scheduler
's Scheduler.Worker
.
Request will be run on that worker too depending on the requestOnSeparateThread
parameter (which defaults to true in the subscribeOn(Scheduler)
version).
As such, placing this operator anywhere in the chain will also impact the execution
context of onNext/onError/onComplete signals from the beginning of the chain up to
the next occurrence of a publishOn
.
Note that if you are using an eager or blocking
create(Consumer, FluxSink.OverflowStrategy)
as the source, it can lead to deadlocks due to requests piling up behind the emitter.
Thus this operator has a requestOnSeparateThread
parameter, which should be
set to false
in this case.
Typically used for slow publisher e.g., blocking IO, fast consumer(s) scenarios.
flux.subscribeOn(Schedulers.single()).subscribe()
Note that Scheduler.Worker.schedule(Runnable)
raising
RejectedExecutionException
on late
Subscription.request(long)
will be propagated to the request caller.
scheduler
- a Scheduler
providing the Scheduler.Worker
where to subscriberequestOnSeparateThread
- whether or not to also perform requests on the worker.
true
to behave like subscribeOn(Scheduler)
Flux
requesting asynchronouslypublishOn(Scheduler)
,
subscribeOn(Scheduler)
public final Flux<T> subscriberContext(Context mergeContext)
Context
by adding all values
from the given Context
, producing a new Context
that is propagated
upstream.
The Context
propagation happens once per subscription (not on each onNext):
it is done during the subscribe(Subscriber)
phase, which runs from
the last operator of a chain towards the first.
So this operator enriches a Context
coming from under it in the chain
(downstream, by default an empty one) and makes the new enriched Context
visible to operators above it in the chain.
public final Flux<T> subscriberContext(Function<Context,Context> doOnContext)
Context
by applying a Function
to it, producing a new Context
that is propagated upstream.
The Context
propagation happens once per subscription (not on each onNext):
it is done during the subscribe(Subscriber)
phase, which runs from
the last operator of a chain towards the first.
So this operator enriches a Context
coming from under it in the chain
(downstream, by default an empty one) and makes the new enriched Context
visible to operators above it in the chain.
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber)
Subscriber
to this Flux
and return said
Subscriber
(eg. a FluxProcessor
).
If you need more control over backpressure and the request, use aflux.subscribeWith(WorkQueueProcessor.create()).subscribe()
BaseSubscriber
.E
- the reified type from the input/output subscribersubscriber
- the Subscriber
to subscribe with and returnSubscriber
public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate)
Publisher
if this sequence is completed without any data.
public final <V> Flux<V> switchMap(Function<? super T,Publisher<? extends V>> fn, int prefetch)
public final <V> Flux<V> switchOnFirst(BiFunction<Signal<? extends T>,Flux<T>,Publisher<? extends V>> transformer)
Flux
once it emits its first element, making a
conditional transformation possible. This operator first requests one element
from the source then applies a transformation derived from the first Signal
and the source. The whole source (including the first signal) is passed as second
argument to the BiFunction
and it is very strongly advised to always build
upon with operators (see below).
Note that the source might complete or error immediately instead of emitting,
in which case the Signal
would be onComplete or onError. It is NOT
necessarily an onNext Signal, and must be checked accordingly.
For example, this operator could be used to define a dynamic transformation that depends on the first element (which could contain routing metadata for instance):
fluxOfIntegers.switchOnFirst((signal, flux) -> { if (signal.hasValue()) { ColoredShape firstColor = signal.get(); return flux.filter(v -> !v.hasSameColorAs(firstColor)) } return flux; //either early complete or error, this forwards the termination in any case //`return flux.onErrorResume(t -> Mono.empty());` instead would suppress an early error //`return Flux.just(1,2,3);` instead would suppress an early error and return 1, 2, 3. //It would also only cancel the original `flux` at the completion of `just`. })
It is advised to return a Publisher
derived from the original Flux
in all cases, as not doing so would keep the original Publisher
open and
hanging with a single request until the inner Publisher
terminates or
the whole Flux
is cancelled.
V
- the item type in the returned Flux
transformer
- A BiFunction
executed once the first signal is
available and used to transform the source conditionally. The whole source (including
first signal) is passed as second argument to the BiFunction.Flux
that transform the upstream once a signal is availablepublic static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers)
Flux
that mirrors the most recently emitted Publisher
,
forwarding its data until a new Publisher
comes in in the source.
The resulting Flux
will complete once there are no new Publisher
in
the source (source has completed) and the last mirrored Publisher
has also
completed.
T
- the produced typemergedPublishers
- The Publisher
of Publisher
to switch on and mirror.FluxProcessor
accepting publishers and producing Tpublic static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers, int prefetch)
Flux
that mirrors the most recently emitted Publisher
,
forwarding its data until a new Publisher
comes in in the source.
The resulting Flux
will complete once there are no new Publisher
in
the source (source has completed) and the last mirrored Publisher
has also
completed.
T
- the produced typemergedPublishers
- The Publisher
of Publisher
to switch on and mirror.prefetch
- the inner source request sizeFluxProcessor
accepting publishers and producing Tpublic final Flux<T> tag(String key, String value)
Set
of
all tags throughout the publisher chain by using Scannable.tags()
(as
traversed
by Scannable.parents()
).key
- a tag keyvalue
- a tag valuepublic final Flux<T> take(long n)
Flux
, if available.
If N is zero, the resulting Flux
completes as soon as this Flux
signals its first value (which is not not relayed, though).
Note that this operator doesn't manipulate the backpressure requested amount.
Rather, it merely lets requests from downstream propagate as is and cancels once
N elements have been emitted. As a result, the source could produce a lot of
extraneous elements in the meantime. If that behavior is undesirable and you do
not own the request from downstream (e.g. prefetching operators), consider
using limitRequest(long)
instead.
n
- the number of items to emit from this Flux
Flux
limited to size NlimitRequest(long)
public final Flux<T> takeLast(int n)
Flux
emitted before its completion.
public final Flux<T> takeUntil(Predicate<? super T> predicate)
Flux
until the given Predicate
matches.
This includes the matching data (unlike takeWhile(java.util.function.Predicate<? super T>)
).
public final Flux<T> takeWhile(Predicate<? super T> continuePredicate)
Flux
while a predicate returns TRUE
for the values (checked before each value is delivered).
This only includes the matching data (unlike takeUntil(java.util.function.Predicate<? super T>)
).
public final Mono<Void> then()
Mono<Void>
that completes when this Flux
completes.
This will actively ignore the sequence and only replay completion or error signals.
public final Mono<Void> thenEmpty(Publisher<Void> other)
Mono<Void>
that waits for this Flux
to complete then
for a supplied Publisher<Void>
to also complete. The
second completion signal is replayed, or any error signal that occurs instead.
public final Flux<T> timeout(Duration timeout)
TimeoutException
as soon as no item is emitted within the
given Duration
from the previous emission (or the subscription for the first item).
public final Flux<T> timeout(Duration timeout, @Nullable Publisher<? extends T> fallback)
Flux
as soon as no item is emitted within the
given Duration
from the previous emission (or the subscription for the first item).
If the given Publisher
is null, signal a TimeoutException
instead.
public final Flux<T> timeout(Duration timeout, @Nullable Publisher<? extends T> fallback, Scheduler timer)
Flux
as soon as no item is emitted within the
given Duration
from the previous emission (or the subscription for the
first item), as measured on the specified Scheduler
.
If the given Publisher
is null, signal a TimeoutException
instead.
public final Flux<T> timeout(Duration timeout, Scheduler timer)
TimeoutException
as soon as no item is emitted within the
given Duration
from the previous emission (or the subscription for the first
item), as measured by the specified Scheduler
.
public final <U> Flux<T> timeout(Publisher<U> firstTimeout)
TimeoutException
in case the first item from this Flux
has
not been emitted before the given Publisher
emits.
public final <U,V> Flux<T> timeout(Publisher<U> firstTimeout, Function<? super T,? extends Publisher<V>> nextTimeoutFactory)
TimeoutException
in case the first item from this Flux
has
not been emitted before the firstTimeout
Publisher
emits, and whenever
each subsequent elements is not emitted before a Publisher
generated from
the latest element signals.
U
- the type of the elements of the first timeout PublisherV
- the type of the elements of the subsequent timeout PublishersfirstTimeout
- the timeout Publisher
that must not emit before the first signal from this Flux
nextTimeoutFactory
- the timeout Publisher
factory for each next itemFlux
that can time out if each element does not come before
a signal from a per-item companion Publisher
public final <U,V> Flux<T> timeout(Publisher<U> firstTimeout, Function<? super T,? extends Publisher<V>> nextTimeoutFactory, Publisher<? extends T> fallback)
Publisher
in case the first item from this Flux
has
not been emitted before the firstTimeout
Publisher
emits, and whenever
each subsequent elements is not emitted before a Publisher
generated from
the latest element signals.
U
- the type of the elements of the first timeout PublisherV
- the type of the elements of the subsequent timeout PublishersfirstTimeout
- the timeout Publisher
that must not emit before the first signal from this Flux
nextTimeoutFactory
- the timeout Publisher
factory for each next itemfallback
- the fallback Publisher
to subscribe when a timeout occursFlux
that can time out if each element does not come before
a signal from a per-item companion Publisher
public final Flux<Tuple2<Long,T>> timestamp()
Tuple2
pair of T1 the current clock time in
millis (as a Long
measured by the parallel
Scheduler) and T2 the emitted data (as a T
), for each item from this Flux
.
Flux
public final Iterable<T> toIterable()
Flux
into a lazy Iterable
blocking on
Iterator.next()
calls.
Note that iterating from within threads marked as "non-blocking only" is illegal and will
cause an IllegalStateException
to be thrown, but obtaining the Iterable
itself within these threads is ok.
Iterable
public final Iterable<T> toIterable(int batchSize)
Flux
into a lazy Iterable
blocking on
Iterator.next()
calls.
Note that iterating from within threads marked as "non-blocking only" is illegal and will
cause an IllegalStateException
to be thrown, but obtaining the Iterable
itself within these threads is ok.
public final Iterable<T> toIterable(int batchSize, @Nullable Supplier<Queue<T>> queueProvider)
Flux
into a lazy Iterable
blocking on
Iterator.next()
calls.
Note that iterating from within threads marked as "non-blocking only" is illegal and will
cause an IllegalStateException
to be thrown, but obtaining the Iterable
itself within these threads is ok.
public final Stream<T> toStream()
Flux
into a lazy Stream
blocking for each source
onNext
call.
Note that iterating from within threads marked as "non-blocking only" is illegal and will
cause an IllegalStateException
to be thrown, but obtaining the Stream
itself or applying lazy intermediate operation on the stream within these threads is ok.
Stream
of unknown size with onClose attached to Subscription.cancel()
public final Stream<T> toStream(int batchSize)
Flux
into a lazy Stream
blocking for each source
onNext
call.
Note that iterating from within threads marked as "non-blocking only" is illegal and will
cause an IllegalStateException
to be thrown, but obtaining the Stream
itself or applying lazy intermediate operation on the stream within these threads is ok.
batchSize
- the bounded capacity to prefetch from this Flux
or
Integer.MAX_VALUE
for unbounded demandStream
of unknown size with onClose attached to Subscription.cancel()
public final <V> Flux<V> transform(Function<? super Flux<T>,? extends Publisher<V>> transformer)
Flux
in order to generate a target Flux
. Unlike compose(Function)
, the
provided function is executed as part of assembly.
FunctionapplySchedulers = flux -> flux.subscribeOn(Schedulers.elastic()) .publishOn(Schedulers.parallel()); flux.transform(applySchedulers).map(v -> v * v).subscribe();
V
- the item type in the returned Flux
transformer
- the Function
to immediately map this Flux
into a target Flux
instance.Flux
for deferred composition of {@link Flux} for each {@link Subscriber}
,
for a loose conversion to an arbitrary type
public static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)
Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal even.
For an asynchronous version of the cleanup, with distinct path for onComplete, onError
and cancel terminations, see usingWhen(Publisher, Function, Function, Function, Function)
.
T
- emitted typeD
- resource typeresourceSupplier
- a Callable
that is called on subscribe to generate the resourcesourceSupplier
- a factory to derive a Publisher
from the supplied resourceresourceCleanup
- a resource cleanup callback invoked on completionFlux
built around a disposable resourceusingWhen(Publisher, Function, Function, Function, Function)
,
usingWhen(Publisher, Function, Function)
public static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager)
For an asynchronous version of the cleanup, with distinct path for onComplete, onError
and cancel terminations, see usingWhen(Publisher, Function, Function, Function, Function)
.
T
- emitted typeD
- resource typeresourceSupplier
- a Callable
that is called on subscribe to generate the resourcesourceSupplier
- a factory to derive a Publisher
from the supplied resourceresourceCleanup
- a resource cleanup callback invoked on completioneager
- true to clean before terminating downstream subscribersFlux
built around a disposable resourceusingWhen(Publisher, Function, Function, Function, Function)
,
usingWhen(Publisher, Function, Function)
public static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> resourceClosure, Function<? super D,? extends Publisher<?>> asyncCleanup)
Publisher
for each individual Subscriber
,
while streaming the values from a Publisher
derived from the same resource.
Whenever the resulting sequence terminates, a provided Function
generates
a "cleanup" Publisher
that is invoked but doesn't change the content of the
main sequence. Instead it just defers the termination (unless it errors, in which case
the error suppresses the original termination signal).
Note that if the resource supplying Publisher
emits more than one resource, the
subsequent resources are dropped (Operators.onNextDropped(Object, Context)
). If
the publisher errors AFTER having emitted one resource, the error is also silently dropped
(Operators.onErrorDropped(Throwable, Context)
).
An empty completion or error without at least one onNext signal triggers a short-circuit
of the main sequence with the same terminal signal (no resource is established, no
cleanup is invoked).
T
- the type of elements emitted by the resource closure, and thus the main sequenceD
- the type of the resource objectresourceSupplier
- a Publisher
that "generates" the resource,
subscribed for each subscription to the main sequenceresourceClosure
- a factory to derive a Publisher
from the supplied resourceasyncCleanup
- an asynchronous resource cleanup invoked when the resource
closure terminates (with onComplete, onError or cancel)Flux
built around a "transactional" resource, with asynchronous
cleanup on all terminations (onComplete, onError, cancel)usingWhen(Publisher, Function, Function, Function, Function)
public static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> resourceClosure, Function<? super D,? extends Publisher<?>> asyncComplete, Function<? super D,? extends Publisher<?>> asyncError)
Publisher
for each individual Subscriber
,
while streaming the values from a Publisher
derived from the same resource.
Whenever the resulting sequence terminates, the relevant Function
generates
a "cleanup" Publisher
that is invoked but doesn't change the content of the
main sequence. Instead it just defers the termination (unless it errors, in which case
the error suppresses the original termination signal).
Individual cleanups can also be associated with main sequence cancellation and error terminations:
Note that if the resource supplying Publisher
emits more than one resource, the
subsequent resources are dropped (Operators.onNextDropped(Object, Context)
). If
the publisher errors AFTER having emitted one resource, the error is also silently dropped
(Operators.onErrorDropped(Throwable, Context)
).
An empty completion or error without at least one onNext signal triggers a short-circuit
of the main sequence with the same terminal signal (no resource is established, no
cleanup is invoked).
Additionally, the terminal signal is replaced by any error that might have happened
in the terminating Publisher
:
Finally, early cancellations will cancel the resource supplying Publisher
:
T
- the type of elements emitted by the resource closure, and thus the main sequenceD
- the type of the resource objectresourceSupplier
- a Publisher
that "generates" the resource,
subscribed for each subscription to the main sequenceresourceClosure
- a factory to derive a Publisher
from the supplied resourceasyncComplete
- an asynchronous resource cleanup invoked if the resource closure terminates with onComplete or is cancelledasyncError
- an asynchronous resource cleanup invoked if the resource closure terminates with onErrorFlux
built around a "transactional" resource, with several
termination path triggering asynchronous cleanup sequencesusingWhen(Publisher, Function, Function, Function, Function)
public static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> resourceClosure, Function<? super D,? extends Publisher<?>> asyncComplete, Function<? super D,? extends Publisher<?>> asyncError, Function<? super D,? extends Publisher<?>> asyncCancel)
Publisher
for each individual Subscriber
,
while streaming the values from a Publisher
derived from the same resource.
Whenever the resulting sequence terminates, the relevant Function
generates
a "cleanup" Publisher
that is invoked but doesn't change the content of the
main sequence. Instead it just defers the termination (unless it errors, in which case
the error suppresses the original termination signal).
Individual cleanups can also be associated with main sequence cancellation and error terminations:
Note that if the resource supplying Publisher
emits more than one resource, the
subsequent resources are dropped (Operators.onNextDropped(Object, Context)
). If
the publisher errors AFTER having emitted one resource, the error is also silently dropped
(Operators.onErrorDropped(Throwable, Context)
).
An empty completion or error without at least one onNext signal triggers a short-circuit
of the main sequence with the same terminal signal (no resource is established, no
cleanup is invoked).
Additionally, the terminal signal is replaced by any error that might have happened
in the terminating Publisher
:
Finally, early cancellations will cancel the resource supplying Publisher
:
T
- the type of elements emitted by the resource closure, and thus the main sequenceD
- the type of the resource objectresourceSupplier
- a Publisher
that "generates" the resource,
subscribed for each subscription to the main sequenceresourceClosure
- a factory to derive a Publisher
from the supplied resourceasyncComplete
- an asynchronous resource cleanup invoked if the resource closure terminates with onCompleteasyncError
- an asynchronous resource cleanup invoked if the resource closure terminates with onErrorasyncCancel
- an asynchronous resource cleanup invoked if the resource closure is cancelled.
When null
, the asyncComplete
path is used instead.Flux
built around a "transactional" resource, with several
termination path triggering asynchronous cleanup sequencesusingWhen(Publisher, Function, Function, Function)
public final Flux<Flux<T>> window(Duration windowingTimespan, Duration openWindowEvery)
Flux
sequence into multiple Flux
windows that open
for a given windowingTimespan
Duration
, after which it closes with onComplete.
Each window is opened at a regular timeShift
interval, starting from the
first item.
Both durations are measured on the parallel
Scheduler.
When windowingTimespan < openWindowEvery : dropping windows
When windowingTimespan > openWindowEvery : overlapping windows
When windowingTimespan == openWindowEvery : exact windows
windowingTimespan
- the maximum Flux
window Duration
openWindowEvery
- the period of time at which to create new Flux
windowsFlux
of Flux
windows opened at regular intervals and
closed after a Duration
public final Flux<Flux<T>> window(Duration windowingTimespan, Duration openWindowEvery, Scheduler timer)
Flux
sequence into multiple Flux
windows that open
for a given windowingTimespan
Duration
, after which it closes with onComplete.
Each window is opened at a regular timeShift
interval, starting from the
first item.
Both durations are measured on the provided Scheduler
.
When windowingTimespan < openWindowEvery : dropping windows
When windowingTimespan > openWindowEvery : overlapping windows
When openWindowEvery == openWindowEvery : exact windows
windowingTimespan
- the maximum Flux
window Duration
openWindowEvery
- the period of time at which to create new Flux
windowstimer
- a time-capable Scheduler
instance to run onFlux
of Flux
windows opened at regular intervals and
closed after a Duration
public final Flux<Flux<T>> window(Duration windowingTimespan, Scheduler timer)
Flux
sequence into continuous, non-overlapping windows that open
for a windowingTimespan
Duration
(as measured on the provided Scheduler
).
windowingTimespan
- the Duration
to delimit Flux
windowstimer
- a time-capable Scheduler
instance to run onFlux
of Flux
windows continuously opened for a given Duration
public final Flux<Flux<T>> window(int maxSize, int skip)
Flux
sequence into multiple Flux
windows of size
maxSize
, that each open every skip
elements in the source.
When maxSize < skip : dropping windows
When maxSize > skip : overlapping windows
When maxSize == skip : exact windows
maxSize
- the maximum number of items to emit in the window before closing itskip
- the number of items to count before opening and emitting a new windowFlux
of Flux
windows based on element count and opened every skipCountpublic final Flux<Flux<T>> window(Publisher<?> boundary)
Flux
sequence into continuous, non-overlapping windows
where the window boundary is signalled by another Publisher
boundary
- a Publisher
to emit any item for a split signal and complete to terminateFlux
of Flux
windows delimited by a given Publisher
public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime)
Flux
sequence into multiple Flux
windows containing
maxSize
elements (or less for the final window) and starting from the first item.
Each Flux
window will onComplete once it contains maxSize
elements
OR it has been open for the given Duration
(as measured on the parallel
Scheduler).
maxSize
- the maximum number of items to emit in the window before closing itmaxTime
- the maximum Duration
since the window was opened before closing itFlux
of Flux
windows based on element count and durationpublic final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime, Scheduler timer)
Flux
sequence into multiple Flux
windows containing
maxSize
elements (or less for the final window) and starting from the first item.
Each Flux
window will onComplete once it contains maxSize
elements
OR it has been open for the given Duration
(as measured on the provided
Scheduler
).
maxSize
- the maximum number of items to emit in the window before closing itmaxTime
- the maximum Duration
since the window was opened before closing ittimer
- a time-capable Scheduler
instance to run onFlux
of Flux
windows based on element count and durationpublic final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger)
Flux
sequence into multiple Flux
windows delimited by the
given predicate. A new window is opened each time the predicate returns true, at which
point the previous window will receive the triggering element then onComplete.
Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window errors). This variant shouldn't expose empty windows, as the separators are emitted into the windows they close.
boundaryTrigger
- a predicate that triggers the next window when it becomes true.Flux
of Flux
windows, bounded depending
on the predicate.public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore)
Flux
sequence into multiple Flux
windows delimited by the
given predicate. A new window is opened each time the predicate returns true.
Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window completes or errors).
If cutBefore
is true, the old window will onComplete and the triggering
element will be emitted in the new window, which becomes immediately available.
This variant can emit an empty window if the sequence starts with a separator.
Otherwise, the triggering element will be emitted in the old window before it does
onComplete, similar to windowUntil(Predicate)
. This variant shouldn't
expose empty windows, as the separators are emitted into the windows they close.
boundaryTrigger
- a predicate that triggers the next window when it becomes true.cutBefore
- set to true to include the triggering element in the new window rather than the old.Flux
of Flux
windows, bounded depending
on the predicate.public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore, int prefetch)
Flux
sequence into multiple Flux
windows delimited by the given
predicate and using a prefetch. A new window is opened each time the predicate
returns true.
Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window completes or errors).
If cutBefore
is true, the old window will onComplete and the triggering
element will be emitted in the new window. This variant can emit an empty window
if the sequence starts with a separator.
Otherwise, the triggering element will be emitted in the old window before it does
onComplete, similar to windowUntil(Predicate)
. This variant shouldn't
expose empty windows, as the separators are emitted into the windows they close.
boundaryTrigger
- a predicate that triggers the next window when it becomes true.cutBefore
- set to true to include the triggering element in the new window rather than the old.prefetch
- the request size to use for this Flux
.Flux
of Flux
windows, bounded depending
on the predicate.public final <U,V> Flux<Flux<T>> windowWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector)
Flux
sequence into potentially overlapping windows controlled by items of a
start Publisher
and end Publisher
derived from the start values.
When Open signal is strictly not overlapping Close signal : dropping windows
When Open signal is strictly more frequent than Close signal : overlapping windows
When Open signal is exactly coordinated with Close signal : exact windows
U
- the type of the sequence opening windowsV
- the type of the sequence closing windows opened by the bucketOpening Publisher's elementsbucketOpening
- a Publisher
that opens a new window when it emits any itemcloseSelector
- a Function
given an opening signal and returning a Publisher
that
will close the window when emittingFlux
of Flux
windows opened by signals from a first
Publisher
and lasting until a selected second Publisher
emitspublic final Flux<Flux<T>> windowWhile(Predicate<T> inclusionPredicate)
Flux
sequence into multiple Flux
windows that stay open
while a given predicate matches the source elements. Once the predicate returns
false, the window closes with an onComplete and the triggering element is discarded.
Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window completes or errors). Empty windows can happen when a sequence starts with a separator or contains multiple separators, but a sequence that finishes with a separator won't cause a remainder empty window to be emitted.
inclusionPredicate
- a predicate that triggers the next window when it becomes false.Flux
of Flux
windows, each containing
subsequent elements that all passed a predicate.public final Flux<Flux<T>> windowWhile(Predicate<T> inclusionPredicate, int prefetch)
Flux
sequence into multiple Flux
windows that stay open
while a given predicate matches the source elements. Once the predicate returns
false, the window closes with an onComplete and the triggering element is discarded.
Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window completes or errors). Empty windows can happen when a sequence starts with a separator or contains multiple separators, but a sequence that finishes with a separator won't cause a remainder empty window to be emitted.
inclusionPredicate
- a predicate that triggers the next window when it becomes false.prefetch
- the request size to use for this Flux
.Flux
of Flux
windows, each containing
subsequent elements that all passed a predicate.public final <U,R> Flux<R> withLatestFrom(Publisher<? extends U> other, BiFunction<? super T,? super U,? extends R> resultSelector)
Flux
and another
Publisher
through a BiFunction
and emits the result.
The operator will drop values from this Flux
until the other
Publisher
produces any value.
If the other Publisher
completes without any value, the sequence is completed.
U
- the other Publisher
sequence typeR
- the result typeother
- the Publisher
to combine withresultSelector
- the bi-function called with each pair of source and other
elements that should return a single value to be emittedFlux
gated by another Publisher
@SafeVarargs public static <I,O> Flux<O> zip(Function<? super Object[],? extends O> combinator, int prefetch, Publisher<? extends I>... sources)
I
- the type of the input sourcesO
- the combined produced typecombinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamprefetch
- individual source request sizesources
- the array providing sources to zipFlux
@SafeVarargs public static <I,O> Flux<O> zip(Function<? super Object[],? extends O> combinator, Publisher<? extends I>... sources)
I
- the type of the input sourcesO
- the combined produced typecombinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamsources
- the array providing sources to zipFlux
public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources, Function<? super Object[],? extends O> combinator)
Iterable.iterator()
will be called on each Publisher.subscribe(Subscriber)
.
public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources, int prefetch, Function<? super Object[],? extends O> combinator)
Iterable.iterator()
will be called on each Publisher.subscribe(Subscriber)
.
O
- the combined produced typesources
- the Iterable
providing sources to zipprefetch
- the inner source request sizecombinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
public static <TUPLE extends Tuple2,V> Flux<V> zip(Publisher<? extends Publisher<?>> sources, Function<? super TUPLE,? extends V> combinator)
Note that the Publisher
sources from the outer Publisher
will
accumulate into an exhaustive list before starting zip operation.
TUPLE
- the raw tuple typeV
- The produced output after transformation by the given combinatorsources
- The Publisher
of Publisher
sources to zip. A finite publisher is required.combinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
based on the produced valuepublic static <T1,T2> Flux<Tuple2<T1,T2>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2)
Tuple2
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
public static <T1,T2,O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends O> combinator)
T1
- type of the value from source1T2
- type of the value from source2O
- The produced output after transformation by the combinatorsource1
- The first Publisher
source to zip.source2
- The second Publisher
source to zip.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamFlux
public static <T1,T2,T3> Flux<Tuple3<T1,T2,T3>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3)
Tuple3
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4> Flux<Tuple4<T1,T2,T3,T4>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4)
Tuple4
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5> Flux<Tuple5<T1,T2,T3,T4,T5>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5)
Tuple5
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5,T6> Flux<Tuple6<T1,T2,T3,T4,T5,T6>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6)
Tuple6
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5,T6,T7> Flux<Tuple7<T1,T2,T3,T4,T5,T6,T7>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7)
Tuple7
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6T7
- type of the value from source7source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.source7
- The seventh upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5,T6,T7,T8> Flux<Tuple8<T1,T2,T3,T4,T5,T6,T7,T8>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7, Publisher<? extends T8> source8)
Tuple8
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6T7
- type of the value from source7T8
- type of the value from source8source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.source7
- The seventh upstream Publisher
to subscribe to.source8
- The eight upstream Publisher
to subscribe to.Flux
public final <T2> Flux<Tuple2<T,T2>> zipWith(Publisher<? extends T2> source2)
Flux
with another Publisher
source, that is to say wait
for both to emit one element and combine these elements once into a Tuple2
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
public final <T2,V> Flux<V> zipWith(Publisher<? extends T2> source2, BiFunction<? super T,? super T2,? extends V> combinator)
Flux
with another Publisher
source, that is to say wait
for both to emit one element and combine these elements using a combinator
BiFunction
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T2
- type of the value from source2V
- The produced output after transformation by the combinatorsource2
- The second source Publisher
to zip with this Flux
.combinator
- The aggregate function that will receive a unique value from each
source and return the value to signal downstreamFlux
public final <T2> Flux<Tuple2<T,T2>> zipWith(Publisher<? extends T2> source2, int prefetch)
Flux
with another Publisher
source, that is to say wait
for both to emit one element and combine these elements once into a Tuple2
.
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
public final <T2,V> Flux<V> zipWith(Publisher<? extends T2> source2, int prefetch, BiFunction<? super T,? super T2,? extends V> combinator)
Flux
with another Publisher
source, that is to say wait
for both to emit one element and combine these elements using a combinator
BiFunction
The operator will continue doing so until any of the sources completes.
Errors will immediately be forwarded.
This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
T2
- type of the value from source2V
- The produced output after transformation by the combinatorsource2
- The second source Publisher
to zip with this Flux
.prefetch
- the request size to use for this Flux
and the other Publisher
combinator
- The aggregate function that will receive a unique value from each
source and return the value to signal downstreamFlux
public final <T2> Flux<Tuple2<T,T2>> zipWithIterable(Iterable<? extends T2> iterable)
public final <T2,V> Flux<V> zipWithIterable(Iterable<? extends T2> iterable, BiFunction<? super T,? super T2,? extends V> zipper)
Flux
with the content of an Iterable
, that is
to say combine one element from each, pairwise, using the given zipper BiFunction
.
T2
- the value type of the other iterable sequenceV
- the result typeiterable
- the Iterable
to zip withzipper
- the BiFunction
pair combinatorFlux