T
- the value typepublic abstract class ParallelFlux<T> extends Object implements Publisher<T>
'groups'
).
Use from()
to start processing a regular Publisher in 'rails', which each
cover a subset of the original Publisher's data. Flux.parallel()
is a
convenient shortcut to achieve that on a Flux
. Use runOn()
to
introduce where each 'rail' should run on thread-vise.
Use sequential()
to merge the sources back into a single Flux
or
subscribe(Subscriber)
if you simply want to subscribe to the merged sequence.
Note that other variants like subscribe(Consumer)
instead do multiple
subscribes, one on each rail (which means that the lambdas should be as stateless and
side-effect free as possible).
Constructor and Description |
---|
ParallelFlux() |
Modifier and Type | Method and Description |
---|---|
<U> U |
as(Function<? super ParallelFlux<T>,U> converter)
Perform a fluent transformation to a value via a converter function which receives
this ParallelFlux.
|
ParallelFlux<T> |
checkpoint()
Activate assembly tracing for this particular
ParallelFlux , in case of an
error upstream of the checkpoint. |
ParallelFlux<T> |
checkpoint(String description)
Activate assembly tracing for this particular
ParallelFlux and give it
a description that will be reflected in the assembly traceback, in case of an error
upstream of the checkpoint. |
<C> ParallelFlux<C> |
collect(Supplier<? extends C> collectionSupplier,
BiConsumer<? super C,? super T> collector)
Collect the elements in each rail into a collection supplied via a
collectionSupplier and collected into with a collector action, emitting the
collection at the end.
|
Mono<List<T>> |
collectSortedList(Comparator<? super T> comparator)
Sorts the 'rails' according to the comparator and returns a full sorted list as a
Publisher.
|
Mono<List<T>> |
collectSortedList(Comparator<? super T> comparator,
int capacityHint)
Sorts the 'rails' according to the comparator and returns a full sorted list as a
Publisher.
|
<U> ParallelFlux<U> |
composeGroup(Function<? super GroupedFlux<Integer,T>,? extends Publisher<? extends U>> composer)
Allows composing operators off the 'rails', as individual
GroupedFlux instances keyed by
the zero based rail's index. |
<R> ParallelFlux<R> |
concatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
Generates and concatenates Publishers on each 'rail', signalling errors immediately
and generating 2 publishers upfront.
|
<R> ParallelFlux<R> |
concatMap(Function<? super T,? extends Publisher<? extends R>> mapper,
int prefetch)
Generates and concatenates Publishers on each 'rail', signalling errors immediately
and using the given prefetch amount for generating Publishers upfront.
|
<R> ParallelFlux<R> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends R>> mapper)
Generates and concatenates Publishers on each 'rail', delaying errors
and generating 2 publishers upfront.
|
ParallelFlux<T> |
doAfterTerminate(Runnable afterTerminate)
Run the specified runnable when a 'rail' completes or signals an error.
|
ParallelFlux<T> |
doOnCancel(Runnable onCancel)
Run the specified runnable when a 'rail' receives a cancellation.
|
ParallelFlux<T> |
doOnComplete(Runnable onComplete)
Run the specified runnable when a 'rail' completes.
|
ParallelFlux<T> |
doOnEach(Consumer<? super Signal<T>> signalConsumer)
Triggers side-effects when the
ParallelFlux emits an item, fails with an error
or completes successfully. |
ParallelFlux<T> |
doOnError(Consumer<? super Throwable> onError)
Call the specified consumer with the exception passing through any 'rail'.
|
ParallelFlux<T> |
doOnNext(Consumer<? super T> onNext)
Call the specified consumer with the current element passing through any 'rail'.
|
ParallelFlux<T> |
doOnRequest(LongConsumer onRequest)
Call the specified consumer with the request amount if any rail receives a
request.
|
ParallelFlux<T> |
doOnSubscribe(Consumer<? super Subscription> onSubscribe)
Call the specified callback when a 'rail' receives a Subscription from its
upstream.
|
ParallelFlux<T> |
doOnTerminate(Runnable onTerminate)
Triggered when the
ParallelFlux terminates, either by completing successfully or with an error. |
ParallelFlux<T> |
filter(Predicate<? super T> predicate)
Filters the source values on each 'rail'.
|
<R> ParallelFlux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
Generates and flattens Publishers on each 'rail'.
|
<R> ParallelFlux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper,
boolean delayError)
Generates and flattens Publishers on each 'rail', optionally delaying errors.
|
<R> ParallelFlux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper,
boolean delayError,
int maxConcurrency)
Generates and flattens Publishers on each 'rail', optionally delaying errors and
having a total number of simultaneous subscriptions to the inner Publishers.
|
<R> ParallelFlux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper,
boolean delayError,
int maxConcurrency,
int prefetch)
Generates and flattens Publishers on each 'rail', optionally delaying errors,
having a total number of simultaneous subscriptions to the inner Publishers and
using the given prefetch amount for the inner Publishers.
|
static <T> ParallelFlux<T> |
from(Publisher<? extends T> source)
Take a Publisher and prepare to consume it on multiple 'rails' (number of CPUs) in
a round-robin fashion.
|
static <T> ParallelFlux<T> |
from(Publisher<? extends T> source,
int parallelism)
Take a Publisher and prepare to consume it on parallelism number of 'rails' ,
possibly ordered and round-robin fashion.
|
static <T> ParallelFlux<T> |
from(Publisher<? extends T> source,
int parallelism,
int prefetch,
Supplier<Queue<T>> queueSupplier)
Take a Publisher and prepare to consume it on parallelism number of 'rails'
and round-robin fashion and use custom prefetch amount and queue
for dealing with the source Publisher's values.
|
static <T> ParallelFlux<T> |
from(Publisher<T>... publishers)
Wraps multiple Publishers into a
ParallelFlux which runs them in parallel and
unordered. |
long |
getPrefetch()
The prefetch configuration of the component
|
Flux<GroupedFlux<Integer,T>> |
groups()
Exposes the 'rails' as individual GroupedFlux instances, keyed by the rail
index (zero based).
|
ParallelFlux<T> |
hide()
Hides the identities of this
ParallelFlux and its Subscription
as well. |
boolean |
isOrdered()
Deprecated.
This accessor was initially exposed to introspect the internal
ordering scenario for parallel rails. In effect reactor 3.0 and 3.1 only offer
non ordered merge at the end and therefore this should always returns false;
|
ParallelFlux<T> |
log()
Observe all Reactive Streams signals and use
Logger support to handle trace
implementation. |
ParallelFlux<T> |
log(String category)
Observe all Reactive Streams signals and use
Logger support to handle trace
implementation. |
ParallelFlux<T> |
log(String category,
Level level,
boolean showOperatorLine,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and use
Logger support to handle trace implementation. |
ParallelFlux<T> |
log(String category,
Level level,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and use
Logger support to handle trace implementation. |
<U> ParallelFlux<U> |
map(Function<? super T,? extends U> mapper)
Maps the source values on each 'rail' to another value.
|
protected static <T> ParallelFlux<T> |
onAssembly(ParallelFlux<T> source)
|
abstract int |
parallelism()
Returns the number of expected parallel Subscribers.
|
Mono<T> |
reduce(BiFunction<T,T,T> reducer)
Reduces all values within a 'rail' and across 'rails' with a reducer function into
a single sequential value.
|
<R> ParallelFlux<R> |
reduce(Supplier<R> initialSupplier,
BiFunction<R,? super T,R> reducer)
Reduces all values within a 'rail' to a single value (with a possibly different
type) via a reducer function that is initialized on each rail from an
initialSupplier value.
|
ParallelFlux<T> |
runOn(Scheduler scheduler)
Specifies where each 'rail' will observe its incoming values with no work-stealing
and default prefetch amount.
|
ParallelFlux<T> |
runOn(Scheduler scheduler,
int prefetch)
Specifies where each 'rail' will observe its incoming values with possibly
work-stealing and a given prefetch amount.
|
Flux<T> |
sequential()
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular Publisher sequence, running with a default prefetch value
for the rails.
|
Flux<T> |
sequential(int prefetch)
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular Publisher sequence, running with a give prefetch value for
the rails.
|
Flux<T> |
sorted(Comparator<? super T> comparator)
Sorts the 'rails' of this
ParallelFlux and returns a Publisher that
sequentially picks the smallest next value from the rails. |
Flux<T> |
sorted(Comparator<? super T> comparator,
int capacityHint)
Sorts the 'rails' of this
ParallelFlux and returns a Publisher that
sequentially picks the smallest next value from the rails. |
void |
subscribe(Consumer<? super T> onNext)
Subscribes to this
ParallelFlux by providing an onNext callback and
triggers the execution chain for all 'rails'. |
void |
subscribe(Consumer<? super T> onNext,
Consumer<? super Throwable> onError)
Subscribes to this
ParallelFlux by providing an onNext and onError callback
and triggers the execution chain for all 'rails'. |
void |
subscribe(Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Runnable onComplete)
Subscribes to this
ParallelFlux by providing an onNext, onError and
onComplete callback and triggers the execution chain for all 'rails'. |
void |
subscribe(Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Runnable onComplete,
Consumer<? super Subscription> onSubscribe)
Subscribes to this
ParallelFlux by providing an onNext, onError,
onComplete and onSubscribe callback and triggers the execution chain for all
'rails'. |
void |
subscribe(Subscriber<? super T> s)
Merge the rails into a
sequential() Flux and
subscribe to said Flux. |
abstract void |
subscribe(Subscriber<? super T>[] subscribers)
Subscribes an array of Subscribers to this
ParallelFlux and triggers the
execution chain for all 'rails'. |
<U> ParallelFlux<U> |
transform(Function<? super ParallelFlux<T>,ParallelFlux<U>> composer)
Allows composing operators, in assembly time, on top of this
ParallelFlux
and returns another ParallelFlux with composed features. |
protected boolean |
validate(Subscriber<?>[] subscribers)
Validates the number of subscribers and returns true if their number matches the
parallelism level of this ParallelFlux.
|
public static <T> ParallelFlux<T> from(Publisher<? extends T> source)
T
- the value typesource
- the source PublisherParallelFlux
instancepublic static <T> ParallelFlux<T> from(Publisher<? extends T> source, int parallelism)
T
- the value typesource
- the source Publisherparallelism
- the number of parallel railsParallelFlux
instancepublic static <T> ParallelFlux<T> from(Publisher<? extends T> source, int parallelism, int prefetch, Supplier<Queue<T>> queueSupplier)
T
- the value typesource
- the source Publisherparallelism
- the number of parallel railsprefetch
- the number of values to prefetch from the sourcequeueSupplier
- the queue structure supplier to hold the prefetched values
from the source until there is a rail ready to process it.ParallelFlux
instance@SafeVarargs public static <T> ParallelFlux<T> from(Publisher<T>... publishers)
ParallelFlux
which runs them in parallel and
unordered.T
- the value typepublishers
- the array of publishersParallelFlux
instancepublic final <U> U as(Function<? super ParallelFlux<T>,U> converter)
U
- the output value typeconverter
- the converter function from ParallelFlux
to some typepublic final ParallelFlux<T> checkpoint()
ParallelFlux
, in case of an
error upstream of the checkpoint.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
ParallelFlux
public final ParallelFlux<T> checkpoint(String description)
ParallelFlux
and give it
a description that will be reflected in the assembly traceback, in case of an error
upstream of the checkpoint.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
The description could for example be a meaningful name for the assembled flux or a wider correlation ID.
description
- a description to include in the assembly traceback.ParallelFlux
public final <C> ParallelFlux<C> collect(Supplier<? extends C> collectionSupplier, BiConsumer<? super C,? super T> collector)
C
- the collection typecollectionSupplier
- the supplier of the collection in each railcollector
- the collector, taking the per-rail collection and the current
itemParallelFlux
instancepublic final Mono<List<T>> collectSortedList(Comparator<? super T> comparator)
This operator requires a finite source ParallelFlux.
comparator
- the comparator to compare elementspublic final Mono<List<T>> collectSortedList(Comparator<? super T> comparator, int capacityHint)
This operator requires a finite source ParallelFlux.
comparator
- the comparator to compare elementscapacityHint
- the expected number of total elementspublic final <R> ParallelFlux<R> concatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
R
- the result typemapper
- the function to map each rail's value into a Publisher source and the
inner Publishers (immediate, boundary, end)ParallelFlux
instancepublic final <R> ParallelFlux<R> concatMap(Function<? super T,? extends Publisher<? extends R>> mapper, int prefetch)
R
- the result typemapper
- the function to map each rail's value into a Publisherprefetch
- the number of items to prefetch from each inner Publisher source
and the inner Publishers (immediate, boundary, end)ParallelFlux
instancepublic final <R> ParallelFlux<R> concatMapDelayError(Function<? super T,? extends Publisher<? extends R>> mapper)
R
- the result typemapper
- the function to map each rail's value into a Publisher
source and the inner Publishers (immediate, boundary, end)ParallelFlux
instancepublic final ParallelFlux<T> doAfterTerminate(Runnable afterTerminate)
afterTerminate
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnCancel(Runnable onCancel)
onCancel
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnComplete(Runnable onComplete)
onComplete
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnEach(Consumer<? super Signal<T>> signalConsumer)
ParallelFlux
emits an item, fails with an error
or completes successfully. All these events are represented as a Signal
that is passed to the side-effect callback. Note that with ParallelFlux
and
the lambda-based subscribes
or the
array-based one
, onError and onComplete will be
invoked as many times as there are rails, resulting in as many corresponding
Signal
being seen in the callback.
Use of subscribe(Subscriber)
, which calls sequential()
, might
cancel some rails, resulting in less signals being observed. This is an advanced
operator, typically used for monitoring of a ParallelFlux.
signalConsumer
- the mandatory callback to call on
Subscriber.onNext(Object)
, Subscriber.onError(Throwable)
and
Subscriber.onComplete()
ParallelFlux
doOnNext(Consumer)
,
doOnError(Consumer)
,
doOnComplete(Runnable)
,
subscribe(Subscriber[])
,
Signal
public final ParallelFlux<T> doOnError(Consumer<? super Throwable> onError)
onError
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe)
onSubscribe
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnNext(Consumer<? super T> onNext)
onNext
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnRequest(LongConsumer onRequest)
onRequest
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnTerminate(Runnable onTerminate)
ParallelFlux
terminates, either by completing successfully or with an error.onTerminate
- the callback to call on Subscriber.onComplete()
or Subscriber.onError(java.lang.Throwable)
ParallelFlux
public final ParallelFlux<T> filter(Predicate<? super T> predicate)
Note that the same predicate may be called from multiple threads concurrently.
predicate
- the function returning true to keep a value or false to drop a
valueParallelFlux
instancepublic final <R> ParallelFlux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
Errors are not delayed and uses unbounded concurrency along with default inner prefetch.
R
- the result typemapper
- the function to map each rail's value into a PublisherParallelFlux
instancepublic final <R> ParallelFlux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper, boolean delayError)
It uses unbounded concurrency along with default inner prefetch.
R
- the result typemapper
- the function to map each rail's value into a PublisherdelayError
- should the errors from the main and the inner sources delayed
till everybody terminates?ParallelFlux
instancepublic final <R> ParallelFlux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency)
It uses a default inner prefetch.
R
- the result typemapper
- the function to map each rail's value into a PublisherdelayError
- should the errors from the main and the inner sources delayed
till everybody terminates?maxConcurrency
- the maximum number of simultaneous subscriptions to the
generated inner PublishersParallelFlux
instancepublic final <R> ParallelFlux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency, int prefetch)
R
- the result typemapper
- the function to map each rail's value into a PublisherdelayError
- should the errors from the main and the inner sources delayed
till everybody terminates?maxConcurrency
- the maximum number of simultaneous subscriptions to the
generated inner Publishersprefetch
- the number of items to prefetch from each inner PublisherParallelFlux
instancepublic final Flux<GroupedFlux<Integer,T>> groups()
Each group can be consumed only once; requests and cancellation compose through. Note that cancelling only one rail may result in undefined behavior.
public final ParallelFlux<T> hide()
ParallelFlux
and its Subscription
as well.ParallelFlux
defeating any Publisher
/ Subscription
feature-detection@Deprecated public boolean isOrdered()
public final ParallelFlux<T> log()
Logger
support to handle trace
implementation. Default will use Level.INFO
and java.util.logging. If SLF4J
is available, it will be used instead.
The default log category will be "reactor.*", a generated operator suffix will complete, e.g. "reactor.Parallel.Map".
ParallelFlux
public final ParallelFlux<T> log(String category)
Logger
support to handle trace
implementation. Default will use Level.INFO
and java.util.logging. If SLF4J
is available, it will be used instead.
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator suffix
will complete, e.g. "reactor.Parallel.Map".ParallelFlux
public final ParallelFlux<T> log(String category, Level level, SignalType... options)
options
and use
Logger
support to handle trace implementation. Default will use the passed
Level
and java.util.logging. If SLF4J is available, it will be used
instead.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
ParallelFlux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will complete, e.g. "reactor.Parallel.Map".level
- the Level
to enforce for this tracing ParallelFlux (only
FINEST, FINE, INFO, WARNING and SEVERE are taken into account)options
- a vararg SignalType
option to filter log messagesParallelFlux
public final ParallelFlux<T> log(String category, Level level, boolean showOperatorLine, SignalType... options)
options
and use
Logger
support to handle trace implementation. Default will use the passed
Level
and java.util.logging. If SLF4J is available, it will be used
instead.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
ParallelFlux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will complete, e.g. "reactor.ParallelFlux.Map".level
- the Level
to enforce for this tracing ParallelFlux (only
FINEST, FINE, INFO, WARNING and SEVERE are taken into account)showOperatorLine
- capture the current stack to display operator
class/line number.options
- a vararg SignalType
option to filter log messagesParallelFlux
public final <U> ParallelFlux<U> map(Function<? super T,? extends U> mapper)
Note that the same mapper function may be called from multiple threads concurrently.
U
- the output value typemapper
- the mapper function turning Ts into Us.ParallelFlux
instancepublic abstract int parallelism()
public final Mono<T> reduce(BiFunction<T,T,T> reducer)
Note that the same reducer function may be called from multiple threads concurrently.
reducer
- the function to reduce two values into one.ParallelFlux
was emptypublic final <R> ParallelFlux<R> reduce(Supplier<R> initialSupplier, BiFunction<R,? super T,R> reducer)
Note that the same mapper function may be called from multiple threads concurrently.
R
- the reduced output typeinitialSupplier
- the supplier for the initial valuereducer
- the function to reduce a previous output of reduce (or the initial
value supplied) with a current source value.ParallelFlux
instancepublic final ParallelFlux<T> runOn(Scheduler scheduler)
This operator uses the default prefetch size returned by QueueSupplier.SMALL_BUFFER_SIZE
.
The operator will call Scheduler.createWorker()
as many times as this
ParallelFlux's parallelism level is.
No assumptions are made about the Scheduler's parallelism level, if the Scheduler's parallelism level is lower than the ParallelFlux's, some rails may end up on the same thread/worker.
This operator doesn't require the Scheduler to be trampolining as it does its own built-in trampolining logic.
scheduler
- the scheduler to useParallelFlux
instancepublic final ParallelFlux<T> runOn(Scheduler scheduler, int prefetch)
This operator uses the default prefetch size returned by QueueSupplier.SMALL_BUFFER_SIZE
.
The operator will call Scheduler.createWorker()
as many times as this
ParallelFlux's parallelism level is.
No assumptions are made about the Scheduler's parallelism level, if the Scheduler's parallelism level is lower than the ParallelFlux's, some rails may end up on the same thread/worker.
This operator doesn't require the Scheduler to be trampolining as it does its own built-in trampolining logic.
scheduler
- the scheduler to use that rail's worker has run out of work.prefetch
- the number of values to request on each 'rail' from the sourceParallelFlux
instancepublic final Flux<T> sequential()
This operator uses the default prefetch size returned by QueueSupplier.SMALL_BUFFER_SIZE
.
sequential(int)
public final Flux<T> sequential(int prefetch)
prefetch
- the prefetch amount to use for each railpublic final Flux<T> sorted(Comparator<? super T> comparator)
ParallelFlux
and returns a Publisher that
sequentially picks the smallest next value from the rails.
This operator requires a finite source ParallelFlux.
comparator
- the comparator to usepublic final Flux<T> sorted(Comparator<? super T> comparator, int capacityHint)
ParallelFlux
and returns a Publisher that
sequentially picks the smallest next value from the rails.
This operator requires a finite source ParallelFlux.
comparator
- the comparator to usecapacityHint
- the expected number of total elementspublic abstract void subscribe(Subscriber<? super T>[] subscribers)
ParallelFlux
and triggers the
execution chain for all 'rails'.subscribers
- the subscribers array to run in parallel, the number of items
must be equal to the parallelism level of this ParallelFluxpublic void subscribe(Consumer<? super T> onNext)
ParallelFlux
by providing an onNext callback and
triggers the execution chain for all 'rails'.onNext
- consumer of onNext signalspublic void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
ParallelFlux
by providing an onNext and onError callback
and triggers the execution chain for all 'rails'.onNext
- consumer of onNext signalsonError
- consumer of error signalpublic void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Runnable onComplete)
ParallelFlux
by providing an onNext, onError and
onComplete callback and triggers the execution chain for all 'rails'.onNext
- consumer of onNext signalsonError
- consumer of error signalonComplete
- callback on completion signalpublic void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Runnable onComplete, Consumer<? super Subscription> onSubscribe)
ParallelFlux
by providing an onNext, onError,
onComplete and onSubscribe callback and triggers the execution chain for all
'rails'.onNext
- consumer of onNext signalsonError
- consumer of error signalonComplete
- callback on completion signalonSubscribe
- consumer of the subscription signalpublic final <U> ParallelFlux<U> transform(Function<? super ParallelFlux<T>,ParallelFlux<U>> composer)
ParallelFlux
and returns another ParallelFlux
with composed features.U
- the output value typecomposer
- the composer function from ParallelFlux
(this) to another
ParallelFluxParallelFlux
returned by the functionpublic final <U> ParallelFlux<U> composeGroup(Function<? super GroupedFlux<Integer,T>,? extends Publisher<? extends U>> composer)
GroupedFlux
instances keyed by
the zero based rail's index. The transformed groups are parallelized
back
once the transformation has been applied.
Note that like in groups()
, requests and cancellation compose through, and
cancelling only one rail may result in undefined behavior.
U
- the type of the resulting parallelized fluxcomposer
- the composition function to apply on each rail
ParallelFlux
of the composed groupsprotected final boolean validate(Subscriber<?>[] subscribers)
subscribers
- the array of Subscriberspublic long getPrefetch()
public final void subscribe(Subscriber<? super T> s)
sequential()
Flux and
subscribe
to said Flux.subscribe
in interface Publisher<T>
s
- the subscriber to use on sequential()
Fluxprotected static <T> ParallelFlux<T> onAssembly(ParallelFlux<T> source)
T
- the value typesource
- the source to wrap