T
- the value typepublic abstract class ParallelFlux<T> extends Object implements CorePublisher<T>
'groups'
).
Use from(org.reactivestreams.Publisher<? extends T>)
to start processing a regular Publisher in 'rails', which each
cover a subset of the original Publisher's data. Flux.parallel()
is a
convenient shortcut to achieve that on a Flux
.
Use runOn(reactor.core.scheduler.Scheduler)
to introduce where each 'rail' should run on thread-wise.
Use sequential()
to merge the sources back into a single Flux
.
Use then()
to listen for all rails termination in the produced Mono
subscribe(Subscriber)
if you simply want to subscribe to the merged sequence.
Note that other variants like subscribe(Consumer)
instead do multiple
subscribes, one on each rail (which means that the lambdas should be as stateless and
side-effect free as possible).
Constructor and Description |
---|
ParallelFlux() |
Modifier and Type | Method and Description |
---|---|
<U> U |
as(Function<? super ParallelFlux<T>,U> converter)
Perform a fluent transformation to a value via a converter function which receives
this ParallelFlux.
|
ParallelFlux<T> |
checkpoint()
Activate traceback (full assembly tracing) for this particular
ParallelFlux , in case of an
error upstream of the checkpoint. |
ParallelFlux<T> |
checkpoint(String description)
Activate traceback (assembly marker) for this particular
ParallelFlux by giving it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. |
ParallelFlux<T> |
checkpoint(String description,
boolean forceStackTrace)
Activate traceback (full assembly tracing or the lighter assembly marking depending on the
forceStackTrace option). |
<C> ParallelFlux<C> |
collect(Supplier<? extends C> collectionSupplier,
BiConsumer<? super C,? super T> collector)
Collect the elements in each rail into a collection supplied via a
collectionSupplier and collected into with a collector action, emitting the
collection at the end.
|
Mono<List<T>> |
collectSortedList(Comparator<? super T> comparator)
Sorts the 'rails' according to the comparator and returns a full sorted list as a
Publisher.
|
Mono<List<T>> |
collectSortedList(Comparator<? super T> comparator,
int capacityHint)
Sorts the 'rails' according to the comparator and returns a full sorted list as a
Publisher.
|
<R> ParallelFlux<R> |
concatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
Generates and concatenates Publishers on each 'rail', signalling errors immediately
and generating 2 publishers upfront.
|
<R> ParallelFlux<R> |
concatMap(Function<? super T,? extends Publisher<? extends R>> mapper,
int prefetch)
Generates and concatenates Publishers on each 'rail', signalling errors immediately
and using the given prefetch amount for generating Publishers upfront.
|
<R> ParallelFlux<R> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends R>> mapper)
Generates and concatenates Publishers on each 'rail', delaying errors
and generating 2 publishers upfront.
|
ParallelFlux<T> |
doAfterTerminate(Runnable afterTerminate)
Run the specified runnable when a 'rail' completes or signals an error.
|
ParallelFlux<T> |
doOnCancel(Runnable onCancel)
Run the specified runnable when a 'rail' receives a cancellation.
|
ParallelFlux<T> |
doOnComplete(Runnable onComplete)
Run the specified runnable when a 'rail' completes.
|
ParallelFlux<T> |
doOnEach(Consumer<? super Signal<T>> signalConsumer)
Triggers side-effects when the
ParallelFlux emits an item, fails with an error
or completes successfully. |
ParallelFlux<T> |
doOnError(Consumer<? super Throwable> onError)
Call the specified consumer with the exception passing through any 'rail'.
|
ParallelFlux<T> |
doOnNext(Consumer<? super T> onNext)
Call the specified consumer with the current element passing through any 'rail'.
|
ParallelFlux<T> |
doOnRequest(LongConsumer onRequest)
Call the specified consumer with the request amount if any rail receives a
request.
|
ParallelFlux<T> |
doOnSubscribe(Consumer<? super Subscription> onSubscribe)
Call the specified callback when a 'rail' receives a Subscription from its
upstream.
|
ParallelFlux<T> |
doOnTerminate(Runnable onTerminate)
Triggered when the
ParallelFlux terminates, either by completing successfully or with an error. |
ParallelFlux<T> |
filter(Predicate<? super T> predicate)
Filters the source values on each 'rail'.
|
<R> ParallelFlux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
Generates and flattens Publishers on each 'rail'.
|
<R> ParallelFlux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper,
boolean delayError)
Generates and flattens Publishers on each 'rail', optionally delaying errors.
|
<R> ParallelFlux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper,
boolean delayError,
int maxConcurrency)
Generates and flattens Publishers on each 'rail', optionally delaying errors and
having a total number of simultaneous subscriptions to the inner Publishers.
|
<R> ParallelFlux<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper,
boolean delayError,
int maxConcurrency,
int prefetch)
Generates and flattens Publishers on each 'rail', optionally delaying errors,
having a total number of simultaneous subscriptions to the inner Publishers and
using the given prefetch amount for the inner Publishers.
|
static <T> ParallelFlux<T> |
from(Publisher<? extends T> source)
Take a Publisher and prepare to consume it on multiple 'rails' (one per CPU core)
in a round-robin fashion.
|
static <T> ParallelFlux<T> |
from(Publisher<? extends T> source,
int parallelism)
Take a Publisher and prepare to consume it on
parallelism number of 'rails',
possibly ordered and in a round-robin fashion. |
static <T> ParallelFlux<T> |
from(Publisher<? extends T> source,
int parallelism,
int prefetch,
Supplier<Queue<T>> queueSupplier)
Take a Publisher and prepare to consume it on
parallelism number of 'rails'
and in a round-robin fashion and use custom prefetch amount and queue
for dealing with the source Publisher's values. |
static <T> ParallelFlux<T> |
from(Publisher<T>... publishers)
Wraps multiple Publishers into a
ParallelFlux which runs them in parallel and
unordered. |
int |
getPrefetch()
The prefetch configuration of the component
|
Flux<GroupedFlux<Integer,T>> |
groups()
Exposes the 'rails' as individual GroupedFlux instances, keyed by the rail
index (zero based).
|
ParallelFlux<T> |
hide()
Hides the identities of this
ParallelFlux and its Subscription
as well. |
ParallelFlux<T> |
log()
Observe all Reactive Streams signals and use
Logger support to handle trace
implementation. |
ParallelFlux<T> |
log(String category)
Observe all Reactive Streams signals and use
Logger support to handle trace
implementation. |
ParallelFlux<T> |
log(String category,
Level level,
boolean showOperatorLine,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and use
Logger support to handle trace implementation. |
ParallelFlux<T> |
log(String category,
Level level,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and use
Logger support to handle trace implementation. |
<U> ParallelFlux<U> |
map(Function<? super T,? extends U> mapper)
Maps the source values on each 'rail' to another value.
|
ParallelFlux<T> |
name(String name)
Give a name to this sequence, which can be retrieved using
Scannable.name()
as long as this is the first reachable Scannable.parents() . |
protected static <T> ParallelFlux<T> |
onAssembly(ParallelFlux<T> source)
|
Flux<T> |
ordered(Comparator<? super T> comparator)
Merges the values from each 'rail', but choose which one to merge by way of a
provided
Comparator , picking the smallest of all rails. |
Flux<T> |
ordered(Comparator<? super T> comparator,
int prefetch)
Merges the values from each 'rail', but choose which one to merge by way of a
provided
Comparator , picking the smallest of all rails. |
abstract int |
parallelism()
Returns the number of expected parallel Subscribers.
|
Mono<T> |
reduce(BiFunction<T,T,T> reducer)
Reduces all values within a 'rail' and across 'rails' with a reducer function into
a single sequential value.
|
<R> ParallelFlux<R> |
reduce(Supplier<R> initialSupplier,
BiFunction<R,? super T,R> reducer)
Reduces all values within a 'rail' to a single value (with a possibly different
type) via a reducer function that is initialized on each rail from an
initialSupplier value.
|
ParallelFlux<T> |
runOn(Scheduler scheduler)
Specifies where each 'rail' will observe its incoming values with possible
work-stealing and default prefetch amount.
|
ParallelFlux<T> |
runOn(Scheduler scheduler,
int prefetch)
Specifies where each 'rail' will observe its incoming values with possible
work-stealing and a given prefetch amount.
|
Flux<T> |
sequential()
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular Publisher sequence, running with a default prefetch value
for the rails.
|
Flux<T> |
sequential(int prefetch)
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular Publisher sequence, running with a give prefetch value for
the rails.
|
Flux<T> |
sorted(Comparator<? super T> comparator)
Sorts the 'rails' of this
ParallelFlux and returns a Publisher that
sequentially picks the smallest next value from the rails. |
Flux<T> |
sorted(Comparator<? super T> comparator,
int capacityHint)
Sorts the 'rails' of this
ParallelFlux and returns a Publisher that
sequentially picks the smallest next value from the rails. |
Disposable |
subscribe()
Subscribes to this
ParallelFlux and triggers the execution chain for all
'rails'. |
Disposable |
subscribe(Consumer<? super T> onNext)
Subscribes to this
ParallelFlux by providing an onNext callback and
triggers the execution chain for all 'rails'. |
Disposable |
subscribe(Consumer<? super T> onNext,
Consumer<? super Throwable> onError)
Subscribes to this
ParallelFlux by providing an onNext and onError callback
and triggers the execution chain for all 'rails'. |
Disposable |
subscribe(Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Runnable onComplete)
Subscribes to this
ParallelFlux by providing an onNext, onError and
onComplete callback and triggers the execution chain for all 'rails'. |
Disposable |
subscribe(Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Runnable onComplete,
Consumer<? super Subscription> onSubscribe)
Subscribes to this
ParallelFlux by providing an onNext, onError,
onComplete and onSubscribe callback and triggers the execution chain for all
'rails'. |
Disposable |
subscribe(Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Runnable onComplete,
Context initialContext)
Subscribes to this
ParallelFlux by providing an onNext, onError and
onComplete callback as well as an initial Context , then trigger the execution chain for all
'rails'. |
void |
subscribe(CoreSubscriber<? super T> s)
An internal
Publisher.subscribe(Subscriber) that will bypass
Hooks.onLastOperator(Function) pointcut. |
abstract void |
subscribe(CoreSubscriber<? super T>[] subscribers)
Subscribes an array of Subscribers to this
ParallelFlux and triggers the
execution chain for all 'rails'. |
void |
subscribe(Subscriber<? super T> s)
Merge the rails into a
sequential() Flux and
subscribe to said Flux. |
ParallelFlux<T> |
tag(String key,
String value)
Tag this ParallelFlux with a key/value pair.
|
Mono<Void> |
then()
Emit an onComplete or onError signal once all values across 'rails' have been observed.
|
String |
toString() |
<U> ParallelFlux<U> |
transform(Function<? super ParallelFlux<T>,ParallelFlux<U>> composer)
Allows composing operators, in assembly time, on top of this
ParallelFlux
and returns another ParallelFlux with composed features. |
<U> ParallelFlux<U> |
transformGroups(Function<? super GroupedFlux<Integer,T>,? extends Publisher<? extends U>> composer)
Allows composing operators off the groups (or 'rails'), as individual
GroupedFlux
instances keyed by the zero based rail's index. |
protected boolean |
validate(Subscriber<?>[] subscribers)
Validates the number of subscribers and returns true if their number matches the
parallelism level of this ParallelFlux.
|
public static <T> ParallelFlux<T> from(Publisher<? extends T> source)
Flux.parallel()
.T
- the value typesource
- the source PublisherParallelFlux
instancepublic static <T> ParallelFlux<T> from(Publisher<? extends T> source, int parallelism)
parallelism
number of 'rails',
possibly ordered and in a round-robin fashion.T
- the value typesource
- the source Publisherparallelism
- the number of parallel railsParallelFlux
instancepublic static <T> ParallelFlux<T> from(Publisher<? extends T> source, int parallelism, int prefetch, Supplier<Queue<T>> queueSupplier)
parallelism
number of 'rails'
and in a round-robin fashion and use custom prefetch amount and queue
for dealing with the source Publisher's values.T
- the value typesource
- the source Publisherparallelism
- the number of parallel railsprefetch
- the number of values to prefetch from the sourcequeueSupplier
- the queue structure supplier to hold the prefetched values
from the source until there is a rail ready to process it.ParallelFlux
instance@SafeVarargs public static <T> ParallelFlux<T> from(Publisher<T>... publishers)
ParallelFlux
which runs them in parallel and
unordered.T
- the value typepublishers
- the array of publishersParallelFlux
instancepublic final <U> U as(Function<? super ParallelFlux<T>,U> converter)
U
- the output value typeconverter
- the converter function from ParallelFlux
to some typepublic final ParallelFlux<T> checkpoint()
ParallelFlux
, in case of an
error upstream of the checkpoint. Tracing incurs the cost of an exception stack trace
creation.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
The traceback is attached to the error as a suppressed exception
.
As such, if the error is a composite one
, the traceback
would appear as a component of the composite. In any case, the traceback nature can be detected via
Exceptions.isTraceback(Throwable)
.
ParallelFlux
public final ParallelFlux<T> checkpoint(String description)
ParallelFlux
by giving it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. Note that unlike checkpoint()
, this doesn't create a
filled stack trace, avoiding the main cost of the operator.
However, as a trade-off the description must be unique enough for the user to find
out where this ParallelFlux was assembled. If you only want a generic description, and
still rely on the stack trace to find the assembly site, use the
checkpoint(String, boolean)
variant.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
The traceback is attached to the error as a suppressed exception
.
As such, if the error is a composite one
, the traceback
would appear as a component of the composite. In any case, the traceback nature can be detected via
Exceptions.isTraceback(Throwable)
.
description
- a unique enough description to include in the light assembly traceback.ParallelFlux
public final ParallelFlux<T> checkpoint(String description, boolean forceStackTrace)
forceStackTrace
option).
By setting the forceStackTrace
parameter to true, activate assembly
tracing for this particular ParallelFlux
and give it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. Note that unlike checkpoint(String)
, this will incur
the cost of an exception stack trace creation. The description could for
example be a meaningful name for the assembled ParallelFlux or a wider correlation ID,
since the stack trace will always provide enough information to locate where this
ParallelFlux was assembled.
By setting forceStackTrace
to false, behaves like
checkpoint(String)
and is subject to the same caveat in choosing the
description.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly marker.
The traceback is attached to the error as a suppressed exception
.
As such, if the error is a composite one
, the traceback
would appear as a component of the composite. In any case, the traceback nature can be detected via
Exceptions.isTraceback(Throwable)
.
description
- a description (must be unique enough if forceStackTrace is set
to false).forceStackTrace
- false to make a light checkpoint without a stacktrace, true
to use a stack trace.ParallelFlux
.public final <C> ParallelFlux<C> collect(Supplier<? extends C> collectionSupplier, BiConsumer<? super C,? super T> collector)
C
- the collection typecollectionSupplier
- the supplier of the collection in each railcollector
- the collector, taking the per-rail collection and the current
itemParallelFlux
instancepublic final Mono<List<T>> collectSortedList(Comparator<? super T> comparator)
This operator requires a finite source ParallelFlux.
comparator
- the comparator to compare elementspublic final Mono<List<T>> collectSortedList(Comparator<? super T> comparator, int capacityHint)
This operator requires a finite source ParallelFlux.
comparator
- the comparator to compare elementscapacityHint
- the expected number of total elementspublic final <R> ParallelFlux<R> concatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
R
- the result typemapper
- the function to map each rail's value into a Publisher source and the
inner Publishers (immediate, boundary, end)ParallelFlux
instancepublic final <R> ParallelFlux<R> concatMap(Function<? super T,? extends Publisher<? extends R>> mapper, int prefetch)
R
- the result typemapper
- the function to map each rail's value into a Publisherprefetch
- the number of items to prefetch from each inner Publisher source
and the inner Publishers (immediate, boundary, end)ParallelFlux
instancepublic final <R> ParallelFlux<R> concatMapDelayError(Function<? super T,? extends Publisher<? extends R>> mapper)
R
- the result typemapper
- the function to map each rail's value into a Publisher
source and the inner Publishers (immediate, boundary, end)ParallelFlux
instancepublic final ParallelFlux<T> doAfterTerminate(Runnable afterTerminate)
afterTerminate
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnCancel(Runnable onCancel)
onCancel
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnComplete(Runnable onComplete)
onComplete
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnEach(Consumer<? super Signal<T>> signalConsumer)
ParallelFlux
emits an item, fails with an error
or completes successfully. All these events are represented as a Signal
that is passed to the side-effect callback. Note that with ParallelFlux
and
the lambda-based subscribes
or the
array-based one
, onError and onComplete will be
invoked as many times as there are rails, resulting in as many corresponding
Signal
being seen in the callback.
Use of subscribe(Subscriber)
, which calls sequential()
, might
cancel some rails, resulting in less signals being observed. This is an advanced
operator, typically used for monitoring of a ParallelFlux.
signalConsumer
- the mandatory callback to call on
Subscriber.onNext(Object)
, Subscriber.onError(Throwable)
and
Subscriber.onComplete()
ParallelFlux
doOnNext(Consumer)
,
doOnError(Consumer)
,
doOnComplete(Runnable)
,
subscribe(CoreSubscriber[])
,
Signal
public final ParallelFlux<T> doOnError(Consumer<? super Throwable> onError)
onError
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe)
This method is not intended for capturing the subscription and calling its methods,
but for side effects like monitoring. For instance, the correct way to cancel a subscription is
to call Disposable.dispose()
on the Disposable returned by subscribe()
.
onSubscribe
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnNext(Consumer<? super T> onNext)
onNext
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnRequest(LongConsumer onRequest)
onRequest
- the callbackParallelFlux
instancepublic final ParallelFlux<T> doOnTerminate(Runnable onTerminate)
ParallelFlux
terminates, either by completing successfully or with an error.onTerminate
- the callback to call on Subscriber.onComplete()
or Subscriber.onError(java.lang.Throwable)
ParallelFlux
public final ParallelFlux<T> filter(Predicate<? super T> predicate)
Note that the same predicate may be called from multiple threads concurrently.
predicate
- the function returning true to keep a value or false to drop a
valueParallelFlux
instancepublic final <R> ParallelFlux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
Errors are not delayed and uses unbounded concurrency along with default inner prefetch.
R
- the result typemapper
- the function to map each rail's value into a PublisherParallelFlux
instancepublic final <R> ParallelFlux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper, boolean delayError)
It uses unbounded concurrency along with default inner prefetch.
R
- the result typemapper
- the function to map each rail's value into a PublisherdelayError
- should the errors from the main and the inner sources delayed
till everybody terminates?ParallelFlux
instancepublic final <R> ParallelFlux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency)
It uses a default inner prefetch.
R
- the result typemapper
- the function to map each rail's value into a PublisherdelayError
- should the errors from the main and the inner sources delayed
till everybody terminates?maxConcurrency
- the maximum number of simultaneous subscriptions to the
generated inner PublishersParallelFlux
instancepublic final <R> ParallelFlux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency, int prefetch)
R
- the result typemapper
- the function to map each rail's value into a PublisherdelayError
- should the errors from the main and the inner sources delayed
till everybody terminates?maxConcurrency
- the maximum number of simultaneous subscriptions to the
generated inner Publishersprefetch
- the number of items to prefetch from each inner PublisherParallelFlux
instancepublic final Flux<GroupedFlux<Integer,T>> groups()
Each group can be consumed only once; requests and cancellation compose through. Note that cancelling only one rail may result in undefined behavior.
public final ParallelFlux<T> hide()
ParallelFlux
and its Subscription
as well.ParallelFlux
defeating any Publisher
/ Subscription
feature-detectionpublic final ParallelFlux<T> log()
Logger
support to handle trace
implementation. Default will use Level.INFO
and java.util.logging. If SLF4J
is available, it will be used instead.
The default log category will be "reactor.*", a generated operator suffix will complete, e.g. "reactor.Parallel.Map".
ParallelFlux
public final ParallelFlux<T> log(@Nullable String category)
Logger
support to handle trace
implementation. Default will use Level.INFO
and java.util.logging. If SLF4J
is available, it will be used instead.
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator suffix
will complete, e.g. "reactor.Parallel.Map".ParallelFlux
public final ParallelFlux<T> log(@Nullable String category, Level level, SignalType... options)
options
and use
Logger
support to handle trace implementation. Default will use the passed
Level
and java.util.logging. If SLF4J is available, it will be used
instead.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
ParallelFlux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will complete, e.g. "reactor.Parallel.Map".level
- the Level
to enforce for this tracing ParallelFlux (only
FINEST, FINE, INFO, WARNING and SEVERE are taken into account)options
- a vararg SignalType
option to filter log messagesParallelFlux
public final ParallelFlux<T> log(@Nullable String category, Level level, boolean showOperatorLine, SignalType... options)
options
and use
Logger
support to handle trace implementation. Default will use the passed
Level
and java.util.logging. If SLF4J is available, it will be used
instead.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
ParallelFlux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will complete, e.g. "reactor.ParallelFlux.Map".level
- the Level
to enforce for this tracing ParallelFlux (only
FINEST, FINE, INFO, WARNING and SEVERE are taken into account)showOperatorLine
- capture the current stack to display operator
class/line number.options
- a vararg SignalType
option to filter log messagesParallelFlux
public final <U> ParallelFlux<U> map(Function<? super T,? extends U> mapper)
Note that the same mapper function may be called from multiple threads concurrently.
U
- the output value typemapper
- the mapper function turning Ts into Us.ParallelFlux
instancepublic final ParallelFlux<T> name(String name)
Scannable.name()
as long as this is the first reachable Scannable.parents()
.name
- a name for the sequencepublic final Flux<T> ordered(Comparator<? super T> comparator)
Comparator
, picking the smallest of all rails. The result is
exposed back as a Flux
.
This version uses a default prefetch of Queues.SMALL_BUFFER_SIZE
.
comparator
- the comparator to choose the smallest value available from all railsordered(Comparator, int)
public final Flux<T> ordered(Comparator<? super T> comparator, int prefetch)
Comparator
, picking the smallest of all rails. The result is
exposed back as a Flux
.comparator
- the comparator to choose the smallest value available from all railsprefetch
- the prefetch to useordered(Comparator)
public abstract int parallelism()
public final Mono<T> reduce(BiFunction<T,T,T> reducer)
Note that the same reducer function may be called from multiple threads concurrently.
reducer
- the function to reduce two values into one.ParallelFlux
was emptypublic final <R> ParallelFlux<R> reduce(Supplier<R> initialSupplier, BiFunction<R,? super T,R> reducer)
Note that the same mapper function may be called from multiple threads concurrently.
R
- the reduced output typeinitialSupplier
- the supplier for the initial valuereducer
- the function to reduce a previous output of reduce (or the initial
value supplied) with a current source value.ParallelFlux
instancepublic final ParallelFlux<T> runOn(Scheduler scheduler)
This operator uses the default prefetch size returned by Queues.SMALL_BUFFER_SIZE
.
The operator will call Scheduler.createWorker()
as many times as this
ParallelFlux's parallelism level is.
No assumptions are made about the Scheduler's parallelism level, if the Scheduler's parallelism level is lower than the ParallelFlux's, some rails may end up on the same thread/worker.
This operator doesn't require the Scheduler to be trampolining as it does its own built-in trampolining logic.
scheduler
- the scheduler to useParallelFlux
instancepublic final ParallelFlux<T> runOn(Scheduler scheduler, int prefetch)
This operator uses the default prefetch size returned by Queues.SMALL_BUFFER_SIZE
.
The operator will call Scheduler.createWorker()
as many times as this
ParallelFlux's parallelism level is.
No assumptions are made about the Scheduler's parallelism level, if the Scheduler's parallelism level is lower than the ParallelFlux's, some rails may end up on the same thread/worker.
This operator doesn't require the Scheduler to be trampolining as it does its own built-in trampolining logic.
scheduler
- the scheduler to use that rail's worker has run out of work.prefetch
- the number of values to request on each 'rail' from the sourceParallelFlux
instancepublic final Flux<T> sequential()
This operator uses the default prefetch size returned by Queues.SMALL_BUFFER_SIZE
.
sequential(int)
public final Flux<T> sequential(int prefetch)
prefetch
- the prefetch amount to use for each railpublic final Flux<T> sorted(Comparator<? super T> comparator)
ParallelFlux
and returns a Publisher that
sequentially picks the smallest next value from the rails.
This operator requires a finite source ParallelFlux.
comparator
- the comparator to usepublic final Flux<T> sorted(Comparator<? super T> comparator, int capacityHint)
ParallelFlux
and returns a Publisher that
sequentially picks the smallest next value from the rails.
This operator requires a finite source ParallelFlux.
comparator
- the comparator to usecapacityHint
- the expected number of total elementspublic abstract void subscribe(CoreSubscriber<? super T>[] subscribers)
ParallelFlux
and triggers the
execution chain for all 'rails'.subscribers
- the subscribers array to run in parallel, the number of items
must be equal to the parallelism level of this ParallelFluxpublic final Disposable subscribe()
ParallelFlux
and triggers the execution chain for all
'rails'.public final Disposable subscribe(Consumer<? super T> onNext)
ParallelFlux
by providing an onNext callback and
triggers the execution chain for all 'rails'.onNext
- consumer of onNext signalspublic final Disposable subscribe(@Nullable Consumer<? super T> onNext, Consumer<? super Throwable> onError)
ParallelFlux
by providing an onNext and onError callback
and triggers the execution chain for all 'rails'.onNext
- consumer of onNext signalsonError
- consumer of error signalpublic final Disposable subscribe(@Nullable Consumer<? super T> onNext, @Nullable Consumer<? super Throwable> onError, @Nullable Runnable onComplete)
ParallelFlux
by providing an onNext, onError and
onComplete callback and triggers the execution chain for all 'rails'.onNext
- consumer of onNext signalsonError
- consumer of error signalonComplete
- callback on completion signalpublic final void subscribe(CoreSubscriber<? super T> s)
CorePublisher
Publisher.subscribe(Subscriber)
that will bypass
Hooks.onLastOperator(Function)
pointcut.
In addition to behave as expected by Publisher.subscribe(Subscriber)
in a controlled manner, it supports direct subscribe-time Context
passing.
subscribe
in interface CorePublisher<T>
s
- the Subscriber
interested into the published sequencePublisher.subscribe(Subscriber)
public final Disposable subscribe(@Nullable Consumer<? super T> onNext, @Nullable Consumer<? super Throwable> onError, @Nullable Runnable onComplete, @Nullable Consumer<? super Subscription> onSubscribe)
ParallelFlux
by providing an onNext, onError,
onComplete and onSubscribe callback and triggers the execution chain for all
'rails'.onNext
- consumer of onNext signalsonError
- consumer of error signalonComplete
- callback on completion signalonSubscribe
- consumer of the subscription signalpublic final Disposable subscribe(@Nullable Consumer<? super T> onNext, @Nullable Consumer<? super Throwable> onError, @Nullable Runnable onComplete, @Nullable Context initialContext)
ParallelFlux
by providing an onNext, onError and
onComplete callback as well as an initial Context
, then trigger the execution chain for all
'rails'.onNext
- consumer of onNext signalsonError
- consumer of error signalonComplete
- callback on completion signalinitialContext
- Context
for the railspublic final void subscribe(Subscriber<? super T> s)
sequential()
Flux and
subscribe
to said Flux.subscribe
in interface Publisher<T>
s
- the subscriber to use on sequential()
Fluxpublic final ParallelFlux<T> tag(String key, String value)
Set
of
all tags throughout the publisher chain by using Scannable.tags()
(as
traversed
by Scannable.parents()
).key
- a tag keyvalue
- a tag valuepublic final Mono<Void> then()
ParallelFlux
was emptypublic final <U> ParallelFlux<U> transform(Function<? super ParallelFlux<T>,ParallelFlux<U>> composer)
ParallelFlux
and returns another ParallelFlux
with composed features.U
- the output value typecomposer
- the composer function from ParallelFlux
(this) to another
ParallelFluxParallelFlux
returned by the functionpublic final <U> ParallelFlux<U> transformGroups(Function<? super GroupedFlux<Integer,T>,? extends Publisher<? extends U>> composer)
GroupedFlux
instances keyed by the zero based rail's index. The transformed groups are
parallelized
back once the transformation has been applied.
Since groups are generated anew per each subscription, this is all done in a "lazy"
fashion where each subscription trigger distinct applications of the Function
.
Note that like in groups()
, requests and cancellation compose through, and
cancelling only one rail may result in undefined behavior.
U
- the type of the resulting parallelized fluxcomposer
- the composition function to apply on each rail
ParallelFlux
of the composed groupsprotected final boolean validate(Subscriber<?>[] subscribers)
subscribers
- the array of Subscriberspublic int getPrefetch()
protected static <T> ParallelFlux<T> onAssembly(ParallelFlux<T> source)
T
- the value typesource
- the source to wrap