Class ParallelFlux<T>
- Type Parameters:
T- the value type
- All Implemented Interfaces:
Publisher<T>,CorePublisher<T>
'groups').
Use from(reactor.core.publisher.ParallelFlux<T>) to start processing a regular Publisher in 'rails', which each
cover a subset of the original Publisher's data. Flux.parallel() is a
convenient shortcut to achieve that on a Flux.
Use runOn(reactor.core.scheduler.Scheduler) to introduce where each 'rail' should run on thread-wise.
Use sequential() to merge the sources back into a single Flux.
Use then() to listen for all rails termination in the produced Mono
subscribe(Subscriber) if you simply want to subscribe to the merged sequence.
Note that other variants like subscribe(Consumer) instead do multiple
subscribes, one on each rail (which means that the lambdas should be as stateless and
side-effect free as possible).
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionfinal <U> Uas(Function<? super ParallelFlux<T>, U> converter) Perform a fluent transformation to a value via a converter function which receives this ParallelFlux.final ParallelFlux<T>Activate traceback (full assembly tracing) for this particularParallelFlux, in case of an error upstream of the checkpoint.final ParallelFlux<T>checkpoint(String description) Activate traceback (assembly marker) for this particularParallelFluxby giving it a description that will be reflected in the assembly traceback in case of an error upstream of the checkpoint.final ParallelFlux<T>checkpoint(String description, boolean forceStackTrace) Activate traceback (full assembly tracing or the lighter assembly marking depending on theforceStackTraceoption).final <C> ParallelFlux<C>collect(Supplier<? extends C> collectionSupplier, BiConsumer<? super C, ? super T> collector) Collect the elements in each rail into a collection supplied via a collectionSupplier and collected into with a collector action, emitting the collection at the end.collectSortedList(Comparator<? super T> comparator) Sorts the 'rails' according to the comparator and returns a full sorted list as a Publisher.collectSortedList(Comparator<? super T> comparator, int capacityHint) Sorts the 'rails' according to the comparator and returns a full sorted list as a Publisher.final <R> ParallelFlux<R>Generates and concatenates Publishers on each 'rail', signalling errors immediately and generating 2 publishers upfront.final <R> ParallelFlux<R>Generates and concatenates Publishers on each 'rail', signalling errors immediately and using the given prefetch amount for generating Publishers upfront.final <R> ParallelFlux<R>concatMapDelayError(Function<? super T, ? extends Publisher<? extends R>> mapper) Generates and concatenates Publishers on each 'rail', delaying errors and generating 2 publishers upfront.final ParallelFlux<T>doAfterTerminate(Runnable afterTerminate) Run the specified runnable when a 'rail' completes or signals an error.final ParallelFlux<T>doOnCancel(Runnable onCancel) Run the specified runnable when a 'rail' receives a cancellation.final ParallelFlux<T>doOnComplete(Runnable onComplete) Run the specified runnable when a 'rail' completes.final ParallelFlux<T>Triggers side-effects when theParallelFluxemits an item, fails with an error or completes successfully.final ParallelFlux<T>Call the specified consumer with the exception passing through any 'rail'.final ParallelFlux<T>Call the specified consumer with the current element passing through any 'rail'.final ParallelFlux<T>doOnRequest(LongConsumer onRequest) Call the specified consumer with the request amount if any rail receives a request.final ParallelFlux<T>doOnSubscribe(Consumer<? super Subscription> onSubscribe) Call the specified callback when a 'rail' receives a Subscription from its upstream.final ParallelFlux<T>doOnTerminate(Runnable onTerminate) Triggered when theParallelFluxterminates, either by completing successfully or with an error.final ParallelFlux<T>Filters the source values on each 'rail'.final <R> ParallelFlux<R>Generates and flattens Publishers on each 'rail'.final <R> ParallelFlux<R>Generates and flattens Publishers on each 'rail', optionally delaying errors.final <R> ParallelFlux<R>flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency) Generates and flattens Publishers on each 'rail', optionally delaying errors and having a total number of simultaneous subscriptions to the inner Publishers.final <R> ParallelFlux<R>flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency, int prefetch) Generates and flattens Publishers on each 'rail', optionally delaying errors, having a total number of simultaneous subscriptions to the inner Publishers and using the given prefetch amount for the inner Publishers.static <T> ParallelFlux<T>Take a Publisher and prepare to consume it on multiple 'rails' (one per CPU core) in a round-robin fashion.static <T> ParallelFlux<T>Take a Publisher and prepare to consume it onparallelismnumber of 'rails', possibly ordered and in a round-robin fashion.static <T> ParallelFlux<T>from(Publisher<? extends T> source, int parallelism, int prefetch, Supplier<Queue<T>> queueSupplier) Take a Publisher and prepare to consume it onparallelismnumber of 'rails' and in a round-robin fashion and use custom prefetch amount and queue for dealing with the source Publisher's values.static <T> ParallelFlux<T>Wraps multiple Publishers into aParallelFluxwhich runs them in parallel and unordered.intThe prefetch configuration of the componentfinal Flux<GroupedFlux<Integer,T>> groups()Exposes the 'rails' as individual GroupedFlux instances, keyed by the rail index (zero based).final ParallelFlux<T>hide()Hides the identities of thisParallelFluxand itsSubscriptionas well.final ParallelFlux<T>log()Observe all Reactive Streams signals and useLoggersupport to handle trace implementation.final ParallelFlux<T>Observe all Reactive Streams signals and useLoggersupport to handle trace implementation.final ParallelFlux<T>log(@Nullable String category, Level level, boolean showOperatorLine, SignalType... options) Observe Reactive Streams signals matching the passed filteroptionsand useLoggersupport to handle trace implementation.final ParallelFlux<T>log(@Nullable String category, Level level, SignalType... options) Observe Reactive Streams signals matching the passed filteroptionsand useLoggersupport to handle trace implementation.final <U> ParallelFlux<U>Maps the source values on each 'rail' to another value.final ParallelFlux<T>Give a name to this sequence, which can be retrieved usingScannable.name()as long as this is the first reachableScannable.parents().protected static <T> ParallelFlux<T>onAssembly(ParallelFlux<T> source) ordered(Comparator<? super T> comparator) Merges the values from each 'rail', but choose which one to merge by way of a providedComparator, picking the smallest of all rails.ordered(Comparator<? super T> comparator, int prefetch) Merges the values from each 'rail', but choose which one to merge by way of a providedComparator, picking the smallest of all rails.abstract intReturns the number of expected parallel Subscribers.reduce(BiFunction<T, T, T> reducer) Reduces all values within a 'rail' and across 'rails' with a reducer function into a single sequential value.final <R> ParallelFlux<R>reduce(Supplier<R> initialSupplier, BiFunction<R, ? super T, R> reducer) Reduces all values within a 'rail' to a single value (with a possibly different type) via a reducer function that is initialized on each rail from an initialSupplier value.final ParallelFlux<T>Specifies where each 'rail' will observe its incoming values with possible work-stealing and default prefetch amount.final ParallelFlux<T>Specifies where each 'rail' will observe its incoming values with possible work-stealing and a given prefetch amount.Merges the values from each 'rail' in a round-robin or same-order fashion and exposes it as a regular Publisher sequence, running with a default prefetch value for the rails.sequential(int prefetch) Merges the values from each 'rail' in a round-robin or same-order fashion and exposes it as a regular Publisher sequence, running with a give prefetch value for the rails.sorted(Comparator<? super T> comparator) Sorts the 'rails' of thisParallelFluxand returns a Publisher that sequentially picks the smallest next value from the rails.sorted(Comparator<? super T> comparator, int capacityHint) Sorts the 'rails' of thisParallelFluxand returns a Publisher that sequentially picks the smallest next value from the rails.final DisposableSubscribes to thisParallelFluxand triggers the execution chain for all 'rails'.final Disposablesubscribe(@Nullable Consumer<? super T> onNext, @Nullable Consumer<? super Throwable> onError, @Nullable Runnable onComplete) Subscribes to thisParallelFluxby providing an onNext, onError and onComplete callback and triggers the execution chain for all 'rails'.final Disposablesubscribe(@Nullable Consumer<? super T> onNext, @Nullable Consumer<? super Throwable> onError, @Nullable Runnable onComplete, @Nullable Consumer<? super Subscription> onSubscribe) Subscribes to thisParallelFluxby providing an onNext, onError, onComplete and onSubscribe callback and triggers the execution chain for all 'rails'.final Disposablesubscribe(@Nullable Consumer<? super T> onNext, @Nullable Consumer<? super Throwable> onError, @Nullable Runnable onComplete, @Nullable Context initialContext) Subscribes to thisParallelFluxby providing an onNext, onError and onComplete callback as well as an initialContext, then trigger the execution chain for all 'rails'.final DisposableSubscribes to thisParallelFluxby providing an onNext and onError callback and triggers the execution chain for all 'rails'.final DisposableSubscribes to thisParallelFluxby providing an onNext callback and triggers the execution chain for all 'rails'.final voidsubscribe(Subscriber<? super T> s) Merge the rails into asequential()Flux andsubscribeto said Flux.final voidsubscribe(CoreSubscriber<? super T> s) An internalPublisher.subscribe(Subscriber)that will bypassHooks.onLastOperator(Function)pointcut.abstract voidsubscribe(CoreSubscriber<? super T>[] subscribers) Subscribes an array of Subscribers to thisParallelFluxand triggers the execution chain for all 'rails'.final ParallelFlux<T>Tag this ParallelFlux with a key/value pair.then()Emit an onComplete or onError signal once all values across 'rails' have been observed.toString()final <U> ParallelFlux<U>transform(Function<? super ParallelFlux<T>, ParallelFlux<U>> composer) Allows composing operators, in assembly time, on top of thisParallelFluxand returns anotherParallelFluxwith composed features.final <U> ParallelFlux<U>transformGroups(Function<? super GroupedFlux<Integer, T>, ? extends Publisher<? extends U>> composer) Allows composing operators off the groups (or 'rails'), as individualGroupedFluxinstances keyed by the zero based rail's index.protected final booleanvalidate(Subscriber<?>[] subscribers) Validates the number of subscribers and returns true if their number matches the parallelism level of this ParallelFlux.
-
Constructor Details
-
ParallelFlux
public ParallelFlux()
-
-
Method Details
-
from
Take a Publisher and prepare to consume it on multiple 'rails' (one per CPU core) in a round-robin fashion. Equivalent toFlux.parallel().- Type Parameters:
T- the value type- Parameters:
source- the source Publisher- Returns:
- the
ParallelFluxinstance
-
from
Take a Publisher and prepare to consume it onparallelismnumber of 'rails', possibly ordered and in a round-robin fashion.- Type Parameters:
T- the value type- Parameters:
source- the source Publisherparallelism- the number of parallel rails- Returns:
- the new
ParallelFluxinstance
-
from
public static <T> ParallelFlux<T> from(Publisher<? extends T> source, int parallelism, int prefetch, Supplier<Queue<T>> queueSupplier) Take a Publisher and prepare to consume it onparallelismnumber of 'rails' and in a round-robin fashion and use custom prefetch amount and queue for dealing with the source Publisher's values.- Type Parameters:
T- the value type- Parameters:
source- the source Publisherparallelism- the number of parallel railsprefetch- the number of values to prefetch from the sourcequeueSupplier- the queue structure supplier to hold the prefetched values from the source until there is a rail ready to process it.- Returns:
- the new
ParallelFluxinstance
-
from
Wraps multiple Publishers into aParallelFluxwhich runs them in parallel and unordered.- Type Parameters:
T- the value type- Parameters:
publishers- the array of publishers- Returns:
- the new
ParallelFluxinstance
-
as
Perform a fluent transformation to a value via a converter function which receives this ParallelFlux.- Type Parameters:
U- the output value type- Parameters:
converter- the converter function fromParallelFluxto some type- Returns:
- the value returned by the converter function
-
checkpoint
Activate traceback (full assembly tracing) for this particularParallelFlux, in case of an error upstream of the checkpoint. Tracing incurs the cost of an exception stack trace creation.It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
The traceback is attached to the error as a
suppressed exception. As such, if the error is acomposite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected viaExceptions.isTraceback(Throwable).- Returns:
- the assembly tracing
ParallelFlux
-
checkpoint
Activate traceback (assembly marker) for this particularParallelFluxby giving it a description that will be reflected in the assembly traceback in case of an error upstream of the checkpoint. Note that unlikecheckpoint(), this doesn't create a filled stack trace, avoiding the main cost of the operator. However, as a trade-off the description must be unique enough for the user to find out where this ParallelFlux was assembled. If you only want a generic description, and still rely on the stack trace to find the assembly site, use thecheckpoint(String, boolean)variant.It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
The traceback is attached to the error as a
suppressed exception. As such, if the error is acomposite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected viaExceptions.isTraceback(Throwable).- Parameters:
description- a unique enough description to include in the light assembly traceback.- Returns:
- the assembly marked
ParallelFlux
-
checkpoint
Activate traceback (full assembly tracing or the lighter assembly marking depending on theforceStackTraceoption).By setting the
forceStackTraceparameter to true, activate assembly tracing for this particularParallelFluxand give it a description that will be reflected in the assembly traceback in case of an error upstream of the checkpoint. Note that unlikecheckpoint(String), this will incur the cost of an exception stack trace creation. The description could for example be a meaningful name for the assembled ParallelFlux or a wider correlation ID, since the stack trace will always provide enough information to locate where this ParallelFlux was assembled.By setting
forceStackTraceto false, behaves likecheckpoint(String)and is subject to the same caveat in choosing the description.It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly marker.
The traceback is attached to the error as a
suppressed exception. As such, if the error is acomposite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected viaExceptions.isTraceback(Throwable).- Parameters:
description- a description (must be unique enough if forceStackTrace is set to false).forceStackTrace- false to make a light checkpoint without a stacktrace, true to use a stack trace.- Returns:
- the assembly marked
ParallelFlux.
-
collect
public final <C> ParallelFlux<C> collect(Supplier<? extends C> collectionSupplier, BiConsumer<? super C, ? super T> collector) Collect the elements in each rail into a collection supplied via a collectionSupplier and collected into with a collector action, emitting the collection at the end.- Type Parameters:
C- the collection type- Parameters:
collectionSupplier- the supplier of the collection in each railcollector- the collector, taking the per-rail collection and the current item- Returns:
- the new
ParallelFluxinstance
-
collectSortedList
Sorts the 'rails' according to the comparator and returns a full sorted list as a Publisher.This operator requires a finite source ParallelFlux.
- Parameters:
comparator- the comparator to compare elements- Returns:
- the new Flux instance
-
collectSortedList
Sorts the 'rails' according to the comparator and returns a full sorted list as a Publisher.This operator requires a finite source ParallelFlux.
- Parameters:
comparator- the comparator to compare elementscapacityHint- the expected number of total elements- Returns:
- the new Mono instance
-
concatMap
public final <R> ParallelFlux<R> concatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) Generates and concatenates Publishers on each 'rail', signalling errors immediately and generating 2 publishers upfront.- Type Parameters:
R- the result type- Parameters:
mapper- the function to map each rail's value into a Publisher source and the inner Publishers (immediate, boundary, end)- Returns:
- the new
ParallelFluxinstance
-
concatMap
public final <R> ParallelFlux<R> concatMap(Function<? super T, ? extends Publisher<? extends R>> mapper, int prefetch) Generates and concatenates Publishers on each 'rail', signalling errors immediately and using the given prefetch amount for generating Publishers upfront.- Type Parameters:
R- the result type- Parameters:
mapper- the function to map each rail's value into a Publisherprefetch- the number of items to prefetch from each inner Publisher source and the inner Publishers (immediate, boundary, end)- Returns:
- the new
ParallelFluxinstance
-
concatMapDelayError
public final <R> ParallelFlux<R> concatMapDelayError(Function<? super T, ? extends Publisher<? extends R>> mapper) Generates and concatenates Publishers on each 'rail', delaying errors and generating 2 publishers upfront.- Type Parameters:
R- the result type- Parameters:
mapper- the function to map each rail's value into a Publisher source and the inner Publishers (immediate, boundary, end)- Returns:
- the new
ParallelFluxinstance
-
doAfterTerminate
Run the specified runnable when a 'rail' completes or signals an error.- Parameters:
afterTerminate- the callback- Returns:
- the new
ParallelFluxinstance
-
doOnCancel
Run the specified runnable when a 'rail' receives a cancellation.- Parameters:
onCancel- the callback- Returns:
- the new
ParallelFluxinstance
-
doOnComplete
Run the specified runnable when a 'rail' completes.- Parameters:
onComplete- the callback- Returns:
- the new
ParallelFluxinstance
-
doOnEach
Triggers side-effects when theParallelFluxemits an item, fails with an error or completes successfully. All these events are represented as aSignalthat is passed to the side-effect callback. Note that withParallelFluxand thelambda-based subscribesor thearray-based one, onError and onComplete will be invoked as many times as there are rails, resulting in as many correspondingSignalbeing seen in the callback.Use of
subscribe(Subscriber), which callssequential(), might cancel some rails, resulting in less signals being observed. This is an advanced operator, typically used for monitoring of a ParallelFlux.- Parameters:
signalConsumer- the mandatory callback to call onSubscriber.onNext(Object),Subscriber.onError(Throwable)andSubscriber.onComplete()- Returns:
- an observed
ParallelFlux - See Also:
-
doOnError
Call the specified consumer with the exception passing through any 'rail'.- Parameters:
onError- the callback- Returns:
- the new
ParallelFluxinstance
-
doOnSubscribe
Call the specified callback when a 'rail' receives a Subscription from its upstream.This method is not intended for capturing the subscription and calling its methods, but for side effects like monitoring. For instance, the correct way to cancel a subscription is to call
Disposable.dispose()on the Disposable returned bysubscribe().- Parameters:
onSubscribe- the callback- Returns:
- the new
ParallelFluxinstance
-
doOnNext
Call the specified consumer with the current element passing through any 'rail'.- Parameters:
onNext- the callback- Returns:
- the new
ParallelFluxinstance
-
doOnRequest
Call the specified consumer with the request amount if any rail receives a request.- Parameters:
onRequest- the callback- Returns:
- the new
ParallelFluxinstance
-
doOnTerminate
Triggered when theParallelFluxterminates, either by completing successfully or with an error.- Parameters:
onTerminate- the callback to call onSubscriber.onComplete()orSubscriber.onError(java.lang.Throwable)- Returns:
- an observed
ParallelFlux
-
filter
Filters the source values on each 'rail'.Note that the same predicate may be called from multiple threads concurrently.
- Parameters:
predicate- the function returning true to keep a value or false to drop a value- Returns:
- the new
ParallelFluxinstance
-
flatMap
public final <R> ParallelFlux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) Generates and flattens Publishers on each 'rail'.Errors are not delayed and uses unbounded concurrency along with default inner prefetch.
- Type Parameters:
R- the result type- Parameters:
mapper- the function to map each rail's value into a Publisher- Returns:
- the new
ParallelFluxinstance
-
flatMap
public final <R> ParallelFlux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper, boolean delayError) Generates and flattens Publishers on each 'rail', optionally delaying errors.It uses unbounded concurrency along with default inner prefetch.
- Type Parameters:
R- the result type- Parameters:
mapper- the function to map each rail's value into a PublisherdelayError- should the errors from the main and the inner sources delayed till everybody terminates?- Returns:
- the new
ParallelFluxinstance
-
flatMap
public final <R> ParallelFlux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency) Generates and flattens Publishers on each 'rail', optionally delaying errors and having a total number of simultaneous subscriptions to the inner Publishers.It uses a default inner prefetch.
- Type Parameters:
R- the result type- Parameters:
mapper- the function to map each rail's value into a PublisherdelayError- should the errors from the main and the inner sources delayed till everybody terminates?maxConcurrency- the maximum number of simultaneous subscriptions to the generated inner Publishers- Returns:
- the new
ParallelFluxinstance
-
flatMap
public final <R> ParallelFlux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency, int prefetch) Generates and flattens Publishers on each 'rail', optionally delaying errors, having a total number of simultaneous subscriptions to the inner Publishers and using the given prefetch amount for the inner Publishers.- Type Parameters:
R- the result type- Parameters:
mapper- the function to map each rail's value into a PublisherdelayError- should the errors from the main and the inner sources delayed till everybody terminates?maxConcurrency- the maximum number of simultaneous subscriptions to the generated inner Publishersprefetch- the number of items to prefetch from each inner Publisher- Returns:
- the new
ParallelFluxinstance
-
groups
Exposes the 'rails' as individual GroupedFlux instances, keyed by the rail index (zero based).Each group can be consumed only once; requests and cancellation compose through. Note that cancelling only one rail may result in undefined behavior.
- Returns:
- the new Flux instance
-
hide
Hides the identities of thisParallelFluxand itsSubscriptionas well.- Returns:
- a new
ParallelFluxdefeating anyPublisher/Subscriptionfeature-detection
-
log
Observe all Reactive Streams signals and useLoggersupport to handle trace implementation. Default will useLevel.INFOand java.util.logging. If SLF4J is available, it will be used instead.The default log category will be "reactor.*", a generated operator suffix will complete, e.g. "reactor.Parallel.Map".
- Returns:
- a new unaltered
ParallelFlux
-
log
Observe all Reactive Streams signals and useLoggersupport to handle trace implementation. Default will useLevel.INFOand java.util.logging. If SLF4J is available, it will be used instead.- Parameters:
category- to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will complete, e.g. "reactor.Parallel.Map".- Returns:
- a new unaltered
ParallelFlux
-
log
Observe Reactive Streams signals matching the passed filteroptionsand useLoggersupport to handle trace implementation. Default will use the passedLeveland java.util.logging. If SLF4J is available, it will be used instead.Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
ParallelFlux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)- Parameters:
category- to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will complete, e.g. "reactor.Parallel.Map".level- theLevelto enforce for this tracing ParallelFlux (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account)options- a varargSignalTypeoption to filter log messages- Returns:
- a new unaltered
ParallelFlux
-
log
public final ParallelFlux<T> log(@Nullable String category, Level level, boolean showOperatorLine, SignalType... options) Observe Reactive Streams signals matching the passed filteroptionsand useLoggersupport to handle trace implementation. Default will use the passedLeveland java.util.logging. If SLF4J is available, it will be used instead.Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
ParallelFlux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)- Parameters:
category- to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will complete, e.g. "reactor.ParallelFlux.Map".level- theLevelto enforce for this tracing ParallelFlux (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account)showOperatorLine- capture the current stack to display operator class/line number.options- a varargSignalTypeoption to filter log messages- Returns:
- a new unaltered
ParallelFlux
-
map
Maps the source values on each 'rail' to another value.Note that the same mapper function may be called from multiple threads concurrently.
- Type Parameters:
U- the output value type- Parameters:
mapper- the mapper function turning Ts into Us.- Returns:
- the new
ParallelFluxinstance
-
name
Give a name to this sequence, which can be retrieved usingScannable.name()as long as this is the first reachableScannable.parents().- Parameters:
name- a name for the sequence- Returns:
- the same sequence, but bearing a name
-
ordered
Merges the values from each 'rail', but choose which one to merge by way of a providedComparator, picking the smallest of all rails. The result is exposed back as aFlux.This version uses a default prefetch of
Queues.SMALL_BUFFER_SIZE.- Parameters:
comparator- the comparator to choose the smallest value available from all rails- Returns:
- the new Flux instance
- See Also:
-
ordered
Merges the values from each 'rail', but choose which one to merge by way of a providedComparator, picking the smallest of all rails. The result is exposed back as aFlux.- Parameters:
comparator- the comparator to choose the smallest value available from all railsprefetch- the prefetch to use- Returns:
- the new Flux instance
- See Also:
-
parallelism
public abstract int parallelism()Returns the number of expected parallel Subscribers.- Returns:
- the number of expected parallel Subscribers
-
reduce
Reduces all values within a 'rail' and across 'rails' with a reducer function into a single sequential value.Note that the same reducer function may be called from multiple threads concurrently.
- Parameters:
reducer- the function to reduce two values into one.- Returns:
- the new Mono instance emitting the reduced value or empty if the
ParallelFluxwas empty
-
reduce
public final <R> ParallelFlux<R> reduce(Supplier<R> initialSupplier, BiFunction<R, ? super T, R> reducer) Reduces all values within a 'rail' to a single value (with a possibly different type) via a reducer function that is initialized on each rail from an initialSupplier value.Note that the same mapper function may be called from multiple threads concurrently.
- Type Parameters:
R- the reduced output type- Parameters:
initialSupplier- the supplier for the initial valuereducer- the function to reduce a previous output of reduce (or the initial value supplied) with a current source value.- Returns:
- the new
ParallelFluxinstance
-
runOn
Specifies where each 'rail' will observe its incoming values with possible work-stealing and default prefetch amount.This operator uses the default prefetch size returned by
Queues.SMALL_BUFFER_SIZE.The operator will call
Scheduler.createWorker()as many times as this ParallelFlux's parallelism level is.No assumptions are made about the Scheduler's parallelism level, if the Scheduler's parallelism level is lower than the ParallelFlux's, some rails may end up on the same thread/worker.
This operator doesn't require the Scheduler to be trampolining as it does its own built-in trampolining logic.
- Parameters:
scheduler- the scheduler to use- Returns:
- the new
ParallelFluxinstance
-
runOn
Specifies where each 'rail' will observe its incoming values with possible work-stealing and a given prefetch amount.This operator uses the default prefetch size returned by
Queues.SMALL_BUFFER_SIZE.The operator will call
Scheduler.createWorker()as many times as this ParallelFlux's parallelism level is.No assumptions are made about the Scheduler's parallelism level, if the Scheduler's parallelism level is lower than the ParallelFlux's, some rails may end up on the same thread/worker.
This operator doesn't require the Scheduler to be trampolining as it does its own built-in trampolining logic.
- Parameters:
scheduler- the scheduler to use that rail's worker has run out of work.prefetch- the number of values to request on each 'rail' from the source- Returns:
- the new
ParallelFluxinstance
-
sequential
Merges the values from each 'rail' in a round-robin or same-order fashion and exposes it as a regular Publisher sequence, running with a default prefetch value for the rails.This operator uses the default prefetch size returned by
Queues.SMALL_BUFFER_SIZE.- Returns:
- the new Flux instance
- See Also:
-
sequential
Merges the values from each 'rail' in a round-robin or same-order fashion and exposes it as a regular Publisher sequence, running with a give prefetch value for the rails.- Parameters:
prefetch- the prefetch amount to use for each rail- Returns:
- the new Flux instance
-
sorted
Sorts the 'rails' of thisParallelFluxand returns a Publisher that sequentially picks the smallest next value from the rails.This operator requires a finite source ParallelFlux.
- Parameters:
comparator- the comparator to use- Returns:
- the new Flux instance
-
sorted
Sorts the 'rails' of thisParallelFluxand returns a Publisher that sequentially picks the smallest next value from the rails.This operator requires a finite source ParallelFlux.
- Parameters:
comparator- the comparator to usecapacityHint- the expected number of total elements- Returns:
- the new Flux instance
-
subscribe
Subscribes an array of Subscribers to thisParallelFluxand triggers the execution chain for all 'rails'.- Parameters:
subscribers- the subscribers array to run in parallel, the number of items must be equal to the parallelism level of this ParallelFlux
-
subscribe
Subscribes to thisParallelFluxand triggers the execution chain for all 'rails'. -
subscribe
Subscribes to thisParallelFluxby providing an onNext callback and triggers the execution chain for all 'rails'.- Parameters:
onNext- consumer of onNext signals
-
subscribe
public final Disposable subscribe(@Nullable Consumer<? super T> onNext, Consumer<? super Throwable> onError) Subscribes to thisParallelFluxby providing an onNext and onError callback and triggers the execution chain for all 'rails'.- Parameters:
onNext- consumer of onNext signalsonError- consumer of error signal
-
subscribe
public final Disposable subscribe(@Nullable Consumer<? super T> onNext, @Nullable Consumer<? super Throwable> onError, @Nullable Runnable onComplete) Subscribes to thisParallelFluxby providing an onNext, onError and onComplete callback and triggers the execution chain for all 'rails'.- Parameters:
onNext- consumer of onNext signalsonError- consumer of error signalonComplete- callback on completion signal
-
subscribe
Description copied from interface:CorePublisherAn internalPublisher.subscribe(Subscriber)that will bypassHooks.onLastOperator(Function)pointcut.In addition to behave as expected by
Publisher.subscribe(Subscriber)in a controlled manner, it supports direct subscribe-timeContextpassing.- Specified by:
subscribein interfaceCorePublisher<T>- Parameters:
s- theSubscriberinterested into the published sequence- See Also:
-
subscribe
public final Disposable subscribe(@Nullable Consumer<? super T> onNext, @Nullable Consumer<? super Throwable> onError, @Nullable Runnable onComplete, @Nullable Consumer<? super Subscription> onSubscribe) Subscribes to thisParallelFluxby providing an onNext, onError, onComplete and onSubscribe callback and triggers the execution chain for all 'rails'.- Parameters:
onNext- consumer of onNext signalsonError- consumer of error signalonComplete- callback on completion signalonSubscribe- consumer of the subscription signal
-
subscribe
public final Disposable subscribe(@Nullable Consumer<? super T> onNext, @Nullable Consumer<? super Throwable> onError, @Nullable Runnable onComplete, @Nullable Context initialContext) Subscribes to thisParallelFluxby providing an onNext, onError and onComplete callback as well as an initialContext, then trigger the execution chain for all 'rails'.- Parameters:
onNext- consumer of onNext signalsonError- consumer of error signalonComplete- callback on completion signalinitialContext-Contextfor the rails
-
subscribe
Merge the rails into asequential()Flux andsubscribeto said Flux.- Specified by:
subscribein interfacePublisher<T>- Parameters:
s- the subscriber to use onsequential()Flux
-
tag
Tag this ParallelFlux with a key/value pair. These can be retrieved as aSetof all tags throughout the publisher chain by usingScannable.tags()(as traversed byScannable.parents()).- Parameters:
key- a tag keyvalue- a tag value- Returns:
- the same sequence, but bearing tags
-
then
Emit an onComplete or onError signal once all values across 'rails' have been observed.- Returns:
- the new Mono instance emitting the reduced value or empty if the
ParallelFluxwas empty
-
transform
public final <U> ParallelFlux<U> transform(Function<? super ParallelFlux<T>, ParallelFlux<U>> composer) Allows composing operators, in assembly time, on top of thisParallelFluxand returns anotherParallelFluxwith composed features.- Type Parameters:
U- the output value type- Parameters:
composer- the composer function fromParallelFlux(this) to another ParallelFlux- Returns:
- the
ParallelFluxreturned by the function
-
transformGroups
public final <U> ParallelFlux<U> transformGroups(Function<? super GroupedFlux<Integer, T>, ? extends Publisher<? extends U>> composer) Allows composing operators off the groups (or 'rails'), as individualGroupedFluxinstances keyed by the zero based rail's index. The transformed groups areparallelizedback once the transformation has been applied. Since groups are generated anew per each subscription, this is all done in a "lazy" fashion where each subscription trigger distinct applications of theFunction.Note that like in
groups(), requests and cancellation compose through, and cancelling only one rail may result in undefined behavior.- Type Parameters:
U- the type of the resulting parallelized flux- Parameters:
composer- the composition function to apply on eachrail- Returns:
- a
ParallelFluxof the composed groups
-
toString
-
validate
Validates the number of subscribers and returns true if their number matches the parallelism level of this ParallelFlux.- Parameters:
subscribers- the array of Subscribers- Returns:
- true if the number of subscribers equals to the parallelism level
-
getPrefetch
public int getPrefetch()The prefetch configuration of the component- Returns:
- the prefetch configuration of the component
-
onAssembly
- Type Parameters:
T- the value type- Parameters:
source- the source to wrap- Returns:
- the potentially wrapped source
-