Class ParallelFlux<T>

java.lang.Object
reactor.core.publisher.ParallelFlux<T>
Type Parameters:
T - the value type
All Implemented Interfaces:
Publisher<T>, CorePublisher<T>

public abstract class ParallelFlux<T> extends Object implements CorePublisher<T>
A ParallelFlux publishes to an array of Subscribers, in parallel 'rails' (or 'groups').

Use from(reactor.core.publisher.ParallelFlux<T>) to start processing a regular Publisher in 'rails', which each cover a subset of the original Publisher's data. Flux.parallel() is a convenient shortcut to achieve that on a Flux.

Use runOn(reactor.core.scheduler.Scheduler) to introduce where each 'rail' should run on thread-wise.

Use sequential() to merge the sources back into a single Flux.

Use then() to listen for all rails termination in the produced Mono

subscribe(Subscriber) if you simply want to subscribe to the merged sequence. Note that other variants like subscribe(Consumer) instead do multiple subscribes, one on each rail (which means that the lambdas should be as stateless and side-effect free as possible).

  • Constructor Details

    • ParallelFlux

      public ParallelFlux()
  • Method Details

    • from

      public static <T> ParallelFlux<T> from(Publisher<? extends T> source)
      Take a Publisher and prepare to consume it on multiple 'rails' (one per CPU core) in a round-robin fashion. Equivalent to Flux.parallel().
      Type Parameters:
      T - the value type
      Parameters:
      source - the source Publisher
      Returns:
      the ParallelFlux instance
    • from

      public static <T> ParallelFlux<T> from(Publisher<? extends T> source, int parallelism)
      Take a Publisher and prepare to consume it on parallelism number of 'rails', possibly ordered and in a round-robin fashion.
      Type Parameters:
      T - the value type
      Parameters:
      source - the source Publisher
      parallelism - the number of parallel rails
      Returns:
      the new ParallelFlux instance
    • from

      public static <T> ParallelFlux<T> from(Publisher<? extends T> source, int parallelism, int prefetch, Supplier<Queue<T>> queueSupplier)
      Take a Publisher and prepare to consume it on parallelism number of 'rails' and in a round-robin fashion and use custom prefetch amount and queue for dealing with the source Publisher's values.
      Type Parameters:
      T - the value type
      Parameters:
      source - the source Publisher
      parallelism - the number of parallel rails
      prefetch - the number of values to prefetch from the source
      queueSupplier - the queue structure supplier to hold the prefetched values from the source until there is a rail ready to process it.
      Returns:
      the new ParallelFlux instance
    • from

      @SafeVarargs public static <T> ParallelFlux<T> from(Publisher<T>... publishers)
      Wraps multiple Publishers into a ParallelFlux which runs them in parallel and unordered.
      Type Parameters:
      T - the value type
      Parameters:
      publishers - the array of publishers
      Returns:
      the new ParallelFlux instance
    • as

      public final <U> U as(Function<? super ParallelFlux<T>,U> converter)
      Perform a fluent transformation to a value via a converter function which receives this ParallelFlux.
      Type Parameters:
      U - the output value type
      Parameters:
      converter - the converter function from ParallelFlux to some type
      Returns:
      the value returned by the converter function
    • checkpoint

      public final ParallelFlux<T> checkpoint()
      Activate traceback (full assembly tracing) for this particular ParallelFlux, in case of an error upstream of the checkpoint. Tracing incurs the cost of an exception stack trace creation.

      It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.

      The traceback is attached to the error as a suppressed exception. As such, if the error is a composite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected via Exceptions.isTraceback(Throwable).

      Returns:
      the assembly tracing ParallelFlux
    • checkpoint

      public final ParallelFlux<T> checkpoint(String description)
      Activate traceback (assembly marker) for this particular ParallelFlux by giving it a description that will be reflected in the assembly traceback in case of an error upstream of the checkpoint. Note that unlike checkpoint(), this doesn't create a filled stack trace, avoiding the main cost of the operator. However, as a trade-off the description must be unique enough for the user to find out where this ParallelFlux was assembled. If you only want a generic description, and still rely on the stack trace to find the assembly site, use the checkpoint(String, boolean) variant.

      It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.

      The traceback is attached to the error as a suppressed exception. As such, if the error is a composite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected via Exceptions.isTraceback(Throwable).

      Parameters:
      description - a unique enough description to include in the light assembly traceback.
      Returns:
      the assembly marked ParallelFlux
    • checkpoint

      public final ParallelFlux<T> checkpoint(String description, boolean forceStackTrace)
      Activate traceback (full assembly tracing or the lighter assembly marking depending on the forceStackTrace option).

      By setting the forceStackTrace parameter to true, activate assembly tracing for this particular ParallelFlux and give it a description that will be reflected in the assembly traceback in case of an error upstream of the checkpoint. Note that unlike checkpoint(String), this will incur the cost of an exception stack trace creation. The description could for example be a meaningful name for the assembled ParallelFlux or a wider correlation ID, since the stack trace will always provide enough information to locate where this ParallelFlux was assembled.

      By setting forceStackTrace to false, behaves like checkpoint(String) and is subject to the same caveat in choosing the description.

      It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly marker.

      The traceback is attached to the error as a suppressed exception. As such, if the error is a composite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected via Exceptions.isTraceback(Throwable).

      Parameters:
      description - a description (must be unique enough if forceStackTrace is set to false).
      forceStackTrace - false to make a light checkpoint without a stacktrace, true to use a stack trace.
      Returns:
      the assembly marked ParallelFlux.
    • collect

      public final <C> ParallelFlux<C> collect(Supplier<? extends C> collectionSupplier, BiConsumer<? super C,? super T> collector)
      Collect the elements in each rail into a collection supplied via a collectionSupplier and collected into with a collector action, emitting the collection at the end.
      Type Parameters:
      C - the collection type
      Parameters:
      collectionSupplier - the supplier of the collection in each rail
      collector - the collector, taking the per-rail collection and the current item
      Returns:
      the new ParallelFlux instance
    • collectSortedList

      public final Mono<List<T>> collectSortedList(Comparator<? super T> comparator)
      Sorts the 'rails' according to the comparator and returns a full sorted list as a Publisher.

      This operator requires a finite source ParallelFlux.

      Parameters:
      comparator - the comparator to compare elements
      Returns:
      the new Flux instance
    • collectSortedList

      public final Mono<List<T>> collectSortedList(Comparator<? super T> comparator, int capacityHint)
      Sorts the 'rails' according to the comparator and returns a full sorted list as a Publisher.

      This operator requires a finite source ParallelFlux.

      Parameters:
      comparator - the comparator to compare elements
      capacityHint - the expected number of total elements
      Returns:
      the new Mono instance
    • concatMap

      public final <R> ParallelFlux<R> concatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
      Generates and concatenates Publishers on each 'rail', signalling errors immediately and generating 2 publishers upfront.
      Type Parameters:
      R - the result type
      Parameters:
      mapper - the function to map each rail's value into a Publisher source and the inner Publishers (immediate, boundary, end)
      Returns:
      the new ParallelFlux instance
    • concatMap

      public final <R> ParallelFlux<R> concatMap(Function<? super T,? extends Publisher<? extends R>> mapper, int prefetch)
      Generates and concatenates Publishers on each 'rail', signalling errors immediately and using the given prefetch amount for generating Publishers upfront.
      Type Parameters:
      R - the result type
      Parameters:
      mapper - the function to map each rail's value into a Publisher
      prefetch - the number of items to prefetch from each inner Publisher source and the inner Publishers (immediate, boundary, end)
      Returns:
      the new ParallelFlux instance
    • concatMapDelayError

      public final <R> ParallelFlux<R> concatMapDelayError(Function<? super T,? extends Publisher<? extends R>> mapper)
      Generates and concatenates Publishers on each 'rail', delaying errors and generating 2 publishers upfront.
      Type Parameters:
      R - the result type
      Parameters:
      mapper - the function to map each rail's value into a Publisher source and the inner Publishers (immediate, boundary, end)
      Returns:
      the new ParallelFlux instance
    • doAfterTerminate

      public final ParallelFlux<T> doAfterTerminate(Runnable afterTerminate)
      Run the specified runnable when a 'rail' completes or signals an error.
      Parameters:
      afterTerminate - the callback
      Returns:
      the new ParallelFlux instance
    • doOnCancel

      public final ParallelFlux<T> doOnCancel(Runnable onCancel)
      Run the specified runnable when a 'rail' receives a cancellation.
      Parameters:
      onCancel - the callback
      Returns:
      the new ParallelFlux instance
    • doOnComplete

      public final ParallelFlux<T> doOnComplete(Runnable onComplete)
      Run the specified runnable when a 'rail' completes.
      Parameters:
      onComplete - the callback
      Returns:
      the new ParallelFlux instance
    • doOnEach

      public final ParallelFlux<T> doOnEach(Consumer<? super Signal<T>> signalConsumer)
      Triggers side-effects when the ParallelFlux emits an item, fails with an error or completes successfully. All these events are represented as a Signal that is passed to the side-effect callback. Note that with ParallelFlux and the lambda-based subscribes or the array-based one, onError and onComplete will be invoked as many times as there are rails, resulting in as many corresponding Signal being seen in the callback.

      Use of subscribe(Subscriber), which calls sequential(), might cancel some rails, resulting in less signals being observed. This is an advanced operator, typically used for monitoring of a ParallelFlux.

      Parameters:
      signalConsumer - the mandatory callback to call on Subscriber.onNext(Object), Subscriber.onError(Throwable) and Subscriber.onComplete()
      Returns:
      an observed ParallelFlux
      See Also:
    • doOnError

      public final ParallelFlux<T> doOnError(Consumer<? super Throwable> onError)
      Call the specified consumer with the exception passing through any 'rail'.
      Parameters:
      onError - the callback
      Returns:
      the new ParallelFlux instance
    • doOnSubscribe

      public final ParallelFlux<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe)
      Call the specified callback when a 'rail' receives a Subscription from its upstream.

      This method is not intended for capturing the subscription and calling its methods, but for side effects like monitoring. For instance, the correct way to cancel a subscription is to call Disposable.dispose() on the Disposable returned by subscribe().

      Parameters:
      onSubscribe - the callback
      Returns:
      the new ParallelFlux instance
    • doOnNext

      public final ParallelFlux<T> doOnNext(Consumer<? super T> onNext)
      Call the specified consumer with the current element passing through any 'rail'.
      Parameters:
      onNext - the callback
      Returns:
      the new ParallelFlux instance
    • doOnRequest

      public final ParallelFlux<T> doOnRequest(LongConsumer onRequest)
      Call the specified consumer with the request amount if any rail receives a request.
      Parameters:
      onRequest - the callback
      Returns:
      the new ParallelFlux instance
    • doOnTerminate

      public final ParallelFlux<T> doOnTerminate(Runnable onTerminate)
      Triggered when the ParallelFlux terminates, either by completing successfully or with an error.
      Parameters:
      onTerminate - the callback to call on Subscriber.onComplete() or Subscriber.onError(java.lang.Throwable)
      Returns:
      an observed ParallelFlux
    • filter

      public final ParallelFlux<T> filter(Predicate<? super T> predicate)
      Filters the source values on each 'rail'.

      Note that the same predicate may be called from multiple threads concurrently.

      Parameters:
      predicate - the function returning true to keep a value or false to drop a value
      Returns:
      the new ParallelFlux instance
    • flatMap

      public final <R> ParallelFlux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
      Generates and flattens Publishers on each 'rail'.

      Errors are not delayed and uses unbounded concurrency along with default inner prefetch.

      Type Parameters:
      R - the result type
      Parameters:
      mapper - the function to map each rail's value into a Publisher
      Returns:
      the new ParallelFlux instance
    • flatMap

      public final <R> ParallelFlux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper, boolean delayError)
      Generates and flattens Publishers on each 'rail', optionally delaying errors.

      It uses unbounded concurrency along with default inner prefetch.

      Type Parameters:
      R - the result type
      Parameters:
      mapper - the function to map each rail's value into a Publisher
      delayError - should the errors from the main and the inner sources delayed till everybody terminates?
      Returns:
      the new ParallelFlux instance
    • flatMap

      public final <R> ParallelFlux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency)
      Generates and flattens Publishers on each 'rail', optionally delaying errors and having a total number of simultaneous subscriptions to the inner Publishers.

      It uses a default inner prefetch.

      Type Parameters:
      R - the result type
      Parameters:
      mapper - the function to map each rail's value into a Publisher
      delayError - should the errors from the main and the inner sources delayed till everybody terminates?
      maxConcurrency - the maximum number of simultaneous subscriptions to the generated inner Publishers
      Returns:
      the new ParallelFlux instance
    • flatMap

      public final <R> ParallelFlux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency, int prefetch)
      Generates and flattens Publishers on each 'rail', optionally delaying errors, having a total number of simultaneous subscriptions to the inner Publishers and using the given prefetch amount for the inner Publishers.
      Type Parameters:
      R - the result type
      Parameters:
      mapper - the function to map each rail's value into a Publisher
      delayError - should the errors from the main and the inner sources delayed till everybody terminates?
      maxConcurrency - the maximum number of simultaneous subscriptions to the generated inner Publishers
      prefetch - the number of items to prefetch from each inner Publisher
      Returns:
      the new ParallelFlux instance
    • groups

      public final Flux<GroupedFlux<Integer,T>> groups()
      Exposes the 'rails' as individual GroupedFlux instances, keyed by the rail index (zero based).

      Each group can be consumed only once; requests and cancellation compose through. Note that cancelling only one rail may result in undefined behavior.

      Returns:
      the new Flux instance
    • hide

      public final ParallelFlux<T> hide()
      Hides the identities of this ParallelFlux and its Subscription as well.
      Returns:
      a new ParallelFlux defeating any Publisher / Subscription feature-detection
    • log

      public final ParallelFlux<T> log()
      Observe all Reactive Streams signals and use Logger support to handle trace implementation. Default will use Level.INFO and java.util.logging. If SLF4J is available, it will be used instead.

      The default log category will be "reactor.*", a generated operator suffix will complete, e.g. "reactor.Parallel.Map".

      Returns:
      a new unaltered ParallelFlux
    • log

      public final ParallelFlux<T> log(@Nullable String category)
      Observe all Reactive Streams signals and use Logger support to handle trace implementation. Default will use Level.INFO and java.util.logging. If SLF4J is available, it will be used instead.

      Parameters:
      category - to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will complete, e.g. "reactor.Parallel.Map".
      Returns:
      a new unaltered ParallelFlux
    • log

      public final ParallelFlux<T> log(@Nullable String category, Level level, SignalType... options)
      Observe Reactive Streams signals matching the passed filter options and use Logger support to handle trace implementation. Default will use the passed Level and java.util.logging. If SLF4J is available, it will be used instead.

      Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:

           ParallelFlux.log("category", Level.INFO, SignalType.ON_NEXT,
       SignalType.ON_ERROR)
      
       

      Parameters:
      category - to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will complete, e.g. "reactor.Parallel.Map".
      level - the Level to enforce for this tracing ParallelFlux (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account)
      options - a vararg SignalType option to filter log messages
      Returns:
      a new unaltered ParallelFlux
    • log

      public final ParallelFlux<T> log(@Nullable String category, Level level, boolean showOperatorLine, SignalType... options)
      Observe Reactive Streams signals matching the passed filter options and use Logger support to handle trace implementation. Default will use the passed Level and java.util.logging. If SLF4J is available, it will be used instead.

      Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:

           ParallelFlux.log("category", Level.INFO, SignalType.ON_NEXT,
       SignalType.ON_ERROR)
      
       

      Parameters:
      category - to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will complete, e.g. "reactor.ParallelFlux.Map".
      level - the Level to enforce for this tracing ParallelFlux (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account)
      showOperatorLine - capture the current stack to display operator class/line number.
      options - a vararg SignalType option to filter log messages
      Returns:
      a new unaltered ParallelFlux
    • map

      public final <U> ParallelFlux<U> map(Function<? super T,? extends U> mapper)
      Maps the source values on each 'rail' to another value.

      Note that the same mapper function may be called from multiple threads concurrently.

      Type Parameters:
      U - the output value type
      Parameters:
      mapper - the mapper function turning Ts into Us.
      Returns:
      the new ParallelFlux instance
    • name

      public final ParallelFlux<T> name(String name)
      Give a name to this sequence, which can be retrieved using Scannable.name() as long as this is the first reachable Scannable.parents().
      Parameters:
      name - a name for the sequence
      Returns:
      the same sequence, but bearing a name
    • ordered

      public final Flux<T> ordered(Comparator<? super T> comparator)
      Merges the values from each 'rail', but choose which one to merge by way of a provided Comparator, picking the smallest of all rails. The result is exposed back as a Flux.

      This version uses a default prefetch of Queues.SMALL_BUFFER_SIZE.

      Parameters:
      comparator - the comparator to choose the smallest value available from all rails
      Returns:
      the new Flux instance
      See Also:
    • ordered

      public final Flux<T> ordered(Comparator<? super T> comparator, int prefetch)
      Merges the values from each 'rail', but choose which one to merge by way of a provided Comparator, picking the smallest of all rails. The result is exposed back as a Flux.
      Parameters:
      comparator - the comparator to choose the smallest value available from all rails
      prefetch - the prefetch to use
      Returns:
      the new Flux instance
      See Also:
    • parallelism

      public abstract int parallelism()
      Returns the number of expected parallel Subscribers.
      Returns:
      the number of expected parallel Subscribers
    • reduce

      public final Mono<T> reduce(BiFunction<T,T,T> reducer)
      Reduces all values within a 'rail' and across 'rails' with a reducer function into a single sequential value.

      Note that the same reducer function may be called from multiple threads concurrently.

      Parameters:
      reducer - the function to reduce two values into one.
      Returns:
      the new Mono instance emitting the reduced value or empty if the ParallelFlux was empty
    • reduce

      public final <R> ParallelFlux<R> reduce(Supplier<R> initialSupplier, BiFunction<R,? super T,R> reducer)
      Reduces all values within a 'rail' to a single value (with a possibly different type) via a reducer function that is initialized on each rail from an initialSupplier value.

      Note that the same mapper function may be called from multiple threads concurrently.

      Type Parameters:
      R - the reduced output type
      Parameters:
      initialSupplier - the supplier for the initial value
      reducer - the function to reduce a previous output of reduce (or the initial value supplied) with a current source value.
      Returns:
      the new ParallelFlux instance
    • runOn

      public final ParallelFlux<T> runOn(Scheduler scheduler)
      Specifies where each 'rail' will observe its incoming values with possible work-stealing and default prefetch amount.

      This operator uses the default prefetch size returned by Queues.SMALL_BUFFER_SIZE.

      The operator will call Scheduler.createWorker() as many times as this ParallelFlux's parallelism level is.

      No assumptions are made about the Scheduler's parallelism level, if the Scheduler's parallelism level is lower than the ParallelFlux's, some rails may end up on the same thread/worker.

      This operator doesn't require the Scheduler to be trampolining as it does its own built-in trampolining logic.

      Parameters:
      scheduler - the scheduler to use
      Returns:
      the new ParallelFlux instance
    • runOn

      public final ParallelFlux<T> runOn(Scheduler scheduler, int prefetch)
      Specifies where each 'rail' will observe its incoming values with possible work-stealing and a given prefetch amount.

      This operator uses the default prefetch size returned by Queues.SMALL_BUFFER_SIZE.

      The operator will call Scheduler.createWorker() as many times as this ParallelFlux's parallelism level is.

      No assumptions are made about the Scheduler's parallelism level, if the Scheduler's parallelism level is lower than the ParallelFlux's, some rails may end up on the same thread/worker.

      This operator doesn't require the Scheduler to be trampolining as it does its own built-in trampolining logic.

      Parameters:
      scheduler - the scheduler to use that rail's worker has run out of work.
      prefetch - the number of values to request on each 'rail' from the source
      Returns:
      the new ParallelFlux instance
    • sequential

      public final Flux<T> sequential()
      Merges the values from each 'rail' in a round-robin or same-order fashion and exposes it as a regular Publisher sequence, running with a default prefetch value for the rails.

      This operator uses the default prefetch size returned by Queues.SMALL_BUFFER_SIZE.

      Returns:
      the new Flux instance
      See Also:
    • sequential

      public final Flux<T> sequential(int prefetch)
      Merges the values from each 'rail' in a round-robin or same-order fashion and exposes it as a regular Publisher sequence, running with a give prefetch value for the rails.
      Parameters:
      prefetch - the prefetch amount to use for each rail
      Returns:
      the new Flux instance
    • sorted

      public final Flux<T> sorted(Comparator<? super T> comparator)
      Sorts the 'rails' of this ParallelFlux and returns a Publisher that sequentially picks the smallest next value from the rails.

      This operator requires a finite source ParallelFlux.

      Parameters:
      comparator - the comparator to use
      Returns:
      the new Flux instance
    • sorted

      public final Flux<T> sorted(Comparator<? super T> comparator, int capacityHint)
      Sorts the 'rails' of this ParallelFlux and returns a Publisher that sequentially picks the smallest next value from the rails.

      This operator requires a finite source ParallelFlux.

      Parameters:
      comparator - the comparator to use
      capacityHint - the expected number of total elements
      Returns:
      the new Flux instance
    • subscribe

      public abstract void subscribe(CoreSubscriber<? super T>[] subscribers)
      Subscribes an array of Subscribers to this ParallelFlux and triggers the execution chain for all 'rails'.
      Parameters:
      subscribers - the subscribers array to run in parallel, the number of items must be equal to the parallelism level of this ParallelFlux
    • subscribe

      public final Disposable subscribe()
      Subscribes to this ParallelFlux and triggers the execution chain for all 'rails'.
    • subscribe

      public final Disposable subscribe(Consumer<? super T> onNext)
      Subscribes to this ParallelFlux by providing an onNext callback and triggers the execution chain for all 'rails'.
      Parameters:
      onNext - consumer of onNext signals
    • subscribe

      public final Disposable subscribe(@Nullable Consumer<? super T> onNext, Consumer<? super Throwable> onError)
      Subscribes to this ParallelFlux by providing an onNext and onError callback and triggers the execution chain for all 'rails'.
      Parameters:
      onNext - consumer of onNext signals
      onError - consumer of error signal
    • subscribe

      public final Disposable subscribe(@Nullable Consumer<? super T> onNext, @Nullable Consumer<? super Throwable> onError, @Nullable Runnable onComplete)
      Subscribes to this ParallelFlux by providing an onNext, onError and onComplete callback and triggers the execution chain for all 'rails'.
      Parameters:
      onNext - consumer of onNext signals
      onError - consumer of error signal
      onComplete - callback on completion signal
    • subscribe

      public final void subscribe(CoreSubscriber<? super T> s)
      Description copied from interface: CorePublisher
      An internal Publisher.subscribe(Subscriber) that will bypass Hooks.onLastOperator(Function) pointcut.

      In addition to behave as expected by Publisher.subscribe(Subscriber) in a controlled manner, it supports direct subscribe-time Context passing.

      Specified by:
      subscribe in interface CorePublisher<T>
      Parameters:
      s - the Subscriber interested into the published sequence
      See Also:
    • subscribe

      public final Disposable subscribe(@Nullable Consumer<? super T> onNext, @Nullable Consumer<? super Throwable> onError, @Nullable Runnable onComplete, @Nullable Consumer<? super Subscription> onSubscribe)
      Subscribes to this ParallelFlux by providing an onNext, onError, onComplete and onSubscribe callback and triggers the execution chain for all 'rails'.
      Parameters:
      onNext - consumer of onNext signals
      onError - consumer of error signal
      onComplete - callback on completion signal
      onSubscribe - consumer of the subscription signal
    • subscribe

      public final Disposable subscribe(@Nullable Consumer<? super T> onNext, @Nullable Consumer<? super Throwable> onError, @Nullable Runnable onComplete, @Nullable Context initialContext)
      Subscribes to this ParallelFlux by providing an onNext, onError and onComplete callback as well as an initial Context, then trigger the execution chain for all 'rails'.
      Parameters:
      onNext - consumer of onNext signals
      onError - consumer of error signal
      onComplete - callback on completion signal
      initialContext - Context for the rails
    • subscribe

      public final void subscribe(Subscriber<? super T> s)
      Merge the rails into a sequential() Flux and subscribe to said Flux.
      Specified by:
      subscribe in interface Publisher<T>
      Parameters:
      s - the subscriber to use on sequential() Flux
    • tag

      public final ParallelFlux<T> tag(String key, String value)
      Tag this ParallelFlux with a key/value pair. These can be retrieved as a Set of all tags throughout the publisher chain by using Scannable.tags() (as traversed by Scannable.parents()).
      Parameters:
      key - a tag key
      value - a tag value
      Returns:
      the same sequence, but bearing tags
    • then

      public final Mono<Void> then()
      Emit an onComplete or onError signal once all values across 'rails' have been observed.
      Returns:
      the new Mono instance emitting the reduced value or empty if the ParallelFlux was empty
    • transform

      public final <U> ParallelFlux<U> transform(Function<? super ParallelFlux<T>,ParallelFlux<U>> composer)
      Allows composing operators, in assembly time, on top of this ParallelFlux and returns another ParallelFlux with composed features.
      Type Parameters:
      U - the output value type
      Parameters:
      composer - the composer function from ParallelFlux (this) to another ParallelFlux
      Returns:
      the ParallelFlux returned by the function
    • transformGroups

      public final <U> ParallelFlux<U> transformGroups(Function<? super GroupedFlux<Integer,T>,? extends Publisher<? extends U>> composer)
      Allows composing operators off the groups (or 'rails'), as individual GroupedFlux instances keyed by the zero based rail's index. The transformed groups are parallelized back once the transformation has been applied. Since groups are generated anew per each subscription, this is all done in a "lazy" fashion where each subscription trigger distinct applications of the Function.

      Note that like in groups(), requests and cancellation compose through, and cancelling only one rail may result in undefined behavior.

      Type Parameters:
      U - the type of the resulting parallelized flux
      Parameters:
      composer - the composition function to apply on each rail
      Returns:
      a ParallelFlux of the composed groups
    • toString

      public String toString()
      Overrides:
      toString in class Object
    • validate

      protected final boolean validate(Subscriber<?>[] subscribers)
      Validates the number of subscribers and returns true if their number matches the parallelism level of this ParallelFlux.
      Parameters:
      subscribers - the array of Subscribers
      Returns:
      true if the number of subscribers equals to the parallelism level
    • getPrefetch

      public int getPrefetch()
      The prefetch configuration of the component
      Returns:
      the prefetch configuration of the component
    • onAssembly

      protected static <T> ParallelFlux<T> onAssembly(ParallelFlux<T> source)
      Invoke Hooks pointcut given a ParallelFlux and returning an eventually new ParallelFlux
      Type Parameters:
      T - the value type
      Parameters:
      source - the source to wrap
      Returns:
      the potentially wrapped source