T
- the value typepublic abstract class ParallelFlux<T>
extends java.lang.Object
Use from()
to start processing a regular Publisher in 'rails'. Use runOn()
to introduce where each 'rail' shoud run on thread-vise. Use sequential()
to
merge the sources back into a single Publisher.
Constructor and Description |
---|
ParallelFlux() |
Modifier and Type | Method and Description |
---|---|
<U> U |
as(java.util.function.Function<? super ParallelFlux<T>,U> converter)
Perform a fluent transformation to a value via a converter function which receives
this ParallelFlux.
|
<C> ParallelFlux<C> |
collect(java.util.function.Supplier<C> collectionSupplier,
java.util.function.BiConsumer<C,T> collector)
Collect the elements in each rail into a collection supplied via a
collectionSupplier and collected into with a collector action, emitting the
collection at the end.
|
Flux<java.util.List<T>> |
collectSortedList(java.util.Comparator<? super T> comparator)
Sorts the 'rails' according to the comparator and returns a full sorted list as a
Publisher.
|
Flux<java.util.List<T>> |
collectSortedList(java.util.Comparator<? super T> comparator,
int capacityHint)
Sorts the 'rails' according to the comparator and returns a full sorted list as a
Publisher.
|
<U> ParallelFlux<U> |
compose(java.util.function.Function<? super ParallelFlux<T>,ParallelFlux<U>> composer)
Allows composing operators, in assembly time, on top of this
ParallelFlux and
returns another ParallelFlux with composed features. |
<R> ParallelFlux<R> |
concatMap(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)
Generates and concatenates Publishers on each 'rail', signalling errors immediately
and generating 2 publishers upfront.
|
<R> ParallelFlux<R> |
concatMap(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper,
int prefetch)
Generates and concatenates Publishers on each 'rail', signalling errors immediately
and using the given prefetch amount for generating Publishers upfront.
|
<R> ParallelFlux<R> |
concatMapDelayError(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)
Generates and concatenates Publishers on each 'rail', delaying errors
and generating 2 publishers upfront.
|
ParallelFlux<T> |
doAfterTerminated(java.lang.Runnable onAfterTerminate)
Run the specified runnable when a 'rail' completes or signals an error.
|
ParallelFlux<T> |
doOnCancel(java.util.function.Consumer<? super org.reactivestreams.Subscription> onSubscribe)
Call the specified callback when a 'rail' receives a Subscription from its
upstream.
|
ParallelFlux<T> |
doOnCancel(java.lang.Runnable onCancel)
Run the specified runnable when a 'rail' receives a cancellation.
|
ParallelFlux<T> |
doOnComplete(java.lang.Runnable onComplete)
Run the specified runnable when a 'rail' completes.
|
ParallelFlux<T> |
doOnError(java.util.function.Consumer<java.lang.Throwable> onError)
Call the specified consumer with the exception passing through any 'rail'.
|
ParallelFlux<T> |
doOnNext(java.util.function.Consumer<? super T> onNext)
Call the specified consumer with the current element passing through any 'rail'.
|
ParallelFlux<T> |
doOnRequest(java.util.function.LongConsumer onRequest)
Call the specified consumer with the request amount if any rail receives a
request.
|
ParallelFlux<T> |
filter(java.util.function.Predicate<? super T> predicate)
Filters the source values on each 'rail'.
|
<R> ParallelFlux<R> |
flatMap(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)
Generates and flattens Publishers on each 'rail'.
|
<R> ParallelFlux<R> |
flatMap(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper,
boolean delayError)
Generates and flattens Publishers on each 'rail', optionally delaying errors.
|
<R> ParallelFlux<R> |
flatMap(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper,
boolean delayError,
int maxConcurrency)
Generates and flattens Publishers on each 'rail', optionally delaying errors and
having a total number of simultaneous subscriptions to the inner Publishers.
|
<R> ParallelFlux<R> |
flatMap(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper,
boolean delayError,
int maxConcurrency,
int prefetch)
Generates and flattens Publishers on each 'rail', optionally delaying errors,
having a total number of simultaneous subscriptions to the inner Publishers and
using the given prefetch amount for the inner Publishers.
|
static <T> ParallelFlux<T> |
from(org.reactivestreams.Publisher<? extends T> source)
Take a Publisher and prepare to consume it on multiple 'rails' (number of CPUs) in
a round-robin fashion.
|
static <T> ParallelFlux<T> |
from(org.reactivestreams.Publisher<? extends T> source,
int parallelism)
Take a Publisher and prepare to consume it on parallallism number of 'rails' ,
possibly ordered and round-robin fashion.
|
static <T> ParallelFlux<T> |
from(org.reactivestreams.Publisher<? extends T> source,
int parallelism,
int prefetch,
java.util.function.Supplier<java.util.Queue<T>> queueSupplier)
Take a Publisher and prepare to consume it on parallallism number of 'rails'
and round-robin fashion and use custom prefetch amount and queue
for dealing with the source Publisher's values.
|
static <T> ParallelFlux<T> |
from(org.reactivestreams.Publisher<T>... publishers)
Wraps multiple Publishers into a
ParallelFlux which runs them in parallel and
unordered. |
long |
getPrefetch()
The prefetch configuration of the component
|
Flux<GroupedFlux<java.lang.Integer,T>> |
groups()
Exposes the 'rails' as individual GroupedFlux instances, keyed by the rail
index (zero based).
|
abstract boolean |
isOrdered()
Returns true if the parallel sequence has to be ordered when joining back.
|
<U> ParallelFlux<U> |
map(java.util.function.Function<? super T,? extends U> mapper)
Maps the source values on each 'rail' to another value.
|
abstract int |
parallelism()
Returns the number of expected parallel Subscribers.
|
Flux<T> |
reduce(java.util.function.BiFunction<T,T,T> reducer)
Reduces all values within a 'rail' and across 'rails' with a reducer function into
a single sequential value.
|
<R> ParallelFlux<R> |
reduce(java.util.function.Supplier<R> initialSupplier,
java.util.function.BiFunction<R,T,R> reducer)
Reduces all values within a 'rail' to a single value (with a possibly different
type) via a reducer function that is initialized on each rail from an
initialSupplier value.
|
ParallelFlux<T> |
runOn(Scheduler scheduler)
Specifies where each 'rail' will observe its incoming values with no work-stealing
and default prefetch amount.
|
ParallelFlux<T> |
runOn(Scheduler scheduler,
int prefetch)
Specifies where each 'rail' will observe its incoming values with possibly
work-stealing and a given prefetch amount.
|
Flux<T> |
sequential()
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular Publisher sequence, running with a default prefetch value
for the rails.
|
Flux<T> |
sequential(int prefetch)
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular Publisher sequence, running with a give prefetch value for
the rails.
|
Flux<T> |
sorted(java.util.Comparator<? super T> comparator)
Sorts the 'rails' of this
ParallelFlux and returns a Publisher that
sequentially picks the smallest next value from the rails. |
Flux<T> |
sorted(java.util.Comparator<? super T> comparator,
int capacityHint)
Sorts the 'rails' of this
ParallelFlux and returns a Publisher that
sequentially picks the smallest next value from the rails. |
void |
subscribe(java.util.function.Consumer<? super T> onNext)
Subscribes an array of Subscribers to this
ParallelFlux and triggers the
execution chain for all 'rails'. |
void |
subscribe(java.util.function.Consumer<? super T> onNext,
java.util.function.Consumer<? super java.lang.Throwable> onError)
Subscribes an array of Subscribers to this
ParallelFlux and triggers the
execution chain for all 'rails'. |
void |
subscribe(java.util.function.Consumer<? super T> onNext,
java.util.function.Consumer<? super java.lang.Throwable> onError,
java.lang.Runnable onComplete)
Subscribes an array of Subscribers to this
ParallelFlux and triggers the
execution chain for all 'rails'. |
abstract void |
subscribe(org.reactivestreams.Subscriber<? super T>[] subscribers)
Subscribes an array of Subscribers to this
ParallelFlux and triggers the
execution chain for all 'rails'. |
protected boolean |
validate(org.reactivestreams.Subscriber<?>[] subscribers)
Validates the number of subscribers and returns true if their number matches the
parallelism level of this ParallelFlux.
|
public static <T> ParallelFlux<T> from(org.reactivestreams.Publisher<? extends T> source)
T
- the value typesource
- the source PublisherParallelFlux
instancepublic static <T> ParallelFlux<T> from(org.reactivestreams.Publisher<? extends T> source, int parallelism)
T
- the value typesource
- the source Publisherparallelism
- the number of parallel railsParallelFlux
instancepublic static <T> ParallelFlux<T> from(org.reactivestreams.Publisher<? extends T> source, int parallelism, int prefetch, java.util.function.Supplier<java.util.Queue<T>> queueSupplier)
T
- the value typesource
- the source Publisherparallelism
- the number of parallel railsprefetch
- the number of values to prefetch from the sourcequeueSupplier
- the queue structure supplier to hold the prefetched values
from the source until there is a rail ready to process it.ParallelFlux
instance@SafeVarargs public static <T> ParallelFlux<T> from(org.reactivestreams.Publisher<T>... publishers)
ParallelFlux
which runs them in parallel and
unordered.T
- the value typepublishers
- the array of publishersParallelFlux
instancepublic final <U> U as(java.util.function.Function<? super ParallelFlux<T>,U> converter)
U
- the output value typeconverter
- the converter function from ParallelFlux
to some typepublic final <C> ParallelFlux<C> collect(java.util.function.Supplier<C> collectionSupplier, java.util.function.BiConsumer<C,T> collector)
C
- the collection typecollectionSupplier
- the supplier of the collection in each railcollector
- the collector, taking the per-rali collection and the current
itemParallelFlux
instancepublic final Flux<java.util.List<T>> collectSortedList(java.util.Comparator<? super T> comparator)
This operator requires a finite source ParallelFlux.
comparator
- the comparator to compare elementspublic final Flux<java.util.List<T>> collectSortedList(java.util.Comparator<? super T> comparator, int capacityHint)
This operator requires a finite source ParallelFlux.
comparator
- the comparator to compare elementscapacityHint
- the expected number of total elementspublic final <U> ParallelFlux<U> compose(java.util.function.Function<? super ParallelFlux<T>,ParallelFlux<U>> composer)
ParallelFlux
and
returns another ParallelFlux
with composed features.U
- the output value typecomposer
- the composer function from ParallelFlux
(this) to another
ParallelFluxParallelFlux
returned by the functionpublic final <R> ParallelFlux<R> concatMap(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)
R
- the result typemapper
- the function to map each rail's value into a Publisher source and the
inner Publishers (immediate, boundary, end)ParallelFlux
instancepublic final <R> ParallelFlux<R> concatMap(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper, int prefetch)
R
- the result typemapper
- the function to map each rail's value into a Publisherprefetch
- the number of items to prefetch from each inner Publisher source
and the inner Publishers (immediate, boundary, end)ParallelFlux
instancepublic final <R> ParallelFlux<R> concatMapDelayError(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)
R
- the result typemapper
- the function to map each rail's value into a Publisher
source and the inner Publishers (immediate, boundary, end)ParallelFlux
instancepublic final ParallelFlux<T> doAfterTerminated(java.lang.Runnable onAfterTerminate)
onAfterTerminate
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnCancel(java.util.function.Consumer<? super org.reactivestreams.Subscription> onSubscribe)
onSubscribe
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnCancel(java.lang.Runnable onCancel)
onCancel
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnComplete(java.lang.Runnable onComplete)
onComplete
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnError(java.util.function.Consumer<java.lang.Throwable> onError)
onError
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnNext(java.util.function.Consumer<? super T> onNext)
onNext
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnRequest(java.util.function.LongConsumer onRequest)
onRequest
- the callbackParallelFlux
instancepublic final ParallelFlux<T> filter(java.util.function.Predicate<? super T> predicate)
Note that the same predicate may be called from multiple threads concurrently.
predicate
- the function returning true to keep a value or false to drop a
valueParallelFlux
instancepublic final <R> ParallelFlux<R> flatMap(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)
Errors are not delayed and uses unbounded concurrency along with default inner prefetch.
R
- the result typemapper
- the function to map each rail's value into a PublisherParallelFlux
instancepublic final <R> ParallelFlux<R> flatMap(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper, boolean delayError)
It uses unbounded concurrency along with default inner prefetch.
R
- the result typemapper
- the function to map each rail's value into a PublisherdelayError
- should the errors from the main and the inner sources delayed
till everybody terminates?ParallelFlux
instancepublic final <R> ParallelFlux<R> flatMap(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency)
It uses a default inner prefetch.
R
- the result typemapper
- the function to map each rail's value into a PublisherdelayError
- should the errors from the main and the inner sources delayed
till everybody terminates?maxConcurrency
- the maximum number of simultaneous subscriptions to the
generated inner PublishersParallelFlux
instancepublic final <R> ParallelFlux<R> flatMap(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency, int prefetch)
R
- the result typemapper
- the function to map each rail's value into a PublisherdelayError
- should the errors from the main and the inner sources delayed
till everybody terminates?maxConcurrency
- the maximum number of simultaneous subscriptions to the
generated inner Publishersprefetch
- the number of items to prefetch from each inner PublisherParallelFlux
instancepublic final Flux<GroupedFlux<java.lang.Integer,T>> groups()
Each group can be consumed only once; requests and cancellation compose through. Note that cancelling only one rail may result in undefined behavior.
public abstract boolean isOrdered()
public final <U> ParallelFlux<U> map(java.util.function.Function<? super T,? extends U> mapper)
Note that the same mapper function may be called from multiple threads concurrently.
U
- the output value typemapper
- the mapper function turning Ts into Us.ParallelFlux
instancepublic abstract int parallelism()
public final Flux<T> reduce(java.util.function.BiFunction<T,T,T> reducer)
Note that the same reducer function may be called from multiple threads concurrently.
reducer
- the function to reduce two values into one.ParallelFlux
was emptypublic final <R> ParallelFlux<R> reduce(java.util.function.Supplier<R> initialSupplier, java.util.function.BiFunction<R,T,R> reducer)
Note that the same mapper function may be called from multiple threads concurrently.
R
- the reduced output typeinitialSupplier
- the supplier for the initial valuereducer
- the function to reduce a previous output of reduce (or the initial
value supplied) with a current source value.ParallelFlux
instancepublic final ParallelFlux<T> runOn(Scheduler scheduler)
This operator uses the default prefetch size returned by Loggers.SMALL_BUFFER_SIZE
.
The operator will call Scheduler.createWorker()
as many times as this
ParallelFlux's parallelism level is.
No assumptions are made about the Scheduler's parallelism level, if the Scheduler's parallelism level is lwer than the ParallelFlux's, some rails may end up on the same thread/worker.
This operator doesn't require the Scheduler to be trampolining as it does its own built-in trampolining logic.
scheduler
- the scheduler to useParallelFlux
instancepublic final ParallelFlux<T> runOn(Scheduler scheduler, int prefetch)
This operator uses the default prefetch size returned by Loggers.SMALL_BUFFER_SIZE
.
The operator will call Scheduler.createWorker()
as many times as this
ParallelFlux's parallelism level is.
No assumptions are made about the Scheduler's parallelism level, if the Scheduler's parallelism level is lwer than the ParallelFlux's, some rails may end up on the same thread/worker.
This operator doesn't require the Scheduler to be trampolining as it does its own built-in trampolining logic.
scheduler
- the scheduler to use that rail's worker has run out of work.prefetch
- the number of values to request on each 'rail' from the sourceParallelFlux
instancepublic final Flux<T> sequential()
This operator uses the default prefetch size returned by Loggers.SMALL_BUFFER_SIZE
.
sequential(int)
public final Flux<T> sequential(int prefetch)
prefetch
- the prefetch amount to use for each railpublic final Flux<T> sorted(java.util.Comparator<? super T> comparator)
ParallelFlux
and returns a Publisher that
sequentially picks the smallest next value from the rails.
This operator requires a finite source ParallelFlux.
comparator
- the comparator to usepublic final Flux<T> sorted(java.util.Comparator<? super T> comparator, int capacityHint)
ParallelFlux
and returns a Publisher that
sequentially picks the smallest next value from the rails.
This operator requires a finite source ParallelFlux.
comparator
- the comparator to usecapacityHint
- the expected number of total elementspublic abstract void subscribe(org.reactivestreams.Subscriber<? super T>[] subscribers)
ParallelFlux
and triggers the
execution chain for all 'rails'.subscribers
- the subscribers array to run in parallel, the number of items
must be equal to the parallelism level of this ParallelFluxpublic void subscribe(java.util.function.Consumer<? super T> onNext)
ParallelFlux
and triggers the
execution chain for all 'rails'.onNext
- must be equal to the parallelism level of this ParallelFluxpublic void subscribe(java.util.function.Consumer<? super T> onNext, java.util.function.Consumer<? super java.lang.Throwable> onError)
ParallelFlux
and triggers the
execution chain for all 'rails'.onNext
- onError
- must be equal to the parallelism level of this ParallelFluxpublic void subscribe(java.util.function.Consumer<? super T> onNext, java.util.function.Consumer<? super java.lang.Throwable> onError, java.lang.Runnable onComplete)
ParallelFlux
and triggers the
execution chain for all 'rails'.onNext
- onError
- onComplete
- must be equal to the parallelism level of this ParallelFluxprotected final boolean validate(org.reactivestreams.Subscriber<?>[] subscribers)
subscribers
- the array of Subscriberspublic long getPrefetch()