T
- the element type of this Reactive Streams Publisher
public abstract class Flux<T> extends Object implements Publisher<T>
Publisher
with rx operators that emits 0 to N elements, and then completes
(successfully or with an error).
It is intended to be used in implementations and return types. Input parameters should keep using raw
Publisher
as much as possible.
If it is known that the underlying Publisher
will emit 0 or 1 element, Mono
should be used
instead.
Note that using state in the java.util.function
/ lambdas used within Flux operators
should be avoided, as these may be shared between several Subscribers
.
Mono
Constructor and Description |
---|
Flux() |
Modifier and Type | Method and Description |
---|---|
Mono<Boolean> |
all(Predicate<? super T> predicate)
Emit a single boolean true if all values of this sequence match
the
Predicate . |
Mono<Boolean> |
any(Predicate<? super T> predicate)
Emit a single boolean true if any of the values of this
Flux sequence match
the predicate. |
<P> P |
as(Function<? super Flux<T>,P> transformer)
Immediately apply the given transformation to this
Flux in order to generate a target type. |
Flux<T> |
awaitOnSubscribe()
Intercepts the onSubscribe call and makes sure calls to Subscription methods
only happen after the child Subscriber has returned from its onSubscribe method.
|
T |
blockFirst()
Blocks until the upstream signals its first value or completes.
|
T |
blockFirst(Duration d)
Blocks until the upstream signals its first value or completes.
|
T |
blockFirstMillis(long timeout)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
T |
blockLast()
Blocks until the upstream completes and return the last emitted value.
|
T |
blockLast(Duration d)
Blocks until the upstream completes and return the last emitted value.
|
T |
blockLastMillis(long timeout)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<List<T>> |
buffer()
|
Flux<List<T>> |
buffer(Duration timespan)
|
Flux<List<T>> |
buffer(Duration timespan,
Duration timeshift)
Collect incoming values into multiple
List delimited by the given timeshift period. |
Flux<List<T>> |
buffer(Duration timespan,
Duration timeshift,
Scheduler timer)
Collect incoming values into multiple
List delimited by the given timeshift period. |
Flux<List<T>> |
buffer(Duration timespan,
Scheduler timer)
|
Flux<List<T>> |
buffer(int maxSize)
|
Flux<List<T>> |
buffer(int maxSize,
Duration timespan)
Deprecated.
use
bufferTimeout(int, Duration) instead, will be removed in 3.1.0 |
<C extends Collection<? super T>> |
buffer(int maxSize,
Duration timespan,
Supplier<C> bufferSupplier)
Deprecated.
use
bufferTimeout(int, Duration, Supplier) instead, will be removed in 3.1.0 |
Flux<List<T>> |
buffer(int maxSize,
int skip)
|
<C extends Collection<? super T>> |
buffer(int maxSize,
int skip,
Supplier<C> bufferSupplier)
Collect incoming values into multiple
Collection that will be pushed into
the returned Flux when the
given max size is reached or onComplete is received. |
<C extends Collection<? super T>> |
buffer(int maxSize,
Supplier<C> bufferSupplier)
Collect incoming values into multiple
Collection buckets that will be
pushed into the returned Flux
when the given max size is reached or onComplete is received. |
Flux<List<T>> |
buffer(Publisher<?> other)
|
<C extends Collection<? super T>> |
buffer(Publisher<?> other,
Supplier<C> bufferSupplier)
Collect incoming values into multiple
Collection delimited by the given Publisher signals. |
<U,V> Flux<List<T>> |
buffer(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector)
Deprecated.
will be removed in 3.1.0. Use
bufferWhen(Publisher, Function) instead. |
<U,V,C extends Collection<? super T>> |
buffer(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector,
Supplier<C> bufferSupplier)
Deprecated.
will be removed in 3.1.0. Use
bufferWhen(Publisher, Function, Supplier) instead. |
Flux<List<T>> |
bufferMillis(int maxSize,
long timespan)
Deprecated.
use
bufferTimeout(int, Duration) instead, will be removed in 3.1.0 |
Flux<List<T>> |
bufferMillis(int maxSize,
long timespan,
TimedScheduler timer)
Deprecated.
use
bufferTimeout(int, Duration, Scheduler) instead, will be removed in 3.1.0 |
<C extends Collection<? super T>> |
bufferMillis(int maxSize,
long timespan,
TimedScheduler timer,
Supplier<C> bufferSupplier)
Deprecated.
use
bufferTimeout(int, Duration, Scheduler, Supplier) instead, will be removed in 3.1.0 |
Flux<List<T>> |
bufferMillis(long timespan)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<List<T>> |
bufferMillis(long timespan,
long timeshift)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<List<T>> |
bufferMillis(long timespan,
long timeshift,
TimedScheduler timer)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<List<T>> |
bufferMillis(long timespan,
TimedScheduler timer)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<List<T>> |
bufferTimeout(int maxSize,
Duration timespan)
|
Flux<List<T>> |
bufferTimeout(int maxSize,
Duration timespan,
Scheduler timer)
|
<C extends Collection<? super T>> |
bufferTimeout(int maxSize,
Duration timespan,
Scheduler timer,
Supplier<C> bufferSupplier)
Collect incoming values into a
Collection that will be pushed into the returned Flux every timespan OR
maxSize items |
<C extends Collection<? super T>> |
bufferTimeout(int maxSize,
Duration timespan,
Supplier<C> bufferSupplier)
Collect incoming values into a
Collection that will be pushed into the returned Flux every timespan OR
maxSize items. |
Flux<List<T>> |
bufferTimeoutMillis(int maxSize,
long timespan)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<List<T>> |
bufferTimeoutMillis(int maxSize,
long timespan,
TimedScheduler timer)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
<C extends Collection<? super T>> |
bufferTimeoutMillis(int maxSize,
long timespan,
TimedScheduler timer,
Supplier<C> bufferSupplier)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<List<T>> |
bufferUntil(Predicate<? super T> predicate)
|
Flux<List<T>> |
bufferUntil(Predicate<? super T> predicate,
boolean cutBefore)
|
<U,V> Flux<List<T>> |
bufferWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector)
|
<U,V,C extends Collection<? super T>> |
bufferWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector,
Supplier<C> bufferSupplier)
Collect incoming values into multiple user-defined
Collection buffers started each time an opening
companion Publisher emits. |
Flux<List<T>> |
bufferWhile(Predicate<? super T> predicate)
|
Flux<T> |
cache()
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber . |
Flux<T> |
cache(Duration ttl)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cache(int history)
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber . |
Flux<T> |
cache(int history,
Duration ttl)
Turn this
Flux into a hot source and cache last emitted signals for further
Subscriber . |
Flux<T> |
cancelOn(Scheduler scheduler)
|
<E> Flux<E> |
cast(Class<E> clazz)
Cast the current
Flux produced type into a target produced type. |
Flux<T> |
checkpoint()
Activate assembly tracing for this particular
Flux , in case of an error
upstream of the checkpoint. |
Flux<T> |
checkpoint(String description)
Activate assembly tracing for this particular
Flux and give it
a description that will be reflected in the assembly traceback in case
of an error upstream of the checkpoint. |
<R,A> Mono<R> |
collect(Collector<? super T,A,? extends R> collector)
Collect the
Flux sequence with the given collector and supplied container on subscribe. |
<E> Mono<E> |
collect(Supplier<E> containerSupplier,
BiConsumer<E,? super T> collector)
Collect the
Flux sequence with the given collector and supplied container on subscribe. |
Mono<List<T>> |
collectList()
|
<K> Mono<Map<K,T>> |
collectMap(Function<? super T,? extends K> keyExtractor)
|
<K,V> Mono<Map<K,V>> |
collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor)
Convert all this
Flux sequence into a hashed map where the key is extracted by the given function and the value will be
the most recent extracted item for this key. |
<K,V> Mono<Map<K,V>> |
collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,V>> mapSupplier)
Convert all this
Flux sequence into a supplied map where the key is extracted by the given function and the value will
be the most recent extracted item for this key. |
<K> Mono<Map<K,Collection<T>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor)
Convert this
Flux sequence into a hashed map where the key is extracted by the given function and the value will be
all the emitted item for this key. |
<K,V> Mono<Map<K,Collection<V>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor)
Convert this
Flux sequence into a hashed map where the key is extracted by the given function and the value will be
all the extracted items for this key. |
<K,V> Mono<Map<K,Collection<V>>> |
collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,Collection<V>>> mapSupplier)
Convert this
Flux sequence into a supplied map where the key is extracted by the given function and the value will
be all the extracted items for this key. |
Mono<List<T>> |
collectSortedList()
|
Mono<List<T>> |
collectSortedList(Comparator<? super T> comparator)
|
static <T,V> Flux<V> |
combineLatest(Function<Object[],V> combinator,
int prefetch,
Publisher<? extends T>... sources)
Build a
Flux whose data are generated by the combination of the most recent published values from all
publishers. |
static <T,V> Flux<V> |
combineLatest(Function<Object[],V> combinator,
Publisher<? extends T>... sources)
Build a
Flux whose data are generated by the combination of the most recent published values from all
publishers. |
static <T,V> Flux<V> |
combineLatest(Iterable<? extends Publisher<? extends T>> sources,
Function<Object[],V> combinator)
Build a
Flux whose data are generated by the combination of the most recent published values from all
publishers. |
static <T,V> Flux<V> |
combineLatest(Iterable<? extends Publisher<? extends T>> sources,
int prefetch,
Function<Object[],V> combinator)
Build a
Flux whose data are generated by the combination of the most recent published values from all
publishers. |
static <T1,T2,V> Flux<V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
BiFunction<? super T1,? super T2,? extends V> combinator)
Build a
Flux whose data are generated by the combination of the most recent published values from all
publishers. |
static <T1,T2,T3,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Function<Object[],V> combinator)
Build a
Flux whose data are generated by the combination of the most recent published values from all
publishers. |
static <T1,T2,T3,T4,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Function<Object[],V> combinator)
Build a
Flux whose data are generated by the combination of the most recent published values from all
publishers. |
static <T1,T2,T3,T4,T5,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Function<Object[],V> combinator)
Build a
Flux whose data are generated by the combination of the most recent published values from all
publishers. |
static <T1,T2,T3,T4,T5,T6,V> |
combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6,
Function<Object[],V> combinator)
Build a
Flux whose data are generated by the combination of the most recent published values from all
publishers. |
<V> Flux<V> |
compose(Function<? super Flux<T>,? extends Publisher<V>> transformer)
|
static <T> Flux<T> |
concat(Iterable<? extends Publisher<? extends T>> sources)
Concat all sources pulled from the supplied
Iterator on Publisher.subscribe(org.reactivestreams.Subscriber<? super T>) from the passed Iterable until Iterator.hasNext()
returns false. |
static <T> Flux<T> |
concat(Publisher<? extends Publisher<? extends T>> sources)
Concat all sources emitted as an onNext signal from a parent
Publisher . |
static <T> Flux<T> |
concat(Publisher<? extends Publisher<? extends T>> sources,
int prefetch)
Concat all sources emitted as an onNext signal from a parent
Publisher . |
static <T> Flux<T> |
concat(Publisher<? extends T>... sources)
Concat all sources pulled from the given
Publisher array. |
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources)
Concat all sources emitted as an onNext signal from a parent
Publisher . |
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources,
boolean delayUntilEnd,
int prefetch)
Concat all sources emitted as an onNext signal from a parent
Publisher . |
static <T> Flux<T> |
concatDelayError(Publisher<? extends Publisher<? extends T>> sources,
int prefetch)
Concat all sources emitted as an onNext signal from a parent
Publisher . |
static <T> Flux<T> |
concatDelayError(Publisher<? extends T>... sources)
Concat all sources pulled from the given
Publisher array. |
<V> Flux<V> |
concatMap(Function<? super T,? extends Publisher<? extends V>> mapper)
Bind dynamic sequences given this input sequence like
flatMap(Function) , but preserve
ordering and concatenate emissions instead of merging (no interleave). |
<V> Flux<V> |
concatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int prefetch)
Bind dynamic sequences given this input sequence like
flatMap(Function) , but preserve
ordering and concatenate emissions instead of merging (no interleave). |
<V> Flux<V> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
boolean delayUntilEnd,
int prefetch)
Bind dynamic sequences given this input sequence like
flatMap(Function) , but preserve
ordering and concatenate emissions instead of merging (no interleave). |
<V> Flux<V> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
int prefetch)
Bind dynamic sequences given this input sequence like
flatMap(Function) , but preserve
ordering and concatenate emissions instead of merging (no interleave). |
<V> Flux<V> |
concatMapDelayError(Function<? super T,Publisher<? extends V>> mapper)
Bind dynamic sequences given this input sequence like
flatMap(Function) , but preserve
ordering and concatenate emissions instead of merging (no interleave). |
<R> Flux<R> |
concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
|
<R> Flux<R> |
concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper,
int prefetch)
|
Flux<T> |
concatWith(Publisher<? extends T> other)
|
Mono<Long> |
count()
Counts the number of values in this
Flux . |
static <T> Flux<T> |
create(Consumer<? super FluxSink<T>> emitter)
Creates a Flux with multi-emission capabilities (synchronous or asynchronous) through
the FluxSink API.
|
static <T> Flux<T> |
create(Consumer<? super FluxSink<T>> emitter,
FluxSink.OverflowStrategy backpressure)
Creates a Flux with multi-emission capabilities (synchronous or asynchronous) through
the FluxSink API.
|
Flux<T> |
defaultIfEmpty(T defaultV)
Provide a default unique value if this sequence is completed without any data
|
static <T> Flux<T> |
defer(Supplier<? extends Publisher<T>> supplier)
Supply a
Publisher everytime subscribe is called on the returned flux. |
Flux<T> |
delay(Duration delay)
Deprecated.
will be replaced by
delayElements(Duration) in 3.1.0 |
Flux<T> |
delayElements(Duration delay)
|
Flux<T> |
delayElements(Duration delay,
Scheduler timer)
|
Flux<T> |
delayElementsMillis(long delay)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<T> |
delayElementsMillis(long delay,
TimedScheduler timer)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<T> |
delayMillis(long delay)
Deprecated.
will be replaced by
delayElements(Duration) in 3.1.0 |
Flux<T> |
delayMillis(long delay,
TimedScheduler timer)
Deprecated.
will be replaced by
delayElements(Duration, Scheduler) in 3.1.0 |
Flux<T> |
delaySubscription(Duration delay)
Delay the
subscription to this Flux source until the given
period elapses. |
Flux<T> |
delaySubscription(Duration delay,
Scheduler timer)
Delay the
subscription to this Flux source until the given
period elapses. |
<U> Flux<T> |
delaySubscription(Publisher<U> subscriptionDelay)
Delay the subscription to the main source until another Publisher
signals a value or completes.
|
Flux<T> |
delaySubscriptionMillis(long delay)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<T> |
delaySubscriptionMillis(long delay,
TimedScheduler timer)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
<X> Flux<X> |
dematerialize()
|
Flux<T> |
distinct()
For each
Subscriber , tracks this Flux values that have been seen and
filters out duplicates. |
<V> Flux<T> |
distinct(Function<? super T,? extends V> keySelector)
For each
Subscriber , tracks this Flux values that have been seen and
filters out duplicates given the extracted key. |
Flux<T> |
distinctUntilChanged()
Filters out subsequent and repeated elements.
|
<V> Flux<T> |
distinctUntilChanged(Function<? super T,? extends V> keySelector)
Filters out subsequent and repeated elements provided a matching extracted key.
|
Flux<T> |
doAfterTerminate(Runnable afterTerminate)
Triggered after the
Flux terminates, either by completing downstream successfully or with an error. |
Flux<T> |
doFinally(Consumer<SignalType> onFinally)
Triggering afterthe
Flux terminates for any reason,
including cancellation. |
Flux<T> |
doOnCancel(Runnable onCancel)
Triggered when the
Flux is cancelled. |
Flux<T> |
doOnComplete(Runnable onComplete)
Triggered when the
Flux completes successfully. |
Flux<T> |
doOnEach(Consumer<? super Signal<T>> signalConsumer)
Triggers side-effects when the
Flux emits an item, fails with an error
or completes successfully. |
<E extends Throwable> |
doOnError(Class<E> exceptionType,
Consumer<? super E> onError)
Triggered when the
Flux completes with an error matching the given exception type. |
Flux<T> |
doOnError(Consumer<? super Throwable> onError)
Triggered when the
Flux completes with an error. |
Flux<T> |
doOnError(Predicate<? super Throwable> predicate,
Consumer<? super Throwable> onError)
Triggered when the
Flux completes with an error matching the given exception. |
Flux<T> |
doOnNext(Consumer<? super T> onNext)
Triggered when the
Flux emits an item. |
Flux<T> |
doOnRequest(LongConsumer consumer)
|
Flux<T> |
doOnSubscribe(Consumer<? super Subscription> onSubscribe)
Triggered when the
Flux is subscribed. |
Flux<T> |
doOnTerminate(Runnable onTerminate)
Triggered when the
Flux terminates, either by completing successfully or with an error. |
Flux<Tuple2<Long,T>> |
elapsed()
|
Flux<Tuple2<Long,T>> |
elapsed(Scheduler scheduler)
|
Flux<Tuple2<Long,T>> |
elapsed(TimedScheduler scheduler)
Deprecated.
|
Mono<T> |
elementAt(int index)
Emit only the element at the given index position or
IndexOutOfBoundsException if the sequence is shorter. |
Mono<T> |
elementAt(int index,
T defaultValue)
Emit only the element at the given index position or signals a
default value if specified if the sequence is shorter.
|
static <T> Flux<T> |
empty()
Create a
Flux that completes without emitting any item. |
static <T> Flux<T> |
error(Throwable error)
Create a
Flux that completes with the specified error. |
static <O> Flux<O> |
error(Throwable throwable,
boolean whenRequested)
Build a
Flux that will only emit an error signal to any new subscriber. |
Flux<T> |
filter(Predicate<? super T> p)
Evaluate each accepted value against the given
Predicate . |
Flux<T> |
filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate)
Test each value emitted by this
Flux asynchronously using a generated
Publisher<Boolean> test. |
Flux<T> |
filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate,
int bufferSize)
Test each value emitted by this
Flux asynchronously using a generated
Publisher<Boolean> test. |
static <I> Flux<I> |
firstEmitting(Iterable<? extends Publisher<? extends I>> sources)
Select the fastest source who won the "ambiguous" race and emitted first onNext or onComplete or onError
|
static <I> Flux<I> |
firstEmitting(Publisher<? extends I>... sources)
Select the fastest source who emitted first onNext or onComplete or onError
|
Flux<T> |
firstEmittingWith(Publisher<? extends T> other)
Emit from the fastest first sequence between this publisher and the given publisher
|
<R> Flux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
|
<R> Flux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapperOnNext,
Function<Throwable,? extends Publisher<? extends R>> mapperOnError,
Supplier<? extends Publisher<? extends R>> mapperOnComplete)
|
<V> Flux<V> |
flatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
boolean delayError,
int concurrency,
int prefetch)
Deprecated.
use
flatMap(Function, int, int) or flatMapDelayError(Function, int, int)
instead, will be removed in 3.1.0. |
<V> Flux<V> |
flatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency)
|
<V> Flux<V> |
flatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency,
int prefetch)
|
<V> Flux<V> |
flatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency,
int prefetch)
|
<R> Flux<R> |
flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
|
<R> Flux<R> |
flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper,
int prefetch)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
boolean delayError,
int maxConcurrency,
int prefetch)
Deprecated.
use
flatMapSequential(Function, int, int) or flatMapSequentialDelayError(Function, int, int)
instead, will be removed in 3.1.0. |
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency)
|
<R> Flux<R> |
flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency,
int prefetch)
|
<R> Flux<R> |
flatMapSequentialDelayError(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency,
int prefetch)
|
static <T> Flux<T> |
from(Publisher<? extends T> source)
|
static <T> Flux<T> |
fromArray(T[] array)
|
static <T> Flux<T> |
fromIterable(Iterable<? extends T> it)
|
static <T> Flux<T> |
fromStream(Stream<? extends T> s)
|
static <T,S> Flux<T> |
generate(Callable<S> stateSupplier,
BiFunction<S,SynchronousSink<T>,S> generator)
Generate signals one-by-one via a function callback.
|
static <T,S> Flux<T> |
generate(Callable<S> stateSupplier,
BiFunction<S,SynchronousSink<T>,S> generator,
Consumer<? super S> stateConsumer)
Generate signals one-by-one via a function callback.
|
static <T> Flux<T> |
generate(Consumer<SynchronousSink<T>> generator)
Generate signals one-by-one via a consumer callback.
|
long |
getPrefetch()
The prefetch configuration of the
Flux |
<K> Flux<GroupedFlux<K,T>> |
groupBy(Function<? super T,? extends K> keyMapper)
Re-route this sequence into dynamically created
Flux for each unique key evaluated by the given
key mapper. |
<K,V> Flux<GroupedFlux<K,V>> |
groupBy(Function<? super T,? extends K> keyMapper,
Function<? super T,? extends V> valueMapper)
Re-route this sequence into dynamically created
Flux for each unique key evaluated by the given
key mapper. |
<K,V> Flux<GroupedFlux<K,V>> |
groupBy(Function<? super T,? extends K> keyMapper,
Function<? super T,? extends V> valueMapper,
int prefetch)
Re-route this sequence into dynamically created
Flux for each unique key evaluated by the given
key mapper. |
<K> Flux<GroupedFlux<K,T>> |
groupBy(Function<? super T,? extends K> keyMapper,
int prefetch)
Re-route this sequence into dynamically created
Flux for each unique key evaluated by the given
key mapper. |
<TRight,TLeftEnd,TRightEnd,R> |
groupJoin(Publisher<? extends TRight> other,
Function<? super T,? extends Publisher<TLeftEnd>> leftEnd,
Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd,
BiFunction<? super T,? super Flux<TRight>,? extends R> resultSelector)
Returns a
Flux that correlates two Publishers when they overlap in time
and groups the results. |
<R> Flux<R> |
handle(BiConsumer<? super T,SynchronousSink<R>> handler)
Handle the items emitted by this
Flux by calling a biconsumer with the
output sink for each onNext. |
Mono<Boolean> |
hasElement(T value)
Emit a single boolean true if any of the values of this
Flux sequence match
the constant. |
Mono<Boolean> |
hasElements()
Emit a single boolean true if this
Flux sequence has at least one element. |
Flux<T> |
hide()
Hides the identities of this
Flux and its Subscription
as well. |
Mono<T> |
ignoreElements()
Ignores onNext signals (dropping them) and only reacts on termination.
|
static Flux<Long> |
interval(Duration period)
Create a
Flux that emits an ever incrementing long starting with 0 every period on
the global timer. |
static Flux<Long> |
interval(Duration delay,
Duration period)
Create a
Flux that emits an ever incrementing long starting with 0 every N period of time unit on
a global timer. |
static Flux<Long> |
interval(Duration delay,
Duration period,
Scheduler timer)
Create a
Flux that emits an ever incrementing long starting with 0 every N period of time on
the given timer. |
static Flux<Long> |
interval(Duration period,
Scheduler timer)
|
static Flux<Long> |
intervalMillis(long period)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
static Flux<Long> |
intervalMillis(long delay,
long period)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
static Flux<Long> |
intervalMillis(long delay,
long period,
TimedScheduler timer)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
static Flux<Long> |
intervalMillis(long period,
TimedScheduler timer)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
<TRight,TLeftEnd,TRightEnd,R> |
join(Publisher<? extends TRight> other,
Function<? super T,? extends Publisher<TLeftEnd>> leftEnd,
Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd,
BiFunction<? super T,? super TRight,? extends R> resultSelector)
Returns a
Flux that correlates two Publishers when they overlap in time
and groups the results. |
static <T> Flux<T> |
just(T... data)
Create a
Flux that emits the specified items and then completes. |
static <T> Flux<T> |
just(T data)
Create a new
Flux that will only emit the passed data then onComplete. |
Mono<T> |
last()
Signal the last element observed before complete signal or emit
NoSuchElementException error if the source was empty. |
Mono<T> |
last(T defaultValue)
Signal the last element observed before complete signal or emit
the defaultValue if empty.
|
Flux<T> |
limitRate(int prefetchRate)
Ensure that backpressure signals from downstream subscribers are capped at the
provided
prefetchRate when propagated upstream, effectively rate limiting
the upstream Publisher . |
Flux<T> |
log()
Observe all Reactive Streams signals and use
Logger support to handle trace implementation. |
Flux<T> |
log(String category)
Observe all Reactive Streams signals and use
Logger support to handle trace implementation. |
Flux<T> |
log(String category,
Level level,
boolean showOperatorLine,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and use
Logger support to handle trace implementation. |
Flux<T> |
log(String category,
Level level,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and
use Logger support to
handle trace
implementation. |
<V> Flux<V> |
map(Function<? super T,? extends V> mapper)
Transform the items emitted by this
Flux by applying a function to each item. |
<E extends Throwable> |
mapError(Class<E> type,
Function<? super E,? extends Throwable> mapper)
Deprecated.
use
onErrorMap(java.util.function.Function<? super java.lang.Throwable, ? extends java.lang.Throwable>) instead. Will be removed in 3.1.0. |
Flux<T> |
mapError(Function<? super Throwable,? extends Throwable> mapper)
Deprecated.
use
onErrorMap(java.util.function.Function<? super java.lang.Throwable, ? extends java.lang.Throwable>) instead. Will be removed in 3.1.0. |
Flux<T> |
mapError(Predicate<? super Throwable> predicate,
Function<? super Throwable,? extends Throwable> mapper)
Deprecated.
use
onErrorMap(java.util.function.Function<? super java.lang.Throwable, ? extends java.lang.Throwable>) instead. Will be removed in 3.1.0. |
Flux<Signal<T>> |
materialize()
Transform the incoming onNext, onError and onComplete signals into
Signal . |
static <I> Flux<I> |
merge(int prefetch,
boolean delayError,
Publisher<? extends I>... sources)
Deprecated.
use
merge(int, Publisher[]) or mergeDelayError(int, Publisher[])
instead, will be removed in 3.1.0 |
static <I> Flux<I> |
merge(int prefetch,
Publisher<? extends I>... sources)
|
static <I> Flux<I> |
merge(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
merge(Publisher<? extends I>... sources)
|
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source)
|
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source,
int concurrency)
|
static <T> Flux<T> |
merge(Publisher<? extends Publisher<? extends T>> source,
int concurrency,
int prefetch)
|
static <I> Flux<I> |
mergeDelayError(int prefetch,
Publisher<? extends I>... sources)
|
static <I> Flux<I> |
mergeSequential(int prefetch,
boolean delayError,
Publisher<? extends I>... sources)
Deprecated.
use
mergeSequential(int, Publisher[]) or mergeSequentialDelayError(int, Publisher[])
instead, will be removed in 3.1.0 |
static <I> Flux<I> |
mergeSequential(int prefetch,
Publisher<? extends I>... sources)
Merge a number of
Publisher sequences into an ordered merged sequence. |
static <I> Flux<I> |
mergeSequential(Iterable<? extends Publisher<? extends I>> sources)
|
static <I> Flux<I> |
mergeSequential(Iterable<? extends Publisher<? extends I>> sources,
boolean delayError,
int maxConcurrency,
int prefetch)
Deprecated.
use
mergeSequential(Iterable, int, int) or mergeSequentialDelayError(Iterable, int, int)
instead, will be removed in 3.1.0. |
static <I> Flux<I> |
mergeSequential(Iterable<? extends Publisher<? extends I>> sources,
int maxConcurrency,
int prefetch)
|
static <I> Flux<I> |
mergeSequential(Publisher<? extends I>... sources)
Merge a number of
Publisher sequences into an ordered merged sequence. |
static <T> Flux<T> |
mergeSequential(Publisher<? extends Publisher<? extends T>> sources)
|
static <T> Flux<T> |
mergeSequential(Publisher<? extends Publisher<? extends T>> sources,
boolean delayError,
int maxConcurrency,
int prefetch)
Deprecated.
use
mergeSequential(Publisher, int, int) or mergeSequentialDelayError(Publisher, int, int)
instead, will be removed in 3.1.0. |
static <T> Flux<T> |
mergeSequential(Publisher<? extends Publisher<? extends T>> sources,
int maxConcurrency,
int prefetch)
|
static <I> Flux<I> |
mergeSequentialDelayError(int prefetch,
Publisher<? extends I>... sources)
Merge a number of
Publisher sequences into an ordered merged sequence. |
static <I> Flux<I> |
mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources,
int maxConcurrency,
int prefetch)
|
static <T> Flux<T> |
mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources,
int maxConcurrency,
int prefetch)
|
Flux<T> |
mergeWith(Publisher<? extends T> other)
|
static <T> Flux<T> |
never()
Create a
Flux that will never signal any data, error or completion signal. |
Mono<T> |
next()
Emit only the first item emitted by this
Flux . |
<U> Flux<U> |
ofType(Class<U> clazz)
Evaluate each accepted value against the given
Class type. |
protected static <T> ConnectableFlux<T> |
onAssembly(ConnectableFlux<T> source)
|
protected static <T> Flux<T> |
onAssembly(Flux<T> source)
|
Flux<T> |
onBackpressureBuffer()
Request an unbounded demand and push the returned
Flux , or park the observed elements if not enough
demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(int maxSize)
Request an unbounded demand and push the returned
Flux , or park the observed elements if not enough
demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(int maxSize,
BufferOverflowStrategy bufferOverflowStrategy)
Request an unbounded demand and push the returned
Flux , or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. |
Flux<T> |
onBackpressureBuffer(int maxSize,
Consumer<? super T> onOverflow)
Request an unbounded demand and push the returned
Flux , or park the observed elements if not enough
demand is requested downstream. |
Flux<T> |
onBackpressureBuffer(int maxSize,
Consumer<? super T> onBufferOverflow,
BufferOverflowStrategy bufferOverflowStrategy)
Request an unbounded demand and push the returned
Flux , or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. |
Flux<T> |
onBackpressureDrop()
Request an unbounded demand and push the returned
Flux , or drop the observed elements if not enough
demand is requested downstream. |
Flux<T> |
onBackpressureDrop(Consumer<? super T> onDropped)
|
Flux<T> |
onBackpressureError()
Request an unbounded demand and push the returned
Flux , or emit onError fom Exceptions.failWithOverflow() if not enough demand is requested
downstream. |
Flux<T> |
onBackpressureLatest()
Request an unbounded demand and push the returned
Flux , or only keep the most recent observed item
if not enough demand is requested downstream. |
<E extends Throwable> |
onErrorMap(Class<E> type,
Function<? super E,? extends Throwable> mapper)
Transform the error emitted by this
Flux by applying a function if the
error matches the given type, otherwise let the error flow. |
Flux<T> |
onErrorMap(Function<? super Throwable,? extends Throwable> mapper)
Transform the error emitted by this
Flux by applying a function. |
Flux<T> |
onErrorMap(Predicate<? super Throwable> predicate,
Function<? super Throwable,? extends Throwable> mapper)
Transform the error emitted by this
Flux by applying a function if the
error matches the given predicate, otherwise let the error flow. |
<E extends Throwable> |
onErrorResume(Class<E> type,
Function<? super E,? extends Publisher<? extends T>> fallback)
Subscribe to a returned fallback publisher when an error matching the given type
occurs.
|
Flux<T> |
onErrorResume(Function<? super Throwable,? extends Publisher<? extends T>> fallback)
Subscribe to a returned fallback publisher when any error occurs.
|
Flux<T> |
onErrorResume(Predicate<? super Throwable> predicate,
Function<? super Throwable,? extends Publisher<? extends T>> fallback)
Subscribe to a returned fallback publisher when an error matching the given type
occurs.
|
<E extends Throwable> |
onErrorResumeWith(Class<E> type,
Function<? super E,? extends Publisher<? extends T>> fallback)
Deprecated.
use
onErrorResume(Class, Function) instead. Will be removed in 3.1.0. |
Flux<T> |
onErrorResumeWith(Function<? super Throwable,? extends Publisher<? extends T>> fallback)
Deprecated.
use
onErrorResume(Function) instead. Will be removed in 3.1.0. |
Flux<T> |
onErrorResumeWith(Predicate<? super Throwable> predicate,
Function<? super Throwable,? extends Publisher<? extends T>> fallback)
Deprecated.
use
onErrorResume(Predicate, Function) instead. Will be removed in 3.1.0. |
<E extends Throwable> |
onErrorReturn(Class<E> type,
T fallbackValue)
Fallback to the given value if an error of a given type is observed on this
Flux |
<E extends Throwable> |
onErrorReturn(Predicate<? super Throwable> predicate,
T fallbackValue)
Fallback to the given value if an error matching the given predicate is
observed on this
Flux |
Flux<T> |
onErrorReturn(T fallbackValue)
Fallback to the given value if an error is observed on this
Flux |
Flux<T> |
onTerminateDetach()
Detaches the both the child
Subscriber and the Subscription on
termination or cancellation. |
ParallelFlux<T> |
parallel()
Prepare to consume this
Flux on number of 'rails' matching number of CPU
in round-robin fashion. |
ParallelFlux<T> |
parallel(int parallelism)
Prepare to consume this
Flux on parallelism number of 'rails'
in round-robin fashion. |
ParallelFlux<T> |
parallel(int parallelism,
int prefetch)
|
ConnectableFlux<T> |
publish()
Prepare a
ConnectableFlux which shares this Flux sequence and dispatches values to
subscribers in a backpressure-aware manner. |
<R> Flux<R> |
publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform)
Shares a sequence for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream.
|
<R> Flux<R> |
publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform,
int prefetch)
Shares a sequence for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream.
|
ConnectableFlux<T> |
publish(int prefetch)
Prepare a
ConnectableFlux which shares this Flux sequence and dispatches values to
subscribers in a backpressure-aware manner. |
Mono<T> |
publishNext()
|
Flux<T> |
publishOn(Scheduler scheduler)
Run onNext, onComplete and onError on a supplied
Scheduler
Scheduler.Worker . |
Flux<T> |
publishOn(Scheduler scheduler,
boolean delayError,
int prefetch)
Run onNext, onComplete and onError on a supplied
Scheduler
Scheduler.Worker . |
Flux<T> |
publishOn(Scheduler scheduler,
int prefetch)
Run onNext, onComplete and onError on a supplied
Scheduler
Scheduler.Worker . |
static <T> Flux<T> |
push(Consumer<? super FluxSink<T>> emitter)
Creates a Flux with multi-emission capabilities from a single threaded producer
through the FluxSink API.
|
static <T> Flux<T> |
push(Consumer<? super FluxSink<T>> emitter,
FluxSink.OverflowStrategy backpressure)
Creates a Flux with multi-emission capabilities from a single threaded producer
through the FluxSink API.
|
static Flux<Integer> |
range(int start,
int count)
Build a
Flux that will only emit a sequence of incrementing integer from start to start + count then complete. |
<A> Mono<A> |
reduce(A initial,
BiFunction<A,? super T,A> accumulator)
Accumulate the values from this
Flux sequence into an object matching an initial value type. |
Mono<T> |
reduce(BiFunction<T,T,T> aggregator)
Aggregate the values from this
Flux sequence into an object of the same type than the
emitted items. |
<A> Mono<A> |
reduceWith(Supplier<A> initial,
BiFunction<A,? super T,A> accumulator)
Accumulate the values from this
Flux sequence into an object matching an initial value type. |
Flux<T> |
repeat()
Repeatedly subscribe to the source completion of the previous subscription.
|
Flux<T> |
repeat(BooleanSupplier predicate)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.
|
Flux<T> |
repeat(long numRepeat)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.
|
Flux<T> |
repeat(long numRepeat,
BooleanSupplier predicate)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous
subscription.
|
Flux<T> |
repeatWhen(Function<Flux<Long>,? extends Publisher<?>> whenFactory)
Repeatedly subscribe to this
Flux when a companion sequence signals a number of emitted elements in
response to the flux completion signal. |
ConnectableFlux<T> |
replay()
Turn this
Flux into a hot source and cache last emitted signals for further Subscriber . |
ConnectableFlux<T> |
replay(Duration ttl)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
ConnectableFlux<T> |
replay(Duration ttl,
Scheduler timer)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
ConnectableFlux<T> |
replay(int history)
Turn this
Flux into a connectable hot source and cache last emitted
signals for further Subscriber . |
ConnectableFlux<T> |
replay(int history,
Duration ttl)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
ConnectableFlux<T> |
replay(int history,
Duration ttl,
Scheduler timer)
Turn this
Flux into a connectable hot source and cache last emitted signals
for further Subscriber . |
ConnectableFlux<T> |
replayMillis(int history,
long ttl,
TimedScheduler timer)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
ConnectableFlux<T> |
replayMillis(long ttl,
TimedScheduler timer)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<T> |
retry()
Re-subscribes to this
Flux sequence if it signals any error
either indefinitely. |
Flux<T> |
retry(long numRetries)
Re-subscribes to this
Flux sequence if it signals any error
either indefinitely or a fixed number of times. |
Flux<T> |
retry(long numRetries,
Predicate<Throwable> retryMatcher)
|
Flux<T> |
retry(Predicate<Throwable> retryMatcher)
|
Flux<T> |
retryWhen(Function<Flux<Throwable>,? extends Publisher<?>> whenFactory)
|
Flux<T> |
sample(Duration timespan)
Emit latest value for every given period of time.
|
<U> Flux<T> |
sample(Publisher<U> sampler)
|
Flux<T> |
sampleFirst(Duration timespan)
Take a value from this
Flux then use the duration provided to skip other values. |
<U> Flux<T> |
sampleFirst(Function<? super T,? extends Publisher<U>> samplerFactory)
|
Flux<T> |
sampleFirstMillis(long timespan)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<T> |
sampleMillis(long timespan)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
<U> Flux<T> |
sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory)
Emit the last value from this
Flux only if there were no new values emitted
during the time window provided by a publisher for that particular last value. |
<U> Flux<T> |
sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory,
int maxConcurrency)
Emit the last value from this
Flux only if there were no newer values emitted
during the time window provided by a publisher for that particular last value. |
<A> Flux<A> |
scan(A initial,
BiFunction<A,? super T,A> accumulator)
Aggregate this
Flux values with the help of an accumulator BiFunction
and emits the intermediate results. |
Flux<T> |
scan(BiFunction<T,T,T> accumulator)
Accumulate this
Flux values with an accumulator BiFunction and
returns the intermediate results of this function. |
<A> Flux<A> |
scanWith(Supplier<A> initial,
BiFunction<A,? super T,A> accumulator)
Aggregate this
Flux values with the help of an accumulator BiFunction
and emits the intermediate results. |
Flux<T> |
share()
|
Mono<T> |
single()
Expect and emit a single item from this
Flux source or signal
NoSuchElementException (or a default generated value) for empty source,
IndexOutOfBoundsException for a multi-item source. |
Mono<T> |
single(T defaultValue)
Expect and emit a single item from this
Flux source or signal
NoSuchElementException (or a default value) for empty source,
IndexOutOfBoundsException for a multi-item source. |
Mono<T> |
singleOrEmpty()
Expect and emit a zero or single item from this
Flux source or
IndexOutOfBoundsException for a multi-item source. |
Flux<T> |
skip(Duration timespan)
Skip elements from this
Flux for the given time period. |
Flux<T> |
skip(Duration timespan,
Scheduler timer)
Skip elements from this
Flux for the given time period. |
Flux<T> |
skip(long skipped)
Skip next the specified number of elements from this
Flux . |
Flux<T> |
skipLast(int n)
Skip the last specified number of elements from this
Flux . |
Flux<T> |
skipMillis(long timespan)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<T> |
skipMillis(long timespan,
TimedScheduler timer)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<T> |
skipUntil(Predicate<? super T> untilPredicate)
|
Flux<T> |
skipUntilOther(Publisher<?> other)
|
Flux<T> |
skipWhile(Predicate<? super T> skipPredicate)
|
Flux<T> |
sort()
|
Flux<T> |
sort(Comparator<? super T> sortFunction)
|
Flux<T> |
startWith(Iterable<? extends T> iterable)
|
Flux<T> |
startWith(Publisher<? extends T> publisher)
|
Flux<T> |
startWith(T... values)
Prepend the given values before this
Flux sequence. |
Disposable |
subscribe()
Start the chain and request unbounded demand.
|
Disposable |
subscribe(Consumer<? super T> consumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer,
int prefetch)
Deprecated.
prefer using
limitRate(prefetch).subscribe(...) to
subscribe(..., prefetch) |
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
int prefetch)
Deprecated.
prefer using
limitRate(prefetch).subscribe(...) to
subscribe(..., prefetch) |
Disposable |
subscribe(Consumer<? super T> consumer,
int prefetch)
Deprecated.
use limitRate + subscribe(Consumer)
|
Disposable |
subscribe(int prefetch)
Deprecated.
use
limitRate(int) |
Flux<T> |
subscribeOn(Scheduler scheduler)
Run subscribe, onSubscribe and request on a supplied
Scheduler |
<E extends Subscriber<? super T>> |
subscribeWith(E subscriber)
A chaining
Publisher.subscribe(Subscriber) alternative to inline composition type conversion to a hot
emitter (e.g. |
Flux<T> |
switchIfEmpty(Publisher<? extends T> alternate)
Provide an alternative if this sequence is completed without any data
|
<V> Flux<V> |
switchMap(Function<? super T,Publisher<? extends V>> fn)
|
<V> Flux<V> |
switchMap(Function<? super T,Publisher<? extends V>> fn,
int prefetch)
|
<E extends Throwable> |
switchOnError(Class<E> type,
Publisher<? extends T> fallback)
Deprecated.
use
onErrorResume(java.util.function.Function<? super java.lang.Throwable, ? extends org.reactivestreams.Publisher<? extends T>>) with a t -> fallback lambda
instead. Will be removed in 3.1.0. |
Flux<T> |
switchOnError(Predicate<? super Throwable> predicate,
Publisher<? extends T> fallback)
Deprecated.
use
onErrorResume(java.util.function.Function<? super java.lang.Throwable, ? extends org.reactivestreams.Publisher<? extends T>>) with a t -> fallback lambda
instead. Will be removed in 3.1.0. |
Flux<T> |
switchOnError(Publisher<? extends T> fallback)
Deprecated.
use
onErrorResume(java.util.function.Function<? super java.lang.Throwable, ? extends org.reactivestreams.Publisher<? extends T>>) with a t -> fallback lambda
instead. Will be removed in 3.1.0. |
static <T> Flux<T> |
switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers)
Build a
FluxProcessor whose data are emitted by the most recent emitted Publisher . |
static <T> Flux<T> |
switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers,
int prefetch)
Build a
FluxProcessor whose data are emitted by the most recent emitted Publisher . |
Flux<T> |
take(Duration timespan)
Relay values from this
Flux until the given time period elapses. |
Flux<T> |
take(Duration timespan,
Scheduler timer)
Relay values from this
Flux until the given time period elapses. |
Flux<T> |
take(long n)
Take only the first N values from this
Flux . |
Flux<T> |
takeLast(int n)
Emit the last N values this
Flux emitted before its completion. |
Flux<T> |
takeMillis(long timespan)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<T> |
takeMillis(long timespan,
TimedScheduler timer)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<T> |
takeUntil(Predicate<? super T> predicate)
|
Flux<T> |
takeUntilOther(Publisher<?> other)
|
Flux<T> |
takeWhile(Predicate<? super T> continuePredicate)
Relay values while a predicate returns TRUE for the values
(checked before each value is delivered).
|
Mono<Void> |
then()
Return a
Mono<Void> that completes when this Flux completes. |
Mono<Void> |
then(Publisher<Void> other)
Deprecated.
use
thenEmpty(Publisher) instead, this alias will be
removed in 3.1.0 |
Mono<Void> |
then(Supplier<? extends Publisher<Void>> afterSupplier)
Deprecated.
removed in 3.1, use
thenEmpty(Publisher) with
defer(java.util.function.Supplier<? extends org.reactivestreams.Publisher<T>>) . The competing overload was causing confusion and the generic was
not symmetric with Mono.then(Mono) . |
Mono<Void> |
thenEmpty(Publisher<Void> other)
Return a
Mono<Void> that waits for this Flux to complete then
for a supplied Publisher<Void> to also complete. |
<V> Flux<V> |
thenMany(Publisher<V> other)
|
<V> Flux<V> |
thenMany(Supplier<? extends Publisher<V>> afterSupplier)
Deprecated.
removed in 3.1, use
thenMany(Publisher) with
defer(java.util.function.Supplier<? extends org.reactivestreams.Publisher<T>>) . The competing overload was called unnecessary by extended
feedback and aligns with removing of Supplier of Publisher aliases elsewhere. |
Flux<T> |
timeout(Duration timeout)
Signal a
TimeoutException in case a per-item period fires before the
next item arrives from this Flux . |
Flux<T> |
timeout(Duration timeout,
Publisher<? extends T> fallback)
|
Flux<T> |
timeout(Duration timeout,
Publisher<? extends T> fallback,
Scheduler timer)
|
Flux<T> |
timeout(Duration timeout,
Scheduler timer)
Signal a
TimeoutException error in case a per-item
period fires before the next item arrives from this Flux . |
<U> Flux<T> |
timeout(Publisher<U> firstTimeout)
Signal a
TimeoutException in case a first item from this Flux has
not been emitted before the given Publisher emits. |
<U,V> Flux<T> |
timeout(Publisher<U> firstTimeout,
Function<? super T,? extends Publisher<V>> nextTimeoutFactory)
Signal a
TimeoutException in case a first item from this Flux has
not been emitted before the given Publisher emits. |
<U,V> Flux<T> |
timeout(Publisher<U> firstTimeout,
Function<? super T,? extends Publisher<V>> nextTimeoutFactory,
Publisher<? extends T> fallback)
|
Flux<T> |
timeoutMillis(long timeout)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<T> |
timeoutMillis(long timeout,
Publisher<? extends T> fallback)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<T> |
timeoutMillis(long timeout,
Publisher<? extends T> fallback,
TimedScheduler timer)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<T> |
timeoutMillis(long timeout,
TimedScheduler timer)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<Tuple2<Long,T>> |
timestamp()
|
Flux<Tuple2<Long,T>> |
timestamp(Scheduler scheduler)
|
Flux<Tuple2<Long,T>> |
timestamp(TimedScheduler scheduler)
Deprecated.
|
Iterable<T> |
toIterable()
|
Iterable<T> |
toIterable(long batchSize)
|
Iterable<T> |
toIterable(long batchSize,
Supplier<Queue<T>> queueProvider)
|
Stream<T> |
toStream()
|
Stream<T> |
toStream(int batchSize)
|
String |
toString() |
<V> Flux<V> |
transform(Function<? super Flux<T>,? extends Publisher<V>> transformer)
|
static <T,D> Flux<T> |
using(Callable<? extends D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a
Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or
the Subscriber cancels.
|
static <T,D> Flux<T> |
using(Callable<? extends D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup,
boolean eager)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a
Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or
the Subscriber cancels.
|
Flux<Flux<T>> |
window()
Deprecated.
will be removed in 3.1.0. Prefer using other variants, like
window(Publisher) , |
Flux<Flux<T>> |
window(Duration timespan)
Split this
Flux sequence into continuous, non-overlapping windows delimited by a given period. |
Flux<Flux<T>> |
window(Duration timespan,
Duration timeshift)
|
Flux<Flux<T>> |
window(Duration timespan,
Duration timeshift,
Scheduler timer)
|
Flux<Flux<T>> |
window(Duration timespan,
Scheduler timer)
Split this
Flux sequence into continuous, non-overlapping windows delimited by a given period. |
Flux<Flux<T>> |
window(int maxSize)
|
Flux<Flux<T>> |
window(int maxSize,
Duration timespan)
Deprecated.
use
windowTimeout(int, Duration) instead, will be removed in 3.1.0 |
Flux<Flux<T>> |
window(int maxSize,
int skip)
|
Flux<Flux<T>> |
window(Publisher<?> boundary)
|
<U,V> Flux<Flux<T>> |
window(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector)
Deprecated.
will be removed in 3.1.0. Use
windowWhen(Publisher, Function) instead. |
Flux<Flux<T>> |
windowMillis(int maxSize,
long timespan)
Deprecated.
use
windowTimeout(int, Duration) instead, will be removed in 3.1.0 |
Flux<Flux<T>> |
windowMillis(int maxSize,
long timespan,
TimedScheduler timer)
Deprecated.
use
windowTimeout(int, Duration, Scheduler) instead, will be removed in 3.1.0 |
Flux<Flux<T>> |
windowMillis(long timespan)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<Flux<T>> |
windowMillis(long timespan,
long timeshift,
TimedScheduler timer)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<Flux<T>> |
windowMillis(long timespan,
TimedScheduler timer)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<Flux<T>> |
windowTimeout(int maxSize,
Duration timespan)
|
Flux<Flux<T>> |
windowTimeout(int maxSize,
Duration timespan,
Scheduler timer)
|
Flux<Flux<T>> |
windowTimeoutMillis(int maxSize,
long timespan)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<Flux<T>> |
windowTimeoutMillis(int maxSize,
long timespan,
TimedScheduler timer)
Deprecated.
use the
Duration based variants instead, will be removed in 3.1.0 |
Flux<GroupedFlux<T,T>> |
windowUntil(Predicate<T> boundaryTrigger)
|
Flux<GroupedFlux<T,T>> |
windowUntil(Predicate<T> boundaryTrigger,
boolean cutBefore)
|
Flux<GroupedFlux<T,T>> |
windowUntil(Predicate<T> boundaryTrigger,
boolean cutBefore,
int prefetch)
|
<U,V> Flux<Flux<T>> |
windowWhen(Publisher<U> bucketOpening,
Function<? super U,? extends Publisher<V>> closeSelector)
|
Flux<GroupedFlux<T,T>> |
windowWhile(Predicate<T> inclusionPredicate)
|
Flux<GroupedFlux<T,T>> |
windowWhile(Predicate<T> inclusionPredicate,
int prefetch)
|
<U,R> Flux<R> |
withLatestFrom(Publisher<? extends U> other,
BiFunction<? super T,? super U,? extends R> resultSelector)
Combine values from this
Flux with values from another
Publisher through a BiFunction and emits the result. |
static <I,O> Flux<O> |
zip(Function<? super Object[],? extends O> combinator,
int prefetch,
Publisher<? extends I>... sources)
"Step-Merge" especially useful in Scatter-Gather scenarios.
|
static <I,O> Flux<O> |
zip(Function<? super Object[],? extends O> combinator,
Publisher<? extends I>... sources)
"Step-Merge" especially useful in Scatter-Gather scenarios.
|
static Flux<Tuple2> |
zip(Iterable<? extends Publisher<?>> sources)
Deprecated.
prefer using
zip(Iterable, Function) , will be removed in 3.1.0 |
static <O> Flux<O> |
zip(Iterable<? extends Publisher<?>> sources,
Function<? super Object[],? extends O> combinator)
"Step-Merge" especially useful in Scatter-Gather scenarios.
|
static <O> Flux<O> |
zip(Iterable<? extends Publisher<?>> sources,
int prefetch,
Function<? super Object[],? extends O> combinator)
"Step-Merge" especially useful in Scatter-Gather scenarios.
|
static <TUPLE extends Tuple2,V> |
zip(Publisher<? extends Publisher<?>> sources,
Function<? super TUPLE,? extends V> combinator)
"Step-Merge" especially useful in Scatter-Gather scenarios.
|
static <T1,T2> Flux<Tuple2<T1,T2>> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2)
"Step-Merge" especially useful in Scatter-Gather scenarios.
|
static <T1,T2,O> Flux<O> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
BiFunction<? super T1,? super T2,? extends O> combinator)
"Step-Merge" especially useful in Scatter-Gather scenarios.
|
static <T1,T2,T3> Flux<Tuple3<T1,T2,T3>> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3)
"Step-Merge" especially useful in Scatter-Gather scenarios.
|
static <T1,T2,T3,T4> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4)
"Step-Merge" especially useful in Scatter-Gather scenarios.
|
static <T1,T2,T3,T4,T5> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5)
"Step-Merge" especially useful in Scatter-Gather scenarios.
|
static <T1,T2,T3,T4,T5,T6> |
zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6)
"Step-Merge" especially useful in Scatter-Gather scenarios.
|
<T2> Flux<Tuple2<T,T2>> |
zipWith(Publisher<? extends T2> source2)
"Step-Merge" especially useful in Scatter-Gather scenarios.
|
<T2,V> Flux<V> |
zipWith(Publisher<? extends T2> source2,
BiFunction<? super T,? super T2,? extends V> combinator)
"Step-Merge" especially useful in Scatter-Gather scenarios.
|
<T2> Flux<Tuple2<T,T2>> |
zipWith(Publisher<? extends T2> source2,
int prefetch)
"Step-Merge" especially useful in Scatter-Gather scenarios.
|
<T2,V> Flux<V> |
zipWith(Publisher<? extends T2> source2,
int prefetch,
BiFunction<? super T,? super T2,? extends V> combinator)
"Step-Merge" especially useful in Scatter-Gather scenarios.
|
<T2> Flux<Tuple2<T,T2>> |
zipWithIterable(Iterable<? extends T2> iterable)
|
<T2,V> Flux<V> |
zipWithIterable(Iterable<? extends T2> iterable,
BiFunction<? super T,? super T2,? extends V> zipper)
|
@SafeVarargs public static <T,V> Flux<V> combineLatest(Function<Object[],V> combinator, Publisher<? extends T>... sources)
Flux
whose data are generated by the combination of the most recent published values from all
publishers.
T
- type of the value from sourcesV
- The produced output after transformation by the given combinatorsources
- The upstreams Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
based on the produced combinations , 3.0@SafeVarargs public static <T,V> Flux<V> combineLatest(Function<Object[],V> combinator, int prefetch, Publisher<? extends T>... sources)
Flux
whose data are generated by the combination of the most recent published values from all
publishers.
T
- type of the value from sourcesV
- The produced output after transformation by the given combinatorsources
- The upstreams Publisher
to subscribe to.prefetch
- demand produced to each combined source Publisher
combinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
based on the produced combinations , 3.0public static <T1,T2,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends V> combinator)
Flux
whose data are generated by the combination of the most recent published values from all
publishers.
T1
- type of the value from source1T2
- type of the value from source2V
- The produced output after transformation by the given combinatorsource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
based on the produced valuepublic static <T1,T2,T3,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the most recent published values from all
publishers.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3V
- The produced output after transformation by the given combinatorsource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
based on the produced valuepublic static <T1,T2,T3,T4,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the most recent published values from all
publishers.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4V
- The produced output after transformation by the given combinatorsource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
based on the produced valuepublic static <T1,T2,T3,T4,T5,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the most recent published values from all
publishers.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5V
- The produced output after transformation by the given combinatorsource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
based on the produced valuepublic static <T1,T2,T3,T4,T5,T6,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the most recent published values from all
publishers.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6V
- The produced output after transformation by the given combinatorsource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
based on the produced valuepublic static <T,V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the most recent published values from all
publishers.
T
- The common base type of the source sequencesV
- The produced output after transformation by the given combinatorsources
- The list of upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
based on the produced value , 3.0public static <T,V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, int prefetch, Function<Object[],V> combinator)
Flux
whose data are generated by the combination of the most recent published values from all
publishers.
T
- The common base type of the source sequencesV
- The produced output after transformation by the given combinatorsources
- The list of upstream Publisher
to subscribe to.prefetch
- demand produced to each combined source Publisher
combinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
based on the produced value , 3.0public static <T> Flux<T> concat(Iterable<? extends Publisher<? extends T>> sources)
Iterator
on Publisher.subscribe(org.reactivestreams.Subscriber<? super T>)
from the passed Iterable
until Iterator.hasNext()
returns false. A complete signal from each source will delimit the individual sequences and will be eventually
passed to the returned Publisher.
public static <T> Flux<T> concat(Publisher<? extends Publisher<? extends T>> sources, int prefetch)
@SafeVarargs public static <T> Flux<T> concat(Publisher<? extends T>... sources)
Publisher
array.
A complete signal from each source will delimit the individual sequences and will be eventually
passed to the returned Publisher.
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources)
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, int prefetch)
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, boolean delayUntilEnd, int prefetch)
Publisher
.
A complete signal from each source will delimit the individual sequences and will be eventually
passed to the returned Publisher
which will stop listening if the main sequence has also completed.
Errors will be delayed after the current concat backlog if delayUntilEnd is
false or after all sources if delayUntilEnd is true.
T
- The source type of the data sequencesources
- The Publisher
of Publisher
to concatdelayUntilEnd
- delay error until all sources have been consumed instead of
after the current sourceprefetch
- the inner source request sizeFlux
concatenating all inner sources sequences until complete or error@SafeVarargs public static <T> Flux<T> concatDelayError(Publisher<? extends T>... sources)
Publisher
array.
A complete signal from each source will delimit the individual sequences and will be eventually
passed to the returned Publisher.
Any error will be delayed until all sources have been concatenated.
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)
This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure. For example:
Handles backpressure by buffering all signals if the downstream can't keep up.
Flux.<String>create(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
});
T
- the value typeemitter
- the consumer that will receive a FluxSink for each individual Subscriber.Flux
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure)
This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure. For example:
Flux.<String>create(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
}, FluxSink.OverflowStrategy.LATEST);
T
- the value typebackpressure
- the backpressure mode, see FluxSink.OverflowStrategy
for the
available backpressure modesemitter
- the consumer that will receive a FluxSink for each individual Subscriber.Flux
public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter)
This Flux factory is useful if one wants to adapt some other single=threaded multi-valued async API and not worry about cancellation and backpressure. For example:
Flux.<String>push(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
}, FluxSink.OverflowStrategy.LATEST);
T
- the value typeemitter
- the consumer that will receive a FluxSink for each individual Subscriber.Flux
public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure)
This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API and not worry about cancellation and backpressure. For example:
Flux.<String>push(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
}, FluxSink.OverflowStrategy.LATEST);
T
- the value typebackpressure
- the backpressure mode, see FluxSink.OverflowStrategy
for the
available backpressure modesemitter
- the consumer that will receive a FluxSink for each individual Subscriber.Flux
public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier)
Publisher
everytime subscribe is called on the returned flux. The passed Supplier
will be invoked and it's up to the developer to choose to return a new instance of a Publisher
or reuse
one effectively behaving like from(Publisher)
.
public static <T> Flux<T> empty()
Flux
that completes without emitting any item.
T
- the reified type of the target Subscriber
Flux
public static <T> Flux<T> error(Throwable error)
Flux
that completes with the specified error.
T
- the reified type of the target Subscriber
error
- the error to signal to each Subscriber
Flux
public static <O> Flux<O> error(Throwable throwable, boolean whenRequested)
Flux
that will only emit an error signal to any new subscriber.
O
- the output typethrowable
- the error to signal to each Subscriber
whenRequested
- if true, will onError on the first request instead of subscribe().Flux
@SafeVarargs public static <I> Flux<I> firstEmitting(Publisher<? extends I>... sources)
I
- The source type of the data sequencesources
- The competing source publishersFlux
eventually subscribed to one of the sources or emptypublic static <I> Flux<I> firstEmitting(Iterable<? extends Publisher<? extends I>> sources)
I
- The source type of the data sequencesources
- The competing source publishersFlux
eventually subscribed to one of the sources or emptypublic static <T> Flux<T> from(Publisher<? extends T> source)
T
- the source sequence typesource
- the source to decorateFlux
public static <T> Flux<T> fromArray(T[] array)
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
T
- the value type emittedgenerator
- the consumer called with the SynchronousSink
API instanceFlux
publisher ready to be subscribedpublic static <T,S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator)
stateSupplier
may return null
.
T
- the value type emittedS
- the custom state per subscriberstateSupplier
- called for each incoming Supplier to provide the initial state for the generator bifunctiongenerator
- the bifunction called with the current state, the SynchronousSink API instance and is
expected to return a (new) state.Flux
publisher ready to be subscribedpublic static <T,S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator, Consumer<? super S> stateConsumer)
stateSupplier
may return null
but your stateConsumer
should be prepared to
handle it.
T
- the value type emittedS
- the custom state per subscriberstateSupplier
- called for each incoming Supplier to provide the initial state for the generator bifunctiongenerator
- the bifunction called with the current state, the SynchronousSink API instance and is
expected to return a (new) state.stateConsumer
- called after the generator has terminated or the downstream cancelled, receiving the last
state to be handled (i.e., release resources or do other cleanup).Flux
publisher ready to be subscribedpublic static Flux<Long> interval(Duration period)
Flux
that emits an ever incrementing long starting with 0 every period on
the global timer. If demand is not produced in time, an onError will be signalled. The Flux
will never
complete.
period
- The duration to wait before the next incrementFlux
public static Flux<Long> interval(Duration delay, Duration period)
Flux
that emits an ever incrementing long starting with 0 every N period of time unit on
a global timer. If demand is not produced in time, an onError will be signalled. The Flux
will never
complete.
delay
- the delay to wait before emitting 0lperiod
- the period before each following incrementFlux
@Deprecated public static Flux<Long> intervalMillis(long period)
Duration
based variants instead, will be removed in 3.1.0Flux
that emits an ever incrementing long starting with 0 every N milliseconds on
the given timer. If demand is not produced in time, an onError will be signalled. The Flux
will never
complete.
period
- The number of milliseconds to wait before the next incrementFlux
@Deprecated public static Flux<Long> intervalMillis(long period, TimedScheduler timer)
Duration
based variants instead, will be removed in 3.1.0@Deprecated public static Flux<Long> intervalMillis(long delay, long period)
Duration
based variants instead, will be removed in 3.1.0Flux
that emits an ever incrementing long starting with 0 every N period of time unit on
a global timer. If demand is not produced in time, an onError will be signalled. The Flux
will never
complete.
delay
- the delay in milliseconds to wait before emitting 0lperiod
- the period in milliseconds before each following incrementFlux
@Deprecated public static Flux<Long> intervalMillis(long delay, long period, TimedScheduler timer)
Duration
based variants instead, will be removed in 3.1.0@SafeVarargs public static <T> Flux<T> just(T... data)
Flux
that emits the specified items and then completes.
T
- the emitted data typedata
- the consecutive data objects to emitFlux
public static <T> Flux<T> just(T data)
Flux
that will only emit the passed data then onComplete.
T
- the emitted data typedata
- the unique data to emitFlux
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency)
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency, int prefetch)
public static <I> Flux<I> merge(Iterable<? extends Publisher<? extends I>> sources)
Publisher
sequences from the passed Iterable
into an interleaved merged sequence.
Iterable.iterator()
will be called for each Publisher.subscribe(org.reactivestreams.Subscriber<? super T>)
.
I
- The source type of the data sequencesources
- the Iterable
to lazily iterate on Publisher.subscribe(Subscriber)
Flux
publisher ready to be subscribed@SafeVarargs public static <I> Flux<I> merge(Publisher<? extends I>... sources)
Publisher
sequences from the passed Publisher
array into an interleaved merged
sequence.
I
- The source type of the data sequencesources
- the Publisher
array to iterate on Publisher.subscribe(Subscriber)
Flux
publisher ready to be subscribed@SafeVarargs public static <I> Flux<I> merge(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences from the passed Publisher
array into an interleaved merged
sequence.
I
- The source type of the data sequencesources
- the Publisher
array to iterate on Publisher.subscribe(Subscriber)
prefetch
- the inner source request sizeFlux
publisher ready to be subscribed@SafeVarargs public static <I> Flux<I> mergeDelayError(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences from the passed Publisher
array into an interleaved merged
sequence. This variant will delay any error until after the rest of the merge backlog has been processed.
I
- The source type of the data sequencesources
- the Publisher
array to iterate on Publisher.subscribe(Subscriber)
prefetch
- the inner source request sizeFlux
publisher ready to be subscribed@SafeVarargs @Deprecated public static <I> Flux<I> merge(int prefetch, boolean delayError, Publisher<? extends I>... sources)
merge(int, Publisher[])
or mergeDelayError(int, Publisher[])
instead, will be removed in 3.1.0Publisher
sequences from the passed Publisher
array into an interleaved merged
sequence.
I
- The source type of the data sequencesources
- the Publisher
array to iterate on Publisher.subscribe(Subscriber)
prefetch
- the inner source request sizedelayError
- should any error be delayed after current merge backlogFlux
publisher ready to be subscribedpublic static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources)
public static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch)
Publisher
sequences by the passed Publisher
into
an ordered merged sequence. Unlike concat, the inner publishers are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in
subscription order.
T
- the merged typesources
- a Publisher
of Publisher
sequence to mergeprefetch
- the inner source request sizemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogFlux
, subscribing early but keeping the original orderingpublic static <T> Flux<T> mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch)
Publisher
sequences by the passed Publisher
into
an ordered merged sequence. Unlike concat, the inner publishers are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in
subscription order. This variant will delay any error until after the rest of the
mergeSequential backlog has been processed.
T
- the merged typesources
- a Publisher
of Publisher
sequence to mergeprefetch
- the inner source request sizemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogFlux
, subscribing early but keeping the original ordering@Deprecated public static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources, boolean delayError, int maxConcurrency, int prefetch)
mergeSequential(Publisher, int, int)
or mergeSequentialDelayError(Publisher, int, int)
instead, will be removed in 3.1.0.Publisher
sequences by the passed Publisher
into
an ordered merged sequence. Unlike concat, the inner publishers are subscribed to
eagerly. Unlike merge, their emitted values are merged into the final sequence in
subscription order.
T
- the merged typesources
- a Publisher
of Publisher
sequence to mergedelayError
- should any error be delayed after current merge backlogprefetch
- the inner source request sizemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogFlux
, subscribing early but keeping the original ordering@SafeVarargs public static <I> Flux<I> mergeSequential(Publisher<? extends I>... sources)
Publisher
sequences into an ordered merged sequence.
Unlike concat, the inner publishers are subscribed to eagerly. Unlike merge, their
emitted values are merged into the final sequence in subscription order.
@SafeVarargs public static <I> Flux<I> mergeSequential(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences into an ordered merged sequence.
Unlike concat, the inner publishers are subscribed to eagerly. Unlike merge, their
emitted values are merged into the final sequence in subscription order.
@SafeVarargs public static <I> Flux<I> mergeSequentialDelayError(int prefetch, Publisher<? extends I>... sources)
Publisher
sequences into an ordered merged sequence.
Unlike concat, the inner publishers are subscribed to eagerly. Unlike merge, their
emitted values are merged into the final sequence in subscription order.
This variant will delay any error until after the rest of the mergeSequential backlog
has been processed.
@SafeVarargs @Deprecated public static <I> Flux<I> mergeSequential(int prefetch, boolean delayError, Publisher<? extends I>... sources)
mergeSequential(int, Publisher[])
or mergeSequentialDelayError(int, Publisher[])
instead, will be removed in 3.1.0Publisher
sequences into an ordered merged sequence.
Unlike concat, the inner publishers are subscribed to eagerly. Unlike merge, their
emitted values are merged into the final sequence in subscription order.
public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources)
public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch)
Publisher
sequences from an Iterable
into an ordered merged
sequence. Unlike concat, the inner publishers are subscribed to eagerly. Unlike
merge, their emitted values are merged into the final sequence in subscription order.
I
- the merged typesources
- an Iterable
of Publisher
sequences to mergemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogprefetch
- the inner source request sizeFlux
, subscribing early but keeping the original orderingpublic static <I> Flux<I> mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch)
Publisher
sequences from an Iterable
into an ordered merged
sequence. Unlike concat, the inner publishers are subscribed to eagerly. Unlike
merge, their emitted values are merged into the final sequence in subscription order.
This variant will delay any error until after the rest of the mergeSequential backlog
has been processed.
I
- the merged typesources
- an Iterable
of Publisher
sequences to mergemaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogprefetch
- the inner source request sizeFlux
, subscribing early but keeping the original ordering@Deprecated public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources, boolean delayError, int maxConcurrency, int prefetch)
mergeSequential(Iterable, int, int)
or mergeSequentialDelayError(Iterable, int, int)
instead, will be removed in 3.1.0.Publisher
sequences from an Iterable
into an ordered merged
sequence. Unlike concat, the inner publishers are subscribed to eagerly. Unlike
merge, their emitted values are merged into the final sequence in subscription order.
I
- the merged typesources
- an Iterable
of Publisher
sequences to mergedelayError
- should any error be delayed after current merge backlogmaxConcurrency
- the request produced to the main source thus limiting concurrent merge backlogprefetch
- the inner source request sizeFlux
, subscribing early but keeping the original orderingpublic static <T> Flux<T> never()
Flux
that will never signal any data, error or completion signal.
T
- the Subscriber
type targetFlux
public static Flux<Integer> range(int start, int count)
Flux
that will only emit a sequence of incrementing integer from start
to start + count
then complete.
start
- the first integer to be emitcount
- the number ot times to emit an increment including the first valueFlux
public static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers)
FluxProcessor
whose data are emitted by the most recent emitted Publisher
. The Flux
will complete once both the publishers source and the last switched to Publisher
have completed.
T
- the produced typemergedPublishers
- The Publisher
of switching Publisher
to subscribe to.FluxProcessor
accepting publishers and producing Tpublic static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers, int prefetch)
FluxProcessor
whose data are emitted by the most recent emitted Publisher
. The Flux
will complete once both the publishers source and the last switched to Publisher
have completed.
T
- the produced typemergedPublishers
- The Publisher
of switching Publisher
to subscribe to.prefetch
- the inner source request sizeFluxProcessor
accepting publishers and producing Tpublic static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)
Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal even.
public static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager)
T
- emitted typeD
- resource typeresourceSupplier
- a Callable
that is called on subscribesourceSupplier
- a Publisher
factory derived from the supplied resourceresourceCleanup
- invoked on completioneager
- true to clean before terminating downstream subscriberspublic static <T1,T2,O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends O> combinator)
T1
- type of the value from source1T2
- type of the value from source2O
- The produced output after transformation by the combinatorsource1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamFlux
public static <T1,T2> Flux<Tuple2<T1,T2>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2)
public static <T1,T2,T3> Flux<Tuple3<T1,T2,T3>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3)
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4> Flux<Tuple4<T1,T2,T3,T4>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4)
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5> Flux<Tuple5<T1,T2,T3,T4,T5>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5)
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.Flux
public static <T1,T2,T3,T4,T5,T6> Flux<Tuple6<T1,T2,T3,T4,T5,T6>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6)
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6source1
- The first upstream Publisher
to subscribe to.source2
- The second upstream Publisher
to subscribe to.source3
- The third upstream Publisher
to subscribe to.source4
- The fourth upstream Publisher
to subscribe to.source5
- The fifth upstream Publisher
to subscribe to.source6
- The sixth upstream Publisher
to subscribe to.Flux
@Deprecated public static Flux<Tuple2> zip(Iterable<? extends Publisher<?>> sources)
zip(Iterable, Function)
, will be removed in 3.1.0Iterable.iterator()
will be called on each Publisher.subscribe(Subscriber)
.
Note that this version is very limited compared to the alternatives where you either provide
a fixed number of sources or a more meaningful combinator. Here we default to combining into
Tuples
, which limits us to up to 8 sources (Tuple8
). Additionally, since a
Tuple2
is returned, the usage will probably be limited to iterating over the Tuple
(or casting to the correct TupleN, but if you can do that you're better off using the fixed
size signatures like zip(Publisher, Publisher, Publisher)
). Consider using
zip(Iterable, Function)
instead, with Arrays.asList(Object[])
as a combinator, if you really don't know the number of sources or if it can grow beyond
8.
sources
- the Iterable
to iterate on Publisher.subscribe(Subscriber)
Flux
zip(Publisher, Publisher, Publisher)
,
zip(Iterable, Function)
public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources, Function<? super Object[],? extends O> combinator)
Iterable.iterator()
will be called on each Publisher.subscribe(Subscriber)
.
O
- the combined produced typesources
- the Iterable
to iterate on Publisher.subscribe(Subscriber)
combinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources, int prefetch, Function<? super Object[],? extends O> combinator)
Iterable.iterator()
will be called on each Publisher.subscribe(Subscriber)
.
O
- the combined produced typesources
- the Iterable
to iterate on Publisher.subscribe(Subscriber)
prefetch
- the inner source request sizecombinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
@SafeVarargs public static <I,O> Flux<O> zip(Function<? super Object[],? extends O> combinator, Publisher<? extends I>... sources)
I
- the type of the input sourcesO
- the combined produced typecombinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamsources
- the Publisher
array to iterate on Publisher.subscribe(Subscriber)
Flux
@SafeVarargs public static <I,O> Flux<O> zip(Function<? super Object[],? extends O> combinator, int prefetch, Publisher<? extends I>... sources)
I
- the type of the input sourcesO
- the combined produced typecombinator
- The aggregate function that will receive a unique value from each upstream and return the
value to signal downstreamprefetch
- individual source request sizesources
- the Publisher
array to iterate on Publisher.subscribe(Subscriber)
Flux
public static <TUPLE extends Tuple2,V> Flux<V> zip(Publisher<? extends Publisher<?>> sources, Function<? super TUPLE,? extends V> combinator)
Publisher
of Publisher
will
accumulate into a list until completion before starting zip operation. The operator will forward all combinations
of the most recent items emitted by each published source until any of them completes. Errors will immediately be
forwarded.
TUPLE
- the raw tuple typeV
- The produced output after transformation by the given combinatorsources
- The publisher of upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
based on the produced valuepublic final Mono<Boolean> all(Predicate<? super T> predicate)
Predicate
.
The implementation uses short-circuit logic and completes with false if the predicate doesn't match a value.
public final Mono<Boolean> any(Predicate<? super T> predicate)
Flux
sequence match
the predicate.
The implementation uses short-circuit logic and completes with true if the predicate matches a value.
predicate
- predicate tested upon valuesFlux
with true
if any value satisfies a predicate and false
otherwisepublic final <P> P as(Function<? super Flux<T>,P> transformer)
Flux
in order to generate a target type.
flux.as(Mono::from).subscribe()
P
- the returned typetransformer
- the Function
to immediately map this Flux
into a target type
instance.for a bounded conversion to {@link Publisher}
public final Flux<T> awaitOnSubscribe()
This helps with child Subscribers that don't expect a recursive call from onSubscribe into their onNext because, for example, they request immediately from their onSubscribe but don't finish their preparation before that and onNext runs into a half-prepared state. This can happen with non Reactor mentality based Subscribers.
Flux
public final T blockFirst()
public final T blockFirst(Duration d)
d
- max duration timeout to wait for.@Deprecated public final T blockFirstMillis(long timeout)
Duration
based variants instead, will be removed in 3.1.0timeout
- max duration timeout in millis to wait for.public final T blockLast()
public final T blockLast(Duration d)
d
- max duration timeout to wait for.@Deprecated public final T blockLastMillis(long timeout)
Duration
based variants instead, will be removed in 3.1.0timeout
- max duration timeout in millis to wait for.public final Flux<List<T>> buffer()
Flux
of at most one List
for an alternative collecting algorithm returning {@link Mono}
public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, Supplier<C> bufferSupplier)
Collection
buckets that will be
pushed into the returned Flux
when the given max size is reached or onComplete is received.
C
- the supplied Collection
typemaxSize
- the maximum collected sizebufferSupplier
- the collection to use for each data segmentFlux
of Collection
public final Flux<List<T>> buffer(int maxSize, int skip)
List
that will be pushed into the returned Flux
when the
given max size is reached or onComplete is received. A new container List
will be created every given
skip count.
When Skip > Max Size : dropping buffers
When Skip < Max Size : overlapping buffers
When Skip == Max Size : exact buffers
public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, int skip, Supplier<C> bufferSupplier)
Collection
that will be pushed into
the returned Flux
when the
given max size is reached or onComplete is received. A new container
Collection
will be created every given
skip count.
When Skip > Max Size : dropping buffers
When Skip < Max Size : overlapping buffers
When Skip == Max Size : exact buffers
C
- the supplied Collection
typeskip
- the number of items to skip before creating a new bucketmaxSize
- the max collected sizebufferSupplier
- the collection to use for each data segmentFlux
of possibly overlapped or gapped
Collection
public final <C extends Collection<? super T>> Flux<C> buffer(Publisher<?> other, Supplier<C> bufferSupplier)
C
- the supplied Collection
typeother
- the other Publisher
to subscribe to for emitting and recycling receiving bucketbufferSupplier
- the collection to use for each data segmentFlux
of Collection
delimited by a Publisher
@Deprecated public final <U,V> Flux<List<T>> buffer(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector)
bufferWhen(Publisher, Function)
instead.List
delimited by the given Publisher
signals. Each List
bucket will last until the mapped Publisher
receiving the boundary signal emits, thus releasing the
bucket to the returned Flux
.
When Open signal is strictly not overlapping Close signal : dropping buffers
When Open signal is strictly more frequent than Close signal : overlapping buffers
When Open signal is exactly coordinated with Close signal : exact buffers
U
- the element type of the bucket-opening sequenceV
- the element type of the bucket-closing sequencebucketOpening
- a Publisher
to subscribe to for creating new receiving bucket signals.closeSelector
- a Publisher
factory provided the opening signal and returning a Publisher
to
subscribe to for emitting relative bucket.Flux
of List
delimited by an opening Publisher
and a relative
closing Publisher
@Deprecated public final <U,V,C extends Collection<? super T>> Flux<C> buffer(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector, Supplier<C> bufferSupplier)
bufferWhen(Publisher, Function, Supplier)
instead.Collection
delimited by the given Publisher
signals. Each Collection
bucket will last until the mapped Publisher
receiving the boundary signal emits, thus releasing the
bucket to the returned Flux
.
When Open signal is strictly not overlapping Close signal : dropping buffers
When Open signal is strictly more frequent than Close signal : overlapping buffers
When Open signal is exactly coordinated with Close signal : exact buffers
U
- the element type of the bucket-opening sequenceV
- the element type of the bucket-closing sequenceC
- the supplied Collection
typebucketOpening
- a Publisher
to subscribe to for creating new receiving bucket signals.closeSelector
- a Publisher
factory provided the opening signal and returning a Publisher
to
subscribe to for emitting relative bucket.bufferSupplier
- the collection to use for each data segmentFlux
of Collection
delimited by an opening Publisher
and a relative
closing Publisher
public final Flux<List<T>> buffer(Duration timespan, Duration timeshift)
List
delimited by the given timeshift
period. Each List
bucket will last until the timespan
has elapsed, thus releasing the bucket to the returned Flux
.
When timeshift > timespan : dropping buffers
When timeshift < timespan : overlapping buffers
When timeshift == timespan : exact buffers
public final Flux<List<T>> buffer(Duration timespan, Duration timeshift, Scheduler timer)
List
delimited by the given timeshift
period. Each List
bucket will last until the timespan
has elapsed, thus releasing the bucket to the returned Flux
.
When timeshift > timespan : dropping buffers
When timeshift < timespan : overlapping buffers
When timeshift == timespan : exact buffers
@Deprecated public final Flux<List<T>> buffer(int maxSize, Duration timespan)
bufferTimeout(int, Duration)
instead, will be removed in 3.1.0@Deprecated public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, Duration timespan, Supplier<C> bufferSupplier)
bufferTimeout(int, Duration, Supplier)
instead, will be removed in 3.1.0Collection
that will be pushed into the returned Flux
every timespan OR
maxSize items.
C
- the supplied Collection
typemaxSize
- the max collected sizetimespan
- the timeout to use to release a buffered listbufferSupplier
- the collection to use for each data segmentFlux
of Collection
delimited by given size or a given period timeoutpublic final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration timespan, Supplier<C> bufferSupplier)
Collection
that will be pushed into the returned Flux
every timespan OR
maxSize items.
C
- the supplied Collection
typemaxSize
- the max collected sizetimespan
- the timeout to use to release a buffered listbufferSupplier
- the collection to use for each data segmentFlux
of Collection
delimited by given size or a given period timeoutpublic final Flux<List<T>> bufferTimeout(int maxSize, Duration timespan, Scheduler timer)
public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration timespan, Scheduler timer, Supplier<C> bufferSupplier)
Collection
that will be pushed into the returned Flux
every timespan OR
maxSize items
C
- the supplied Collection
typemaxSize
- the max collected sizetimespan
- the timeout to use to release a buffered collectiontimer
- a time-capable Scheduler
instance to run onbufferSupplier
- the collection to use for each data segmentFlux
of Collection
delimited by given size or a given period timeout@Deprecated public final Flux<List<T>> bufferMillis(long timespan)
Duration
based variants instead, will be removed in 3.1.0@Deprecated public final Flux<List<T>> bufferMillis(long timespan, TimedScheduler timer)
Duration
based variants instead, will be removed in 3.1.0@Deprecated public final Flux<List<T>> bufferMillis(long timespan, long timeshift)
Duration
based variants instead, will be removed in 3.1.0List
delimited by the given timeshift
period. Each List
bucket will last until the timespan
has elapsed, thus releasing the bucket to the returned Flux
.
When timeshift > timespan : dropping buffers
When timeshift < timespan : overlapping buffers
When timeshift == timespan : exact buffers
@Deprecated public final Flux<List<T>> bufferMillis(long timespan, long timeshift, TimedScheduler timer)
Duration
based variants instead, will be removed in 3.1.0List
delimited by the given timeshift
period. Each List
bucket will last until the timespan
has elapsed, thus releasing the bucket to the returned Flux
.
When timeshift > timespan : dropping buffers
When timeshift < timespan : overlapping buffers
When timeshift == timespan : exact buffers
@Deprecated public final Flux<List<T>> bufferMillis(int maxSize, long timespan)
bufferTimeout(int, Duration)
instead, will be removed in 3.1.0@Deprecated public final Flux<List<T>> bufferMillis(int maxSize, long timespan, TimedScheduler timer)
bufferTimeout(int, Duration, Scheduler)
instead, will be removed in 3.1.0@Deprecated public final <C extends Collection<? super T>> Flux<C> bufferMillis(int maxSize, long timespan, TimedScheduler timer, Supplier<C> bufferSupplier)
bufferTimeout(int, Duration, Scheduler, Supplier)
instead, will be removed in 3.1.0Collection
that will be pushed into the returned Flux
every timespan OR
maxSize items
C
- the supplied Collection
typemaxSize
- the max collected sizetimespan
- the timeout to use to release a buffered collectiontimer
- a time-capable Scheduler
instance to run onbufferSupplier
- the collection to use for each data segmentFlux
of Collection
delimited by given size or a given period timeout@Deprecated public final Flux<List<T>> bufferTimeoutMillis(int maxSize, long timespan)
Duration
based variants instead, will be removed in 3.1.0@Deprecated public final Flux<List<T>> bufferTimeoutMillis(int maxSize, long timespan, TimedScheduler timer)
Duration
based variants instead, will be removed in 3.1.0@Deprecated public final <C extends Collection<? super T>> Flux<C> bufferTimeoutMillis(int maxSize, long timespan, TimedScheduler timer, Supplier<C> bufferSupplier)
Duration
based variants instead, will be removed in 3.1.0Collection
that will be pushed into the returned Flux
every timespan OR
maxSize items
C
- the supplied Collection
typemaxSize
- the max collected sizetimespan
- the timeout to use to release a buffered collectiontimer
- a time-capable Scheduler
instance to run onbufferSupplier
- the collection to use for each data segmentFlux
of Collection
delimited by given size or a given period timeoutpublic final Flux<List<T>> bufferUntil(Predicate<? super T> predicate)
List
that will be pushed into
the returned Flux
each time the given predicate returns true. Note that
the element that triggers the predicate to return true (and thus closes a buffer)
is included as last element in the emitted buffer.
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
public final Flux<List<T>> bufferUntil(Predicate<? super T> predicate, boolean cutBefore)
List
that will be pushed into
the returned Flux
each time the given predicate returns true. Note that
the buffer into which the element that triggers the predicate to return true
(and thus closes a buffer) is included depends on the cutBefore
parameter:
set it to true to include the boundary element in the newly opened buffer, false to
include it in the closed buffer (as in bufferUntil(Predicate)
).
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
public final Flux<List<T>> bufferWhile(Predicate<? super T> predicate)
List
that will be pushed into
the returned Flux
. Each buffer continues aggregating values while the
given predicate returns true, and a new buffer is created as soon as the
predicate returns false... Note that the element that triggers the predicate
to return false (and thus closes a buffer) is NOT included in any emitted buffer.
On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
public final <U,V> Flux<List<T>> bufferWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector)
List
buffers started each time an opening
companion Publisher
emits. Each buffer will last until the corresponding
closing companion Publisher
emits, thus releasing the buffer to the resulting Flux
.
When Open signal is strictly not overlapping Close signal : dropping buffers
When Open signal is strictly more frequent than Close signal : overlapping buffers
When Open signal is exactly coordinated with Close signal : exact buffers
U
- the element type of the buffer-opening sequenceV
- the element type of the buffer-closing sequencebucketOpening
- a companion Publisher
to subscribe for buffer creation signals.closeSelector
- a factory that, given a buffer opening signal, returns a companion
Publisher
to subscribe to for buffer closure and emission signals.Flux
of List
delimited by an opening Publisher
and a relative
closing Publisher
public final <U,V,C extends Collection<? super T>> Flux<C> bufferWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector, Supplier<C> bufferSupplier)
Collection
buffers started each time an opening
companion Publisher
emits. Each buffer will last until the corresponding
closing companion Publisher
emits, thus releasing the buffer to the resulting Flux
.
When Open signal is strictly not overlapping Close signal : dropping buffers
When Open signal is strictly more frequent than Close signal : overlapping buffers
When Open signal is exactly coordinated with Close signal : exact buffers
U
- the element type of the buffer-opening sequenceV
- the element type of the buffer-closing sequenceC
- the Collection
buffer typebucketOpening
- a companion Publisher
to subscribe for buffer creation signals.closeSelector
- a factory that, given a buffer opening signal, returns a companion
Publisher
to subscribe to for buffer closure and emission signals.bufferSupplier
- a Supplier
of the concrete Collection
to use for each bufferFlux
of Collection
delimited by an opening Publisher
and a relative
closing Publisher
public final Flux<T> cache()
Flux
into a hot source and cache last emitted signals for further Subscriber
. Will
retain up an unbounded volume of onNext signals. Completion and Error will also be
replayed.
Flux
public final Flux<T> cache(int history)
Flux
into a hot source and cache last emitted signals for further Subscriber
.
Will retain up to the given history size onNext signals. Completion and Error will also be
replayed.
history
- number of events retained in history excluding complete and errorFlux
public final Flux<T> cache(Duration ttl)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain an unbounded history with per-item expiry timeout
Completion and Error will also be replayed.
ttl
- Time-to-live for each cached item.Flux
public final Flux<T> cache(int history, Duration ttl)
Flux
into a hot source and cache last emitted signals for further
Subscriber
. Will retain up to the given history size with per-item expiry
timeout.
history
- number of events retained in history excluding complete and errorttl
- Time-to-live for each cached item.Flux
public final <E> Flux<E> cast(Class<E> clazz)
Flux
produced type into a target produced type.
public final Flux<T> checkpoint()
Flux
, in case of an error
upstream of the checkpoint.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
Flux
.public final Flux<T> checkpoint(String description)
Flux
and give it
a description that will be reflected in the assembly traceback in case
of an error upstream of the checkpoint.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
The description could for example be a meaningful name for the assembled flux or a wider correlation ID.
description
- a description to include in the assembly traceback.Flux
.public final <E> Mono<E> collect(Supplier<E> containerSupplier, BiConsumer<E,? super T> collector)
Flux
sequence with the given collector and supplied container on subscribe.
The collected result will be emitted when this sequence completes.
public final <R,A> Mono<R> collect(Collector<? super T,A,? extends R> collector)
Flux
sequence with the given collector and supplied container on subscribe.
The collected result will be emitted when this sequence completes.
public final <K> Mono<Map<K,T>> collectMap(Function<? super T,? extends K> keyExtractor)
Flux
sequence into a hashed map where the key is extracted by the given Function
and the
value will be the most recent emitted item for this key.
K
- the key extracted from each value of this Flux instancekeyExtractor
- a Function
to route items into a keyed Collection
Mono
of all last matched key-values from this Flux
public final <K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor)
Flux
sequence into a hashed map where the key is extracted by the given function and the value will be
the most recent extracted item for this key.
K
- the key extracted from each value of this Flux instanceV
- the value extracted from each value of this Flux instancekeyExtractor
- a Function
to route items into a keyed Collection
valueExtractor
- a Function
to select the data to store from each itemMono
of all last matched key-values from this Flux
public final <K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor, Supplier<Map<K,V>> mapSupplier)
Flux
sequence into a supplied map where the key is extracted by the given function and the value will
be the most recent extracted item for this key.
K
- the key extracted from each value of this Flux instanceV
- the value extracted from each value of this Flux instancekeyExtractor
- a Function
to route items into a keyed Collection
valueExtractor
- a Function
to select the data to store from each itemmapSupplier
- a Map
factory called for each Subscriber
Mono
of all last matched key-values from this Flux
public final <K> Mono<Map<K,Collection<T>>> collectMultimap(Function<? super T,? extends K> keyExtractor)
Flux
sequence into a hashed map where the key is extracted by the given function and the value will be
all the emitted item for this key.
K
- the key extracted from each value of this Flux instancekeyExtractor
- a Function
to route items into a keyed Collection
Mono
of all matched key-values from this Flux
public final <K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor)
Flux
sequence into a hashed map where the key is extracted by the given function and the value will be
all the extracted items for this key.
K
- the key extracted from each value of this Flux instanceV
- the value extracted from each value of this Flux instancekeyExtractor
- a Function
to route items into a keyed Collection
valueExtractor
- a Function
to select the data to store from each itemMono
of all matched key-values from this Flux
public final <K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor, Supplier<Map<K,Collection<V>>> mapSupplier)
Flux
sequence into a supplied map where the key is extracted by the given function and the value will
be all the extracted items for this key.
K
- the key extracted from each value of this Flux instanceV
- the value extracted from each value of this Flux instancekeyExtractor
- a Function
to route items into a keyed Collection
valueExtractor
- a Function
to select the data to store from each itemmapSupplier
- a Map
factory called for each Subscriber
Mono
of all matched key-values from this Flux
public final Mono<List<T>> collectSortedList(Comparator<? super T> comparator)
Flux
sequence in a List
that is emitted to the returned Mono
on
onComplete.
comparator
- a Comparator
to sort the items of this sequencesMono
of all sorted values from this Flux
public final <V> Flux<V> compose(Function<? super Flux<T>,? extends Publisher<V>> transformer)
Flux
in order to generate a target Flux
for each
new Subscriber
.
flux.compose(Mono::from).subscribe()
V
- the item type in the returned Publisher
transformer
- the Function
to map this Flux
into a target Publisher
instance for each new subscriberFlux
for immmediate transformation of {@link Flux}
,
for a loose conversion to an arbitrary type
public final <V> Flux<V> concatMap(Function<? super T,? extends Publisher<? extends V>> mapper)
flatMap(Function)
, but preserve
ordering and concatenate emissions instead of merging (no interleave).
Errors will immediately short circuit current concat backlog.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of VFlux
public final <V> Flux<V> concatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int prefetch)
flatMap(Function)
, but preserve
ordering and concatenate emissions instead of merging (no interleave).
Errors will immediately short circuit current concat backlog.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of Vprefetch
- the inner source produced demandFlux
public final <V> Flux<V> concatMapDelayError(Function<? super T,Publisher<? extends V>> mapper)
flatMap(Function)
, but preserve
ordering and concatenate emissions instead of merging (no interleave).
Errors will be delayed after the current concat backlog.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of VFlux
public final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, int prefetch)
flatMap(Function)
, but preserve
ordering and concatenate emissions instead of merging (no interleave).
Errors will be delayed after all concated sources terminate.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of Vprefetch
- the inner source produced demandFlux
public final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, boolean delayUntilEnd, int prefetch)
flatMap(Function)
, but preserve
ordering and concatenate emissions instead of merging (no interleave).
Errors will be delayed after the current concat backlog if delayUntilEnd is
false or after all sources if delayUntilEnd is true.
V
- the produced concatenated typemapper
- the function to transform this sequence of T into concatenated sequences of VdelayUntilEnd
- delay error until all sources have been consumed instead of
after the current sourceprefetch
- the inner source produced demandFlux
public final <R> Flux<R> concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
Flux
into Iterable
, then flatten the elements from those by
concatening them into a single Flux
.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
public final <R> Flux<R> concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper, int prefetch)
Flux
into Iterable
, then flatten the emissions from those by
concatening them into a single Flux
. The prefetch argument allows to give an arbitrary prefetch size to the merged Iterable
.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N Iterable
prefetch
- the maximum in-flight elements from each inner Iterable
sequenceFlux
public final Mono<Long> count()
Flux
.
The count will be emitted when onComplete is observed.
public final Flux<T> defaultIfEmpty(T defaultV)
defaultV
- the alternate value if this sequence is emptyFlux
@Deprecated public final Flux<T> delay(Duration delay)
delayElements(Duration)
in 3.1.0delay
- duration to delay each Subscriber.onNext(T)
signalFlux
delaySubscription to introduce a delay at the beginning of the sequence only
public final Flux<T> delayElements(Duration delay)
delay
- duration to delay each Subscriber.onNext(T)
signalFlux
delaySubscription to introduce a delay at the beginning of the sequence only
public final Flux<T> delayElements(Duration delay, Scheduler timer)
Flux
elements (Subscriber.onNext(T)
signals)
by a given Duration
. Signals are delayed and played on the
parallel
default Scheduler.
delay
- period to delay each Subscriber.onNext(T)
signaltimer
- a time-capable Scheduler
instance to delay each signal onFlux
@Deprecated public final Flux<T> delayMillis(long delay)
delayElements(Duration)
in 3.1.0Flux
elements (Subscriber.onNext(T)
signals)
by a given duration in milliseconds.
delay
- period to delay each Subscriber.onNext(T)
signal, in millisecondsFlux
@Deprecated public final Flux<T> delayElementsMillis(long delay)
Duration
based variants instead, will be removed in 3.1.0Flux
elements (Subscriber.onNext(T)
signals)
by a given duration in milliseconds.
delay
- period to delay each Subscriber.onNext(T)
signal, in millisecondsFlux
@Deprecated public final Flux<T> delayMillis(long delay, TimedScheduler timer)
delayElements(Duration, Scheduler)
in 3.1.0Flux
elements (Subscriber.onNext(T)
signals)
by a given duration in milliseconds.
delay
- period to delay each Subscriber.onNext(T)
signal, in millisecondstimer
- a time-capable Scheduler
instance to delay each signal onFlux
@Deprecated public final Flux<T> delayElementsMillis(long delay, TimedScheduler timer)
Duration
based variants instead, will be removed in 3.1.0Flux
elements (Subscriber.onNext(T)
signals)
by a given duration in milliseconds.
delay
- period to delay each Subscriber.onNext(T)
signal, in millisecondstimer
- a time-capable Scheduler
instance to delay each signal onFlux
public final Flux<T> delaySubscription(Duration delay)
subscription
to this Flux
source until the given
period elapses. The delay is introduced through the parallel
default Scheduler.
public final <U> Flux<T> delaySubscription(Publisher<U> subscriptionDelay)
U
- the other source typesubscriptionDelay
- a
Publisher
to signal by next or complete this Publisher.subscribe(Subscriber)
Flux
@Deprecated public final Flux<T> delaySubscriptionMillis(long delay)
Duration
based variants instead, will be removed in 3.1.0subscription
to this Flux
source until the given
period elapses. The delay is introduced through the parallel
default Scheduler.
@Deprecated public final Flux<T> delaySubscriptionMillis(long delay, TimedScheduler timer)
Duration
based variants instead, will be removed in 3.1.0public final <X> Flux<X> dematerialize()
Flux
is a emits onNext, onError or onComplete Signal
. The relative Subscriber
callback will be invoked, error Signal
will trigger onError and complete Signal
will trigger
onComplete.
X
- the dematerialized typeFlux
public final <V> Flux<T> distinct(Function<? super T,? extends V> keySelector)
Subscriber
, tracks this Flux
values that have been seen and
filters out duplicates given the extracted key.
V
- the type of the key extracted from each value in this sequencekeySelector
- function to compute comparison key for each elementFlux
with values having distinct keyspublic final Flux<T> distinctUntilChanged()
Flux
with conflated repeated elementspublic final <V> Flux<T> distinctUntilChanged(Function<? super T,? extends V> keySelector)
V
- the type of the key extracted from each value in this sequencekeySelector
- function to compute comparison key for each elementFlux
with conflated repeated elements given a comparison keypublic final Flux<T> doAfterTerminate(Runnable afterTerminate)
Flux
terminates, either by completing downstream successfully or with an error.
afterTerminate
- the callback to call after Subscriber.onComplete()
or Subscriber.onError(java.lang.Throwable)
Flux
public final Flux<T> doOnCancel(Runnable onCancel)
Flux
is cancelled.
onCancel
- the callback to call on Subscription.cancel()
Flux
public final Flux<T> doOnComplete(Runnable onComplete)
Flux
completes successfully.
onComplete
- the callback to call on Subscriber.onComplete()
Flux
public final Flux<T> doOnEach(Consumer<? super Signal<T>> signalConsumer)
Flux
emits an item, fails with an error
or completes successfully. All these events are represented as a Signal
that is passed to the side-effect callback. Note that this is an advanced operator,
typically used for monitoring of a Flux.signalConsumer
- the mandatory callback to call on
Subscriber.onNext(Object)
, Subscriber.onError(Throwable)
and
Subscriber.onComplete()
Flux
doOnNext(Consumer)
,
doOnError(Consumer)
,
doOnComplete(Runnable)
,
materialize()
,
Signal
public final Flux<T> doOnError(Consumer<? super Throwable> onError)
Flux
completes with an error.
onError
- the callback to call on Subscriber.onError(java.lang.Throwable)
Flux
public final <E extends Throwable> Flux<T> doOnError(Class<E> exceptionType, Consumer<? super E> onError)
Flux
completes with an error matching the given exception type.
E
- type of the error to handleexceptionType
- the type of exceptions to handleonError
- the error handler for each errorFlux
public final Flux<T> doOnError(Predicate<? super Throwable> predicate, Consumer<? super Throwable> onError)
Flux
completes with an error matching the given exception.
predicate
- the matcher for exceptions to handleonError
- the error handler for each errorFlux
public final Flux<T> doOnNext(Consumer<? super T> onNext)
Flux
emits an item.
onNext
- the callback to call on Subscriber.onNext(T)
Flux
public final Flux<T> doOnRequest(LongConsumer consumer)
consumer
- the consumer to invoke on each requestFlux
public final Flux<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe)
Flux
is subscribed.
onSubscribe
- the callback to call on Subscriber.onSubscribe(org.reactivestreams.Subscription)
Flux
public final Flux<T> doOnTerminate(Runnable onTerminate)
Flux
terminates, either by completing successfully or with an error.
onTerminate
- the callback to call on Subscriber.onComplete()
or Subscriber.onError(java.lang.Throwable)
Flux
public final Flux<T> doFinally(Consumer<SignalType> onFinally)
Flux
terminates for any reason,
including cancellation. The terminating event (SignalType.ON_COMPLETE
,
SignalType.ON_ERROR
and SignalType.CANCEL
) is passed to the consumer,
which is executed after the signal has been passed downstream.
Note that the fact that the signal is propagated downstream before the callback is
executed means that several doFinally in a row will be executed in
reverse order. If you want to assert the execution of the callback
please keep in mind that the Flux will complete before it is executed, so its
effect might not be visible immediately after eg. a blockLast()
.
onFinally
- the callback to execute after a terminal signal (complete, error
or cancel)Flux
public final Flux<Tuple2<Long,T>> elapsed()
Flux
sequence into Tuple2
of T1 Long
timemillis and T2
T
associated data. The timemillis corresponds to the elapsed time between the subscribe and the first
next signal OR between two next signals.
Flux
that emits tuples of time elapsed in milliseconds and matching data@Deprecated public final Flux<Tuple2<Long,T>> elapsed(TimedScheduler scheduler)
elapsed(Scheduler)
Flux
sequence into Tuple2
of T1
Long
timemillis and T2 T
associated data. The timemillis
corresponds to the elapsed time between the subscribe and the first next signal OR
between two next signals.
scheduler
- a Scheduler
instance to read time from
Flux
that emits tuples of time elapsed in
milliseconds and matching datapublic final Flux<Tuple2<Long,T>> elapsed(Scheduler scheduler)
Flux
sequence into Tuple2
of T1
Long
timemillis and T2 T
associated data. The timemillis
corresponds to the elapsed time between the subscribe and the first next signal OR
between two next signals.
scheduler
- a Scheduler
instance to read time from
Flux
that emits tuples of time elapsed in
milliseconds and matching datapublic final Mono<T> elementAt(int index)
IndexOutOfBoundsException
if the sequence is shorter.
index
- index of an itemMono
of the item at a specified indexpublic final Mono<T> elementAt(int index, T defaultValue)
index
- index of an itemdefaultValue
- supply a default value if not foundMono
of the item at a specified index or a default valuepublic final Flux<T> filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate)
Flux
asynchronously using a generated
Publisher<Boolean>
test. A value is replayed if the first item emitted
by its corresponding test is true. It is dropped if its test is either
empty or its first emitted value is false.
Note that only the first value of the test publisher is considered, and unless it
is a Mono
, test will be cancelled after receiving that first value. Test
publishers are generated and subscribed to in sequence.
public final Flux<T> filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate, int bufferSize)
Flux
asynchronously using a generated
Publisher<Boolean>
test. A value is replayed if the first item emitted
by its corresponding test is true. It is dropped if its test is either
empty or its first emitted value is false.
Note that only the first value of the test publisher is considered, and unless it
is a Mono
, test will be cancelled after receiving that first value. Test
publishers are generated and subscribed to in sequence.
asyncPredicate
- the function generating a Publisher
of Boolean
for each value, to filter the Flux withbufferSize
- the maximum expected number of values to hold pending a result of
their respective asynchronous predicates, rounded to the next power of two. This is
capped depending on the size of the heap and the JVM limits, so be careful with
large values (although eg. 65536 should still be fine). Also serves as
the initial request size for the source.Flux
public final Flux<T> firstEmittingWith(Publisher<? extends T> other)
other
- the Publisher
to race withfirstEmitting(org.reactivestreams.Publisher<? extends I>...)
public final <R> Flux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency)
public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)
Flux
into Publishers, then flatten the emissions from those by
merging them into a single Flux
, so that they may interleave. The concurrency argument allows to
control how many merged Publisher
can happen in parallel. The prefetch argument allows to give an
arbitrary prefetch size to the merged Publisher
.
V
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
concurrency
- the maximum in-flight elements from this Flux
sequenceprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
public final <V> Flux<V> flatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)
Flux
into Publishers, then flatten the emissions from those by
merging them into a single Flux
, so that they may interleave. The concurrency argument allows to
control how many merged Publisher
can happen in parallel. The prefetch argument allows to give an
arbitrary prefetch size to the merged Publisher
. This variant will delay any error until after the
rest of the flatMap backlog has been processed.
V
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
concurrency
- the maximum in-flight elements from this Flux
sequenceprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
@Deprecated public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, boolean delayError, int concurrency, int prefetch)
flatMap(Function, int, int)
or flatMapDelayError(Function, int, int)
instead, will be removed in 3.1.0.Flux
into Publishers, then flatten the emissions from those by
merging them into a single Flux
, so that they may interleave. The concurrency argument allows to
control how many merged Publisher
can happen in parallel. The prefetch argument allows to give an
arbitrary prefetch size to the merged Publisher
.
V
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
delayError
- should any error be delayed after current merge backlogconcurrency
- the maximum in-flight elements from this Flux
sequenceprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
public final <R> Flux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapperOnNext, Function<Throwable,? extends Publisher<? extends R>> mapperOnError, Supplier<? extends Publisher<? extends R>> mapperOnComplete)
Flux
into Publishers, then flatten the emissions from those by
merging them into a single Flux
, so that they may interleave.
OnError will be transformed into completion signal after its mapping callback has been applied.
R
- the output Publisher
type targetmapperOnNext
- the Function
to call on next data and returning a sequence to mergemapperOnError
- the Function
to call on error signal and returning a sequence to mergemapperOnComplete
- the Function
to call on complete signal and returning a sequence to mergeFlux
public final <R> Flux<R> flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
Flux
into Iterable
, then flatten the elements from those by
merging them into a single Flux
.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
public final <R> Flux<R> flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper, int prefetch)
Flux
into Iterable
, then flatten the emissions from those by
merging them into a single Flux
. The prefetch argument allows to give an
arbitrary prefetch size to the merged Iterable
.
Note that unlike flatMap(Function)
and concatMap(Function)
, with Iterable there is
no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
Thus flatMapIterable
and concatMapIterable
are equivalent offered as a discoverability
improvement for users that explore the API with the concat vs flatMap expectation.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N Iterable
prefetch
- the maximum in-flight elements from each inner Iterable
sequenceFlux
public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper)
Flux
into Publishers, then flatten the
emissions from those by merging them into a single Flux
, in order.
Unlike concatMap, transformed inner Publishers are subscribed to eagerly. Unlike
flatMap, their emitted elements are merged respecting the order of the original sequence.
public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency)
Flux
into Publishers, then flatten the
emissions from those by merging them into a single Flux
, in order.
Unlike concatMap, transformed inner Publishers are subscribed to eagerly. Unlike
flatMap, their emitted elements are merged respecting the order of the original
sequence. The concurrency argument allows to control how many merged
Publisher
can happen in parallel.
public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency, int prefetch)
Flux
into Publishers, then flatten the
emissions from those by merging them into a single Flux
, in order.
Unlike concatMap, transformed inner Publishers are subscribed to eagerly. Unlike
flatMap, their emitted elements are merged respecting the order of the original
sequence. The concurrency argument allows to control how many merged Publisher
can happen in parallel. The prefetch argument allows to give an arbitrary prefetch
size to the merged Publisher
.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
maxConcurrency
- the maximum in-flight elements from this Flux
sequenceprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
, subscribing early but keeping the original orderingpublic final <R> Flux<R> flatMapSequentialDelayError(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency, int prefetch)
Flux
into Publishers, then flatten the
emissions from those by merging them into a single Flux
, in order.
Unlike concatMap, transformed inner Publishers are subscribed to eagerly. Unlike
flatMap, their emitted elements are merged respecting the order of the original
sequence. The concurrency argument allows to control how many merged Publisher
can happen in parallel. The prefetch argument allows to give an arbitrary prefetch
size to the merged Publisher
. This variant will delay any error until after the
rest of the flatMap backlog has been processed.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
maxConcurrency
- the maximum in-flight elements from this Flux
sequenceprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
, subscribing early but keeping the original ordering@Deprecated public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency, int prefetch)
flatMapSequential(Function, int, int)
or flatMapSequentialDelayError(Function, int, int)
instead, will be removed in 3.1.0.Flux
into Publishers, then flatten the
emissions from those by merging them into a single Flux
, in order.
Unlike concatMap, transformed inner Publishers are subscribed to eagerly. Unlike
flatMap, their emitted elements are merged respecting the order of the original
sequence. The concurrency argument allows to control how many merged Publisher
can happen in parallel. The prefetch argument allows to give an arbitrary prefetch
size to the merged Publisher
.
R
- the merged output sequence typemapper
- the Function
to transform input sequence into N sequences Publisher
delayError
- should any error be delayed after current merge backlogmaxConcurrency
- the maximum in-flight elements from this Flux
sequenceprefetch
- the maximum in-flight elements from each inner Publisher
sequenceFlux
, subscribing early but keeping the original orderingpublic long getPrefetch()
Flux
Flux
, -1L if unspecifiedpublic final <K> Flux<GroupedFlux<K,T>> groupBy(Function<? super T,? extends K> keyMapper)
Flux
for each unique key evaluated by the given
key mapper.
K
- the key type extracted from each value of this sequencekeyMapper
- the key mapping Function
that evaluates an incoming data and returns a key.Flux
of GroupedFlux
grouped sequencespublic final <K> Flux<GroupedFlux<K,T>> groupBy(Function<? super T,? extends K> keyMapper, int prefetch)
Flux
for each unique key evaluated by the given
key mapper.
K
- the key type extracted from each value of this sequencekeyMapper
- the key mapping Function
that evaluates an incoming data and returns a key.prefetch
- the number of values to prefetch from the sourceFlux
of GroupedFlux
grouped sequencespublic final <K,V> Flux<GroupedFlux<K,V>> groupBy(Function<? super T,? extends K> keyMapper, Function<? super T,? extends V> valueMapper)
Flux
for each unique key evaluated by the given
key mapper. It will use the given value mapper to extract the element to route.
K
- the key type extracted from each value of this sequenceV
- the value type extracted from each value of this sequencekeyMapper
- the key mapping function that evaluates an incoming data and returns a key.valueMapper
- the value mapping function that evaluates which data to extract for re-routing.Flux
of GroupedFlux
grouped sequencespublic final <K,V> Flux<GroupedFlux<K,V>> groupBy(Function<? super T,? extends K> keyMapper, Function<? super T,? extends V> valueMapper, int prefetch)
Flux
for each unique key evaluated by the given
key mapper. It will use the given value mapper to extract the element to route.
K
- the key type extracted from each value of this sequenceV
- the value type extracted from each value of this sequencekeyMapper
- the key mapping function that evaluates an incoming data and returns a key.valueMapper
- the value mapping function that evaluates which data to extract for re-routing.prefetch
- the number of values to prefetch from the sourceFlux
of GroupedFlux
grouped sequencespublic final <TRight,TLeftEnd,TRightEnd,R> Flux<R> groupJoin(Publisher<? extends TRight> other, Function<? super T,? extends Publisher<TLeftEnd>> leftEnd, Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd, BiFunction<? super T,? super Flux<TRight>,? extends R> resultSelector)
Flux
that correlates two Publishers when they overlap in time
and groups the results.
There are no guarantees in what order the items get combined when multiple items from one or both source Publishers overlap.
Unlike join(org.reactivestreams.Publisher<? extends TRight>, java.util.function.Function<? super T, ? extends org.reactivestreams.Publisher<TLeftEnd>>, java.util.function.Function<? super TRight, ? extends org.reactivestreams.Publisher<TRightEnd>>, java.util.function.BiFunction<? super T, ? super TRight, ? extends R>)
, items from the right Publisher will be streamed
into the right resultSelector argument Flux
.
TRight
- the type of the right PublisherTLeftEnd
- this Flux
timeout typeTRightEnd
- the right Publisher timeout typeR
- the combined result typeother
- the other Publisher to correlate items from the source Publisher withleftEnd
- a function that returns a Publisher whose emissions indicate the
duration of the values of the source PublisherrightEnd
- a function that returns a Publisher whose emissions indicate the
duration of the values of the right
PublisherresultSelector
- a function that takes an item emitted by each Publisher and returns the
value to be emitted by the resulting PublisherFlux
public final <R> Flux<R> handle(BiConsumer<? super T,SynchronousSink<R>> handler)
Flux
by calling a biconsumer with the
output sink for each onNext. At most one SynchronousSink.next(Object)
call must be performed and/or 0 or 1 SynchronousSink.error(Throwable)
or
SynchronousSink.complete()
.R
- the transformed typehandler
- the handling BiConsumer
Flux
public final Mono<Boolean> hasElement(T value)
Flux
sequence match
the constant.
The implementation uses short-circuit logic and completes with true if the constant matches a value.
value
- constant compared to incoming signalsFlux
with true
if any value satisfies a predicate and false
otherwisepublic final Mono<Boolean> hasElements()
Flux
sequence has at least one element.
The implementation uses short-circuit logic and completes with true on onNext.
Mono
with true
if any value is emitted and false
otherwisepublic final Flux<T> hide()
Flux
and its Subscription
as well.Flux
defeating any Publisher
/ Subscription
feature-detectionpublic final Mono<T> ignoreElements()
Mono
.public final <TRight,TLeftEnd,TRightEnd,R> Flux<R> join(Publisher<? extends TRight> other, Function<? super T,? extends Publisher<TLeftEnd>> leftEnd, Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd, BiFunction<? super T,? super TRight,? extends R> resultSelector)
Flux
that correlates two Publishers when they overlap in time
and groups the results.
There are no guarantees in what order the items get combined when multiple items from one or both source Publishers overlap.
TRight
- the type of the right PublisherTLeftEnd
- this Flux
timeout typeTRightEnd
- the right Publisher timeout typeR
- the combined result typeother
- the other Publisher to correlate items from the source Publisher withleftEnd
- a function that returns a Publisher whose emissions indicate the
duration of the values of the source PublisherrightEnd
- a function that returns a Publisher whose emissions indicate the
duration of the values of the right
PublisherresultSelector
- a function that takes an item emitted by each Publisher and returns the
value to be emitted by the resulting PublisherFlux
public final Mono<T> last()
NoSuchElementException
error if the source was empty.
For a passive version use takeLast(int)
Flux
public final Mono<T> last(T defaultValue)
takeLast(int)
public final Flux<T> limitRate(int prefetchRate)
prefetchRate
when propagated upstream, effectively rate limiting
the upstream Publisher
.
Typically used for scenarios where consumer(s) request a large amount of data
(eg. Long.MAX_VALUE
) but the data source behaves better or can be optimized
with smaller requests (eg. database paging, etc...). All data is still processed.
Equivalent to flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe()
prefetchRate
- the limit to apply to downstream's backpressureFlux
limiting downstream's backpressurepublishOn(Scheduler, int)
public final Flux<T> log()
Logger
support to handle trace implementation. Default will
use Level.INFO
and java.util.logging. If SLF4J is available, it will be used instead.
The default log category will be "reactor.*", a generated operator suffix will complete, e.g. "reactor.Flux.Map".
Flux
public final Flux<T> log(String category)
Logger
support to handle trace implementation. Default will
use Level.INFO
and java.util.logging. If SLF4J is available, it will be used instead.
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will complete, e.g. "reactor.Flux.Map".Flux
public final Flux<T> log(String category, Level level, SignalType... options)
options
and
use Logger
support to
handle trace
implementation. Default will
use the passed Level
and java.util.logging. If SLF4J is available, it will be used instead.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
flux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
![]()
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will complete, e.g. "reactor.Flux.Map".level
- the Level
to enforce for this tracing Flux (only FINEST, FINE,
INFO, WARNING and SEVERE are taken into account)options
- a vararg SignalType
option to filter log messagesFlux
public final Flux<T> log(String category, Level level, boolean showOperatorLine, SignalType... options)
options
and use
Logger
support to handle trace implementation. Default will use the passed
Level
and java.util.logging. If SLF4J is available, it will be used
instead.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
flux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will complete, e.g. "reactor.Flux.Map".level
- the Level
to enforce for this tracing Flux (only FINEST, FINE,
INFO, WARNING and SEVERE are taken into account)showOperatorLine
- capture the current stack to display operator
class/line number.options
- a vararg SignalType
option to filter log messagesFlux
public final <V> Flux<V> map(Function<? super T,? extends V> mapper)
Flux
by applying a function to each item.
@Deprecated public final Flux<T> mapError(Function<? super Throwable,? extends Throwable> mapper)
onErrorMap(java.util.function.Function<? super java.lang.Throwable, ? extends java.lang.Throwable>)
instead. Will be removed in 3.1.0.Flux
by applying a function.
@Deprecated public final <E extends Throwable> Flux<T> mapError(Class<E> type, Function<? super E,? extends Throwable> mapper)
onErrorMap(java.util.function.Function<? super java.lang.Throwable, ? extends java.lang.Throwable>)
instead. Will be removed in 3.1.0.Flux
by applying a function if the
error matches the given type, otherwise let the error flow.
@Deprecated public final Flux<T> mapError(Predicate<? super Throwable> predicate, Function<? super Throwable,? extends Throwable> mapper)
onErrorMap(java.util.function.Function<? super java.lang.Throwable, ? extends java.lang.Throwable>)
instead. Will be removed in 3.1.0.Flux
by applying a function if the
error matches the given predicate, otherwise let the error flow.
public final Flux<Signal<T>> materialize()
Signal
.
Since the error is materialized as a Signal
, the propagation will be stopped and onComplete will be
emitted. Complete signal will first emit a Signal.complete()
and then effectively complete the flux.
public final Flux<T> onBackpressureBuffer()
Flux
, or park the observed elements if not enough
demand is requested downstream. Errors will be delayed until the buffer gets consumed.
Flux
public final Flux<T> onBackpressureBuffer(int maxSize)
Flux
, or park the observed elements if not enough
demand is requested downstream. Errors will be immediately emitted on overflow
regardless of the pending buffer.
maxSize
- maximum buffer backlog size before immediate errorFlux
public final Flux<T> onBackpressureBuffer(int maxSize, Consumer<? super T> onOverflow)
Flux
, or park the observed elements if not enough
demand is requested downstream. Overflow error will be delayed after the current
backlog is consumed. However the Consumer
will be immediately invoked.
maxSize
- maximum buffer backlog size before overflow callback is calledonOverflow
- callback to invoke on overflowFlux
public final Flux<T> onBackpressureBuffer(int maxSize, BufferOverflowStrategy bufferOverflowStrategy)
Flux
, or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. Over that limit, the overflow strategy is applied (see BufferOverflowStrategy
).
Note that for the ERROR
strategy, the overflow
error will be delayed after the current backlog is consumed.
maxSize
- maximum buffer backlog size before overflow strategy is appliedbufferOverflowStrategy
- strategy to apply to overflowing elementsFlux
public final Flux<T> onBackpressureBuffer(int maxSize, Consumer<? super T> onBufferOverflow, BufferOverflowStrategy bufferOverflowStrategy)
Flux
, or park the observed
elements if not enough demand is requested downstream, within a maxSize
limit. Over that limit, the overflow strategy is applied (see BufferOverflowStrategy
).
A Consumer
is immediately invoked when there is an overflow, receiving the
value that was discarded because of the overflow (which can be different from the
latest element emitted by the source in case of a
DROP_LATEST
strategy).
Note that for the ERROR
strategy, the overflow
error will be delayed after the current backlog is consumed. The consumer is still
invoked immediately.
maxSize
- maximum buffer backlog size before overflow callback is calledonBufferOverflow
- callback to invoke on overflowbufferOverflowStrategy
- strategy to apply to overflowing elementsFlux
public final Flux<T> onBackpressureDrop()
Flux
, or drop the observed elements if not enough
demand is requested downstream.
Flux
public final Flux<T> onBackpressureDrop(Consumer<? super T> onDropped)
Flux
, or drop and notify dropping Consumer
with the observed elements if not enough demand is requested downstream.
onDropped
- the Consumer called when an value gets dropped due to lack of downstream requestsFlux
public final Flux<T> onBackpressureError()
Flux
, or emit onError fom Exceptions.failWithOverflow()
if not enough demand is requested
downstream.
Flux
on backpressurepublic final Flux<T> onBackpressureLatest()
Flux
, or only keep the most recent observed item
if not enough demand is requested downstream.
Flux
that will only keep a reference to the last observed itempublic final Flux<T> onErrorMap(Function<? super Throwable,? extends Throwable> mapper)
Flux
by applying a function.
public final <E extends Throwable> Flux<T> onErrorMap(Class<E> type, Function<? super E,? extends Throwable> mapper)
Flux
by applying a function if the
error matches the given type, otherwise let the error flow.
public final Flux<T> onErrorMap(Predicate<? super Throwable> predicate, Function<? super Throwable,? extends Throwable> mapper)
Flux
by applying a function if the
error matches the given predicate, otherwise let the error flow.
@Deprecated public final Flux<T> onErrorResumeWith(Function<? super Throwable,? extends Publisher<? extends T>> fallback)
onErrorResume(Function)
instead. Will be removed in 3.1.0.
@Deprecated public final <E extends Throwable> Flux<T> onErrorResumeWith(Class<E> type, Function<? super E,? extends Publisher<? extends T>> fallback)
onErrorResume(Class, Function)
instead. Will be removed in 3.1.0.
@Deprecated public final Flux<T> onErrorResumeWith(Predicate<? super Throwable> predicate, Function<? super Throwable,? extends Publisher<? extends T>> fallback)
onErrorResume(Predicate, Function)
instead. Will be removed in 3.1.0.
public final Flux<T> onErrorResume(Function<? super Throwable,? extends Publisher<? extends T>> fallback)
public final <E extends Throwable> Flux<T> onErrorResume(Class<E> type, Function<? super E,? extends Publisher<? extends T>> fallback)
public final Flux<T> onErrorResume(Predicate<? super Throwable> predicate, Function<? super Throwable,? extends Publisher<? extends T>> fallback)
public final Flux<T> onErrorReturn(T fallbackValue)
Flux
fallbackValue
- alternate value on fallbackFlux
public final <E extends Throwable> Flux<T> onErrorReturn(Class<E> type, T fallbackValue)
Flux
E
- the error typetype
- the error type to matchfallbackValue
- alternate value on fallbackFlux
public final <E extends Throwable> Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)
Flux
E
- the error typepredicate
- the error predicate to matchfallbackValue
- alternate value on fallbackFlux
public final Flux<T> onTerminateDetach()
Subscriber
and the Subscription
on
termination or cancellation.
This should help with odd retention scenarios when running
with non-reactor Subscriber
.
Flux
public final ParallelFlux<T> parallel()
Flux
on number of 'rails' matching number of CPU
in round-robin fashion.
ParallelFlux
instancepublic final ParallelFlux<T> parallel(int parallelism)
Flux
on parallelism number of 'rails'
in round-robin fashion.
parallelism
- the number of parallel railsParallelFlux
instancepublic final ParallelFlux<T> parallel(int parallelism, int prefetch)
Flux
on parallelism number of 'rails'
in round-robin fashion and use custom prefetch amount and queue
for dealing with the source Flux
's values.
parallelism
- the number of parallel railsprefetch
- the number of values to prefetch from the sourceParallelFlux
instancepublic final ConnectableFlux<T> publish()
ConnectableFlux
which shares this Flux
sequence and dispatches values to
subscribers in a backpressure-aware manner. Prefetch will default to QueueSupplier.SMALL_BUFFER_SIZE
.
This will effectively turn any type of sequence into a hot sequence.
Backpressure will be coordinated on Subscription.request(long)
and if any Subscriber
is missing
demand (requested = 0), multicast will pause pushing/pulling.
ConnectableFlux
public final ConnectableFlux<T> publish(int prefetch)
ConnectableFlux
which shares this Flux
sequence and dispatches values to
subscribers in a backpressure-aware manner. This will effectively turn any type of sequence into a hot sequence.
Backpressure will be coordinated on Subscription.request(long)
and if any Subscriber
is missing
demand (requested = 0), multicast will pause pushing/pulling.
prefetch
- bounded requested demandConnectableFlux
public final <R> Flux<R> publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform)
R
- the output value typetransform
- the transformation functionFlux
public final <R> Flux<R> publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform, int prefetch)
R
- the output value typetransform
- the transformation functionprefetch
- the request sizeFlux
public final Mono<T> publishNext()
Mono
which shares this Flux
sequence and dispatches the first observed item to
subscribers in a backpressure-aware manner.
This will effectively turn any type of sequence into a hot sequence when the first Subscriber
subscribes.
Mono
public final Flux<T> publishOn(Scheduler scheduler)
Scheduler
Scheduler.Worker
.
Typically used for fast publisher, slow consumer(s) scenarios.
flux.publishOn(Schedulers.single()).subscribe()
scheduler
- a checked Scheduler.Worker
factoryFlux
producing asynchronouslypublic final Flux<T> publishOn(Scheduler scheduler, int prefetch)
Scheduler
Scheduler.Worker
.
Typically used for fast publisher, slow consumer(s) scenarios.
flux.publishOn(Schedulers.single()).subscribe()
scheduler
- a checked Scheduler.Worker
factoryprefetch
- the asynchronous boundary capacityFlux
producing asynchronouslypublic final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch)
Scheduler
Scheduler.Worker
.
Typically used for fast publisher, slow consumer(s) scenarios.
flux.publishOn(Schedulers.single()).subscribe()
scheduler
- a checked Scheduler.Worker
factorydelayError
- should the buffer be consumed before forwarding any errorprefetch
- the asynchronous boundary capacityFlux
producing asynchronouslypublic final Mono<T> reduce(BiFunction<T,T,T> aggregator)
Flux
sequence into an object of the same type than the
emitted items. The left/right BiFunction
arguments are the N-1 and N item, ignoring sequence
with 0 or 1 element only.
aggregator
- the aggregating BiFunction
Flux
public final <A> Mono<A> reduce(A initial, BiFunction<A,? super T,A> accumulator)
Flux
sequence into an object matching an initial value type.
The arguments are the N-1 or initial value and N current item .
A
- the type of the initial and reduced objectaccumulator
- the reducing BiFunction
initial
- the initial left argument to pass to the reducing BiFunction
Flux
public final <A> Mono<A> reduceWith(Supplier<A> initial, BiFunction<A,? super T,A> accumulator)
Flux
sequence into an object matching an initial value type.
The arguments are the N-1 or initial value and N current item .
A
- the type of the initial and reduced objectaccumulator
- the reducing BiFunction
initial
- the initial left argument supplied on subscription to the reducing BiFunction
Flux
public final Flux<T> repeat()
Flux
on onCompletepublic final Flux<T> repeat(BooleanSupplier predicate)
predicate
- the boolean to evaluate on onComplete.Flux
on onCompletepublic final Flux<T> repeat(long numRepeat)
numRepeat
- the number of times to re-subscribe on onCompleteFlux
on onComplete up to number of repeat specifiedpublic final Flux<T> repeat(long numRepeat, BooleanSupplier predicate)
numRepeat
- the number of times to re-subscribe on completepredicate
- the boolean to evaluate on onCompleteFlux
on onComplete up to number of repeat specified OR matching
predicatepublic final Flux<T> repeatWhen(Function<Flux<Long>,? extends Publisher<?>> whenFactory)
Flux
when a companion sequence signals a number of emitted elements in
response to the flux completion signal.
If the companion sequence signals when this Flux
is active, the repeat
attempt is suppressed and any terminal signal will terminate this Flux
with the same signal immediately.
public final ConnectableFlux<T> replay()
Flux
into a hot source and cache last emitted signals for further Subscriber
. Will
retain an unbounded amount of onNext signals. Completion and Error will also be
replayed.
ConnectableFlux
public final ConnectableFlux<T> replay(int history)
Flux
into a connectable hot source and cache last emitted
signals for further Subscriber
.
Will retain up to the given history size onNext signals. Completion and Error will also be
replayed.
history
- number of events retained in history excluding complete and
errorConnectableFlux
public final ConnectableFlux<T> replay(Duration ttl)
Flux
into a connectable hot source and cache last emitted signals
for further Subscriber
. Will retain each onNext up to the given per-item
expiry timeout. Completion and Error will also be replayed.
ttl
- Per-item timeout durationConnectableFlux
public final ConnectableFlux<T> replay(int history, Duration ttl)
Flux
into a connectable hot source and cache last emitted signals
for further Subscriber
. Will retain up to the given history size onNext
signals with a per-item ttl. Completion and Error will also be replayed.
history
- number of events retained in history excluding complete and errorttl
- Per-item timeout Duration
ConnectableFlux
public final ConnectableFlux<T> replay(Duration ttl, Scheduler timer)
Flux
into a connectable hot source and cache last emitted signals
for further Subscriber
. Will retain onNext signal for up to the given
Duration
with a per-item ttl. Completion and Error will also be replayed.
ttl
- Per-item timeout Duration
timer
- a time-capable Scheduler
instance to read current time fromConnectableFlux
public final ConnectableFlux<T> replay(int history, Duration ttl, Scheduler timer)
Flux
into a connectable hot source and cache last emitted signals
for further Subscriber
. Will retain up to the given history size onNext
signals with a per-item ttl. Completion and Error will also be replayed.
history
- number of events retained in history excluding complete and errorttl
- Per-item timeout Duration
timer
- a Scheduler
instance to read current time fromConnectableFlux
@Deprecated public final ConnectableFlux<T> replayMillis(long ttl, TimedScheduler timer)
Duration
based variants instead, will be removed in 3.1.0Flux
into a connectable hot source and cache last emitted signals
for further Subscriber
. Will retain up to the given history size onNext
signals. Completion and Error will also be replayed.
ttl
- Per-item timeout duration in millisecondstimer
- Scheduler
to read current time fromConnectableFlux
@Deprecated public final ConnectableFlux<T> replayMillis(int history, long ttl, TimedScheduler timer)
Duration
based variants instead, will be removed in 3.1.0Flux
into a connectable hot source and cache last emitted signals
for further Subscriber
. Will retain up to the given history size onNext
signals. Completion and Error will also be replayed.
history
- number of events retained in history excluding complete and errorttl
- Per-item timeout duration in millisecondstimer
- Scheduler
to read current time fromConnectableFlux
public final Flux<T> retry()
Flux
sequence if it signals any error
either indefinitely.
The times == Long.MAX_VALUE is treated as infinite retry.
Flux
on onErrorpublic final Flux<T> retry(long numRetries)
Flux
sequence if it signals any error
either indefinitely or a fixed number of times.
The times == Long.MAX_VALUE is treated as infinite retry.
numRetries
- the number of times to tolerate an errorFlux
on onError up to the specified number of retries.public final Flux<T> retry(Predicate<Throwable> retryMatcher)
Flux
sequence if it signals any error
and the given Predicate
matches otherwise push the error downstream.
retryMatcher
- the predicate to evaluate if retry should occur based on a given error signalFlux
on onError if the predicates matches.public final Flux<T> retry(long numRetries, Predicate<Throwable> retryMatcher)
Flux
sequence up to the specified number of retries if it signals any
error and the given Predicate
matches otherwise push the error downstream.
numRetries
- the number of times to tolerate an errorretryMatcher
- the predicate to evaluate if retry should occur based on a given error signalFlux
on onError up to the specified number of retries and if the predicate
matches.public final Flux<T> retryWhen(Function<Flux<Throwable>,? extends Publisher<?>> whenFactory)
public final Flux<T> sample(Duration timespan)
timespan
- the duration to emit the latest observed itemFlux
by last item over a period of timepublic final <U> Flux<T> sample(Publisher<U> sampler)
Flux
and emit its latest value whenever the sampler Publisher
signals a value.
Termination of either Publisher
will result in termination for the Subscriber
as well.
Both Publisher
will run in unbounded mode because the backpressure
would interfere with the sampling precision.
@Deprecated public final Flux<T> sampleMillis(long timespan)
Duration
based variants instead, will be removed in 3.1.0
timespan
- the period in second to emit the latest observed itemFlux
by last item over a period of timepublic final Flux<T> sampleFirst(Duration timespan)
Flux
then use the duration provided to skip other values.
timespan
- the duration to exclude others values from this sequenceFlux
by first item over a period of timepublic final <U> Flux<T> sampleFirst(Function<? super T,? extends Publisher<U>> samplerFactory)
@Deprecated public final Flux<T> sampleFirstMillis(long timespan)
Duration
based variants instead, will be removed in 3.1.0Flux
then use the duration provided to skip other values.
timespan
- the period in milliseconds to exclude others values from this sequenceFlux
by first item over a period of timepublic final <U> Flux<T> sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory)
Flux
only if there were no new values emitted
during the time window provided by a publisher for that particular last value.
U
- the companion reified typethrottlerFactory
- select a Publisher
companion to signal onNext or onComplete to stop checking
others values from this sequence and emit the selecting itemFlux
by last single item observed before a companion Publisher
emitspublic final <U> Flux<T> sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory, int maxConcurrency)
Flux
only if there were no newer values emitted
during the time window provided by a publisher for that particular last value.
The provided maxConcurrency will keep a bounded maximum of concurrent timeouts and drop any new items until at least one timeout terminates.
U
- the throttling typethrottlerFactory
- select a Publisher
companion to signal onNext or onComplete to stop checking
others values from this sequence and emit the selecting itemmaxConcurrency
- the maximum number of concurrent timeoutsFlux
by last single item observed before a companion Publisher
emitspublic final Flux<T> scan(BiFunction<T,T,T> accumulator)
Flux
values with an accumulator BiFunction
and
returns the intermediate results of this function.
Unlike scan(Object, BiFunction)
, this operator doesn't take an initial value
but treats the first Flux
value as initial value.
The accumulation works as follows:
result[0] = accumulator(source[0], source[1])
result[1] = accumulator(result[0], source[2])
result[2] = accumulator(result[1], source[3])
...
accumulator
- the accumulating BiFunction
Flux
public final <A> Flux<A> scan(A initial, BiFunction<A,? super T,A> accumulator)
Flux
values with the help of an accumulator BiFunction
and emits the intermediate results.
The accumulation works as follows:
result[0] = initialValue;
result[1] = accumulator(result[0], source[0])
result[2] = accumulator(result[1], source[1])
result[3] = accumulator(result[2], source[2])
...
A
- the accumulated typeinitial
- the initial argument to pass to the reduce functionaccumulator
- the accumulating BiFunction
Flux
starting with initial statepublic final <A> Flux<A> scanWith(Supplier<A> initial, BiFunction<A,? super T,A> accumulator)
Flux
values with the help of an accumulator BiFunction
and emits the intermediate results.
The accumulation works as follows:
result[0] = initialValue;
result[1] = accumulator(result[0], source[0])
result[2] = accumulator(result[1], source[1])
result[3] = accumulator(result[2], source[2])
...
A
- the accumulated typeinitial
- the initial supplier to init the first value to pass to the reduce
functionaccumulator
- the accumulating BiFunction
Flux
starting with initial statepublic final Flux<T> share()
Flux
that multicasts (shares) the original Flux
.
As long as
there is at least one Subscriber
this Flux
will be subscribed and
emitting data.
When all subscribers have cancelled it will cancel the source
Flux
.
This is an alias for publish()
.ConnectableFlux.refCount()
.
public final Mono<T> single()
Flux
source or signal
NoSuchElementException
(or a default generated value) for empty source,
IndexOutOfBoundsException
for a multi-item source.
Mono
with the eventual single item or an error signalpublic final Mono<T> single(T defaultValue)
Flux
source or signal
NoSuchElementException
(or a default value) for empty source,
IndexOutOfBoundsException
for a multi-item source.
public final Mono<T> singleOrEmpty()
Flux
source or
IndexOutOfBoundsException
for a multi-item source.
Mono
with the eventual single item or no itempublic final Flux<T> skip(long skipped)
Flux
.
skipped
- the number of times to dropFlux
until the specified skipped number of elementspublic final Flux<T> skip(Duration timespan)
Flux
for the given time period.
timespan
- the time window to exclude next signalsFlux
until the end of the given timespanpublic final Flux<T> skip(Duration timespan, Scheduler timer)
Flux
for the given time period.
public final Flux<T> skipLast(int n)
Flux
.
n
- the number of elements to ignore before completionFlux
for the specified skipped number of elements before termination@Deprecated public final Flux<T> skipMillis(long timespan)
Duration
based variants instead, will be removed in 3.1.0Flux
for the given time period.
timespan
- the time window to exclude next signalsFlux
until the end of the given timespan@Deprecated public final Flux<T> skipMillis(long timespan, TimedScheduler timer)
Duration
based variants instead, will be removed in 3.1.0Flux
for the given time period.
public final Flux<T> sort()
Flux
that sorts the events emitted by source Flux
.
Each item emitted by the Flux
must implement Comparable
with
respect to all
other items in the sequence.
Note that calling sort
with long, non-terminating or infinite sources
might cause OutOfMemoryError
. Use sequence splitting like
window()
to sort batches in that case.
Flux
ClassCastException
- if any item emitted by the Flux
does not implement
Comparable
with respect to
all other items emitted by the Flux
public final Flux<T> sort(Comparator<? super T> sortFunction)
Flux
that sorts the events emitted by source Flux
given the Comparator
function.
Note that calling sorted
with long, non-terminating or infinite sources
might cause OutOfMemoryError
@SafeVarargs public final Flux<T> startWith(T... values)
Flux
sequence.
values
- the array of values to start withFlux
with given valuespublic final Disposable subscribe()
Disposable
task to execute to dispose and cancel the underlying Subscription
@Deprecated public final Disposable subscribe(int prefetch)
limitRate(int)
prefetch
demand
For a version that gives you more control over backpressure and the request, see
Publisher.subscribe(Subscriber)
with a BaseSubscriber
.
prefetch
- an arbitrary valueDisposable
task to execute to dispose and cancel the underlying Subscription
public final Disposable subscribe(Consumer<? super T> consumer)
Consumer
to this Flux
that will consume all the
sequence. It will request an unbounded demand.
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer)
.
For a version that gives you more control over backpressure and the request, see
Publisher.subscribe(Subscriber)
with a BaseSubscriber
.
consumer
- the consumer to invoke on each valueDisposable
to dispose the Subscription
@Deprecated public final Disposable subscribe(Consumer<? super T> consumer, int prefetch)
Consumer
to this Flux
that will consume all the
sequence.
If prefetch is != Long.MAX_VALUE
, the Subscriber
will use it as
a prefetch strategy: first request N, then when 25% of N is left to be received on
onNext, request N x 0.75.
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer)
.
For a version that gives you more control over backpressure and the request, see
Publisher.subscribe(Subscriber)
with a BaseSubscriber
.
consumer
- the consumer to invoke on each valueprefetch
- the the prefetch amount, positiveDisposable
to dispose the Subscription
public final Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer)
Consumer
to this Flux
that will consume all the
sequence. It will request unbounded demand Long.MAX_VALUE
.
For a passive version that observe and forward incoming data see
doOnNext(java.util.function.Consumer)
and doOnError(java.util.function.Consumer)
.
For a version that gives you more control over backpressure and the request, see
Publisher.subscribe(Subscriber)
with a BaseSubscriber
.
consumer
- the consumer to invoke on each next signalerrorConsumer
- the consumer to invoke on error signalDisposable
to dispose the Subscription
public final Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer)
Consumer
to this Flux
that will consume all the
sequence. It will request unbounded demand Long.MAX_VALUE
.
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer)
,
doOnError(java.util.function.Consumer)
and doOnComplete(Runnable)
.
For a version that gives you more control over backpressure and the request, see
Publisher.subscribe(Subscriber)
with a BaseSubscriber
.
consumer
- the consumer to invoke on each valueerrorConsumer
- the consumer to invoke on error signalcompleteConsumer
- the consumer to invoke on complete signalDisposable
to dispose the Subscription
public final Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer)
Consumer
to this Flux
that will consume all the
sequence. It will let the provided subscriptionConsumer
request the adequate amount of data, or request unbounded demand
Long.MAX_VALUE
if no such consumer is provided.
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer)
,
doOnError(java.util.function.Consumer)
, doOnComplete(Runnable)
and doOnSubscribe(Consumer)
.
For a version that gives you more control over backpressure and the request, see
Publisher.subscribe(Subscriber)
with a BaseSubscriber
.
consumer
- the consumer to invoke on each valueerrorConsumer
- the consumer to invoke on error signalcompleteConsumer
- the consumer to invoke on complete signalsubscriptionConsumer
- the consumer to invoke on subscribe signal, to be used
for the initial request
, or null for max requestDisposable
to dispose the Subscription
@Deprecated public final Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, int prefetch)
limitRate(prefetch).subscribe(...)
to
subscribe(..., prefetch)
Consumer
to this Flux
that will consume all the sequence.
If prefetch is != Long.MAX_VALUE
, the Subscriber
will use it as
a prefetch strategy: first request N, then when 25% of N is left to be received on
onNext, request N x 0.75.
For a passive version that observe and forward
incoming data see doOnNext(java.util.function.Consumer)
, doOnError(java.util.function.Consumer)
, doOnComplete(Runnable)
and
doOnSubscribe(Consumer)
.
For a version that gives you more control over backpressure and the request, see
Publisher.subscribe(Subscriber)
with a BaseSubscriber
.
consumer
- the consumer to invoke on each valueerrorConsumer
- the consumer to invoke on error signalcompleteConsumer
- the consumer to invoke on complete signalprefetch
- the demand to produce to this Flux
Disposable
to dispose the Subscription
limitRate(int)
,
subscribe(Consumer, Consumer, Runnable)
@Deprecated public final Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer, int prefetch)
limitRate(prefetch).subscribe(...)
to
subscribe(..., prefetch)
Consumer
to this Flux
that will consume all the sequence.
If prefetch is != Long.MAX_VALUE
, the Subscriber
will use it as
a prefetch strategy: first request N, then when 25% of N is left to be received on
onNext, request N x 0.75.
For a passive version that observe and forward
incoming data see doOnNext(java.util.function.Consumer)
, doOnError(java.util.function.Consumer)
and doOnComplete(Runnable)
.
For a version that gives you more control over backpressure and the request, see
Publisher.subscribe(Subscriber)
with a BaseSubscriber
.
consumer
- the consumer to invoke on each valueerrorConsumer
- the consumer to invoke on error signalcompleteConsumer
- the consumer to invoke on complete signalsubscriptionConsumer
- the consumer to invoke on subscribe signal, to be used
for the initial request
, or null for max requestprefetch
- the demand to produce to this Flux
(deprecated)Disposable
to dispose the Subscription
limitRate(int)
,
subscribe(Consumer, Consumer, Runnable, Consumer)
public final Flux<T> subscribeOn(Scheduler scheduler)
Scheduler
Typically used for slow publisher e.g., blocking IO, fast consumer(s) scenarios.
flux.subscribeOn(Schedulers.single()).subscribe()
scheduler
- a checked Scheduler.Worker
factoryFlux
requesting asynchronouslypublic final <E extends Subscriber<? super T>> E subscribeWith(E subscriber)
Publisher.subscribe(Subscriber)
alternative to inline composition type conversion to a hot
emitter (e.g. FluxProcessor
or MonoProcessor
).
flux.subscribeWith(WorkQueueProcessor.create()).subscribe()
If you need more control over backpressure and the request, use a BaseSubscriber
.
E
- the reified type from the input/output subscribersubscriber
- the Subscriber
to subscribe and returnSubscriber
public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate)
alternate
- the alternate publisher if this sequence is emptyFlux
on source onComplete without elementspublic final <V> Flux<V> switchMap(Function<? super T,Publisher<? extends V>> fn)
V
- the type of the return value of the transformation functionfn
- the transformation functionFlux
on source onNextpublic final <V> Flux<V> switchMap(Function<? super T,Publisher<? extends V>> fn, int prefetch)
V
- the type of the return value of the transformation functionfn
- the transformation functionprefetch
- the produced demand for inner sourcesFlux
on source onNext@Deprecated public final <E extends Throwable> Flux<T> switchOnError(Class<E> type, Publisher<? extends T> fallback)
onErrorResume(java.util.function.Function<? super java.lang.Throwable, ? extends org.reactivestreams.Publisher<? extends T>>)
with a t -> fallback
lambda
instead. Will be removed in 3.1.0.@Deprecated public final Flux<T> switchOnError(Predicate<? super Throwable> predicate, Publisher<? extends T> fallback)
onErrorResume(java.util.function.Function<? super java.lang.Throwable, ? extends org.reactivestreams.Publisher<? extends T>>)
with a t -> fallback
lambda
instead. Will be removed in 3.1.0.@Deprecated public final Flux<T> switchOnError(Publisher<? extends T> fallback)
onErrorResume(java.util.function.Function<? super java.lang.Throwable, ? extends org.reactivestreams.Publisher<? extends T>>)
with a t -> fallback
lambda
instead. Will be removed in 3.1.0.public final Flux<T> take(long n)
Flux
.
If N is zero, the Subscriber
gets completed if this Flux
completes, signals an error or
signals its first value (which is not not relayed though).
public final Flux<T> take(Duration timespan)
Flux
until the given time period elapses.
If the time period is zero, the Subscriber
gets completed if this Flux
completes, signals an
error or
signals its first value (which is not not relayed though).
public final Flux<T> take(Duration timespan, Scheduler timer)
Flux
until the given time period elapses.
If the time period is zero, the Subscriber
gets completed if this Flux
completes, signals an
error or signals its first value (which is not not relayed though).
public final Flux<T> takeLast(int n)
Flux
emitted before its completion.
@Deprecated public final Flux<T> takeMillis(long timespan)
Duration
based variants instead, will be removed in 3.1.0Flux
until the given time period elapses.
If the time period is zero, the Subscriber
gets completed if this Flux
completes, signals an
error or
signals its first value (which is not not relayed though).
@Deprecated public final Flux<T> takeMillis(long timespan, TimedScheduler timer)
Duration
based variants instead, will be removed in 3.1.0Flux
until the given time period elapses.
If the time period is zero, the Subscriber
gets completed if this Flux
completes, signals an
error or
signals its first value (which is not not relayed though).
public final Flux<T> takeUntil(Predicate<? super T> predicate)
Flux
until the given Predicate
matches.
Unlike takeWhile(java.util.function.Predicate<? super T>)
, this will include the matched data.
public final Flux<T> takeWhile(Predicate<? super T> continuePredicate)
takeUntil(java.util.function.Predicate<? super T>)
, this will exclude the matched data.
public final Mono<Void> then()
Mono<Void>
that completes when this Flux
completes.
This will actively ignore the sequence and only replay completion or error signals.
Mono
@Deprecated public final Mono<Void> then(Publisher<Void> other)
thenEmpty(Publisher)
instead, this alias will be
removed in 3.1.0Mono<Void>
that waits for this Flux
to complete then
for a supplied Publisher<Void>
to also complete. The
second completion signal is replayed, or any error signal that occurs instead.
@Deprecated public final Mono<Void> then(Supplier<? extends Publisher<Void>> afterSupplier)
thenEmpty(Publisher)
with
defer(java.util.function.Supplier<? extends org.reactivestreams.Publisher<T>>)
. The competing overload was causing confusion and the generic was
not symmetric with Mono.then(Mono)
.public final Mono<Void> thenEmpty(Publisher<Void> other)
Mono<Void>
that waits for this Flux
to complete then
for a supplied Publisher<Void>
to also complete. The
second completion signal is replayed, or any error signal that occurs instead.
@Deprecated public final <V> Flux<V> thenMany(Supplier<? extends Publisher<V>> afterSupplier)
thenMany(Publisher)
with
defer(java.util.function.Supplier<? extends org.reactivestreams.Publisher<T>>)
. The competing overload was called unnecessary by extended
feedback and aligns with removing of Supplier of Publisher aliases elsewhere.public final Flux<T> timeout(Duration timeout)
TimeoutException
in case a per-item period fires before the
next item arrives from this Flux
.
public final Flux<T> timeout(Duration timeout, Publisher<? extends T> fallback)
Publisher
in case a per-item period
fires before the next item arrives from this Flux
.
If the given Publisher
is null, signal a TimeoutException
.
public final Flux<T> timeout(Duration timeout, Scheduler timer)
TimeoutException
error in case a per-item
period fires before the next item arrives from this Flux
.
public final Flux<T> timeout(Duration timeout, Publisher<? extends T> fallback, Scheduler timer)
Publisher
in case a per-item period fires before the
next item arrives from this Flux
.
If the given Publisher
is null, signal a TimeoutException
.
public final <U> Flux<T> timeout(Publisher<U> firstTimeout)
TimeoutException
in case a first item from this Flux
has
not been emitted before the given Publisher
emits.
public final <U,V> Flux<T> timeout(Publisher<U> firstTimeout, Function<? super T,? extends Publisher<V>> nextTimeoutFactory)
TimeoutException
in case a first item from this Flux
has
not been emitted before the given Publisher
emits. The following items will be individually timed via
the factory provided Publisher
.
U
- the type of the elements of the first timeout PublisherV
- the type of the elements of the subsequent timeout PublishersfirstTimeout
- the timeout Publisher
that must not emit before the first signal from this Flux
nextTimeoutFactory
- the timeout Publisher
factory for each next itemFlux
public final <U,V> Flux<T> timeout(Publisher<U> firstTimeout, Function<? super T,? extends Publisher<V>> nextTimeoutFactory, Publisher<? extends T> fallback)
Publisher
in case a first item from this Flux
has
not been emitted before the given Publisher
emits. The following items will be individually timed via
the factory provided Publisher
.
U
- the type of the elements of the first timeout PublisherV
- the type of the elements of the subsequent timeout PublishersfirstTimeout
- the timeout Publisher
that must not emit before the first signal from this Flux
nextTimeoutFactory
- the timeout Publisher
factory for each next itemfallback
- the fallback Publisher
to subscribe when a timeout occursFlux
with a fallback Publisher
@Deprecated public final Flux<T> timeoutMillis(long timeout)
Duration
based variants instead, will be removed in 3.1.0TimeoutException
error in case a per-item period in milliseconds fires
before the next item arrives from this Flux
.
@Deprecated public final Flux<T> timeoutMillis(long timeout, TimedScheduler timer)
Duration
based variants instead, will be removed in 3.1.0TimeoutException
error in case a per-item period in milliseconds fires
before the next item arrives from this Flux
.
@Deprecated public final Flux<T> timeoutMillis(long timeout, Publisher<? extends T> fallback)
Duration
based variants instead, will be removed in 3.1.0Publisher
in case a per-item period
fires before the next item arrives from this Flux
.
If the given Publisher
is null, signal a TimeoutException
.
@Deprecated public final Flux<T> timeoutMillis(long timeout, Publisher<? extends T> fallback, TimedScheduler timer)
Duration
based variants instead, will be removed in 3.1.0Publisher
in case a per-item period
fires before the next item arrives from this Flux
.
If the given Publisher
is null, signal a TimeoutException
.
public final Flux<Tuple2<Long,T>> timestamp()
Tuple2
pair of T1 Long
current system time in
millis and T2 T
associated data for each item from this Flux
Flux
@Deprecated public final Flux<Tuple2<Long,T>> timestamp(TimedScheduler scheduler)
timestamp(Scheduler)
public Stream<T> toStream()
Stream
of unknown size with onClose attached to Subscription.cancel()
public Stream<T> toStream(int batchSize)
batchSize
- the bounded capacity to produce to this Flux
or Integer.MAX_VALUE
for unbounded
Stream
of unknown size with onClose attached to Subscription.cancel()
public final <V> Flux<V> transform(Function<? super Flux<T>,? extends Publisher<V>> transformer)
Flux
in order to generate a target Flux
. Unlike compose(Function)
, the
provided function is executed as part of assembly.
Function<Flux, Flux> applySchedulers = flux -> flux.subscribeOn(Schedulers.elastic()).publishOn(Schedulers.parallel());
flux.transform(applySchedulers).map(v -> v * v).subscribe()
V
- the item type in the returned Flux
transformer
- the Function
to immediately map this Flux
into a target Flux
instance.Flux
for deferred composition of {@link Flux} for each {@link Subscriber}
,
for a loose conversion to an arbitrary type
@Deprecated public final Flux<Flux<T>> window()
window(Publisher)
,@Deprecated public final <U,V> Flux<Flux<T>> window(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector)
windowWhen(Publisher, Function)
instead.Flux
sequence into potentially overlapping windows controlled by items of a
start Publisher
and end Publisher
derived from the start values.
When Open signal is strictly not overlapping Close signal : dropping windows
When Open signal is strictly more frequent than Close signal : overlapping windows
When Open signal is exactly coordinated with Close signal : exact windows
U
- the type of the sequence opening windowsV
- the type of the sequence closing windows opened by the bucketOpening Publisher's elementsbucketOpening
- a Publisher
to emit any item for a split signal and complete to terminatecloseSelector
- a Function
given an opening signal and returning a Publisher
that
emits to complete the windowFlux
delimiting its sub-sequences by a given Publisher
and lasting until
a selected Publisher
emitspublic final Flux<Flux<T>> window(Duration timespan)
Flux
sequence into continuous, non-overlapping windows delimited by a given period.
public final Flux<Flux<T>> window(Duration timespan, Duration timeshift)
Flux
sequence into multiple Flux
delimited by the given timeshift
period, starting from the first item.
Each Flux
bucket will onComplete after timespan
period has elpased.
When timeshift > timespan : dropping windows
When timeshift < timespan : overlapping windows
When timeshift == timespan : exact windows
public final Flux<Flux<T>> window(Duration timespan, Scheduler timer)
Flux
sequence into continuous, non-overlapping windows delimited by a given period.
public final Flux<Flux<T>> window(Duration timespan, Duration timeshift, Scheduler timer)
Flux
sequence into multiple Flux
delimited by the given timeshift
period, starting from the first item.
Each Flux
bucket will onComplete after timespan
period has elapsed.
When timeshift > timespan : dropping windows
When timeshift < timespan : overlapping windows
When timeshift == timespan : exact windows
@Deprecated public final Flux<Flux<T>> window(int maxSize, Duration timespan)
windowTimeout(int, Duration)
instead, will be removed in 3.1.0public final Flux<Flux<T>> windowTimeout(int maxSize, Duration timespan, Scheduler timer)
@Deprecated public final Flux<Flux<T>> windowMillis(long timespan)
Duration
based variants instead, will be removed in 3.1.0Flux
sequence into continuous, non-overlapping windows delimited by a given period.
@Deprecated public final Flux<Flux<T>> windowMillis(long timespan, TimedScheduler timer)
Duration
based variants instead, will be removed in 3.1.0Flux
sequence into continuous, non-overlapping windows delimited by a given period.
@Deprecated public final Flux<Flux<T>> windowMillis(long timespan, long timeshift, TimedScheduler timer)
Duration
based variants instead, will be removed in 3.1.0Flux
sequence into multiple Flux
delimited by the given timeshift
period, starting from the first item.
Each Flux
bucket will onComplete after timespan
period has elapsed.
When timeshift > timespan : dropping windows
When timeshift < timespan : overlapping windows
When timeshift == timespan : exact windows
timespan
- the maximum Flux
window duration in millisecondstimeshift
- the period of time in milliseconds to create new Flux
windowstimer
- a time-capable Scheduler
instance to run onFlux
of Flux
buckets delimited by an opening Publisher
and a selected closing Publisher
@Deprecated public final Flux<Flux<T>> windowMillis(int maxSize, long timespan)
windowTimeout(int, Duration)
instead, will be removed in 3.1.0@Deprecated public final Flux<Flux<T>> windowMillis(int maxSize, long timespan, TimedScheduler timer)
windowTimeout(int, Duration, Scheduler)
instead, will be removed in 3.1.0@Deprecated public final Flux<Flux<T>> windowTimeoutMillis(int maxSize, long timespan)
Duration
based variants instead, will be removed in 3.1.0@Deprecated public final Flux<Flux<T>> windowTimeoutMillis(int maxSize, long timespan, TimedScheduler timer)
Duration
based variants instead, will be removed in 3.1.0public final Flux<GroupedFlux<T,T>> windowUntil(Predicate<T> boundaryTrigger)
Flux
sequence into multiple Flux
delimited by the given
predicate. A new window is opened each time the predicate returns true, at which
point the previous window will receive the triggering element then onComplete.
boundaryTrigger
- a predicate that triggers the next window when it becomes true.Flux
of GroupedFlux
windows, bounded depending
on the predicate and keyed with the value that triggered the new window.public final Flux<GroupedFlux<T,T>> windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore)
Flux
sequence into multiple Flux
delimited by the given
predicate. A new window is opened each time the predicate returns true.
If cutBefore
is true, the old window will onComplete and the triggering
element will be emitted in the new window. Note it can mean that an empty window is
sometimes emitted, eg. if the first element in the sequence immediately matches the
predicate.
Otherwise, the triggering element will be emitted in the old window before it does
onComplete, similar to windowUntil(Predicate)
.
boundaryTrigger
- a predicate that triggers the next window when it becomes true.cutBefore
- set to true to include the triggering element in the new window rather than the old.Flux
of GroupedFlux
windows, bounded depending
on the predicate and keyed with the value that triggered the new window.public final Flux<GroupedFlux<T,T>> windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore, int prefetch)
Flux
sequence into multiple Flux
delimited by the given
predicate and using a prefetch. A new window is opened each time the predicate
returns true.
If cutBefore
is true, the old window will onComplete and the triggering
element will be emitted in the new window. Note it can mean that an empty window is
sometimes emitted, eg. if the first element in the sequence immediately matches the
predicate.
Otherwise, the triggering element will be emitted in the old window before it does
onComplete, similar to windowUntil(Predicate)
.
boundaryTrigger
- a predicate that triggers the next window when it becomes true.cutBefore
- set to true to include the triggering element in the new window rather than the old.prefetch
- the request size to use for this Flux
.Flux
of GroupedFlux
windows, bounded depending
on the predicate and keyed with the value that triggered the new window.public final Flux<GroupedFlux<T,T>> windowWhile(Predicate<T> inclusionPredicate)
Flux
sequence into multiple Flux
windows that stay open
while a given predicate matches the source elements. Once the predicate returns
false, the window closes with an onComplete and the triggering element is discarded.
Note that for a sequence starting with a separator, or having several subsequent separators anywhere in the sequence, each occurrence will lead to an empty window.
inclusionPredicate
- a predicate that triggers the next window when it becomes false.Flux
of GroupedFlux
windows, each containing
subsequent elements that all passed a predicate, and keyed with a separator element.public final Flux<GroupedFlux<T,T>> windowWhile(Predicate<T> inclusionPredicate, int prefetch)
Flux
sequence into multiple Flux
windows that stay open
while a given predicate matches the source elements. Once the predicate returns
false, the window closes with an onComplete and the triggering element is discarded.
Note that for a sequence starting with a separator, or having several subsequent separators anywhere in the sequence, each occurrence will lead to an empty window.
inclusionPredicate
- a predicate that triggers the next window when it becomes false.prefetch
- the request size to use for this Flux
.Flux
of GroupedFlux
windows, each containing
subsequent elements that all passed a predicate, and keyed with a separator element.public final <U,V> Flux<Flux<T>> windowWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector)
Flux
sequence into potentially overlapping windows controlled by items of a
start Publisher
and end Publisher
derived from the start values.
When Open signal is strictly not overlapping Close signal : dropping windows
When Open signal is strictly more frequent than Close signal : overlapping windows
When Open signal is exactly coordinated with Close signal : exact windows
U
- the type of the sequence opening windowsV
- the type of the sequence closing windows opened by the bucketOpening Publisher's elementsbucketOpening
- a Publisher
to emit any item for a split signal and complete to terminatecloseSelector
- a Function
given an opening signal and returning a Publisher
that
emits to complete the windowFlux
delimiting its sub-sequences by a given Publisher
and lasting until
a selected Publisher
emitspublic final <U,R> Flux<R> withLatestFrom(Publisher<? extends U> other, BiFunction<? super T,? super U,? extends R> resultSelector)
Flux
with values from another
Publisher
through a BiFunction
and emits the result.
The operator will drop values from this Flux
until the other
Publisher
produces any value.
If the other Publisher
completes without any value, the sequence is completed.
U
- the other Publisher
sequence typeR
- the result typeother
- the Publisher
to combine withresultSelector
- the bi-function called with each pair of source and other elements that should return a single value to be emittedFlux
gated by another Publisher
public final <T2> Flux<Tuple2<T,T2>> zipWith(Publisher<? extends T2> source2)
public final <T2,V> Flux<V> zipWith(Publisher<? extends T2> source2, BiFunction<? super T,? super T2,? extends V> combinator)
T2
- type of the value from source2V
- The produced output after transformation by the combinatorsource2
- The second upstream Publisher
to subscribe to.combinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
public final <T2,V> Flux<V> zipWith(Publisher<? extends T2> source2, int prefetch, BiFunction<? super T,? super T2,? extends V> combinator)
T2
- type of the value from source2V
- The produced output after transformation by the combinatorsource2
- The second upstream Publisher
to subscribe to.prefetch
- the request size to use for this Flux
and the other Publisher
combinator
- The aggregate function that will receive a unique value from each upstream and return the value
to signal downstreamFlux
public final <T2> Flux<Tuple2<T,T2>> zipWith(Publisher<? extends T2> source2, int prefetch)
public final <T2> Flux<Tuple2<T,T2>> zipWithIterable(Iterable<? extends T2> iterable)
public final <T2,V> Flux<V> zipWithIterable(Iterable<? extends T2> iterable, BiFunction<? super T,? super T2,? extends V> zipper)
T2
- the value type of the other iterable sequenceV
- the result typeiterable
- the Iterable
to pair withzipper
- the BiFunction
combinatorFlux
protected static <T> Flux<T> onAssembly(Flux<T> source)
T
- the value typesource
- the source to wrapprotected static <T> ConnectableFlux<T> onAssembly(ConnectableFlux<T> source)
T
- the value typesource
- the source to wrap