Class Flux<T>

java.lang.Object
reactor.core.publisher.Flux<T>
Type Parameters:
T - the element type of this Reactive Streams Publisher
All Implemented Interfaces:
Publisher<T>, CorePublisher<T>
Direct Known Subclasses:
ConnectableFlux, FluxOperator, FluxProcessor, GroupedFlux

public abstract class Flux<T> extends Object implements CorePublisher<T>
A Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error).

The recommended way to learn about the Flux API and discover new operators is through the reference documentation, rather than through this javadoc (as opposed to learning more about individual operators). See the "which operator do I need?" appendix.

It is intended to be used in implementations and return types. Input parameters should keep using raw Publisher as much as possible.

If it is known that the underlying Publisher will emit 0 or 1 element, Mono should be used instead.

Note that using state in the java.util.function / lambdas used within Flux operators should be avoided, as these may be shared between several Subscribers.

subscribe(CoreSubscriber) is an internal extension to subscribe(Subscriber) used internally for Context passing. User provided Subscriber may be passed to this "subscribe" extension but will loose the available per-subscribe Hooks.onLastOperator(java.util.function.Function<? super org.reactivestreams.Publisher<java.lang.Object>, ? extends org.reactivestreams.Publisher<java.lang.Object>>).

Author:
Sebastien Deleuze, Stephane Maldini, David Karnok, Simon Baslé, Injae Kim
See Also:
  • Constructor Details

    • Flux

      public Flux()
  • Method Details

    • combineLatest

      @SafeVarargs public static <T, V> Flux<V> combineLatest(Function<Object[],V> combinator, Publisher<? extends T>... sources)
      Build a Flux whose data are generated by the combination of the most recently published value from each of the Publisher sources.

      Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.

      Type Parameters:
      T - type of the value from sources
      V - The produced output after transformation by the given combinator
      Parameters:
      sources - The Publisher sources to combine values from
      combinator - The aggregate function that will receive the latest value from each upstream and return the value to signal downstream
      Returns:
      a Flux based on the produced combinations
    • combineLatest

      @SafeVarargs public static <T, V> Flux<V> combineLatest(Function<Object[],V> combinator, int prefetch, Publisher<? extends T>... sources)
      Build a Flux whose data are generated by the combination of the most recently published value from each of the Publisher sources.

      Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.

      Type Parameters:
      T - type of the value from sources
      V - The produced output after transformation by the given combinator
      Parameters:
      sources - The Publisher sources to combine values from
      prefetch - The demand sent to each combined source Publisher
      combinator - The aggregate function that will receive the latest value from each upstream and return the value to signal downstream
      Returns:
      a Flux based on the produced combinations
    • combineLatest

      public static <T1, T2, V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends V> combinator)
      Build a Flux whose data are generated by the combination of the most recently published value from each of two Publisher sources.

      Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.

      Type Parameters:
      T1 - type of the value from source1
      T2 - type of the value from source2
      V - The produced output after transformation by the given combinator
      Parameters:
      source1 - The first Publisher source to combine values from
      source2 - The second Publisher source to combine values from
      combinator - The aggregate function that will receive the latest value from each upstream and return the value to signal downstream
      Returns:
      a Flux based on the produced combinations
    • combineLatest

      public static <T1, T2, T3, V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Function<Object[],V> combinator)
      Build a Flux whose data are generated by the combination of the most recently published value from each of three Publisher sources.

      Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.

      Type Parameters:
      T1 - type of the value from source1
      T2 - type of the value from source2
      T3 - type of the value from source3
      V - The produced output after transformation by the given combinator
      Parameters:
      source1 - The first Publisher source to combine values from
      source2 - The second Publisher source to combine values from
      source3 - The third Publisher source to combine values from
      combinator - The aggregate function that will receive the latest value from each upstream and return the value to signal downstream
      Returns:
      a Flux based on the produced combinations
    • combineLatest

      public static <T1, T2, T3, T4, V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Function<Object[],V> combinator)
      Build a Flux whose data are generated by the combination of the most recently published value from each of four Publisher sources.

      Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.

      Type Parameters:
      T1 - type of the value from source1
      T2 - type of the value from source2
      T3 - type of the value from source3
      T4 - type of the value from source4
      V - The produced output after transformation by the given combinator
      Parameters:
      source1 - The first Publisher source to combine values from
      source2 - The second Publisher source to combine values from
      source3 - The third Publisher source to combine values from
      source4 - The fourth Publisher source to combine values from
      combinator - The aggregate function that will receive the latest value from each upstream and return the value to signal downstream
      Returns:
      a Flux based on the produced combinations
    • combineLatest

      public static <T1, T2, T3, T4, T5, V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Function<Object[],V> combinator)
      Build a Flux whose data are generated by the combination of the most recently published value from each of five Publisher sources.

      Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.

      Type Parameters:
      T1 - type of the value from source1
      T2 - type of the value from source2
      T3 - type of the value from source3
      T4 - type of the value from source4
      T5 - type of the value from source5
      V - The produced output after transformation by the given combinator
      Parameters:
      source1 - The first Publisher source to combine values from
      source2 - The second Publisher source to combine values from
      source3 - The third Publisher source to combine values from
      source4 - The fourth Publisher source to combine values from
      source5 - The fifth Publisher source to combine values from
      combinator - The aggregate function that will receive the latest value from each upstream and return the value to signal downstream
      Returns:
      a Flux based on the produced combinations
    • combineLatest

      public static <T1, T2, T3, T4, T5, T6, V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Function<Object[],V> combinator)
      Build a Flux whose data are generated by the combination of the most recently published value from each of six Publisher sources.

      Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.

      Type Parameters:
      T1 - type of the value from source1
      T2 - type of the value from source2
      T3 - type of the value from source3
      T4 - type of the value from source4
      T5 - type of the value from source5
      T6 - type of the value from source6
      V - The produced output after transformation by the given combinator
      Parameters:
      source1 - The first Publisher source to combine values from
      source2 - The second Publisher source to combine values from
      source3 - The third Publisher source to combine values from
      source4 - The fourth Publisher source to combine values from
      source5 - The fifth Publisher source to combine values from
      source6 - The sixth Publisher source to combine values from
      combinator - The aggregate function that will receive the latest value from each upstream and return the value to signal downstream
      Returns:
      a Flux based on the produced combinations
    • combineLatest

      public static <T, V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<Object[],V> combinator)
      Build a Flux whose data are generated by the combination of the most recently published value from each of the Publisher sources provided in an Iterable.

      Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.

      Type Parameters:
      T - The common base type of the values from sources
      V - The produced output after transformation by the given combinator
      Parameters:
      sources - The list of Publisher sources to combine values from
      combinator - The aggregate function that will receive the latest value from each upstream and return the value to signal downstream
      Returns:
      a Flux based on the produced combinations
    • combineLatest

      public static <T, V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, int prefetch, Function<Object[],V> combinator)
      Build a Flux whose data are generated by the combination of the most recently published value from each of the Publisher sources provided in an Iterable.

      Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.

      Type Parameters:
      T - The common base type of the values from sources
      V - The produced output after transformation by the given combinator
      Parameters:
      sources - The list of Publisher sources to combine values from
      prefetch - demand produced to each combined source Publisher
      combinator - The aggregate function that will receive the latest value from each upstream and return the value to signal downstream
      Returns:
      a Flux based on the produced combinations
    • concat

      public static <T> Flux<T> concat(Iterable<? extends Publisher<? extends T>> sources)
      Concatenate all sources provided in an Iterable, forwarding elements emitted by the sources downstream.

      Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.

      Type Parameters:
      T - The type of values in both source and output sequences
      Parameters:
      sources - The Iterable of Publisher to concatenate
      Returns:
      a new Flux concatenating all source sequences
    • concatWithValues

      @SafeVarargs public final Flux<T> concatWithValues(T... values)
      Concatenates the values to the end of the Flux

      Parameters:
      values - The values to concatenate
      Returns:
      a new Flux concatenating all source sequences
    • concat

      public static <T> Flux<T> concat(Publisher<? extends Publisher<? extends T>> sources)
      Concatenate all sources emitted as an onNext signal from a parent Publisher, forwarding elements emitted by the sources downstream.

      Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.

      Type Parameters:
      T - The type of values in both source and output sequences
      Parameters:
      sources - The Publisher of Publisher to concatenate
      Returns:
      a new Flux concatenating all inner sources sequences
    • concat

      public static <T> Flux<T> concat(Publisher<? extends Publisher<? extends T>> sources, int prefetch)
      Concatenate all sources emitted as an onNext signal from a parent Publisher, forwarding elements emitted by the sources downstream.

      Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.

      Type Parameters:
      T - The type of values in both source and output sequences
      Parameters:
      sources - The Publisher of Publisher to concatenate
      prefetch - the number of Publishers to prefetch from the outer Publisher
      Returns:
      a new Flux concatenating all inner sources sequences
    • concat

      @SafeVarargs public static <T> Flux<T> concat(Publisher<? extends T>... sources)
      Concatenate all sources provided as a vararg, forwarding elements emitted by the sources downstream.

      Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.

      Type Parameters:
      T - The type of values in both source and output sequences
      Parameters:
      sources - The Publisher of Publisher to concat
      Returns:
      a new Flux concatenating all source sequences
    • concatDelayError

      public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources)
      Concatenate all sources emitted as an onNext signal from a parent Publisher, forwarding elements emitted by the sources downstream.

      Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.

      Type Parameters:
      T - The type of values in both source and output sequences
      Parameters:
      sources - The Publisher of Publisher to concatenate
      Returns:
      a new Flux concatenating all inner sources sequences, delaying errors
    • concatDelayError

      public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, int prefetch)
      Concatenate all sources emitted as an onNext signal from a parent Publisher, forwarding elements emitted by the sources downstream.

      Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.

      Type Parameters:
      T - The type of values in both source and output sequences
      Parameters:
      sources - The Publisher of Publisher to concatenate
      prefetch - number of elements to prefetch from the source, to be turned into inner Publishers
      Returns:
      a new Flux concatenating all inner sources sequences until complete or error
    • concatDelayError

      public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, boolean delayUntilEnd, int prefetch)
      Concatenate all sources emitted as an onNext signal from a parent Publisher, forwarding elements emitted by the sources downstream.

      Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes.

      Errors do not interrupt the main sequence but are propagated after the current concat backlog if delayUntilEnd is false or after all sources have had a chance to be concatenated if delayUntilEnd is true.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.

      Type Parameters:
      T - The type of values in both source and output sequences
      Parameters:
      sources - The Publisher of Publisher to concatenate
      delayUntilEnd - delay error until all sources have been consumed instead of after the current source
      prefetch - the number of Publishers to prefetch from the outer Publisher
      Returns:
      a new Flux concatenating all inner sources sequences until complete or error
    • concatDelayError

      @SafeVarargs public static <T> Flux<T> concatDelayError(Publisher<? extends T>... sources)
      Concatenate all sources provided as a vararg, forwarding elements emitted by the sources downstream.

      Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.

      Type Parameters:
      T - The type of values in both source and output sequences
      Parameters:
      sources - The Publisher of Publisher to concat
      Returns:
      a new Flux concatenating all source sequences
    • create

      public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)
      Programmatically create a Flux with the capability of emitting multiple elements in a synchronous or asynchronous manner through the FluxSink API. This includes emitting elements from multiple threads.

      This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).

      For example:

      
       Flux.<String>create(emitter -> {
      
           ActionListener al = e -> {
               emitter.next(textField.getText());
           };
           // without cleanup support:
      
           button.addActionListener(al);
      
           // with cleanup support:
      
           button.addActionListener(al);
           emitter.onDispose(() -> {
               button.removeListener(al);
           });
       });
       

      Discard Support: The FluxSink exposed by this operator buffers in case of overflow. The buffer is discarded when the main sequence is cancelled.

      Type Parameters:
      T - The type of values in the sequence
      Parameters:
      emitter - Consume the FluxSink provided per-subscriber by Reactor to generate signals.
      Returns:
      a Flux
      See Also:
    • create

      public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure)
      Programmatically create a Flux with the capability of emitting multiple elements in a synchronous or asynchronous manner through the FluxSink API. This includes emitting elements from multiple threads.

      This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).

      For example:

      
       Flux.<String>create(emitter -> {
      
           ActionListener al = e -> {
               emitter.next(textField.getText());
           };
           // without cleanup support:
      
           button.addActionListener(al);
      
           // with cleanup support:
      
           button.addActionListener(al);
           emitter.onDispose(() -> {
               button.removeListener(al);
           });
       }, FluxSink.OverflowStrategy.LATEST);
       

      Discard Support: The FluxSink exposed by this operator discards elements as relevant to the chosen FluxSink.OverflowStrategy. For example, the FluxSink.OverflowStrategy.DROP discards each items as they are being dropped, while FluxSink.OverflowStrategy.BUFFER will discard the buffer upon cancellation.

      Type Parameters:
      T - The type of values in the sequence
      Parameters:
      backpressure - the backpressure mode, see FluxSink.OverflowStrategy for the available backpressure modes
      emitter - Consume the FluxSink provided per-subscriber by Reactor to generate signals.
      Returns:
      a Flux
      See Also:
    • push

      public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter)
      Programmatically create a Flux with the capability of emitting multiple elements from a single-threaded producer through the FluxSink API. For a multi-threaded capable alternative, see create(Consumer).

      This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).

      For example:

      
       Flux.<String>push(emitter -> {
      
               ActionListener al = e -> {
                       emitter.next(textField.getText());
               };
               // without cleanup support:
      
               button.addActionListener(al);
      
               // with cleanup support:
      
               button.addActionListener(al);
               emitter.onDispose(() -> {
                       button.removeListener(al);
               });
       });
       

      Discard Support: The FluxSink exposed by this operator buffers in case of overflow. The buffer is discarded when the main sequence is cancelled.

      Type Parameters:
      T - The type of values in the sequence
      Parameters:
      emitter - Consume the FluxSink provided per-subscriber by Reactor to generate signals.
      Returns:
      a Flux
      See Also:
    • push

      public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure)
      Programmatically create a Flux with the capability of emitting multiple elements from a single-threaded producer through the FluxSink API. For a multi-threaded capable alternative, see create(Consumer, reactor.core.publisher.FluxSink.OverflowStrategy).

      This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).

      For example:

      
       Flux.<String>push(emitter -> {
      
               ActionListener al = e -> {
                       emitter.next(textField.getText());
               };
               // without cleanup support:
      
               button.addActionListener(al);
      
               // with cleanup support:
      
               button.addActionListener(al);
               emitter.onDispose(() -> {
                       button.removeListener(al);
               });
       }, FluxSink.OverflowStrategy.LATEST);
       

      Discard Support: The FluxSink exposed by this operator discards elements as relevant to the chosen FluxSink.OverflowStrategy. For example, the FluxSink.OverflowStrategy.DROP discards each items as they are being dropped, while FluxSink.OverflowStrategy.BUFFER will discard the buffer upon cancellation.

      Type Parameters:
      T - The type of values in the sequence
      Parameters:
      backpressure - the backpressure mode, see FluxSink.OverflowStrategy for the available backpressure modes
      emitter - Consume the FluxSink provided per-subscriber by Reactor to generate signals.
      Returns:
      a Flux
      See Also:
    • defer

      public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier)
      Lazily supply a Publisher every time a Subscription is made on the resulting Flux, so the actual source instantiation is deferred until each subscribe and the Supplier can create a subscriber-specific instance. If the supplier doesn't generate a new instance however, this operator will effectively behave like from(Publisher).

      Type Parameters:
      T - the type of values passing through the Flux
      Parameters:
      supplier - the Publisher Supplier to call on subscribe
      Returns:
      a deferred Flux
      See Also:
    • deferContextual

      public static <T> Flux<T> deferContextual(Function<ContextView,? extends Publisher<T>> contextualPublisherFactory)
      Lazily supply a Publisher every time a Subscription is made on the resulting Flux, so the actual source instantiation is deferred until each subscribe and the Function can create a subscriber-specific instance. This operator behaves the same way as defer(Supplier), but accepts a Function that will receive the current ContextView as an argument. If the function doesn't generate a new instance however, this operator will effectively behave like from(Publisher).

      Type Parameters:
      T - the type of values passing through the Flux
      Parameters:
      contextualPublisherFactory - the Publisher Function to call on subscribe
      Returns:
      a deferred Flux deriving actual Flux from context values for each subscription
    • empty

      public static <T> Flux<T> empty()
      Create a Flux that completes without emitting any item.

      Type Parameters:
      T - the reified type of the target Subscriber
      Returns:
      an empty Flux
    • error

      public static <T> Flux<T> error(Throwable error)
      Create a Flux that terminates with the specified error immediately after being subscribed to.

      Type Parameters:
      T - the reified type of the target Subscriber
      Parameters:
      error - the error to signal to each Subscriber
      Returns:
      a new failing Flux
    • error

      public static <T> Flux<T> error(Supplier<? extends Throwable> errorSupplier)
      Create a Flux that terminates with an error immediately after being subscribed to. The Throwable is generated by a Supplier, invoked each time there is a subscription and allowing for lazy instantiation.

      Type Parameters:
      T - the reified type of the target Subscriber
      Parameters:
      errorSupplier - the error signal Supplier to invoke for each Subscriber
      Returns:
      a new failing Flux
    • error

      public static <O> Flux<O> error(Throwable throwable, boolean whenRequested)
      Create a Flux that terminates with the specified error, either immediately after being subscribed to or after being first requested.

      Type Parameters:
      O - the reified type of the target Subscriber
      Parameters:
      throwable - the error to signal to each Subscriber
      whenRequested - if true, will onError on the first request instead of subscribe().
      Returns:
      a new failing Flux
    • first

      @SafeVarargs @Deprecated public static <I> Flux<I> first(Publisher<? extends I>... sources)
      Deprecated.
      use firstWithSignal(Publisher[]). To be removed in reactor 3.5.
      Pick the first Publisher to emit any signal (onNext/onError/onComplete) and replay all signals from that Publisher, effectively behaving like the fastest of these competing sources.

      Type Parameters:
      I - The type of values in both source and output sequences
      Parameters:
      sources - The competing source publishers
      Returns:
      a new Flux behaving like the fastest of its sources
    • first

      @Deprecated public static <I> Flux<I> first(Iterable<? extends Publisher<? extends I>> sources)
      Deprecated.
      use firstWithSignal(Iterable). To be removed in reactor 3.5.
      Pick the first Publisher to emit any signal (onNext/onError/onComplete) and replay all signals from that Publisher, effectively behaving like the fastest of these competing sources.

      Type Parameters:
      I - The type of values in both source and output sequences
      Parameters:
      sources - The competing source publishers
      Returns:
      a new Flux behaving like the fastest of its sources
    • firstWithSignal

      @SafeVarargs public static <I> Flux<I> firstWithSignal(Publisher<? extends I>... sources)
      Pick the first Publisher to emit any signal (onNext/onError/onComplete) and replay all signals from that Publisher, effectively behaving like the fastest of these competing sources.

      Type Parameters:
      I - The type of values in both source and output sequences
      Parameters:
      sources - The competing source publishers
      Returns:
      a new Flux behaving like the fastest of its sources
    • firstWithSignal

      public static <I> Flux<I> firstWithSignal(Iterable<? extends Publisher<? extends I>> sources)
      Pick the first Publisher to emit any signal (onNext/onError/onComplete) and replay all signals from that Publisher, effectively behaving like the fastest of these competing sources.

      Type Parameters:
      I - The type of values in both source and output sequences
      Parameters:
      sources - The competing source publishers
      Returns:
      a new Flux behaving like the fastest of its sources
    • firstWithValue

      public static <I> Flux<I> firstWithValue(Iterable<? extends Publisher<? extends I>> sources)
      Pick the first Publisher to emit any value and replay all values from that Publisher, effectively behaving like the source that first emits an onNext.

      Sources with values always "win" over empty sources (ones that only emit onComplete) or failing sources (ones that only emit onError).

      When no source can provide a value, this operator fails with a NoSuchElementException (provided there are at least two sources). This exception has a composite as its cause that can be used to inspect what went wrong with each source (so the composite has as many elements as there are sources).

      Exceptions from failing sources are directly reflected in the composite at the index of the failing source. For empty sources, a NoSuchElementException is added at their respective index. One can use Exceptions.unwrapMultiple(topLevel.getCause()) to easily inspect these errors as a List.

      Note that like in firstWithSignal(Iterable), an infinite source can be problematic if no other source emits onNext.

      Type Parameters:
      I - The type of values in both source and output sequences
      Parameters:
      sources - An Iterable of the competing source publishers
      Returns:
      a new Flux behaving like the fastest of its sources
    • firstWithValue

      @SafeVarargs public static <I> Flux<I> firstWithValue(Publisher<? extends I> first, Publisher<? extends I>... others)
      Pick the first Publisher to emit any value and replay all values from that Publisher, effectively behaving like the source that first emits an onNext.

      Sources with values always "win" over an empty source (ones that only emit onComplete) or failing sources (ones that only emit onError).

      When no source can provide a value, this operator fails with a NoSuchElementException (provided there are at least two sources). This exception has a composite as its cause that can be used to inspect what went wrong with each source (so the composite has as many elements as there are sources).

      Exceptions from failing sources are directly reflected in the composite at the index of the failing source. For empty sources, a NoSuchElementException is added at their respective index. One can use Exceptions.unwrapMultiple(topLevel.getCause()) to easily inspect these errors as a List.

      Note that like in firstWithSignal(Publisher[]), an infinite source can be problematic if no other source emits onNext. In case the first source is already an array-based firstWithValue(Publisher, Publisher[]) instance, nesting is avoided: a single new array-based instance is created with all the sources from first plus all the others sources at the same level.

      Type Parameters:
      I - The type of values in both source and output sequences
      Parameters:
      first - The first competing source publisher
      others - The other competing source publishers
      Returns:
      a new Flux behaving like the fastest of its sources
    • from

      public static <T> Flux<T> from(Publisher<? extends T> source)
      Decorate the specified Publisher with the Flux API.

      Hooks.onEachOperator(String, Function) and similar assembly hooks are applied unless the source is already a Flux.

      Type Parameters:
      T - The type of values in both source and output sequences
      Parameters:
      source - the source to decorate
      Returns:
      a new Flux
    • fromArray

      public static <T> Flux<T> fromArray(T[] array)
      Create a Flux that emits the items contained in the provided array.

      Type Parameters:
      T - The type of values in the source array and resulting Flux
      Parameters:
      array - the array to read data from
      Returns:
      a new Flux
    • fromIterable

      public static <T> Flux<T> fromIterable(Iterable<? extends T> it)
      Create a Flux that emits the items contained in the provided Iterable. The Iterable.iterator() method will be invoked at least once and at most twice for each subscriber.

      This operator inspects the Iterable's Spliterator to assess if the iteration can be guaranteed to be finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)). Since the default Spliterator wraps the Iterator we can have two Iterable.iterator() calls. This second invocation is skipped on a Collection however, a type which is assumed to be always finite.

      Discard Support: Upon cancellation, this operator attempts to discard the remainder of the Iterable if it can safely ensure the iterator is finite. Note that this means the Iterable.iterator() method could be invoked twice.

      Type Parameters:
      T - The type of values in the source Iterable and resulting Flux
      Parameters:
      it - the Iterable to read data from
      Returns:
      a new Flux
    • fromStream

      public static <T> Flux<T> fromStream(Stream<? extends T> s)
      Create a Flux that emits the items contained in the provided Stream. Keep in mind that a Stream cannot be re-used, which can be problematic in case of multiple subscriptions or re-subscription (like with repeat() or retry()). The Stream is closed automatically by the operator on cancellation, error or completion.

      Discard Support: Upon cancellation, this operator attempts to discard remainder of the Stream through its open Spliterator, if it can safely ensure it is finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)).

      Type Parameters:
      T - The type of values in the source Stream and resulting Flux
      Parameters:
      s - the Stream to read data from
      Returns:
      a new Flux
    • fromStream

      public static <T> Flux<T> fromStream(Supplier<Stream<? extends T>> streamSupplier)
      Create a Flux that emits the items contained in a Stream created by the provided Supplier for each subscription. The Stream is closed automatically by the operator on cancellation, error or completion.

      Discard Support: Upon cancellation, this operator attempts to discard remainder of the Stream through its open Spliterator, if it can safely ensure it is finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)).

      Type Parameters:
      T - The type of values in the source Stream and resulting Flux
      Parameters:
      streamSupplier - the Supplier that generates the Stream from which to read data
      Returns:
      a new Flux
    • generate

      public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
      Programmatically create a Flux by generating signals one-by-one via a consumer callback.

      Type Parameters:
      T - the value type emitted
      Parameters:
      generator - Consume the SynchronousSink provided per-subscriber by Reactor to generate a single signal on each pass.
      Returns:
      a Flux
    • generate

      public static <T, S extends @Nullable Object> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator)
      Programmatically create a Flux by generating signals one-by-one via a consumer callback and some state. The stateSupplier may return null.

      Type Parameters:
      T - the value type emitted
      S - the per-subscriber custom state type
      Parameters:
      stateSupplier - called for each incoming Subscriber to provide the initial state for the generator bifunction
      generator - Consume the SynchronousSink provided per-subscriber by Reactor as well as the current state to generate a single signal on each pass and return a (new) state.
      Returns:
      a Flux
    • generate

      public static <T, S extends @Nullable Object> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator, Consumer<? super S> stateConsumer)
      Programmatically create a Flux by generating signals one-by-one via a consumer callback and some state, with a final cleanup callback. The stateSupplier may return null but your cleanup stateConsumer will need to handle the null case.

      Type Parameters:
      T - the value type emitted
      S - the per-subscriber custom state type
      Parameters:
      stateSupplier - called for each incoming Subscriber to provide the initial state for the generator bifunction
      generator - Consume the SynchronousSink provided per-subscriber by Reactor as well as the current state to generate a single signal on each pass and return a (new) state.
      stateConsumer - called after the generator has terminated or the downstream cancelled, receiving the last state to be handled (i.e., release resources or do other cleanup).
      Returns:
      a Flux
    • interval

      public static Flux<Long> interval(Duration period)
      Create a Flux that emits long values starting with 0 and incrementing at specified time intervals on the global timer. The first element is emitted after an initial delay equal to the period. If demand is not produced in time, an onError will be signalled with an overflow IllegalStateException detailing the tick that couldn't be emitted. In normal conditions, the Flux will never complete.

      Runs on the Schedulers.parallel() Scheduler.

      Parameters:
      period - the period Duration between each increment
      Returns:
      a new Flux emitting increasing numbers at regular intervals
    • interval

      public static Flux<Long> interval(Duration delay, Duration period)
      Create a Flux that emits long values starting with 0 and incrementing at specified time intervals, after an initial delay, on the global timer. If demand is not produced in time, an onError will be signalled with an overflow IllegalStateException detailing the tick that couldn't be emitted. In normal conditions, the Flux will never complete.

      Runs on the Schedulers.parallel() Scheduler.

      Parameters:
      delay - the Duration to wait before emitting 0l
      period - the period Duration before each following increment
      Returns:
      a new Flux emitting increasing numbers at regular intervals
    • interval

      public static Flux<Long> interval(Duration period, Scheduler timer)
      Create a Flux that emits long values starting with 0 and incrementing at specified time intervals, on the specified Scheduler. The first element is emitted after an initial delay equal to the period. If demand is not produced in time, an onError will be signalled with an overflow IllegalStateException detailing the tick that couldn't be emitted. In normal conditions, the Flux will never complete.

      Parameters:
      period - the period Duration between each increment
      timer - a time-capable Scheduler instance to run on
      Returns:
      a new Flux emitting increasing numbers at regular intervals
    • interval

      public static Flux<Long> interval(Duration delay, Duration period, Scheduler timer)
      Create a Flux that emits long values starting with 0 and incrementing at specified time intervals, after an initial delay, on the specified Scheduler. If demand is not produced in time, an onError will be signalled with an overflow IllegalStateException detailing the tick that couldn't be emitted. In normal conditions, the Flux will never complete.

      Parameters:
      delay - the Duration to wait before emitting 0l
      period - the period Duration before each following increment
      timer - a time-capable Scheduler instance to run on
      Returns:
      a new Flux emitting increasing numbers at regular intervals
    • just

      @SafeVarargs public static <T> Flux<T> just(T... data)
      Create a Flux that emits the provided elements and then completes.

      Type Parameters:
      T - the emitted data type
      Parameters:
      data - the elements to emit, as a vararg
      Returns:
      a new Flux
    • just

      public static <T> Flux<T> just(T data)
      Create a new Flux that will only emit a single element then onComplete.

      Type Parameters:
      T - the emitted data type
      Parameters:
      data - the single element to emit
      Returns:
      a new Flux
    • merge

      public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source)
      Merge data from Publisher sequences emitted by the passed Publisher into an interleaved merged sequence. Unlike concat, inner sources are subscribed to eagerly.

      Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

      Type Parameters:
      T - the merged type
      Parameters:
      source - a Publisher of Publisher sources to merge
      Returns:
      a merged Flux
    • merge

      public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency)
      Merge data from Publisher sequences emitted by the passed Publisher into an interleaved merged sequence. Unlike concat, inner sources are subscribed to eagerly (but at most concurrency sources are subscribed to at the same time).

      Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

      Type Parameters:
      T - the merged type
      Parameters:
      source - a Publisher of Publisher sources to merge
      concurrency - the request produced to the main source thus limiting concurrent merge backlog
      Returns:
      a merged Flux
    • merge

      public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency, int prefetch)
      Merge data from Publisher sequences emitted by the passed Publisher into an interleaved merged sequence. Unlike concat, inner sources are subscribed to eagerly (but at most concurrency sources are subscribed to at the same time).

      Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

      Type Parameters:
      T - the merged type
      Parameters:
      source - a Publisher of Publisher sources to merge
      concurrency - the request produced to the main source thus limiting concurrent merge backlog
      prefetch - the inner source request size
      Returns:
      a merged Flux
    • merge

      public static <I> Flux<I> merge(Iterable<? extends Publisher<? extends I>> sources)
      Merge data from Publisher sequences contained in an Iterable into an interleaved merged sequence. Unlike concat, inner sources are subscribed to eagerly. A new Iterator will be created for each subscriber.

      Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

      Type Parameters:
      I - The source type of the data sequence
      Parameters:
      sources - the Iterable of sources to merge (will be lazily iterated on subscribe)
      Returns:
      a merged Flux
    • merge

      @SafeVarargs public static <I> Flux<I> merge(Publisher<? extends I>... sources)
      Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence. Unlike concat, sources are subscribed to eagerly.

      Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

      Type Parameters:
      I - The source type of the data sequence
      Parameters:
      sources - the array of Publisher sources to merge
      Returns:
      a merged Flux
    • merge

      @SafeVarargs public static <I> Flux<I> merge(int prefetch, Publisher<? extends I>... sources)
      Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence. Unlike concat, sources are subscribed to eagerly.

      Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

      Type Parameters:
      I - The source type of the data sequence
      Parameters:
      sources - the array of Publisher sources to merge
      prefetch - the inner source request size
      Returns:
      a fresh Reactive Flux publisher ready to be subscribed
    • mergeDelayError

      @SafeVarargs public static <I> Flux<I> mergeDelayError(int prefetch, Publisher<? extends I>... sources)
      Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence. Unlike concat, sources are subscribed to eagerly. This variant will delay any error until after the rest of the merge backlog has been processed.

      Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

      Type Parameters:
      I - The source type of the data sequence
      Parameters:
      sources - the array of Publisher sources to merge
      prefetch - the inner source request size
      Returns:
      a fresh Reactive Flux publisher ready to be subscribed
    • mergePriority

      @SafeVarargs public static <I extends Comparable<? super I>> Flux<I> mergePriority(Publisher<? extends I>... sources)
      Merge data from provided Publisher sequences into an ordered merged sequence, by picking the smallest values from each source (as defined by their natural order) as they arrive. This is not a sort(), as it doesn't consider the whole of each sequences. Unlike mergeComparing, this operator does not wait for a value from each source to arrive either.

      While this operator does retrieve at most one value from each source, it only compares values when two or more sources emit at the same time. In that case it picks the smallest of these competing values and continues doing so as long as there is demand. It is therefore best suited for asynchronous sources where you do not want to wait for a value from each source before emitting a value downstream.

      Type Parameters:
      I - a Comparable merged type that has a natural order
      Parameters:
      sources - Publisher sources of Comparable to merge
      Returns:
      a merged Flux that compares the latest available value from each source, publishing the smallest value and replenishing the source that produced it.
    • mergePriority

      @SafeVarargs public static <T> Flux<T> mergePriority(Comparator<? super T> comparator, Publisher<? extends T>... sources)
      Merge data from provided Publisher sequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the provided Comparator) as they arrive. This is not a sort(Comparator), as it doesn't consider the whole of each sequences. Unlike mergeComparing, this operator does not wait for a value from each source to arrive either.

      While this operator does retrieve at most one value from each source, it only compares values when two or more sources emit at the same time. In that case it picks the smallest of these competing values and continues doing so as long as there is demand. It is therefore best suited for asynchronous sources where you do not want to wait for a value from each source before emitting a value downstream.

      Type Parameters:
      T - the merged type
      Parameters:
      comparator - the Comparator to use to find the smallest value
      sources - Publisher sources to merge
      Returns:
      a merged Flux that compares the latest available value from each source, publishing the smallest value and replenishing the source that produced it.
    • mergePriority

      @SafeVarargs public static <T> Flux<T> mergePriority(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
      Merge data from provided Publisher sequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the provided Comparator) as they arrive. This is not a sort(Comparator), as it doesn't consider the whole of each sequences. Unlike mergeComparing, this operator does not wait for a value from each source to arrive either.

      While this operator does retrieve at most one value from each source, it only compares values when two or more sources emit at the same time. In that case it picks the smallest of these competing values and continues doing so as long as there is demand. It is therefore best suited for asynchronous sources where you do not want to wait for a value from each source before emitting a value downstream.

      Type Parameters:
      T - the merged type
      Parameters:
      prefetch - the number of elements to prefetch from each source (avoiding too many small requests to the source when picking)
      comparator - the Comparator to use to find the smallest value
      sources - Publisher sources to merge
      Returns:
      a merged Flux that compares the latest available value from each source, publishing the smallest value and replenishing the source that produced it.
    • mergePriorityDelayError

      @SafeVarargs public static <T> Flux<T> mergePriorityDelayError(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
      Merge data from provided Publisher sequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the provided Comparator) as they arrive. This is not a sort(Comparator), as it doesn't consider the whole of each sequences. Unlike mergeComparing, this operator does not wait for a value from each source to arrive either.

      While this operator does retrieve at most one value from each source, it only compares values when two or more sources emit at the same time. In that case it picks the smallest of these competing values and continues doing so as long as there is demand. It is therefore best suited for asynchronous sources where you do not want to wait for a value from each source before emitting a value downstream.

      Note that it is delaying errors until all data is consumed.

      Type Parameters:
      T - the merged type
      Parameters:
      prefetch - the number of elements to prefetch from each source (avoiding too many small requests to the source when picking)
      comparator - the Comparator to use to find the smallest value
      sources - Publisher sources to merge
      Returns:
      a merged Flux that compares the latest available value from each source, publishing the smallest value and replenishing the source that produced it.
    • mergeComparing

      @SafeVarargs public static <I extends Comparable<? super I>> Flux<I> mergeComparing(Publisher<? extends I>... sources)
      Merge data from provided Publisher sequences into an ordered merged sequence, by picking the smallest values from each source (as defined by their natural order). This is not a sort(), as it doesn't consider the whole of each sequences.

      Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.

      Type Parameters:
      I - a Comparable merged type that has a natural order
      Parameters:
      sources - Publisher sources of Comparable to merge
      Returns:
      a merged Flux that , subscribing early but keeping the original ordering
    • mergeComparing

      @SafeVarargs public static <T> Flux<T> mergeComparing(Comparator<? super T> comparator, Publisher<? extends T>... sources)
      Merge data from provided Publisher sequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the provided Comparator). This is not a sort(Comparator), as it doesn't consider the whole of each sequences.

      Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.

      Type Parameters:
      T - the merged type
      Parameters:
      comparator - the Comparator to use to find the smallest value
      sources - Publisher sources to merge
      Returns:
      a merged Flux that compares latest values from each source, using the smallest value and replenishing the source that produced it
    • mergeComparing

      @SafeVarargs public static <T> Flux<T> mergeComparing(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
      Merge data from provided Publisher sequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the provided Comparator). This is not a sort(Comparator), as it doesn't consider the whole of each sequences.

      Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.

      Type Parameters:
      T - the merged type
      Parameters:
      prefetch - the number of elements to prefetch from each source (avoiding too many small requests to the source when picking)
      comparator - the Comparator to use to find the smallest value
      sources - Publisher sources to merge
      Returns:
      a merged Flux that compares latest values from each source, using the smallest value and replenishing the source that produced it
    • mergeComparingDelayError

      @SafeVarargs public static <T> Flux<T> mergeComparingDelayError(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
      Merge data from provided Publisher sequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the provided Comparator). This is not a sort(Comparator), as it doesn't consider the whole of each sequences.

      Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.

      Note that it is delaying errors until all data is consumed.

      Type Parameters:
      T - the merged type
      Parameters:
      prefetch - the number of elements to prefetch from each source (avoiding too many small requests to the source when picking)
      comparator - the Comparator to use to find the smallest value
      sources - Publisher sources to merge
      Returns:
      a merged Flux that compares latest values from each source, using the smallest value and replenishing the source that produced it
    • mergeOrdered

      @SafeVarargs @Deprecated public static <I extends Comparable<? super I>> Flux<I> mergeOrdered(Publisher<? extends I>... sources)
      Deprecated.
      Use mergeComparingDelayError(int, Comparator, Publisher[]) instead (as mergeComparing(Publisher[]) don't have this operator's delayError behavior). To be removed in 3.6.0 at the earliest.
      Merge data from provided Publisher sequences into an ordered merged sequence, by picking the smallest values from each source (as defined by their natural order). This is not a sort(), as it doesn't consider the whole of each sequences.

      Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.

      Note that it is delaying errors until all data is consumed.

      Type Parameters:
      I - a Comparable merged type that has a natural order
      Parameters:
      sources - Publisher sources of Comparable to merge
      Returns:
      a merged Flux that compares latest values from each source, using the smallest value and replenishing the source that produced it
    • mergeOrdered

      @SafeVarargs @Deprecated public static <T> Flux<T> mergeOrdered(Comparator<? super T> comparator, Publisher<? extends T>... sources)
      Deprecated.
      Use mergeComparingDelayError(int, Comparator, Publisher[]) instead (as mergeComparing(Publisher[]) don't have this operator's delayError behavior). To be removed in 3.6.0 at the earliest.
      Merge data from provided Publisher sequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the provided Comparator). This is not a sort(Comparator), as it doesn't consider the whole of each sequences.

      Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.

      Note that it is delaying errors until all data is consumed.

      Type Parameters:
      T - the merged type
      Parameters:
      comparator - the Comparator to use to find the smallest value
      sources - Publisher sources to merge
      Returns:
      a merged Flux that compares latest values from each source, using the smallest value and replenishing the source that produced it
    • mergeOrdered

      @SafeVarargs @Deprecated public static <T> Flux<T> mergeOrdered(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
      Deprecated.
      Use mergeComparingDelayError(int, Comparator, Publisher[]) instead (as mergeComparing(Publisher[]) don't have this operator's delayError behavior). To be removed in 3.6.0 at the earliest.
      Merge data from provided Publisher sequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the provided Comparator). This is not a sort(Comparator), as it doesn't consider the whole of each sequences.

      Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.

      Note that it is delaying errors until all data is consumed.

      Type Parameters:
      T - the merged type
      Parameters:
      prefetch - the number of elements to prefetch from each source (avoiding too many small requests to the source when picking)
      comparator - the Comparator to use to find the smallest value
      sources - Publisher sources to merge
      Returns:
      a merged Flux that compares latest values from each source, using the smallest value and replenishing the source that produced it
    • mergeSequential

      public static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources)
      Merge data from Publisher sequences emitted by the passed Publisher into an ordered merged sequence. Unlike concat, the inner publishers are subscribed to eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.

      Type Parameters:
      T - the merged type
      Parameters:
      sources - a Publisher of Publisher sources to merge
      Returns:
      a merged Flux, subscribing early but keeping the original ordering
    • mergeSequential

      public static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch)
      Merge data from Publisher sequences emitted by the passed Publisher into an ordered merged sequence. Unlike concat, the inner publishers are subscribed to eagerly (but at most maxConcurrency sources at a time). Unlike merge, their emitted values are merged into the final sequence in subscription order.

      Type Parameters:
      T - the merged type
      Parameters:
      sources - a Publisher of Publisher sources to merge
      prefetch - the inner source request size
      maxConcurrency - the request produced to the main source thus limiting concurrent merge backlog
      Returns:
      a merged Flux, subscribing early but keeping the original ordering
    • mergeSequentialDelayError

      public static <T> Flux<T> mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch)
      Merge data from Publisher sequences emitted by the passed Publisher into an ordered merged sequence. Unlike concat, the inner publishers are subscribed to eagerly (but at most maxConcurrency sources at a time). Unlike merge, their emitted values are merged into the final sequence in subscription order. This variant will delay any error until after the rest of the mergeSequential backlog has been processed.

      Type Parameters:
      T - the merged type
      Parameters:
      sources - a Publisher of Publisher sources to merge
      prefetch - the inner source request size
      maxConcurrency - the request produced to the main source thus limiting concurrent merge backlog
      Returns:
      a merged Flux, subscribing early but keeping the original ordering
    • mergeSequential

      @SafeVarargs public static <I> Flux<I> mergeSequential(Publisher<? extends I>... sources)
      Merge data from Publisher sequences provided in an array/vararg into an ordered merged sequence. Unlike concat, sources are subscribed to eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.

      Type Parameters:
      I - the merged type
      Parameters:
      sources - a number of Publisher sequences to merge
      Returns:
      a merged Flux, subscribing early but keeping the original ordering
    • mergeSequential

      @SafeVarargs public static <I> Flux<I> mergeSequential(int prefetch, Publisher<? extends I>... sources)
      Merge data from Publisher sequences provided in an array/vararg into an ordered merged sequence. Unlike concat, sources are subscribed to eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.

      Type Parameters:
      I - the merged type
      Parameters:
      prefetch - the inner source request size
      sources - a number of Publisher sequences to merge
      Returns:
      a merged Flux, subscribing early but keeping the original ordering
    • mergeSequentialDelayError

      @SafeVarargs public static <I> Flux<I> mergeSequentialDelayError(int prefetch, Publisher<? extends I>... sources)
      Merge data from Publisher sequences provided in an array/vararg into an ordered merged sequence. Unlike concat, sources are subscribed to eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order. This variant will delay any error until after the rest of the mergeSequential backlog has been processed.

      Type Parameters:
      I - the merged type
      Parameters:
      prefetch - the inner source request size
      sources - a number of Publisher sequences to merge
      Returns:
      a merged Flux, subscribing early but keeping the original ordering
    • mergeSequential

      public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources)
      Merge data from Publisher sequences provided in an Iterable into an ordered merged sequence. Unlike concat, sources are subscribed to eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.

      Type Parameters:
      I - the merged type
      Parameters:
      sources - an Iterable of Publisher sequences to merge
      Returns:
      a merged Flux, subscribing early but keeping the original ordering
    • mergeSequential

      public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch)
      Merge data from Publisher sequences provided in an Iterable into an ordered merged sequence. Unlike concat, sources are subscribed to eagerly (but at most maxConcurrency sources at a time). Unlike merge, their emitted values are merged into the final sequence in subscription order.

      Type Parameters:
      I - the merged type
      Parameters:
      sources - an Iterable of Publisher sequences to merge
      maxConcurrency - the request produced to the main source thus limiting concurrent merge backlog
      prefetch - the inner source request size
      Returns:
      a merged Flux, subscribing early but keeping the original ordering
    • mergeSequentialDelayError

      public static <I> Flux<I> mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch)
      Merge data from Publisher sequences provided in an Iterable into an ordered merged sequence. Unlike concat, sources are subscribed to eagerly (but at most maxConcurrency sources at a time). Unlike merge, their emitted values are merged into the final sequence in subscription order. This variant will delay any error until after the rest of the mergeSequential backlog has been processed.

      Type Parameters:
      I - the merged type
      Parameters:
      sources - an Iterable of Publisher sequences to merge
      maxConcurrency - the request produced to the main source thus limiting concurrent merge backlog
      prefetch - the inner source request size
      Returns:
      a merged Flux, subscribing early but keeping the original ordering
    • never

      public static <T> Flux<T> never()
      Create a Flux that will never signal any data, error or completion signal.

      Type Parameters:
      T - the Subscriber type target
      Returns:
      a never completing Flux
    • range

      public static Flux<Integer> range(int start, int count)
      Build a Flux that will only emit a sequence of count incrementing integers, starting from start. That is, emit integers between start (included) and start + count (excluded) then complete.

      Parameters:
      start - the first integer to be emit
      count - the total number of incrementing values to emit, including the first value
      Returns:
      a ranged Flux
    • switchOnNext

      public static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers)
      Creates a Flux that mirrors the most recently emitted Publisher, forwarding its data until a new Publisher comes in the source.

      The resulting Flux will complete once there are no new Publisher in the source (source has completed) and the last mirrored Publisher has also completed.

      This operator requests the mergedPublishers source for an unbounded amount of inner publishers, but doesn't request each inner Publisher unless the downstream has made a corresponding request (no prefetch on publishers emitted by mergedPublishers).

      Type Parameters:
      T - the produced type
      Parameters:
      mergedPublishers - The Publisher of Publisher to switch on and mirror.
      Returns:
      a Flux accepting publishers and producing T
    • switchOnNext

      @Deprecated public static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers, int prefetch)
      Deprecated.
      to be removed in 3.6.0 at the earliest. In 3.5.0, you should replace calls with prefetch=0 with calls to switchOnNext(mergedPublishers), as the default behavior of the single-parameter variant will then change to prefetch=0.
      Creates a Flux that mirrors the most recently emitted Publisher, forwarding its data until a new Publisher comes in the source.

      The resulting Flux will complete once there are no new Publisher in the source (source has completed) and the last mirrored Publisher has also completed.

      Type Parameters:
      T - the produced type
      Parameters:
      mergedPublishers - The Publisher of Publisher to switch on and mirror.
      prefetch - the inner source request size
      Returns:
      a Flux accepting publishers and producing T
    • using

      public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)
      Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.

      Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal event.

      For an asynchronous version of the cleanup, with distinct path for onComplete, onError and cancel terminations, see usingWhen(Publisher, Function, Function, BiFunction, Function).

      Type Parameters:
      T - emitted type
      D - resource type
      Parameters:
      resourceSupplier - a Callable that is called on subscribe to generate the resource
      sourceSupplier - a factory to derive a Publisher from the supplied resource
      resourceCleanup - a resource cleanup callback invoked on completion
      Returns:
      a new Flux built around a disposable resource
      See Also:
    • using

      public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager)
      Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.

      • Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal event.
      • Non-eager cleanup will drop any exception.

      For an asynchronous version of the cleanup, with distinct path for onComplete, onError and cancel terminations, see usingWhen(Publisher, Function, Function, BiFunction, Function).

      Type Parameters:
      T - emitted type
      D - resource type
      Parameters:
      resourceSupplier - a Callable that is called on subscribe to generate the resource
      sourceSupplier - a factory to derive a Publisher from the supplied resource
      resourceCleanup - a resource cleanup callback invoked on completion
      eager - true to clean before terminating downstream subscribers
      Returns:
      a new Flux built around a disposable resource
      See Also:
    • using

      public static <T, D extends AutoCloseable> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier)
      Uses an AutoCloseable resource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.

      Eager AutoCloseable resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal event.

      For an asynchronous version of the cleanup, with distinct path for onComplete, onError and cancel terminations, see usingWhen(Publisher, Function, Function, BiFunction, Function).

      Type Parameters:
      T - emitted type
      D - resource type
      Parameters:
      resourceSupplier - a Callable that is called on subscribe to generate the resource
      sourceSupplier - a factory to derive a Publisher from the supplied resource
      Returns:
      a new Flux built around a disposable resource
      See Also:
    • using

      public static <T, D extends AutoCloseable> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, boolean eager)
      Uses an AutoCloseable resource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.

      • Eager AutoCloseable resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal event.
      • Non-eager cleanup will drop any exception.

      For an asynchronous version of the cleanup, with distinct path for onComplete, onError and cancel terminations, see usingWhen(Publisher, Function, Function, BiFunction, Function).

      Type Parameters:
      T - emitted type
      D - resource type
      Parameters:
      resourceSupplier - a Callable that is called on subscribe to generate the resource
      sourceSupplier - a factory to derive a Publisher from the supplied resource
      eager - true to clean before terminating downstream subscribers
      Returns:
      a new Flux built around a disposable resource
      See Also:
    • usingWhen

      public static <T, D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> resourceClosure, Function<? super D,? extends Publisher<?>> asyncCleanup)
      Uses a resource, generated by a Publisher for each individual Subscriber, while streaming the values from a Publisher derived from the same resource. Whenever the resulting sequence terminates, a provided Function generates a "cleanup" Publisher that is invoked but doesn't change the content of the main sequence. Instead it just defers the termination (unless it errors, in which case the error suppresses the original termination signal).

      Note that if the resource supplying Publisher emits more than one resource, the subsequent resources are dropped (Operators.onNextDropped(Object, Context)). If the publisher errors AFTER having emitted one resource, the error is also silently dropped (Operators.onErrorDropped(Throwable, Context)). An empty completion or error without at least one onNext signal triggers a short-circuit of the main sequence with the same terminal signal (no resource is established, no cleanup is invoked).

      Type Parameters:
      T - the type of elements emitted by the resource closure, and thus the main sequence
      D - the type of the resource object
      Parameters:
      resourceSupplier - a Publisher that "generates" the resource, subscribed for each subscription to the main sequence
      resourceClosure - a factory to derive a Publisher from the supplied resource
      asyncCleanup - an asynchronous resource cleanup invoked when the resource closure terminates (with onComplete, onError or cancel)
      Returns:
      a new Flux built around a "transactional" resource, with asynchronous cleanup on all terminations (onComplete, onError, cancel)
    • usingWhen

      public static <T, D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> resourceClosure, Function<? super D,? extends Publisher<?>> asyncComplete, BiFunction<? super D,? super Throwable,? extends Publisher<?>> asyncError, Function<? super D,? extends Publisher<?>> asyncCancel)
      Uses a resource, generated by a Publisher for each individual Subscriber, while streaming the values from a Publisher derived from the same resource. Note that all steps of the operator chain that would need the resource to be in an open stable state need to be described inside the resourceClosure Function.

      Whenever the resulting sequence terminates, the relevant Function generates a "cleanup" Publisher that is invoked but doesn't change the content of the main sequence. Instead it just defers the termination (unless it errors, in which case the error suppresses the original termination signal).

      Individual cleanups can also be associated with main sequence cancellation and error terminations:

      Note that if the resource supplying Publisher emits more than one resource, the subsequent resources are dropped (Operators.onNextDropped(Object, Context)). If the publisher errors AFTER having emitted one resource, the error is also silently dropped (Operators.onErrorDropped(Throwable, Context)). An empty completion or error without at least one onNext signal triggers a short-circuit of the main sequence with the same terminal signal (no resource is established, no cleanup is invoked).

      Additionally, the terminal signal is replaced by any error that might have happened in the terminating Publisher:

      Finally, early cancellations will cancel the resource supplying Publisher:

      Type Parameters:
      T - the type of elements emitted by the resource closure, and thus the main sequence
      D - the type of the resource object
      Parameters:
      resourceSupplier - a Publisher that "generates" the resource, subscribed for each subscription to the main sequence
      resourceClosure - a factory to derive a Publisher from the supplied resource
      asyncComplete - an asynchronous resource cleanup invoked if the resource closure terminates with onComplete
      asyncError - an asynchronous resource cleanup invoked if the resource closure terminates with onError. The terminating error is provided to the BiFunction
      asyncCancel - an asynchronous resource cleanup invoked if the resource closure is cancelled. When null, the asyncComplete path is used instead.
      Returns:
      a new Flux built around a "transactional" resource, with several termination path triggering asynchronous cleanup sequences
      See Also:
    • zip

      public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends O> combinator)
      Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

      Type Parameters:
      T1 - type of the value from source1
      T2 - type of the value from source2
      O - The produced output after transformation by the combinator
      Parameters:
      source1 - The first Publisher source to zip.
      source2 - The second Publisher source to zip.
      combinator - The aggregate function that will receive a unique value from each upstream and return the value to signal downstream
      Returns:
      a zipped Flux
    • zip

      public static <T1, T2> Flux<Tuple2<T1,T2>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2)
      Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple2. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

      Type Parameters:
      T1 - type of the value from source1
      T2 - type of the value from source2
      Parameters:
      source1 - The first Publisher source to zip.
      source2 - The second Publisher source to zip.
      Returns:
      a zipped Flux
    • zip

      public static <T1, T2, T3> Flux<Tuple3<T1,T2,T3>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3)
      Zip three sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple3. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

      Type Parameters:
      T1 - type of the value from source1
      T2 - type of the value from source2
      T3 - type of the value from source3
      Parameters:
      source1 - The first upstream Publisher to subscribe to.
      source2 - The second upstream Publisher to subscribe to.
      source3 - The third upstream Publisher to subscribe to.
      Returns:
      a zipped Flux
    • zip

      public static <T1, T2, T3, T4> Flux<Tuple4<T1,T2,T3,T4>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4)
      Zip four sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple4. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

      Type Parameters:
      T1 - type of the value from source1
      T2 - type of the value from source2
      T3 - type of the value from source3
      T4 - type of the value from source4
      Parameters:
      source1 - The first upstream Publisher to subscribe to.
      source2 - The second upstream Publisher to subscribe to.
      source3 - The third upstream Publisher to subscribe to.
      source4 - The fourth upstream Publisher to subscribe to.
      Returns:
      a zipped Flux
    • zip

      public static <T1, T2, T3, T4, T5> Flux<Tuple5<T1,T2,T3,T4,T5>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5)
      Zip five sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple5. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

      Type Parameters:
      T1 - type of the value from source1
      T2 - type of the value from source2
      T3 - type of the value from source3
      T4 - type of the value from source4
      T5 - type of the value from source5
      Parameters:
      source1 - The first upstream Publisher to subscribe to.
      source2 - The second upstream Publisher to subscribe to.
      source3 - The third upstream Publisher to subscribe to.
      source4 - The fourth upstream Publisher to subscribe to.
      source5 - The fifth upstream Publisher to subscribe to.
      Returns:
      a zipped Flux
    • zip

      public static <T1, T2, T3, T4, T5, T6> Flux<Tuple6<T1,T2,T3,T4,T5,T6>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6)
      Zip six sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple6. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

      Type Parameters:
      T1 - type of the value from source1
      T2 - type of the value from source2
      T3 - type of the value from source3
      T4 - type of the value from source4
      T5 - type of the value from source5
      T6 - type of the value from source6
      Parameters:
      source1 - The first upstream Publisher to subscribe to.
      source2 - The second upstream Publisher to subscribe to.
      source3 - The third upstream Publisher to subscribe to.
      source4 - The fourth upstream Publisher to subscribe to.
      source5 - The fifth upstream Publisher to subscribe to.
      source6 - The sixth upstream Publisher to subscribe to.
      Returns:
      a zipped Flux
    • zip

      public static <T1, T2, T3, T4, T5, T6, T7> Flux<Tuple7<T1,T2,T3,T4,T5,T6,T7>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7)
      Zip seven sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple7. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

      Type Parameters:
      T1 - type of the value from source1
      T2 - type of the value from source2
      T3 - type of the value from source3
      T4 - type of the value from source4
      T5 - type of the value from source5
      T6 - type of the value from source6
      T7 - type of the value from source7
      Parameters:
      source1 - The first upstream Publisher to subscribe to.
      source2 - The second upstream Publisher to subscribe to.
      source3 - The third upstream Publisher to subscribe to.
      source4 - The fourth upstream Publisher to subscribe to.
      source5 - The fifth upstream Publisher to subscribe to.
      source6 - The sixth upstream Publisher to subscribe to.
      source7 - The seventh upstream Publisher to subscribe to.
      Returns:
      a zipped Flux
    • zip

      public static <T1, T2, T3, T4, T5, T6, T7, T8> Flux<Tuple8<T1,T2,T3,T4,T5,T6,T7,T8>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7, Publisher<? extends T8> source8)
      Zip eight sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple8. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

      Type Parameters:
      T1 - type of the value from source1
      T2 - type of the value from source2
      T3 - type of the value from source3
      T4 - type of the value from source4
      T5 - type of the value from source5
      T6 - type of the value from source6
      T7 - type of the value from source7
      T8 - type of the value from source8
      Parameters:
      source1 - The first upstream Publisher to subscribe to.
      source2 - The second upstream Publisher to subscribe to.
      source3 - The third upstream Publisher to subscribe to.
      source4 - The fourth upstream Publisher to subscribe to.
      source5 - The fifth upstream Publisher to subscribe to.
      source6 - The sixth upstream Publisher to subscribe to.
      source7 - The seventh upstream Publisher to subscribe to.
      source8 - The eight upstream Publisher to subscribe to.
      Returns:
      a zipped Flux
    • zip

      public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources, Function<? super Object[],? extends O> combinator)
      Zip multiple sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. The Iterable.iterator() will be called on each Publisher.subscribe(Subscriber).

      Type Parameters:
      O - the combined produced type
      Parameters:
      sources - the Iterable providing sources to zip
      combinator - The aggregate function that will receive a unique value from each upstream and return the value to signal downstream
      Returns:
      a zipped Flux
    • zip

      public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources, int prefetch, Function<? super Object[],? extends O> combinator)
      Zip multiple sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. The Iterable.iterator() will be called on each Publisher.subscribe(Subscriber).

      Type Parameters:
      O - the combined produced type
      Parameters:
      sources - the Iterable providing sources to zip
      prefetch - the inner source request size
      combinator - The aggregate function that will receive a unique value from each upstream and return the value to signal downstream
      Returns:
      a zipped Flux
    • zip

      @SafeVarargs public static <I, O> Flux<O> zip(Function<? super Object[],? extends O> combinator, Publisher<? extends I>... sources)
      Zip multiple sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

      Type Parameters:
      I - the type of the input sources
      O - the combined produced type
      Parameters:
      combinator - The aggregate function that will receive a unique value from each upstream and return the value to signal downstream
      sources - the array providing sources to zip
      Returns:
      a zipped Flux
    • zip

      @SafeVarargs public static <I, O> Flux<O> zip(Function<? super Object[],? extends O> combinator, int prefetch, Publisher<? extends I>... sources)
      Zip multiple sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

      Type Parameters:
      I - the type of the input sources
      O - the combined produced type
      Parameters:
      combinator - The aggregate function that will receive a unique value from each upstream and return the value to signal downstream
      prefetch - individual source request size
      sources - the array providing sources to zip
      Returns:
      a zipped Flux
    • zip

      public static <TUPLE extends Tuple2, V> Flux<V> zip(Publisher<? extends Publisher<?>> sources, Function<? super TUPLE,? extends V> combinator)
      Zip multiple sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

      Note that the Publisher sources from the outer Publisher will accumulate into an exhaustive list before starting zip operation.

      Type Parameters:
      TUPLE - the raw tuple type
      V - The produced output after transformation by the given combinator
      Parameters:
      sources - The Publisher of Publisher sources to zip. A finite publisher is required.
      combinator - The aggregate function that will receive a unique value from each upstream and return the value to signal downstream
      Returns:
      a Flux based on the produced value
    • all

      public final Mono<Boolean> all(Predicate<? super T> predicate)
      Emit a single boolean true if all values of this sequence match the Predicate.

      The implementation uses short-circuit logic and completes with false if the predicate doesn't match a value.

      Parameters:
      predicate - the Predicate that needs to apply to all emitted items
      Returns:
      a new Mono with true if all values satisfies a predicate and false otherwise
    • any

      public final Mono<Boolean> any(Predicate<? super T> predicate)
      Emit a single boolean true if any of the values of this Flux sequence match the predicate.

      The implementation uses short-circuit logic and completes with true if the predicate matches a value.

      Parameters:
      predicate - the Predicate that needs to apply to at least one emitted item
      Returns:
      a new Mono with true if any value satisfies a predicate and false otherwise
    • as

      public final <P> P as(Function<? super Flux<T>,P> transformer)
      Transform this Flux into a target type.
       flux.as(Mono::from).subscribe() 
       
      Type Parameters:
      P - the returned instance type
      Parameters:
      transformer - the Function to immediately map this Flux into a target type instance.
      Returns:
      the Flux transformed to an instance of P
      See Also:
    • blockFirst

      public final @Nullable T blockFirst()
      Subscribe to this Flux and block indefinitely until the upstream signals its first value or completes. Returns that value, or null if the Flux completes empty. In case the Flux errors, the original exception is thrown (wrapped in a RuntimeException if it was a checked exception).

      Note that each blockFirst() will trigger a new subscription: in other words, the result might miss signal from hot publishers.

      Returns:
      the first value or null
    • blockFirst

      public final @Nullable T blockFirst(Duration timeout)
      Subscribe to this Flux and block until the upstream signals its first value, completes or a timeout expires. Returns that value, or null if the Flux completes empty. In case the Flux errors, the original exception is thrown (wrapped in a RuntimeException if it was a checked exception). If the provided timeout expires, a RuntimeException is thrown with a TimeoutException as the cause.

      Note that each blockFirst() will trigger a new subscription: in other words, the result might miss signal from hot publishers.

      Parameters:
      timeout - maximum time period to wait for before raising a RuntimeException with a TimeoutException as the cause
      Returns:
      the first value or null
    • blockLast

      public final @Nullable T blockLast()
      Subscribe to this Flux and block indefinitely until the upstream signals its last value or completes. Returns that value, or null if the Flux completes empty. In case the Flux errors, the original exception is thrown (wrapped in a RuntimeException if it was a checked exception).

      Note that each blockLast() will trigger a new subscription: in other words, the result might miss signal from hot publishers.

      Returns:
      the last value or null
    • blockLast

      public final @Nullable T blockLast(Duration timeout)
      Subscribe to this Flux and block until the upstream signals its last value, completes or a timeout expires. Returns that value, or null if the Flux completes empty. In case the Flux errors, the original exception is thrown (wrapped in a RuntimeException if it was a checked exception). If the provided timeout expires, a RuntimeException is thrown with a TimeoutException as the cause.

      Note that each blockLast() will trigger a new subscription: in other words, the result might miss signal from hot publishers.

      Parameters:
      timeout - maximum time period to wait for before raising a RuntimeException with a TimeoutException as the cause
      Returns:
      the last value or null
    • buffer

      public final Flux<List<T>> buffer()
      Collect all incoming values into a single List buffer that will be emitted by the returned Flux once this Flux completes.

      Discard Support: This operator discards the buffer upon cancellation or error triggered by a data signal.

      Returns:
      a buffered Flux of at most one List
      See Also:
    • buffer

      public final Flux<List<T>> buffer(int maxSize)
      Collect incoming values into multiple List buffers that will be emitted by the returned Flux each time the given max size is reached or once this Flux completes.

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.

      Parameters:
      maxSize - the maximum collected size
      Returns:
      a microbatched Flux of List
    • buffer

      public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, Supplier<C> bufferSupplier)
      Collect incoming values into multiple user-defined Collection buffers that will be emitted by the returned Flux each time the given max size is reached or once this Flux completes.

      Note that if buffers provided by the bufferSupplier return false upon invocation of Collection.add(Object) for a given element, that element will be discarded.

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal, as well as latest unbuffered element if the bufferSupplier fails.

      Type Parameters:
      C - the Collection buffer type
      Parameters:
      maxSize - the maximum collected size
      bufferSupplier - a Supplier of the concrete Collection to use for each buffer
      Returns:
      a microbatched Flux of Collection
    • buffer

      public final Flux<List<T>> buffer(int maxSize, int skip)
      Collect incoming values into multiple List buffers that will be emitted by the returned Flux each time the given max size is reached or once this Flux completes. Buffers can be created with gaps, as a new buffer will be created every time skip values have been emitted by the source.

      When maxSize < skip : dropping buffers

      When maxSize > skip : overlapping buffers

      When maxSize == skip : exact buffers

      Discard Support: This operator discards elements in between buffers (in the case of dropping buffers). It also discards the currently open buffer upon cancellation or error triggered by a data signal. Note however that overlapping buffer variant DOES NOT discard, as this might result in an element being discarded from an early buffer while it is still valid in a more recent buffer.

      Parameters:
      skip - the number of items to count before creating a new buffer
      maxSize - the max collected size
      Returns:
      a microbatched Flux of possibly overlapped or gapped List
    • buffer

      public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, int skip, Supplier<C> bufferSupplier)
      Collect incoming values into multiple user-defined Collection buffers that will be emitted by the returned Flux each time the given max size is reached or once this Flux completes. Buffers can be created with gaps, as a new buffer will be created every time skip values have been emitted by the source

      When maxSize < skip : dropping buffers

      When maxSize > skip : overlapping buffers

      When maxSize == skip : exact buffers

      Note for exact buffers: If buffers provided by the bufferSupplier return false upon invocation of Collection.add(Object) for a given element, that element will be discarded.

      Discard Support: This operator discards elements in between buffers (in the case of dropping buffers). It also discards the currently open buffer upon cancellation or error triggered by a data signal. Note however that overlapping buffer variant DOES NOT discard, as this might result in an element being discarded from an early buffer while it is still valid in a more recent buffer.

      Type Parameters:
      C - the Collection buffer type
      Parameters:
      skip - the number of items to count before creating a new buffer
      maxSize - the max collected size
      bufferSupplier - a Supplier of the concrete Collection to use for each buffer
      Returns:
      a microbatched Flux of possibly overlapped or gapped Collection
    • buffer

      public final Flux<List<T>> buffer(Publisher<?> other)
      Collect incoming values into multiple List buffers, as delimited by the signals of a companion Publisher this operator will subscribe to.

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.

      Parameters:
      other - the companion Publisher whose signals trigger new buffers
      Returns:
      a microbatched Flux of List delimited by signals from a Publisher
    • buffer

      public final <C extends Collection<? super T>> Flux<C> buffer(Publisher<?> other, Supplier<C> bufferSupplier)
      Collect incoming values into multiple user-defined Collection buffers, as delimited by the signals of a companion Publisher this operator will subscribe to.

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal, and the last received element when the bufferSupplier fails.

      Type Parameters:
      C - the Collection buffer type
      Parameters:
      other - the companion Publisher whose signals trigger new buffers
      bufferSupplier - a Supplier of the concrete Collection to use for each buffer
      Returns:
      a microbatched Flux of Collection delimited by signals from a Publisher
    • buffer

      public final Flux<List<T>> buffer(Duration bufferingTimespan)
      Collect incoming values into multiple List buffers that will be emitted by the returned Flux every bufferingTimespan.

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.

      Parameters:
      bufferingTimespan - the duration from buffer creation until a buffer is closed and emitted
      Returns:
      a microbatched Flux of List delimited by the given time span
    • buffer

      public final Flux<List<T>> buffer(Duration bufferingTimespan, Duration openBufferEvery)
      Collect incoming values into multiple List buffers created at a given openBufferEvery period. Each buffer will last until the bufferingTimespan has elapsed, thus emitting the bucket in the resulting Flux.

      When bufferingTimespan < openBufferEvery : dropping buffers

      When bufferingTimespan > openBufferEvery : overlapping buffers

      When bufferingTimespan == openBufferEvery : exact buffers

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. It DOES NOT provide strong guarantees in the case of overlapping buffers, as elements might get discarded too early (from the first of two overlapping buffers for instance).

      Parameters:
      bufferingTimespan - the duration from buffer creation until a buffer is closed and emitted
      openBufferEvery - the interval at which to create a new buffer
      Returns:
      a microbatched Flux of List delimited by the given period openBufferEvery and sized by bufferingTimespan
    • buffer

      public final Flux<List<T>> buffer(Duration bufferingTimespan, Scheduler timer)
      Collect incoming values into multiple List buffers that will be emitted by the returned Flux every bufferingTimespan, as measured on the provided Scheduler.

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.

      Parameters:
      bufferingTimespan - the duration from buffer creation until a buffer is closed and emitted
      timer - a time-capable Scheduler instance to run on
      Returns:
      a microbatched Flux of List delimited by the given period
    • buffer

      public final Flux<List<T>> buffer(Duration bufferingTimespan, Duration openBufferEvery, Scheduler timer)
      Collect incoming values into multiple List buffers created at a given openBufferEvery period, as measured on the provided Scheduler. Each buffer will last until the bufferingTimespan has elapsed (also measured on the scheduler), thus emitting the bucket in the resulting Flux.

      When bufferingTimespan < openBufferEvery : dropping buffers

      When bufferingTimespan > openBufferEvery : overlapping buffers

      When bufferingTimespan == openBufferEvery : exact buffers

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. It DOES NOT provide strong guarantees in the case of overlapping buffers, as elements might get discarded too early (from the first of two overlapping buffers for instance).

      Parameters:
      bufferingTimespan - the duration from buffer creation until a buffer is closed and emitted
      openBufferEvery - the interval at which to create a new buffer
      timer - a time-capable Scheduler instance to run on
      Returns:
      a microbatched Flux of List delimited by the given period openBufferEvery and sized by bufferingTimespan
    • bufferTimeout

      public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime)
      Collect incoming values into multiple List buffers that will be emitted by the returned Flux each time the buffer reaches a maximum size OR the maxTime Duration elapses.

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.

      Parameters:
      maxSize - the max collected size
      maxTime - the timeout enforcing the release of a partial buffer
      Returns:
      a microbatched Flux of List delimited by given size or a given period timeout
    • bufferTimeout

      public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier)
      Collect incoming values into multiple user-defined Collection buffers that will be emitted by the returned Flux each time the buffer reaches a maximum size OR the maxTime Duration elapses.

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.

      Type Parameters:
      C - the Collection buffer type
      Parameters:
      maxSize - the max collected size
      maxTime - the timeout enforcing the release of a partial buffer
      bufferSupplier - a Supplier of the concrete Collection to use for each buffer
      Returns:
      a microbatched Flux of Collection delimited by given size or a given period timeout
    • bufferTimeout

      public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer)
      Collect incoming values into multiple List buffers that will be emitted by the returned Flux each time the buffer reaches a maximum size OR the maxTime Duration elapses, as measured on the provided Scheduler.

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.

      Parameters:
      maxSize - the max collected size
      maxTime - the timeout enforcing the release of a partial buffer
      timer - a time-capable Scheduler instance to run on
      Returns:
      a microbatched Flux of List delimited by given size or a given period timeout
    • bufferTimeout

      public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer, Supplier<C> bufferSupplier)
      Collect incoming values into multiple user-defined Collection buffers that will be emitted by the returned Flux each time the buffer reaches a maximum size OR the maxTime Duration elapses, as measured on the provided Scheduler.

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.

      Type Parameters:
      C - the Collection buffer type
      Parameters:
      maxSize - the max collected size
      maxTime - the timeout enforcing the release of a partial buffer
      timer - a time-capable Scheduler instance to run on
      bufferSupplier - a Supplier of the concrete Collection to use for each buffer
      Returns:
      a microbatched Flux of Collection delimited by given size or a given period timeout
    • bufferTimeout

      public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, boolean fairBackpressure)
      Collect incoming values into multiple List buffers that will be emitted by the returned Flux each time the buffer reaches a maximum size OR the maxTime Duration elapses.

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.

      Parameters:
      maxSize - the max collected size
      maxTime - the timeout enforcing the release of a partial buffer
      fairBackpressure - If true, prefetches maxSize * 4 from upstream and replenishes the buffer when the downstream demand is satisfactory. When false, no prefetching takes place and a single buffer is always ready to be pushed downstream.
      Returns:
      a microbatched Flux of List delimited by given size or a given period timeout
    • bufferTimeout

      public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer, boolean fairBackpressure)
      Collect incoming values into multiple List buffers that will be emitted by the returned Flux each time the buffer reaches a maximum size OR the maxTime Duration elapses, as measured on the provided Scheduler.

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.

      Parameters:
      maxSize - the max collected size
      maxTime - the timeout enforcing the release of a partial buffer
      timer - a time-capable Scheduler instance to run on
      fairBackpressure - If true, prefetches maxSize * 4 from upstream and replenishes the buffer when the downstream demand is satisfactory. When false, no prefetching takes place and a single buffer is always ready to be pushed downstream.
      Returns:
      a microbatched Flux of List delimited by given size or a given period timeout
    • bufferTimeout

      public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier, boolean fairBackpressure)
      Collect incoming values into multiple user-defined Collection buffers that will be emitted by the returned Flux each time the buffer reaches a maximum size OR the maxTime Duration elapses.

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.

      Type Parameters:
      C - the Collection buffer type
      Parameters:
      maxSize - the max collected size
      maxTime - the timeout enforcing the release of a partial buffer
      bufferSupplier - a Supplier of the concrete Collection to use for each buffer
      fairBackpressure - If true, prefetches maxSize * 4 from upstream and replenishes the buffer when the downstream demand is satisfactory. When false, no prefetching takes place and a single buffer is always ready to be pushed downstream.
      Returns:
      a microbatched Flux of Collection delimited by given size or a given period timeout
    • bufferTimeout

      public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer, Supplier<C> bufferSupplier, boolean fairBackpressure)
      Collect incoming values into multiple user-defined Collection buffers that will be emitted by the returned Flux each time the buffer reaches a maximum size OR the maxTime Duration elapses, as measured on the provided Scheduler.

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.

      Type Parameters:
      C - the Collection buffer type
      Parameters:
      maxSize - the max collected size
      maxTime - the timeout enforcing the release of a partial buffer
      timer - a time-capable Scheduler instance to run on
      bufferSupplier - a Supplier of the concrete Collection to use for each buffer
      fairBackpressure - If true, prefetches maxSize * 4 from upstream and replenishes the buffer when the downstream demand is satisfactory. When false, no prefetching takes place and a single buffer is always ready to be pushed downstream.
      Returns:
      a microbatched Flux of Collection delimited by given size or a given period timeout
    • bufferUntil

      public final Flux<List<T>> bufferUntil(Predicate<? super T> predicate)
      Collect incoming values into multiple List buffers that will be emitted by the resulting Flux each time the given predicate returns true. Note that the element that triggers the predicate to return true (and thus closes a buffer) is included as last element in the emitted buffer.

      On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.

      Parameters:
      predicate - a predicate that triggers the next buffer when it becomes true.
      Returns:
      a microbatched Flux of List
    • bufferUntil

      public final Flux<List<T>> bufferUntil(Predicate<? super T> predicate, boolean cutBefore)
      Collect incoming values into multiple List buffers that will be emitted by the resulting Flux each time the given predicate returns true. Note that the buffer into which the element that triggers the predicate to return true (and thus closes a buffer) is included depends on the cutBefore parameter: set it to true to include the boundary element in the newly opened buffer, false to include it in the closed buffer (as in bufferUntil(Predicate)).

      On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.

      Parameters:
      predicate - a predicate that triggers the next buffer when it becomes true.
      cutBefore - set to true to include the triggering element in the new buffer rather than the old.
      Returns:
      a microbatched Flux of List
    • bufferUntilChanged

      public final Flux<List<T>> bufferUntilChanged()
      Collect subsequent repetitions of an element (that is, if they arrive right after one another) into multiple List buffers that will be emitted by the resulting Flux.

      Returns:
      a microbatched Flux of List
    • bufferUntilChanged

      public final <V> Flux<List<T>> bufferUntilChanged(Function<? super T,? extends V> keySelector)
      Collect subsequent repetitions of an element (that is, if they arrive right after one another), as compared by a key extracted through the user provided Function, into multiple List buffers that will be emitted by the resulting Flux.

      Parameters:
      keySelector - function to compute comparison key for each element
      Returns:
      a microbatched Flux of List
    • bufferUntilChanged

      public final <V> Flux<List<T>> bufferUntilChanged(Function<? super T,? extends V> keySelector, BiPredicate<? super V,? super V> keyComparator)
      Collect subsequent repetitions of an element (that is, if they arrive right after one another), as compared by a key extracted through the user provided Function and compared using a supplied BiPredicate, into multiple List buffers that will be emitted by the resulting Flux.

      Parameters:
      keySelector - function to compute comparison key for each element
      keyComparator - predicate used to compare keys
      Returns:
      a microbatched Flux of List
    • bufferWhile

      public final Flux<List<T>> bufferWhile(Predicate<? super T> predicate)
      Collect incoming values into multiple List buffers that will be emitted by the resulting Flux. Each buffer continues aggregating values while the given predicate returns true, and a new buffer is created as soon as the predicate returns false... Note that the element that triggers the predicate to return false (and thus closes a buffer) is NOT included in any emitted buffer.

      On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal, as well as the buffer-triggering element.

      Parameters:
      predicate - a predicate that triggers the next buffer when it becomes false.
      Returns:
      a microbatched Flux of List
    • bufferWhen

      public final <U, V> Flux<List<T>> bufferWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector)
      Collect incoming values into multiple List buffers started each time an opening companion Publisher emits. Each buffer will last until the corresponding closing companion Publisher emits, thus releasing the buffer to the resulting Flux.

      When Open signal is strictly not overlapping Close signal : dropping buffers (see green marbles in diagram below).

      When Open signal is strictly more frequent than Close signal : overlapping buffers (see second and third buffers in diagram below).

      When Open signal is exactly coordinated with Close signal : exact buffers

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. It DOES NOT provide strong guarantees in the case of overlapping buffers, as elements might get discarded too early (from the first of two overlapping buffers for instance).

      Type Parameters:
      U - the element type of the buffer-opening sequence
      V - the element type of the buffer-closing sequence
      Parameters:
      bucketOpening - a companion Publisher to subscribe for buffer creation signals.
      closeSelector - a factory that, given a buffer opening signal, returns a companion Publisher to subscribe to for buffer closure and emission signals.
      Returns:
      a microbatched Flux of List delimited by an opening Publisher and a relative closing Publisher
    • bufferWhen

      public final <U, V, C extends Collection<? super T>> Flux<C> bufferWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector, Supplier<C> bufferSupplier)
      Collect incoming values into multiple user-defined Collection buffers started each time an opening companion Publisher emits. Each buffer will last until the corresponding closing companion Publisher emits, thus releasing the buffer to the resulting Flux.

      When Open signal is strictly not overlapping Close signal : dropping buffers (see green marbles in diagram below).

      When Open signal is strictly more frequent than Close signal : overlapping buffers (see second and third buffers in diagram below).

      Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. It DOES NOT provide strong guarantees in the case of overlapping buffers, as elements might get discarded too early (from the first of two overlapping buffers for instance).

      Type Parameters:
      U - the element type of the buffer-opening sequence
      V - the element type of the buffer-closing sequence
      C - the Collection buffer type
      Parameters:
      bucketOpening - a companion Publisher to subscribe for buffer creation signals.
      closeSelector - a factory that, given a buffer opening signal, returns a companion Publisher to subscribe to for buffer closure and emission signals.
      bufferSupplier - a Supplier of the concrete Collection to use for each buffer
      Returns:
      a microbatched Flux of Collection delimited by an opening Publisher and a relative closing Publisher
    • cache

      public final Flux<T> cache()
      Turn this Flux into a hot source and cache last emitted signals for further Subscriber. Will retain an unbounded volume of onNext signals. Completion and Error will also be replayed.

      Returns:
      a replaying Flux
    • cache

      public final Flux<T> cache(int history)
      Turn this Flux into a hot source and cache last emitted signals for further Subscriber. Will retain up to the given history size onNext signals. Completion and Error will also be replayed.

      Note that cache(0) will only cache the terminal signal without expiration.

      Parameters:
      history - number of elements retained in cache
      Returns:
      a replaying Flux
    • cache

      public final Flux<T> cache(Duration ttl)
      Turn this Flux into a hot source and cache last emitted signals for further Subscriber. Will retain an unbounded history but apply a per-item expiry timeout

      Completion and Error will also be replayed until ttl triggers in which case the next Subscriber will start over a new subscription.

      Parameters:
      ttl - Time-to-live for each cached item and post termination.
      Returns:
      a replaying Flux
    • cache

      public final Flux<T> cache(Duration ttl, Scheduler timer)
      Turn this Flux into a hot source and cache last emitted signals for further Subscriber. Will retain an unbounded history but apply a per-item expiry timeout

      Completion and Error will also be replayed until ttl triggers in which case the next Subscriber will start over a new subscription.

      Parameters:
      ttl - Time-to-live for each cached item and post termination.
      timer - the Scheduler on which to measure the duration.
      Returns:
      a replaying Flux
    • cache

      public final Flux<T> cache(int history, Duration ttl)
      Turn this Flux into a hot source and cache last emitted signals for further Subscriber. Will retain up to the given history size and apply a per-item expiry timeout.

      Completion and Error will also be replayed until ttl triggers in which case the next Subscriber will start over a new subscription.

      Parameters:
      history - number of elements retained in cache
      ttl - Time-to-live for each cached item and post termination.
      Returns:
      a replaying Flux
    • cache

      public final Flux<T> cache(int history, Duration ttl, Scheduler timer)
      Turn this Flux into a hot source and cache last emitted signals for further Subscriber. Will retain up to the given history size and apply a per-item expiry timeout.

      Completion and Error will also be replayed until ttl triggers in which case the next Subscriber will start over a new subscription.

      Parameters:
      history - number of elements retained in cache
      ttl - Time-to-live for each cached item and post termination.
      timer - the Scheduler on which to measure the duration.
      Returns:
      a replaying Flux
    • cast

      public final <E> Flux<E> cast(Class<E> clazz)
      Cast the current Flux produced type into a target produced type.

      Type Parameters:
      E - the Flux output type
      Parameters:
      clazz - the target class to cast to
      Returns:
      a casted Flux
    • cancelOn

      public final Flux<T> cancelOn(Scheduler scheduler)
      Prepare this Flux so that subscribers will cancel from it on a specified Scheduler.

      Parameters:
      scheduler - the Scheduler to signal cancel on
      Returns:
      a scheduled cancel Flux
    • checkpoint

      public final Flux<T> checkpoint()
      Activate traceback (full assembly tracing) for this particular Flux, in case of an error upstream of the checkpoint. Tracing incurs the cost of an exception stack trace creation.

      It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with the traceback.

      The traceback is attached to the error as a suppressed exception. As such, if the error is a composite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected via Exceptions.isTraceback(Throwable).

      Returns:
      the assembly tracing Flux.
    • checkpoint

      public final Flux<T> checkpoint(String description)
      Activate traceback (assembly marker) for this particular Flux by giving it a description that will be reflected in the assembly traceback in case of an error upstream of the checkpoint. Note that unlike checkpoint(), this doesn't create a filled stack trace, avoiding the main cost of the operator. However, as a trade-off the description must be unique enough for the user to find out where this Flux was assembled. If you only want a generic description, and still rely on the stack trace to find the assembly site, use the checkpoint(String, boolean) variant.

      It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.

      The traceback is attached to the error as a suppressed exception. As such, if the error is a composite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected via Exceptions.isTraceback(Throwable).

      Parameters:
      description - a unique enough description to include in the light assembly traceback.
      Returns:
      the assembly marked Flux
    • checkpoint

      public final Flux<T> checkpoint(@Nullable String description, boolean forceStackTrace)
      Activate traceback (full assembly tracing or the lighter assembly marking depending on the forceStackTrace option).

      By setting the forceStackTrace parameter to true, activate assembly tracing for this particular Flux and give it a description that will be reflected in the assembly traceback in case of an error upstream of the checkpoint. Note that unlike checkpoint(String), this will incur the cost of an exception stack trace creation. The description could for example be a meaningful name for the assembled flux or a wider correlation ID, since the stack trace will always provide enough information to locate where this Flux was assembled.

      By setting forceStackTrace to false, behaves like checkpoint(String) and is subject to the same caveat in choosing the description.

      It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly marker.

      The traceback is attached to the error as a suppressed exception. As such, if the error is a composite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected via Exceptions.isTraceback(Throwable).

      Parameters:
      description - a description (must be unique enough if forceStackTrace is set to false).
      forceStackTrace - false to make a light checkpoint without a stacktrace, true to use a stack trace.
      Returns:
      the assembly marked Flux.
    • collect

      public final <E> Mono<E> collect(Supplier<E> containerSupplier, BiConsumer<E,? super T> collector)
      Collect all elements emitted by this Flux into a user-defined container, by applying a collector BiConsumer taking the container and each element. The collected result will be emitted when this sequence completes, emitting the empty container if the sequence was empty.

      Discard Support: This operator discards the container upon cancellation or error triggered by a data signal. Either the container type is a Collection (in which case individual elements are discarded) or not (in which case the entire container is discarded). In case the collector BiConsumer fails to accumulate an element, the container is discarded as above and the triggering element is also discarded.

      Type Parameters:
      E - the container type
      Parameters:
      containerSupplier - the supplier of the container instance for each Subscriber
      collector - a consumer of both the container instance and the value being currently collected
      Returns:
      a Mono of the collected container on complete
    • collect

      public final <R, A> Mono<R> collect(Collector<? super T,A,? extends R> collector)
      Collect all elements emitted by this Flux into a container, by applying a Java 8 Stream API Collector The collected result will be emitted when this sequence completes, emitting the empty container if the sequence was empty.

      Discard Support: This operator discards the intermediate container (see Collector.supplier()) upon cancellation, error or exception while applying the Collector.finisher(). Either the container type is a Collection (in which case individual elements are discarded) or not (in which case the entire container is discarded). In case the accumulator BiConsumer of the collector fails to accumulate an element into the intermediate container, the container is discarded as above and the triggering element is also discarded.

      Type Parameters:
      A - The mutable accumulation type
      R - the container type
      Parameters:
      collector - the Collector
      Returns:
      a Mono of the collected container on complete
    • collectList

      public final Mono<List<T>> collectList()
      Collect all elements emitted by this Flux into a List that is emitted by the resulting Mono when this sequence completes, emitting the empty List if the sequence was empty.

      Discard Support: This operator discards the elements in the List upon cancellation or error triggered by a data signal.

      Returns:
      a Mono of a List of all values from this Flux
    • collectMap

      public final <K> Mono<Map<K,T>> collectMap(Function<? super T,? extends K> keyExtractor)
      Collect all elements emitted by this Flux into a hashed Map that is emitted by the resulting Mono when this sequence completes, emitting the empty Map if the sequence was empty. The key is extracted from each element by applying the keyExtractor Function. In case several elements map to the same key, the associated value will be the most recently emitted element.

      Discard Support: This operator discards the whole Map upon cancellation or error triggered by a data signal, so discard handlers will have to unpack the map.

      Type Parameters:
      K - the type of the key extracted from each source element
      Parameters:
      keyExtractor - a Function to map elements to a key for the Map
      Returns:
      a Mono of a Map of key-element pairs (only including latest element in case of key conflicts)
    • collectMap

      public final <K, V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor)
      Collect all elements emitted by this Flux into a hashed Map that is emitted by the resulting Mono when this sequence completes, emitting the empty Map if the sequence was empty. The key is extracted from each element by applying the keyExtractor Function, and the value is extracted by the valueExtractor Function. In case several elements map to the same key, the associated value will be derived from the most recently emitted element.

      Discard Support: This operator discards the whole Map upon cancellation or error triggered by a data signal, so discard handlers will have to unpack the map.

      Type Parameters:
      K - the type of the key extracted from each source element
      V - the type of the value extracted from each source element
      Parameters:
      keyExtractor - a Function to map elements to a key for the Map
      valueExtractor - a Function to map elements to a value for the Map
      Returns:
      a Mono of a Map of key-element pairs (only including latest element's value in case of key conflicts)
    • collectMap

      public final <K, V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor, Supplier<Map<K,V>> mapSupplier)
      Collect all elements emitted by this Flux into a user-defined Map that is emitted by the resulting Mono when this sequence completes, emitting the empty Map if the sequence was empty. The key is extracted from each element by applying the keyExtractor Function, and the value is extracted by the valueExtractor Function. In case several elements map to the same key, the associated value will be derived from the most recently emitted element.

      Discard Support: This operator discards the whole Map upon cancellation or error triggered by a data signal, so discard handlers will have to unpack the map.

      Type Parameters:
      K - the type of the key extracted from each source element
      V - the type of the value extracted from each source element
      Parameters:
      keyExtractor - a Function to map elements to a key for the Map
      valueExtractor - a Function to map elements to a value for the Map
      mapSupplier - a Map factory called for each Subscriber
      Returns:
      a Mono of a Map of key-value pairs (only including latest element's value in case of key conflicts)
    • collectMultimap

      public final <K> Mono<Map<K,Collection<T>>> collectMultimap(Function<? super T,? extends K> keyExtractor)
      Collect all elements emitted by this Flux into a multimap that is emitted by the resulting Mono when this sequence completes, emitting the empty multimap if the sequence was empty. The key is extracted from each element by applying the keyExtractor Function, and every element mapping to the same key is stored in the List associated to said key.

      Discard Support: This operator discards the whole Map upon cancellation or error triggered by a data signal, so discard handlers will have to unpack the list values in the map.

      Type Parameters:
      K - the type of the key extracted from each source element
      Parameters:
      keyExtractor - a Function to map elements to a key for the Map
      Returns:
      a Mono of a Map of key-List(elements) pairs
    • collectMultimap

      public final <K, V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor)
      Collect all elements emitted by this Flux into a multimap that is emitted by the resulting Mono when this sequence completes, emitting the empty multimap if the sequence was empty. The key is extracted from each element by applying the keyExtractor Function, and every element mapping to the same key is converted by the valueExtractor Function to a value stored in the List associated to said key.

      Discard Support: This operator discards the whole Map upon cancellation or error triggered by a data signal, so discard handlers will have to unpack the list values in the map.

      Type Parameters:
      K - the type of the key extracted from each source element
      V - the type of the value extracted from each source element
      Parameters:
      keyExtractor - a Function to map elements to a key for the Map
      valueExtractor - a Function to map elements to a value for the Map
      Returns:
      a Mono of a Map of key-List(values) pairs
    • collectMultimap

      public final <K, V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor, Supplier<Map<K,Collection<V>>> mapSupplier)
      Collect all elements emitted by this Flux into a user-defined multimap that is emitted by the resulting Mono when this sequence completes, emitting the empty multimap if the sequence was empty. The key is extracted from each element by applying the keyExtractor Function, and every element mapping to the same key is converted by the valueExtractor Function to a value stored in the Collection associated to said key.

      Discard Support: This operator discards the whole Map upon cancellation or error triggered by a data signal, so discard handlers will have to unpack the list values in the map.

      Type Parameters:
      K - the type of the key extracted from each source element
      V - the type of the value extracted from each source element
      Parameters:
      keyExtractor - a Function to map elements to a key for the Map
      valueExtractor - a Function to map elements to a value for the Map
      mapSupplier - a multimap (Map of Collection) factory called for each Subscriber
      Returns:
      a Mono of a Map of key-Collection(values) pairs
    • collectSortedList

      public final Mono<List<T>> collectSortedList()
      Collect all elements emitted by this Flux until this sequence completes, and then sort them in natural order into a List that is emitted by the resulting Mono. If the sequence was empty, empty List will be emitted.

      Discard Support: This operator is based on collectList(), and as such discards the elements in the List individually upon cancellation or error triggered by a data signal.

      Returns:
      a Mono of a sorted List of all values from this Flux, in natural order
    • collectSortedList

      public final Mono<List<T>> collectSortedList(@Nullable Comparator<? super T> comparator)
      Collect all elements emitted by this Flux until this sequence completes, and then sort them using a Comparator into a List that is emitted by the resulting Mono. If the sequence was empty, empty List will be emitted.

      Discard Support: This operator is based on collectList(), and as such discards the elements in the List individually upon cancellation or error triggered by a data signal.

      Parameters:
      comparator - a Comparator to sort the items of this sequences
      Returns:
      a Mono of a sorted List of all values from this Flux
    • concatMap

      public final <V> Flux<V> concatMap(Function<? super T,? extends Publisher<? extends V>> mapper)
      Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation.

      There are three dimensions to this operator that can be compared with flatMap and flatMapSequential:

      • Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.
      • Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.
      • Interleaving: this operator does not let values from different inners interleave (concatenation).

      Errors will immediately short circuit current concat backlog. Note that no prefetching is done on the source, which gets requested only if there is downstream demand.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.

      Type Parameters:
      V - the produced concatenated type
      Parameters:
      mapper - the function to transform this sequence of T into concatenated sequences of V
      Returns:
      a concatenated Flux
    • concatMap

      public final <V> Flux<V> concatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int prefetch)
      Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation.

      There are three dimensions to this operator that can be compared with flatMap and flatMapSequential:

      • Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.
      • Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.
      • Interleaving: this operator does not let values from different inners interleave (concatenation).

      Errors will immediately short circuit current concat backlog. The prefetch argument allows to give an arbitrary prefetch size to the upstream source, or to disable prefetching with 0.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.

      Type Parameters:
      V - the produced concatenated type
      Parameters:
      mapper - the function to transform this sequence of T into concatenated sequences of V
      prefetch - the number of values to prefetch from upstream source, or 0 to disable prefetching
      Returns:
      a concatenated Flux
    • concatMapDelayError

      public final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper)
      Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation.

      There are three dimensions to this operator that can be compared with flatMap and flatMapSequential:

      • Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.
      • Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.
      • Interleaving: this operator does not let values from different inners interleave (concatenation).

      Errors in the individual publishers will be delayed at the end of the whole concat sequence (possibly getting combined into a composite) if several sources error. Note that no prefetching is done on the source, which gets requested only if there is downstream demand.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.

      Type Parameters:
      V - the produced concatenated type
      Parameters:
      mapper - the function to transform this sequence of T into concatenated sequences of V
      Returns:
      a concatenated Flux
    • concatMapDelayError

      public final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, int prefetch)
      Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation.

      There are three dimensions to this operator that can be compared with flatMap and flatMapSequential:

      • Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.
      • Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.
      • Interleaving: this operator does not let values from different inners interleave (concatenation).

      Errors in the individual publishers will be delayed at the end of the whole concat sequence (possibly getting combined into a composite) if several sources error. The prefetch argument allows to give an arbitrary prefetch size to the upstream source, or to disable prefetching with 0.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.

      Type Parameters:
      V - the produced concatenated type
      Parameters:
      mapper - the function to transform this sequence of T into concatenated sequences of V
      prefetch - the number of values to prefetch from upstream source, or 0 to disable prefetching
      Returns:
      a concatenated Flux
    • concatMapDelayError

      public final <V> Flux<V> concatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, boolean delayUntilEnd, int prefetch)
      Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation.

      There are three dimensions to this operator that can be compared with flatMap and flatMapSequential:

      • Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.
      • Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.
      • Interleaving: this operator does not let values from different inners interleave (concatenation).

      Errors in the individual publishers will be delayed after the current concat backlog if delayUntilEnd is false or after all sources if delayUntilEnd is true. The prefetch argument allows to give an arbitrary prefetch size to the upstream source, or to disable prefetching with 0.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.

      Type Parameters:
      V - the produced concatenated type
      Parameters:
      mapper - the function to transform this sequence of T into concatenated sequences of V
      delayUntilEnd - delay error until all sources have been consumed instead of after the current source
      prefetch - the number of values to prefetch from upstream source, or 0 to disable prefetching
      Returns:
      a concatenated Flux
    • concatMapIterable

      public final <R> Flux<R> concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
      Transform the items emitted by this Flux into Iterable, then flatten the elements from those by concatenating them into a single Flux. For each iterable, Iterable.iterator() will be called at least once and at most twice.

      This operator inspects each Iterable's Spliterator to assess if the iteration can be guaranteed to be finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)). Since the default Spliterator wraps the Iterator we can have two Iterable.iterator() calls per iterable. This second invocation is skipped on a Collection however, a type which is assumed to be always finite.

      Note that unlike flatMap(Function) and concatMap(Function), with Iterable there is no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially. Thus flatMapIterable and concatMapIterable are equivalent offered as a discoverability improvement for users that explore the API with the concat vs flatMap expectation.

      Discard Support: Upon cancellation, this operator discards T elements it prefetched and, in some cases, attempts to discard remainder of the currently processed Iterable (if it can safely ensure the iterator is finite). Note that this means each Iterable's Iterable.iterator() method could be invoked twice.

      Error Mode Support: This operator supports resuming on errors (including when fusion is enabled). Exceptions thrown by the consumer are passed to the onErrorContinue(BiConsumer) error consumer (the value consumer is not invoked, as the source element will be part of the sequence). The onNext signal is then propagated as normal.

      Type Parameters:
      R - the merged output sequence type
      Parameters:
      mapper - the Function to transform input sequence into N Iterable
      Returns:
      a concatenation of the values from the Iterables obtained from each element in this Flux
    • concatMapIterable

      public final <R> Flux<R> concatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper, int prefetch)
      Transform the items emitted by this Flux into Iterable, then flatten the emissions from those by concatenating them into a single Flux. The prefetch argument allows to give an arbitrary prefetch size to the upstream source. For each iterable, Iterable.iterator() will be called at least once and at most twice.

      This operator inspects each Iterable's Spliterator to assess if the iteration can be guaranteed to be finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)). Since the default Spliterator wraps the Iterator we can have two Iterable.iterator() calls per iterable. This second invocation is skipped on a Collection however, a type which is assumed to be always finite.

      Note that unlike flatMap(Function) and concatMap(Function), with Iterable there is no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially. Thus flatMapIterable and concatMapIterable are equivalent offered as a discoverability improvement for users that explore the API with the concat vs flatMap expectation.

      Discard Support: Upon cancellation, this operator discards T elements it prefetched and, in some cases, attempts to discard remainder of the currently processed Iterable (if it can safely ensure the iterator is finite). Note that this means each Iterable's Iterable.iterator() method could be invoked twice.

      Error Mode Support: This operator supports resuming on errors (including when fusion is enabled). Exceptions thrown by the consumer are passed to the onErrorContinue(BiConsumer) error consumer (the value consumer is not invoked, as the source element will be part of the sequence). The onNext signal is then propagated as normal.

      Type Parameters:
      R - the merged output sequence type
      Parameters:
      mapper - the Function to transform input sequence into N Iterable
      prefetch - the number of values to request from the source upon subscription, to be transformed to Iterable
      Returns:
      a concatenation of the values from the Iterables obtained from each element in this Flux
    • concatWith

      public final Flux<T> concatWith(Publisher<? extends T> other)
      Concatenate emissions of this Flux with the provided Publisher (no interleave).

      Parameters:
      other - the Publisher sequence to concat after this Flux
      Returns:
      a concatenated Flux
    • contextCapture

      public final Flux<T> contextCapture()
      If context-propagation library is on the classpath, this is a convenience shortcut to capture thread local values during the subscription phase and put them in the Context that is visible upstream of this operator.

      As a result this operator should generally be used as close as possible to the end of the chain / subscription point.

      If the ContextView visible upstream is not empty, a small subset of operators will automatically restore the context snapshot (handle, tap). If context-propagation is not available at runtime, this operator simply returns the current Flux instance.

      Returns:
      a new Flux where context-propagation API has been used to capture entries and inject them into the Context
      See Also:
    • contextWrite

      public final Flux<T> contextWrite(ContextView contextToAppend)
      Enrich the Context visible from downstream for the benefit of upstream operators, by making all values from the provided ContextView visible on top of pairs from downstream.

      A Context (and its ContextView) is tied to a given subscription and is read by querying the downstream Subscriber. Subscriber that don't enrich the context instead access their own downstream's context. As a result, this operator conceptually enriches a Context coming from under it in the chain (downstream, by default an empty one) and makes the new enriched Context visible to operators above it in the chain.

      Parameters:
      contextToAppend - the ContextView to merge with the downstream Context, resulting in a new more complete Context that will be visible from upstream.
      Returns:
      a contextualized Flux
      See Also:
    • contextWrite

      public final Flux<T> contextWrite(Function<Context,Context> contextModifier)
      Enrich the Context visible from downstream for the benefit of upstream operators, by applying a Function to the downstream Context.

      The Function takes a Context for convenience, allowing to easily call write APIs to return a new Context.

      A Context (and its ContextView) is tied to a given subscription and is read by querying the downstream Subscriber. Subscriber that don't enrich the context instead access their own downstream's context. As a result, this operator conceptually enriches a Context coming from under it in the chain (downstream, by default an empty one) and makes the new enriched Context visible to operators above it in the chain.

      Parameters:
      contextModifier - the Function to apply to the downstream Context, resulting in a new more complete Context that will be visible from upstream.
      Returns:
      a contextualized Flux
      See Also:
    • count

      public final Mono<Long> count()
      Counts the number of values in this Flux. The count will be emitted when onComplete is observed.

      Returns:
      a new Mono of Long count
    • defaultIfEmpty

      public final Flux<T> defaultIfEmpty(T defaultV)
      Provide a default unique value if this sequence is completed without any data

      Parameters:
      defaultV - the alternate value if this sequence is empty
      Returns:
      a new Flux
    • delayElements

      public final Flux<T> delayElements(Duration delay)
      Delay each of this Flux elements (Subscriber.onNext(T) signals) by a given Duration. Signals are delayed and continue on the parallel default Scheduler, but empty sequences or immediate error signals are not delayed.

      Parameters:
      delay - duration by which to delay each Subscriber.onNext(T) signal
      Returns:
      a delayed Flux
      See Also:
    • delayElements

      public final Flux<T> delayElements(Duration delay, Scheduler timer)
      Delay each of this Flux elements (Subscriber.onNext(T) signals) by a given Duration. Signals are delayed and continue on a user-specified Scheduler, but empty sequences or immediate error signals are not delayed.

      Parameters:
      delay - period to delay each Subscriber.onNext(T) signal
      timer - a time-capable Scheduler instance to delay each signal on
      Returns:
      a delayed Flux
    • delaySequence

      public final Flux<T> delaySequence(Duration delay)
      Shift this Flux forward in time by a given Duration. Unlike with delayElements(Duration), elements are shifted forward in time as they are emitted, always resulting in the delay between two elements being the same as in the source (only the first element is visibly delayed from the previous event, that is the subscription). Signals are delayed and continue on the parallel Scheduler, but empty sequences or immediate error signals are not delayed.

      With this operator, a source emitting at 10Hz with a delaySequence Duration of 1s will still emit at 10Hz, with an initial "hiccup" of 1s. On the other hand, delayElements(Duration) would end up emitting at 1Hz.

      This is closer to delaySubscription(Duration), except the source is subscribed to immediately.

      Discard Support: This operator discards elements currently being delayed * if the sequence is cancelled during the delay.

      Parameters:
      delay - Duration to shift the sequence by
      Returns:
      a shifted Flux emitting at the same frequency as the source
    • delaySequence

      public final Flux<T> delaySequence(Duration delay, Scheduler timer)
      Shift this Flux forward in time by a given Duration. Unlike with delayElements(Duration, Scheduler), elements are shifted forward in time as they are emitted, always resulting in the delay between two elements being the same as in the source (only the first element is visibly delayed from the previous event, that is the subscription). Signals are delayed and continue on a user-specified Scheduler, but empty sequences or immediate error signals are not delayed.

      With this operator, a source emitting at 10Hz with a delaySequence Duration of 1s will still emit at 10Hz, with an initial "hiccup" of 1s. On the other hand, delayElements(Duration, Scheduler) would end up emitting at 1Hz.

      This is closer to delaySubscription(Duration, Scheduler), except the source is subscribed to immediately.

      Discard Support: This operator discards elements currently being delayed if the sequence is cancelled during the delay.

      Parameters:
      delay - Duration to shift the sequence by
      timer - a time-capable Scheduler instance to delay signals on
      Returns:
      a shifted Flux emitting at the same frequency as the source
    • delayUntil

      public final Flux<T> delayUntil(Function<? super T,? extends Publisher<?>> triggerProvider)
      Subscribe to this Flux and generate a Publisher from each of this Flux elements, each acting as a trigger for relaying said element.

      That is to say, the resulting Flux delays each of its emission until the associated trigger Publisher terminates.

      In case of an error either in the source or in a trigger, that error is propagated immediately downstream. Note that unlike with the Mono variant there is no fusion of subsequent calls.

      Parameters:
      triggerProvider - a Function that maps each element into a Publisher whose termination will trigger relaying the value.
      Returns:
      this Flux, but with elements delayed until their derived publisher terminates.
    • delaySubscription

      public final Flux<T> delaySubscription(Duration delay)
      Delay the subscription to this Flux source until the given period elapses. The delay is introduced through the parallel default Scheduler.

      Parameters:
      delay - duration before subscribing this Flux
      Returns:
      a delayed Flux
    • delaySubscription

      public final Flux<T> delaySubscription(Duration delay, Scheduler timer)
      Delay the subscription to this Flux source until the given period elapses, as measured on the user-provided Scheduler.

      Parameters:
      delay - Duration before subscribing this Flux
      timer - a time-capable Scheduler instance to run on
      Returns:
      a delayed Flux
    • delaySubscription

      public final <U> Flux<T> delaySubscription(Publisher<U> subscriptionDelay)
      Delay the subscription to this Flux source until another Publisher signals a value or completes.

      Type Parameters:
      U - the other source type
      Parameters:
      subscriptionDelay - a companion Publisher whose onNext/onComplete signal will trigger the subscription
      Returns:
      a delayed Flux
    • dematerialize

      public final <X> Flux<X> dematerialize()
      An operator working only if this Flux emits onNext, onError or onComplete Signal instances, transforming these materialized signals into real signals on the Subscriber. The error Signal will trigger onError and complete Signal will trigger onComplete.

      Type Parameters:
      X - the dematerialized type
      Returns:
      a dematerialized Flux
      See Also:
    • distinct

      public final Flux<T> distinct()
      For each Subscriber, track elements from this Flux that have been seen and filter out duplicates.

      The values themselves are recorded into a HashSet for distinct detection. Use distinct(Object::hashcode) if you want a more lightweight approach that doesn't retain all the objects, but is more susceptible to falsely considering two elements as distinct due to a hashcode collision.

      Discard Support: This operator discards elements that don't match the distinct predicate, but you should use the version with a cleanup if you need discarding of keys categorized by the operator as "seen". See distinct(Function, Supplier, BiPredicate, Consumer).

      Returns:
      a filtering Flux only emitting distinct values
    • distinct

      public final <V> Flux<T> distinct(Function<? super T,? extends V> keySelector)
      For each Subscriber, track elements from this Flux that have been seen and filter out duplicates, as compared by a key extracted through the user provided Function.

      Discard Support: This operator discards elements that don't match the distinct predicate, but you should use the version with a cleanup if you need discarding of keys categorized by the operator as "seen". See distinct(Function, Supplier, BiPredicate, Consumer).

      Type Parameters:
      V - the type of the key extracted from each value in this sequence
      Parameters:
      keySelector - function to compute comparison key for each element
      Returns:
      a filtering Flux only emitting values with distinct keys
    • distinct

      public final <V, C extends Collection<? super V>> Flux<T> distinct(Function<? super T,? extends V> keySelector, Supplier<C> distinctCollectionSupplier)
      For each Subscriber, track elements from this Flux that have been seen and filter out duplicates, as compared by a key extracted through the user provided Function and by the add method of the Collection supplied (typically a Set).

      Discard Support: This operator discards elements that don't match the distinct predicate, but you should use the version with a cleanup if you need discarding of keys categorized by the operator as "seen". See distinct(Function, Supplier, BiPredicate, Consumer).

      Type Parameters:
      V - the type of the key extracted from each value in this sequence
      C - the type of Collection used for distinct checking of keys
      Parameters:
      keySelector - function to compute comparison key for each element
      distinctCollectionSupplier - supplier of the Collection used for distinct check through add of the key.
      Returns:
      a filtering Flux only emitting values with distinct keys
    • distinct

      public final <V, C> Flux<T> distinct(Function<? super T,? extends V> keySelector, Supplier<C> distinctStoreSupplier, BiPredicate<C,V> distinctPredicate, Consumer<C> cleanup)
      For each Subscriber, track elements from this Flux that have been seen and filter out duplicates, as compared by applying a BiPredicate on an arbitrary user-supplied <C> store and a key extracted through the user provided Function. The BiPredicate should typically add the key to the arbitrary store for further comparison. A cleanup callback is also invoked on the store upon termination of the sequence.

      Discard Support: This operator discards elements that don't match the distinct predicate, but you should use the cleanup as well if you need discarding of keys categorized by the operator as "seen".

      Type Parameters:
      V - the type of the key extracted from each value in this sequence
      C - the type of store backing the BiPredicate
      Parameters:
      keySelector - function to compute comparison key for each element
      distinctStoreSupplier - supplier of the arbitrary store object used in distinct checks along the extracted key.
      distinctPredicate - the BiPredicate to apply to the arbitrary store + extracted key to perform a distinct check. Since nothing is assumed of the store, this predicate should also add the key to the store as necessary.
      cleanup - the cleanup callback to invoke on the store upon termination.
      Returns:
      a filtering Flux only emitting values with distinct keys
    • distinctUntilChanged

      public final Flux<T> distinctUntilChanged()
      Filter out subsequent repetitions of an element (that is, if they arrive right after one another).

      The last distinct value seen is retained for further comparison, which is done on the values themselves using the equals method. Use distinctUntilChanged(Object::hashcode) if you want a more lightweight approach that doesn't retain all the objects, but is more susceptible to falsely considering two elements as distinct due to a hashcode collision.

      Discard Support: Although this operator discards elements that are considered as "already seen", it is not recommended for cases where discarding is needed as the operator doesn't discard the "key" (in this context, the distinct instance that was last seen).

      Returns:
      a filtering Flux with only one occurrence in a row of each element (yet elements can repeat in the overall sequence)
    • distinctUntilChanged

      public final <V> Flux<T> distinctUntilChanged(Function<? super T,? extends V> keySelector)
      Filter out subsequent repetitions of an element (that is, if they arrive right after one another), as compared by a key extracted through the user provided Function using equality.

      Discard Support: This operator discards elements that are considered as "already seen". The keys themselves are not discarded.

      Type Parameters:
      V - the type of the key extracted from each value in this sequence
      Parameters:
      keySelector - function to compute comparison key for each element
      Returns:
      a filtering Flux with only one occurrence in a row of each element of the same key (yet element keys can repeat in the overall sequence)
    • distinctUntilChanged

      public final <V> Flux<T> distinctUntilChanged(Function<? super T,? extends V> keySelector, BiPredicate<? super V,? super V> keyComparator)
      Filter out subsequent repetitions of an element (that is, if they arrive right after one another), as compared by a key extracted through the user provided Function and then comparing keys with the supplied BiPredicate.

      Discard Support: This operator discards elements that are considered as "already seen" (for which the keyComparator returns true). The keys themselves are not discarded.

      Type Parameters:
      V - the type of the key extracted from each value in this sequence
      Parameters:
      keySelector - function to compute comparison key for each element
      keyComparator - predicate used to compare keys.
      Returns:
      a filtering Flux with only one occurrence in a row of each element of the same key for which the predicate returns true (yet element keys can repeat in the overall sequence)
    • doAfterTerminate

      public final Flux<T> doAfterTerminate(Runnable afterTerminate)
      Add behavior (side-effect) triggered after the Flux terminates, either by completing downstream successfully or with an error.

      The relevant signal is propagated downstream, then the Runnable is executed.

      Parameters:
      afterTerminate - the callback to call after Subscriber.onComplete() or Subscriber.onError(java.lang.Throwable)
      Returns:
      an observed Flux
    • doOnCancel

      public final Flux<T> doOnCancel(Runnable onCancel)
      Add behavior (side-effect) triggered when the Flux is cancelled.

      The handler is executed first, then the cancel signal is propagated upstream to the source.

      Parameters:
      onCancel - the callback to call on Subscription.cancel()
      Returns:
      an observed Flux
    • doOnComplete

      public final Flux<T> doOnComplete(Runnable onComplete)
      Add behavior (side-effect) triggered when the Flux completes successfully.

      The Runnable is executed first, then the onComplete signal is propagated downstream.

      Parameters:
      onComplete - the callback to call on Subscriber.onComplete()
      Returns:
      an observed Flux
    • doOnDiscard

      public final <R> Flux<T> doOnDiscard(Class<R> type, Consumer<? super R> discardHook)
      Potentially modify the behavior of the whole chain of operators upstream of this one to conditionally clean up elements that get discarded by these operators.

      The discardHook MUST be idempotent and safe to use on any instance of the desired type. Calls to this method are additive, and the order of invocation of the discardHook is the same as the order of declaration (calling .filter(...).doOnDiscard(first).doOnDiscard(second) will let the filter invoke first then second handlers).

      Two main categories of discarding operators exist:

      • filtering operators, dropping some source elements as part of their designed behavior
      • operators that prefetch a few elements and keep them around pending a request, but get cancelled/in error
      WARNING: Not all operators support this instruction. The ones that do are identified in the javadoc by the presence of a Discard Support section.
      Parameters:
      type - the Class of elements in the upstream chain of operators that this cleanup hook should take into account.
      discardHook - a Consumer of elements in the upstream chain of operators that performs the cleanup.
      Returns:
      a Flux that cleans up matching elements that get discarded upstream of it.
    • doOnEach

      public final Flux<T> doOnEach(Consumer<? super Signal<T>> signalConsumer)
      Add behavior (side-effects) triggered when the Flux emits an item, fails with an error or completes successfully. All these events are represented as a Signal that is passed to the side-effect callback. Note that this is an advanced operator, typically used for monitoring of a Flux. These Signal have a Context associated to them.

      The Consumer is executed first, then the relevant signal is propagated downstream.

      Parameters:
      signalConsumer - the mandatory callback to call on Subscriber.onNext(Object), Subscriber.onError(Throwable) and Subscriber.onComplete()
      Returns:
      an observed Flux
      See Also:
    • doOnError

      public final Flux<T> doOnError(Consumer<? super Throwable> onError)
      Add behavior (side-effect) triggered when the Flux completes with an error.

      The Consumer is executed first, then the onError signal is propagated downstream.

      Parameters:
      onError - the callback to call on Subscriber.onError(java.lang.Throwable)
      Returns:
      an observed Flux
    • doOnError

      public final <E extends Throwable> Flux<T> doOnError(Class<E> exceptionType, Consumer<? super E> onError)
      Add behavior (side-effect) triggered when the Flux completes with an error matching the given exception type.

      The Consumer is executed first, then the onError signal is propagated downstream.

      Type Parameters:
      E - type of the error to handle
      Parameters:
      exceptionType - the type of exceptions to handle
      onError - the error handler for each error
      Returns:
      an observed Flux
    • doOnError

      public final Flux<T> doOnError(Predicate<? super Throwable> predicate, Consumer<? super Throwable> onError)
      Add behavior (side-effect) triggered when the Flux completes with an error matching the given exception.

      The Consumer is executed first, then the onError signal is propagated downstream.

      Parameters:
      predicate - the matcher for exceptions to handle
      onError - the error handler for each error
      Returns:
      an observed Flux
    • doOnNext

      public final Flux<T> doOnNext(Consumer<? super T> onNext)
      Add behavior (side-effect) triggered when the Flux emits an item.

      The Consumer is executed first, then the onNext signal is propagated downstream.

      Error Mode Support: This operator supports resuming on errors (including when fusion is enabled). Exceptions thrown by the consumer are passed to the onErrorContinue(BiConsumer) error consumer (the value consumer is not invoked, as the source element will be part of the sequence). The onNext signal is then propagated as normal.

      Parameters:
      onNext - the callback to call on Subscriber.onNext(T)
      Returns:
      an observed Flux
    • doOnRequest

      public final Flux<T> doOnRequest(LongConsumer consumer)
      Add behavior (side-effect) triggering a LongConsumer when this Flux receives any request.

      Note that non fatal error raised in the callback will not be propagated and will simply trigger Operators.onOperatorError(Throwable, Context).

      The LongConsumer is executed first, then the request signal is propagated upstream to the parent.

      Parameters:
      consumer - the consumer to invoke on each request
      Returns:
      an observed Flux
    • doOnSubscribe

      public final Flux<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe)
      Add behavior (side-effect) triggered when the Flux is being subscribed, that is to say when a Subscription has been produced by the Publisher and is being passed to the Subscriber.onSubscribe(Subscription).

      This method is not intended for capturing the subscription and calling its methods, but for side effects like monitoring. For instance, the correct way to cancel a subscription is to call Disposable.dispose() on the Disposable returned by subscribe().

      The Consumer is executed first, then the Subscription is propagated downstream to the next subscriber in the chain that is being established.

      Parameters:
      onSubscribe - the callback to call on Subscriber.onSubscribe(org.reactivestreams.Subscription)
      Returns:
      an observed Flux
      See Also:
    • doOnTerminate

      public final Flux<T> doOnTerminate(Runnable onTerminate)
      Add behavior (side-effect) triggered when the Flux terminates, either by completing successfully or failing with an error.

      The Runnable is executed first, then the onComplete/onError signal is propagated downstream.

      Parameters:
      onTerminate - the callback to call on Subscriber.onComplete() or Subscriber.onError(java.lang.Throwable)
      Returns:
      an observed Flux
    • doFirst

      public final Flux<T> doFirst(Runnable onFirst)
      Add behavior (side-effect) triggered before the Flux is subscribed to, which should be the first event after assembly time.

      Note that when several doFirst(Runnable) operators are used anywhere in a chain of operators, their order of execution is reversed compared to the declaration order (as subscribe signal flows backward, from the ultimate subscriber to the source publisher):

      
       Flux.just(1, 2)
           .doFirst(() -> System.out.println("three"))
           .doFirst(() -> System.out.println("two"))
           .doFirst(() -> System.out.println("one"));
       //would print one two three
       
       

      In case the Runnable throws an exception, said exception will be directly propagated to the subscribing Subscriber along with a no-op Subscription, similarly to what error(Throwable) does. Otherwise, after the handler has executed, the Subscriber is directly subscribed to the original source Flux (this).

      This side-effect method provides stronger first guarantees compared to doOnSubscribe(Consumer), which is triggered once the Subscription has been set up and passed to the Subscriber.

      Parameters:
      onFirst - the callback to execute before the Flux is subscribed to
      Returns:
      an observed Flux
    • doFinally

      public final Flux<T> doFinally(Consumer<SignalType> onFinally)
      Add behavior (side-effect) triggered after the Flux terminates for any reason, including cancellation. The terminating event (SignalType.ON_COMPLETE, SignalType.ON_ERROR and SignalType.CANCEL) is passed to the consumer, which is executed after the signal has been passed downstream.

      Note that the fact that the signal is propagated downstream before the callback is executed means that several doFinally in a row will be executed in reverse order. If you want to assert the execution of the callback please keep in mind that the Flux will complete before it is executed, so its effect might not be visible immediately after eg. a blockLast().

      Parameters:
      onFinally - the callback to execute after a terminal signal (complete, error or cancel)
      Returns:
      an observed Flux
    • elapsed

      public final Flux<Tuple2<Long,T>> elapsed()
      Map this Flux into Tuple2<Long, T> of timemillis and source data. The timemillis corresponds to the elapsed time between each signal as measured by the parallel scheduler. First duration is measured between the subscription and the first element.

      Returns:
      a new Flux that emits a tuple of time elapsed in milliseconds and matching data
      See Also:
    • elapsed

      public final Flux<Tuple2<Long,T>> elapsed(Scheduler scheduler)
      Map this Flux into Tuple2<Long, T> of timemillis and source data. The timemillis corresponds to the elapsed time between each signal as measured by the provided Scheduler. First duration is measured between the subscription and the first element.

      Parameters:
      scheduler - a Scheduler instance to read time from
      Returns:
      a new Flux that emits tuples of time elapsed in milliseconds and matching data
      See Also:
    • elementAt

      public final Mono<T> elementAt(int index)
      Emit only the element at the given index position or IndexOutOfBoundsException if the sequence is shorter.

      Discard Support: This operator discards elements that appear before the requested index.

      Parameters:
      index - zero-based index of the only item to emit
      Returns:
      a Mono of the item at the specified zero-based index
    • elementAt

      public final Mono<T> elementAt(int index, T defaultValue)
      Emit only the element at the given index position or fall back to a default value if the sequence is shorter.

      Discard Support: This operator discards elements that appear before the requested index.

      Parameters:
      index - zero-based index of the only item to emit
      defaultValue - a default value to emit if the sequence is shorter
      Returns:
      a Mono of the item at the specified zero-based index or a default value
    • expandDeep

      public final Flux<T> expandDeep(Function<? super T,? extends Publisher<? extends T>> expander, int capacityHint)
      Recursively expand elements into a graph and emit all the resulting element, in a depth-first traversal order.

      That is: emit one value from this Flux, expand it and emit the first value at this first level of recursion, and so on... When no more recursion is possible, backtrack to the previous level and re-apply the strategy.

      For example, given the hierarchical structure

        A
         - AA
           - aa1
        B
         - BB
           - bb1
       
      Expands Flux.just(A, B) into
        A
        AA
        aa1
        B
        BB
        bb1
       
      Parameters:
      expander - the Function applied at each level of recursion to expand values into a Publisher, producing a graph.
      capacityHint - a capacity hint to prepare the inner queues to accommodate n elements per level of recursion.
      Returns:
      a Flux expanded depth-first
    • expandDeep

      public final Flux<T> expandDeep(Function<? super T,? extends Publisher<? extends T>> expander)
      Recursively expand elements into a graph and emit all the resulting element, in a depth-first traversal order.

      That is: emit one value from this Flux, expand it and emit the first value at this first level of recursion, and so on... When no more recursion is possible, backtrack to the previous level and re-apply the strategy.

      For example, given the hierarchical structure

        A
         - AA
           - aa1
        B
         - BB
           - bb1
       
      Expands Flux.just(A, B) into
        A
        AA
        aa1
        B
        BB
        bb1
       
      Parameters:
      expander - the Function applied at each level of recursion to expand values into a Publisher, producing a graph.
      Returns:
      a Flux expanded depth-first
    • expand

      public final Flux<T> expand(Function<? super T,? extends Publisher<? extends T>> expander, int capacityHint)
      Recursively expand elements into a graph and emit all the resulting element using a breadth-first traversal strategy.

      That is: emit the values from this Flux first, then expand each at a first level of recursion and emit all of the resulting values, then expand all of these at a second level and so on.

      For example, given the hierarchical structure

        A
         - AA
           - aa1
        B
         - BB
           - bb1
       
      Expands Flux.just(A, B) into
        A
        B
        AA
        BB
        aa1
        bb1
       
      Parameters:
      expander - the Function applied at each level of recursion to expand values into a Publisher, producing a graph.
      capacityHint - a capacity hint to prepare the inner queues to accommodate n elements per level of recursion.
      Returns:
      a breadth-first expanded Flux
    • expand

      public final Flux<T> expand(Function<? super T,? extends Publisher<? extends T>> expander)
      Recursively expand elements into a graph and emit all the resulting element using a breadth-first traversal strategy.

      That is: emit the values from this Flux first, then expand each at a first level of recursion and emit all of the resulting values, then expand all of these at a second level and so on..

      For example, given the hierarchical structure

        A
         - AA
           - aa1
        B
         - BB
           - bb1
       
      Expands Flux.just(A, B) into
        A
        B
        AA
        BB
        aa1
        bb1
       
      Parameters:
      expander - the Function applied at each level of recursion to expand values into a Publisher, producing a graph.
      Returns:
      a breadth-first expanded Flux
    • filter

      public final Flux<T> filter(Predicate<? super T> p)
      Evaluate each source value against the given Predicate. If the predicate test succeeds, the value is emitted. If the predicate test fails, the value is ignored and a request of 1 is made upstream.

      Discard Support: This operator discards elements that do not match the filter. It also discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.

      Error Mode Support: This operator supports resuming on errors (including when fusion is enabled). Exceptions thrown by the predicate are considered as if the predicate returned false: they cause the source value to be dropped and a new element (request(1)) being requested from upstream.

      Parameters:
      p - the Predicate to test values against
      Returns:
      a new Flux containing only values that pass the predicate test
    • filterWhen

      public final Flux<T> filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate)
      Test each value emitted by this Flux asynchronously using a generated Publisher<Boolean> test. A value is replayed if the first item emitted by its corresponding test is true. It is dropped if its test is either empty or its first emitted value is false.

      Note that only the first value of the test publisher is considered, and unless it is a Mono, test will be cancelled after receiving that first value. Test publishers are generated and subscribed to in sequence.

      Discard Support: This operator discards elements that do not match the filter. It also discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.

      Parameters:
      asyncPredicate - the function generating a Publisher of Boolean for each value, to filter the Flux with
      Returns:
      a filtered Flux
    • filterWhen

      public final Flux<T> filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate, int bufferSize)
      Test each value emitted by this Flux asynchronously using a generated Publisher<Boolean> test. A value is replayed if the first item emitted by its corresponding test is true. It is dropped if its test is either empty or its first emitted value is false.

      Note that only the first value of the test publisher is considered, and unless it is a Mono, test will be cancelled after receiving that first value. Test publishers are generated and subscribed to in sequence.

      Discard Support: This operator discards elements that do not match the filter. It also discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.

      Parameters:
      asyncPredicate - the function generating a Publisher of Boolean for each value, to filter the Flux with
      bufferSize - the maximum expected number of values to hold pending a result of their respective asynchronous predicates, rounded to the next power of two. This is capped depending on the size of the heap and the JVM limits, so be careful with large values (although eg. 65536 should still be fine). Also serves as the initial request size for the source.
      Returns:
      a filtered Flux
    • flatMap

      public final <R> Flux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
      Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.

      There are three dimensions to this operator that can be compared with flatMapSequential and concatMap:

      • Generation of inners and subscription: this operator is eagerly subscribing to its inners.
      • Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner element are flattened as they arrive.
      • Interleaving: this operator lets values from different inners interleave (similar to merging the inner sequences).

      Discard Support: This operator discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.

      Error Mode Support: This operator supports resuming on errors in the mapper Function. Exceptions thrown by the mapper then behave as if it had mapped the value to an empty publisher. If the mapper does map to a scalar publisher (an optimization in which the value can be resolved immediately without subscribing to the publisher, e.g. a Mono.fromCallable(Callable)) but said publisher throws, this can be resumed from in the same manner.

      Type Parameters:
      R - the merged output sequence type
      Parameters:
      mapper - the Function to transform input sequence into N sequences Publisher
      Returns:
      a new Flux
    • flatMap

      public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency)
      Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.

      There are three dimensions to this operator that can be compared with flatMapSequential and concatMap:

      • Generation of inners and subscription: this operator is eagerly subscribing to its inners.
      • Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner element are flattened as they arrive.
      • Interleaving: this operator lets values from different inners interleave (similar to merging the inner sequences).
      The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel. In turn, that argument shows the size of the first Subscription.request(long) to the upstream.

      Discard Support: This operator discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.

      Error Mode Support: This operator supports resuming on errors in the mapper Function. Exceptions thrown by the mapper then behave as if it had mapped the value to an empty publisher. If the mapper does map to a scalar publisher (an optimization in which the value can be resolved immediately without subscribing to the publisher, e.g. a Mono.fromCallable(Callable)) but said publisher throws, this can be resumed from in the same manner.

      Type Parameters:
      V - the merged output sequence type
      Parameters:
      mapper - the Function to transform input sequence into N sequences Publisher
      concurrency - the maximum number of in-flight inner sequences
      Returns:
      a new Flux
    • flatMap

      public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)
      Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.

      There are three dimensions to this operator that can be compared with flatMapSequential and concatMap:

      • Generation of inners and subscription: this operator is eagerly subscribing to its inners.
      • Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner element are flattened as they arrive.
      • Interleaving: this operator lets values from different inners interleave (similar to merging the inner sequences).
      The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel. In turn, that argument shows the size of the first Subscription.request(long) to the upstream. The prefetch argument allows to give an arbitrary prefetch size to the merged Publisher (in other words prefetch size means the size of the first Subscription.request(long) to the merged Publisher).

      Discard Support: This operator discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.

      Error Mode Support: This operator supports resuming on errors in the mapper Function. Exceptions thrown by the mapper then behave as if it had mapped the value to an empty publisher. If the mapper does map to a scalar publisher (an optimization in which the value can be resolved immediately without subscribing to the publisher, e.g. a Mono.fromCallable(Callable)) but said publisher throws, this can be resumed from in the same manner.

      Type Parameters:
      V - the merged output sequence type
      Parameters:
      mapper - the Function to transform input sequence into N sequences Publisher
      concurrency - the maximum number of in-flight inner sequences
      prefetch - the maximum in-flight elements from each inner Publisher sequence
      Returns:
      a merged Flux
    • flatMapDelayError

      public final <V> Flux<V> flatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)
      Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.

      There are three dimensions to this operator that can be compared with flatMapSequential and concatMap:

      • Generation of inners and subscription: this operator is eagerly subscribing to its inners.
      • Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner element are flattened as they arrive.
      • Interleaving: this operator lets values from different inners interleave (similar to merging the inner sequences).
      The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel. The prefetch argument allows to give an arbitrary prefetch size to the merged Publisher. This variant will delay any error until after the rest of the flatMap backlog has been processed.

      Discard Support: This operator discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.

      Error Mode Support: This operator supports resuming on errors in the mapper Function. Exceptions thrown by the mapper then behave as if it had mapped the value to an empty publisher. If the mapper does map to a scalar publisher (an optimization in which the value can be resolved immediately without subscribing to the publisher, e.g. a Mono.fromCallable(Callable)) but said publisher throws, this can be resumed from in the same manner.

      Type Parameters:
      V - the merged output sequence type
      Parameters:
      mapper - the Function to transform input sequence into N sequences Publisher
      concurrency - the maximum number of in-flight inner sequences
      prefetch - the maximum in-flight elements from each inner Publisher sequence
      Returns:
      a merged Flux
    • flatMap

      public final <R> Flux<R> flatMap(@Nullable Function<? super T,? extends Publisher<? extends R>> mapperOnNext, @Nullable Function<? super Throwable,? extends Publisher<? extends R>> mapperOnError, @Nullable Supplier<? extends Publisher<? extends R>> mapperOnComplete)
      Transform the signals emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave. Note that at least one of the signal mappers must be provided, and all provided mappers must produce a publisher.

      There are three dimensions to this operator that can be compared with flatMapSequential and concatMap:

      • Generation of inners and subscription: this operator is eagerly subscribing to its inners.
      • Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner element are flattened as they arrive.
      • Interleaving: this operator lets values from different inners interleave (similar to merging the inner sequences).

      OnError will be transformed into completion signal after its mapping callback has been applied.

      Type Parameters:
      R - the output Publisher type target
      Parameters:
      mapperOnNext - the Function to call on next data and returning a sequence to merge. Use null to ignore (provided at least one other mapper is specified).
      mapperOnError - the Function to call on error signal and returning a sequence to merge. Use null to ignore (provided at least one other mapper is specified).
      mapperOnComplete - the Function to call on complete signal and returning a sequence to merge. Use null to ignore (provided at least one other mapper is specified).
      Returns:
      a new Flux
    • flatMapIterable

      public final <R> Flux<R> flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
      Transform the items emitted by this Flux into Iterable, then flatten the elements from those by merging them into a single Flux. For each iterable, Iterable.iterator() will be called at least once and at most twice.

      This operator inspects each Iterable's Spliterator to assess if the iteration can be guaranteed to be finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)). Since the default Spliterator wraps the Iterator we can have two Iterable.iterator() calls per iterable. This second invocation is skipped on a Collection however, a type which is assumed to be always finite.

      Note that unlike flatMap(Function) and concatMap(Function), with Iterable there is no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially. Thus flatMapIterable and concatMapIterable are equivalent offered as a discoverability improvement for users that explore the API with the concat vs flatMap expectation.

      Discard Support: Upon cancellation, this operator discards T elements it prefetched and, in some cases, attempts to discard remainder of the currently processed Iterable (if it can safely ensure the iterator is finite). Note that this means each Iterable's Iterable.iterator() method could be invoked twice.

      Error Mode Support: This operator supports resuming on errors (including when fusion is enabled). Exceptions thrown by the consumer are passed to the onErrorContinue(BiConsumer) error consumer (the value consumer is not invoked, as the source element will be part of the sequence). The onNext signal is then propagated as normal.

      Type Parameters:
      R - the merged output sequence type
      Parameters:
      mapper - the Function to transform input sequence into N Iterable
      Returns:
      a concatenation of the values from the Iterables obtained from each element in this Flux
    • flatMapIterable

      public final <R> Flux<R> flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper, int prefetch)
      Transform the items emitted by this Flux into Iterable, then flatten the emissions from those by merging them into a single Flux. The prefetch argument allows to give an arbitrary prefetch size to the upstream source. For each iterable, Iterable.iterator() will be called at least once and at most twice.

      This operator inspects each Iterable's Spliterator to assess if the iteration can be guaranteed to be finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)). Since the default Spliterator wraps the Iterator we can have two Iterable.iterator() calls per iterable. This second invocation is skipped on a Collection however, a type which is assumed to be always finite.

      Note that unlike flatMap(Function) and concatMap(Function), with Iterable there is no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially. Thus flatMapIterable and concatMapIterable are equivalent offered as a discoverability improvement for users that explore the API with the concat vs flatMap expectation.

      Discard Support: Upon cancellation, this operator discards T elements it prefetched and, in some cases, attempts to discard remainder of the currently processed Iterable (if it can safely ensure the iterator is finite). Note that this means each Iterable's Iterable.iterator() method could be invoked twice.

      Error Mode Support: This operator supports resuming on errors (including when fusion is enabled). Exceptions thrown by the consumer are passed to the onErrorContinue(BiConsumer) error consumer (the value consumer is not invoked, as the source element will be part of the sequence). The onNext signal is then propagated as normal.

      Type Parameters:
      R - the merged output sequence type
      Parameters:
      mapper - the Function to transform input sequence into N Iterable
      prefetch - the number of values to request from the source upon subscription, to be transformed to Iterable
      Returns:
      a concatenation of the values from the Iterables obtained from each element in this Flux
    • flatMapSequential

      public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper)
      Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, but merge them in the order of their source element.

      There are three dimensions to this operator that can be compared with flatMap and concatMap:

      • Generation of inners and subscription: this operator is eagerly subscribing to its inners (like flatMap).
      • Ordering of the flattened values: this operator queues elements from late inners until all elements from earlier inners have been emitted, thus emitting inner sequences as a whole, in an order that matches their source's order.
      • Interleaving: this operator does not let values from different inners interleave (similar looking result to concatMap, but due to queueing of values that would have been interleaved otherwise).

      That is to say, whenever a source element is emitted it is transformed to an inner Publisher. However, if such an early inner takes more time to complete than subsequent faster inners, the data from these faster inners will be queued until the earlier inner completes, so as to maintain source ordering.

      Type Parameters:
      R - the merged output sequence type
      Parameters:
      mapper - the Function to transform input sequence into N sequences Publisher
      Returns:
      a merged Flux, subscribing early but keeping the original ordering
    • flatMapSequential

      public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency)
      Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, but merge them in the order of their source element.

      There are three dimensions to this operator that can be compared with flatMap and concatMap:

      • Generation of inners and subscription: this operator is eagerly subscribing to its inners (like flatMap).
      • Ordering of the flattened values: this operator queues elements from late inners until all elements from earlier inners have been emitted, thus emitting inner sequences as a whole, in an order that matches their source's order.
      • Interleaving: this operator does not let values from different inners interleave (similar looking result to concatMap, but due to queueing of values that would have been interleaved otherwise).

      That is to say, whenever a source element is emitted it is transformed to an inner Publisher. However, if such an early inner takes more time to complete than subsequent faster inners, the data from these faster inners will be queued until the earlier inner completes, so as to maintain source ordering.

      The concurrency argument allows to control how many merged Publisher can happen in parallel.

      Type Parameters:
      R - the merged output sequence type
      Parameters:
      mapper - the Function to transform input sequence into N sequences Publisher
      maxConcurrency - the maximum number of in-flight inner sequences
      Returns:
      a merged Flux, subscribing early but keeping the original ordering
    • flatMapSequential

      public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency, int prefetch)
      Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, but merge them in the order of their source element.

      There are three dimensions to this operator that can be compared with flatMap and concatMap:

      • Generation of inners and subscription: this operator is eagerly subscribing to its inners (like flatMap).
      • Ordering of the flattened values: this operator queues elements from late inners until all elements from earlier inners have been emitted, thus emitting inner sequences as a whole, in an order that matches their source's order.
      • Interleaving: this operator does not let values from different inners interleave (similar looking result to concatMap, but due to queueing of values that would have been interleaved otherwise).

      That is to say, whenever a source element is emitted it is transformed to an inner Publisher. However, if such an early inner takes more time to complete than subsequent faster inners, the data from these faster inners will be queued until the earlier inner completes, so as to maintain source ordering.

      The concurrency argument allows to control how many merged Publisher can happen in parallel. The prefetch argument allows to give an arbitrary prefetch size to the merged Publisher.

      Type Parameters:
      R - the merged output sequence type
      Parameters:
      mapper - the Function to transform input sequence into N sequences Publisher
      maxConcurrency - the maximum number of in-flight inner sequences
      prefetch - the maximum in-flight elements from each inner Publisher sequence
      Returns:
      a merged Flux, subscribing early but keeping the original ordering
    • flatMapSequentialDelayError

      public final <R> Flux<R> flatMapSequentialDelayError(Function<? super T,? extends Publisher<? extends R>> mapper, int maxConcurrency, int prefetch)
      Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, but merge them in the order of their source element.

      There are three dimensions to this operator that can be compared with flatMap and concatMap:

      • Generation of inners and subscription: this operator is eagerly subscribing to its inners (like flatMap).
      • Ordering of the flattened values: this operator queues elements from late inners until all elements from earlier inners have been emitted, thus emitting inner sequences as a whole, in an order that matches their source's order.
      • Interleaving: this operator does not let values from different inners interleave (similar looking result to concatMap, but due to queueing of values that would have been interleaved otherwise).

      That is to say, whenever a source element is emitted it is transformed to an inner Publisher. However, if such an early inner takes more time to complete than subsequent faster inners, the data from these faster inners will be queued until the earlier inner completes, so as to maintain source ordering.

      The concurrency argument allows to control how many merged Publisher can happen in parallel. The prefetch argument allows to give an arbitrary prefetch size to the merged Publisher. This variant will delay any error until after the rest of the flatMap backlog has been processed.

      Type Parameters:
      R - the merged output sequence type
      Parameters:
      mapper - the Function to transform input sequence into N sequences Publisher
      maxConcurrency - the maximum number of in-flight inner sequences
      prefetch - the maximum in-flight elements from each inner Publisher sequence
      Returns:
      a merged Flux, subscribing early but keeping the original ordering
    • getPrefetch

      public int getPrefetch()
      The prefetch configuration of the Flux
      Returns:
      the prefetch configuration of the Flux, -1 if unspecified
    • groupBy

      public final <K> Flux<GroupedFlux<K,T>> groupBy(Function<? super T,? extends K> keyMapper)
      Divide this sequence into dynamically created Flux (or groups) for each unique key, as produced by the provided keyMapper Function. Note that groupBy works best with a low cardinality of groups, so chose your keyMapper function accordingly.

      The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap with a maxConcurrency parameter that is set too low).

      To avoid deadlock, the concurrency of the subscriber to groupBy should be greater than or equal to the number of groups created. In that case every group has its own subscriber and progress can be made, even when the data publish pattern is arbitrary. Otherwise, when the number of groups exceeds downstream concurrency, the subscribers should be designed with caution, because if the consumption pattern doesn't match what can be accommodated in its producer buffer, the process may enter deadlock due to backpressure.

      Note that groups are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a specific group more than once: groups are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      Type Parameters:
      K - the key type extracted from each value of this sequence
      Parameters:
      keyMapper - the key mapping Function that evaluates an incoming data and returns a key.
      Returns:
      a Flux of GroupedFlux grouped sequences
    • groupBy

      public final <K> Flux<GroupedFlux<K,T>> groupBy(Function<? super T,? extends K> keyMapper, int prefetch)
      Divide this sequence into dynamically created Flux (or groups) for each unique key, as produced by the provided keyMapper Function. Note that groupBy works best with a low cardinality of groups, so chose your keyMapper function accordingly.

      The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap with a maxConcurrency parameter that is set too low).

      To avoid deadlock, the concurrency of the subscriber to groupBy should be greater than or equal to the number of groups created. In that case every group has its own subscriber and progress can be made, even when the data publish pattern is arbitrary. Otherwise, when the number of groups exceeds downstream concurrency, the subscribers should be designed with caution, because if the consumption pattern doesn't match what can be accommodated in its producer buffer, the process may enter deadlock due to backpressure.

      Note that groups are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a specific group more than once: groups are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      Type Parameters:
      K - the key type extracted from each value of this sequence
      Parameters:
      keyMapper - the key mapping Function that evaluates an incoming data and returns a key.
      prefetch - the number of values to prefetch from the source
      Returns:
      a Flux of GroupedFlux grouped sequences
    • groupBy

      public final <K, V> Flux<GroupedFlux<K,V>> groupBy(Function<? super T,? extends K> keyMapper, Function<? super T,? extends V> valueMapper)
      Divide this sequence into dynamically created Flux (or groups) for each unique key, as produced by the provided keyMapper Function. Source elements are also mapped to a different value using the valueMapper. Note that groupBy works best with a low cardinality of groups, so chose your keyMapper function accordingly.

      The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap with a maxConcurrency parameter that is set too low).

      To avoid deadlock, the concurrency of the subscriber to groupBy should be greater than or equal to the number of groups created. In that case every group has its own subscriber and progress can be made, even when the data publish pattern is arbitrary. Otherwise, when the number of groups exceeds downstream concurrency, the subscribers should be designed with caution, because if the consumption pattern doesn't match what can be accommodated in its producer buffer, the process may enter deadlock due to backpressure.

      Note that groups are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a specific group more than once: groups are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      Type Parameters:
      K - the key type extracted from each value of this sequence
      V - the value type extracted from each value of this sequence
      Parameters:
      keyMapper - the key mapping function that evaluates an incoming data and returns a key.
      valueMapper - the value mapping function that evaluates which data to extract for re-routing.
      Returns:
      a Flux of GroupedFlux grouped sequences
    • groupBy

      public final <K, V> Flux<GroupedFlux<K,V>> groupBy(Function<? super T,? extends K> keyMapper, Function<? super T,? extends V> valueMapper, int prefetch)
      Divide this sequence into dynamically created Flux (or groups) for each unique key, as produced by the provided keyMapper Function. Source elements are also mapped to a different value using the valueMapper. Note that groupBy works best with a low cardinality of groups, so chose your keyMapper function accordingly.

      The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap with a maxConcurrency parameter that is set too low).

      To avoid deadlock, the concurrency of the subscriber to groupBy should be greater than or equal to the number of groups created. In that case every group has its own subscriber and progress can be made, even when the data publish pattern is arbitrary. Otherwise, when the number of groups exceeds downstream concurrency, the subscribers should be designed with caution, because if the consumption pattern doesn't match what can be accommodated in its producer buffer, the process may enter deadlock due to backpressure.

      Note that groups are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a specific group more than once: groups are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      Type Parameters:
      K - the key type extracted from each value of this sequence
      V - the value type extracted from each value of this sequence
      Parameters:
      keyMapper - the key mapping function that evaluates an incoming data and returns a key.
      valueMapper - the value mapping function that evaluates which data to extract for re-routing.
      prefetch - the number of values to prefetch from the source
      Returns:
      a Flux of GroupedFlux grouped sequences
    • groupJoin

      public final <TRight, TLeftEnd, TRightEnd, R> Flux<R> groupJoin(Publisher<? extends TRight> other, Function<? super T,? extends Publisher<TLeftEnd>> leftEnd, Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd, BiFunction<? super T,? super Flux<TRight>,? extends R> resultSelector)
      Map values from two Publishers into time windows and emit combination of values in case their windows overlap. The emitted elements are obtained by passing the value from this Flux and a Flux emitting the value from the other Publisher to a BiFunction.

      There are no guarantees in what order the items get combined when multiple items from one or both source Publishers overlap.

      Unlike join(org.reactivestreams.Publisher<? extends TRight>, java.util.function.Function<? super T, ? extends org.reactivestreams.Publisher<TLeftEnd>>, java.util.function.Function<? super TRight, ? extends org.reactivestreams.Publisher<TRightEnd>>, java.util.function.BiFunction<? super T, ? super TRight, ? extends R>), items from the second Publisher will be provided as a Flux to the resultSelector.

      Type Parameters:
      TRight - the type of the elements from the right Publisher
      TLeftEnd - the type for this Flux window signals
      TRightEnd - the type for the right Publisher window signals
      R - the combined result type
      Parameters:
      other - the other Publisher to correlate items with
      leftEnd - a function that returns a Publisher whose emissions indicate the time window for the source value to be considered
      rightEnd - a function that returns a Publisher whose emissions indicate the time window for the right Publisher value to be considered
      resultSelector - a function that takes an item emitted by this Flux and a Flux representation of the overlapping item from the other Publisher and returns the value to be emitted by the resulting Flux
      Returns:
      a joining Flux
      See Also:
    • handle

      public final <R> Flux<R> handle(BiConsumer<? super T,SynchronousSink<R>> handler)
      Handle the items emitted by this Flux by calling a biconsumer with the output sink for each onNext. At most one SynchronousSink.next(Object) call must be performed and/or 0 or 1 SynchronousSink.error(Throwable) or SynchronousSink.complete().

      Error Mode Support: This operator supports resuming on errors (including when fusion is enabled) when the BiConsumer throws an exception or if an error is signaled explicitly via SynchronousSink.error(Throwable).

      When the context-propagation library is available at runtime and the downstream ContextView is not empty, this operator implicitly uses the library to restore thread locals around the handler BiConsumer. Typically, this would be done in conjunction with the use of contextCapture() operator down the chain.

      Type Parameters:
      R - the transformed type
      Parameters:
      handler - the handling BiConsumer
      Returns:
      a transformed Flux
    • hasElement

      public final Mono<Boolean> hasElement(T value)
      Emit a single boolean true if any of the elements of this Flux sequence is equal to the provided value.

      The implementation uses short-circuit logic and completes with true if an element matches the value.

      Parameters:
      value - constant compared to incoming signals
      Returns:
      a new Flux with true if any element is equal to a given value and false otherwise
    • hasElements

      public final Mono<Boolean> hasElements()
      Emit a single boolean true if this Flux sequence has at least one element.

      The implementation uses short-circuit logic and completes with true on onNext.

      Returns:
      a new Mono with true if any value is emitted and false otherwise
    • hide

      public Flux<T> hide()
      Hides the identities of this Flux instance.

      The main purpose of this operator is to prevent certain identity-based optimizations from happening, mostly for diagnostic purposes.

      Returns:
      a new Flux preventing Publisher / Subscription based Reactor optimizations
    • index

      public final Flux<Tuple2<Long,T>> index()
      Keep information about the order in which source values were received by indexing them with a 0-based incrementing long, returning a Flux of Tuple2<(index, value)>.

      Returns:
      an indexed Flux with each source value combined with its 0-based index.
    • index

      public final <I> Flux<I> index(BiFunction<? super Long,? super T,? extends I> indexMapper)
      Keep information about the order in which source values were received by indexing them internally with a 0-based incrementing long then combining this information with the source value into a I using the provided BiFunction, returning a Flux<I>.

      Typical usage would be to produce a Tuple2 similar to index(), but 1-based instead of 0-based:

      index((i, v) -> Tuples.of(i+1, v))

      Parameters:
      indexMapper - the BiFunction to use to combine elements and their index.
      Returns:
      an indexed Flux with each source value combined with its computed index.
    • ignoreElements

      public final Mono<T> ignoreElements()
      Ignores onNext signals (dropping them) and only propagate termination events.

      Discard Support: This operator discards the upstream's elements.

      Returns:
      a new empty Mono representing the completion of this Flux.
    • join

      public final <TRight, TLeftEnd, TRightEnd, R> Flux<R> join(Publisher<? extends TRight> other, Function<? super T,? extends Publisher<TLeftEnd>> leftEnd, Function<? super TRight,? extends Publisher<TRightEnd>> rightEnd, BiFunction<? super T,? super TRight,? extends R> resultSelector)
      Combine values from two Publishers in case their windows overlap. Each incoming value triggers a creation of a new Publisher via the given Function. If the Publisher signals its first value or completes, the time windows for the original element is immediately closed. The emitted elements are obtained by passing the values from this Flux and the other Publisher to a BiFunction.

      There are no guarantees in what order the items get combined when multiple items from one or both source Publishers overlap.

      Type Parameters:
      TRight - the type of the elements from the right Publisher
      TLeftEnd - the type for this Flux window signals
      TRightEnd - the type for the right Publisher window signals
      R - the combined result type
      Parameters:
      other - the other Publisher to correlate items with
      leftEnd - a function that returns a Publisher whose emissions indicate the time window for the source value to be considered
      rightEnd - a function that returns a Publisher whose emissions indicate the time window for the right Publisher value to be considered
      resultSelector - a function that takes an item emitted by each Publisher and returns the value to be emitted by the resulting Flux
      Returns:
      a joining Flux
      See Also:
    • last

      public final Mono<T> last()
      Emit the last element observed before complete signal as a Mono, or emit NoSuchElementException error if the source was empty. For a passive version use takeLast(int)

      Discard Support: This operator discards elements before the last.

      Returns:
      a Mono with the last value in this Flux
    • last

      public final Mono<T> last(T defaultValue)
      Emit the last element observed before complete signal as a Mono, or emit the defaultValue if the source was empty. For a passive version use takeLast(int)

      Discard Support: This operator discards elements before the last.

      Parameters:
      defaultValue - a single fallback item if this Flux is empty
      Returns:
      a Mono with the last value in this Flux
    • limitRate

      public final Flux<T> limitRate(int prefetchRate)
      Ensure that backpressure signals from downstream subscribers are split into batches capped at the provided prefetchRate when propagated upstream, effectively rate limiting the upstream Publisher.

      Note that this is an upper bound, and that this operator uses a prefetch-and-replenish strategy, requesting a replenishing amount when 75% of the prefetch amount has been emitted.

      Typically used for scenarios where consumer(s) request a large amount of data (eg. Long.MAX_VALUE) but the data source behaves better or can be optimized with smaller requests (eg. database paging, etc...). All data is still processed, unlike with take(long) which will cap the grand total request amount.

      Equivalent to flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe() . Note that the prefetchRate is an upper bound, and that this operator uses a prefetch-and-replenish strategy, requesting a replenishing amount when 75% of the prefetch amount has been emitted.

      Parameters:
      prefetchRate - the limit to apply to downstream's backpressure
      Returns:
      a Flux limiting downstream's backpressure
      See Also:
    • limitRate

      public final Flux<T> limitRate(int highTide, int lowTide)
      Ensure that backpressure signals from downstream subscribers are split into batches capped at the provided highTide first, then replenishing at the provided lowTide, effectively rate limiting the upstream Publisher.

      Note that this is an upper bound, and that this operator uses a prefetch-and-replenish strategy, requesting a replenishing amount when 75% of the prefetch amount has been emitted.

      Typically used for scenarios where consumer(s) request a large amount of data (eg. Long.MAX_VALUE) but the data source behaves better or can be optimized with smaller requests (eg. database paging, etc...). All data is still processed, unlike with take(long) which will cap the grand total request amount.

      Similar to flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe() , except with a customized "low tide" instead of the default 75%. Note that the smaller the lowTide is, the higher the potential for concurrency between request and data production. And thus the more extraneous replenishment requests this operator could make. For example, for a global downstream request of 14, with a highTide of 10 and a lowTide of 2, the operator would perform low tide requests (request(2)) seven times in a row, whereas with the default lowTide of 8 it would only perform one low tide request (request(8)). Using a lowTide equal to highTide reverts to the default 75% strategy, while using a lowTide of 0 disables the lowTide, resulting in all requests strictly adhering to the highTide.

      Parameters:
      highTide - the initial request amount
      lowTide - the subsequent (or replenishing) request amount, 0 to disable early replenishing, highTide to revert to a 75% replenish strategy.
      Returns:
      a Flux limiting downstream's backpressure and customizing the replenishment request amount
      See Also:
    • limitRequest

      @Deprecated public final Flux<T> limitRequest(long n)
      Deprecated.
      replace with take(n, true) in 3.4.x, then take(long) in 3.5.0. To be removed in 3.6.0 at the earliest. See https://github.com/reactor/reactor-core/issues/2339
      Take only the first N values from this Flux, if available. Furthermore, ensure that the total amount requested upstream is capped at n. If n is zero, the source isn't even subscribed to and the operator completes immediately upon subscription.

      Backpressure signals from downstream subscribers are smaller than the cap are propagated as is, but if they would cause the total requested amount to go over the cap, they are reduced to the minimum value that doesn't go over.

      As a result, this operator never let the upstream produce more elements than the cap. Typically useful for cases where a race between request and cancellation can lead the upstream to producing a lot of extraneous data, and such a production is undesirable (e.g. a source that would send the extraneous data over the network).

      Parameters:
      n - the number of elements to emit from this flux, which is also the backpressure cap for all of downstream's request
      Returns:
      a Flux of n elements from the source, that requests AT MOST n from upstream in total.
      See Also:
    • log

      public final Flux<T> log()
      Observe all Reactive Streams signals and trace them using Logger support. Default will use Level.INFO and java.util.logging. If SLF4J is available, it will be used instead.

      The default log category will be "reactor.Flux.", followed by a suffix generated from the source operator, e.g. "reactor.Flux.Map".

      Returns:
      a new Flux that logs signals
    • log

      public final Flux<T> log(String category)
      Observe all Reactive Streams signals and trace them using Logger support. Default will use Level.INFO and java.util.logging. If SLF4J is available, it will be used instead.

      Parameters:
      category - to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will be added, e.g. "reactor.Flux.Map".
      Returns:
      a new Flux that logs signals
    • log

      public final Flux<T> log(@Nullable String category, Level level, SignalType... options)
      Observe Reactive Streams signals matching the passed filter options and trace them using Logger support. Default will use Level.INFO and java.util.logging. If SLF4J is available, it will be used instead.

      Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:

           flux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
       

      Parameters:
      category - to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will be added, e.g. "reactor.Flux.Map".
      level - the Level to enforce for this tracing Flux (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account)
      options - a vararg SignalType option to filter log messages
      Returns:
      a new Flux that logs signals
    • log

      public final Flux<T> log(@Nullable String category, Level level, boolean showOperatorLine, SignalType... options)
      Observe Reactive Streams signals matching the passed filter options and trace them using Logger support. Default will use Level.INFO and java.util.logging. If SLF4J is available, it will be used instead.

      Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:

           flux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
       

      Parameters:
      category - to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will be added, e.g. "reactor.Flux.Map".
      level - the Level to enforce for this tracing Flux (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account)
      showOperatorLine - capture the current stack to display operator class/line number.
      options - a vararg SignalType option to filter log messages
      Returns:
      a new Flux that logs signals
    • log

      public final Flux<T> log(Logger logger)
      Observe Reactive Streams signals matching the passed filter options and trace them using a specific user-provided Logger, at Level.INFO level.

      Parameters:
      logger - the Logger to use, instead of resolving one through a category.
      Returns:
      a new Flux that logs signals
    • log

      public final Flux<T> log(Logger logger, Level level, boolean showOperatorLine, SignalType... options)
      Observe Reactive Streams signals matching the passed filter options and trace them using a specific user-provided Logger, at the given Level.

      Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:

           flux.log(myCustomLogger, Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
       

      Parameters:
      logger - the Logger to use, instead of resolving one through a category.
      level - the Level to enforce for this tracing Flux (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account)
      showOperatorLine - capture the current stack to display operator class/line number (default in overload is false).
      options - a vararg SignalType option to filter log messages
      Returns:
      a new Flux that logs signals
    • map

      public final <V> Flux<V> map(Function<? super T,? extends V> mapper)
      Transform the items emitted by this Flux by applying a synchronous function to each item.

      Error Mode Support: This operator supports resuming on errors (including when fusion is enabled). Exceptions thrown by the mapper then cause the source value to be dropped and a new element (request(1)) being requested from upstream.

      Type Parameters:
      V - the transformed type
      Parameters:
      mapper - the synchronous transforming Function
      Returns:
      a transformed Flux
    • mapNotNull

      public final <V> Flux<V> mapNotNull(Function<? super T,? extends @Nullable V> mapper)
      Transform the items emitted by this Flux by applying a synchronous function to each item, which may produce null values. In that case, no value is emitted. This operator effectively behaves like map(Function) followed by filter(Predicate) although null is not a supported value, so it can't be filtered out.

      Error Mode Support: This operator supports resuming on errors (including when fusion is enabled). Exceptions thrown by the mapper then cause the source value to be dropped and a new element (request(1)) being requested from upstream.

      Type Parameters:
      V - the transformed type
      Parameters:
      mapper - the synchronous transforming Function
      Returns:
      a transformed Flux
    • materialize

      public final Flux<Signal<T>> materialize()
      Transform incoming onNext, onError and onComplete signals into Signal instances, materializing these signals. Since the error is materialized as a Signal, the propagation will be stopped and onComplete will be emitted. Complete signal will first emit a Signal.complete() and then effectively complete the flux. All these Signal have a Context associated to them.

      Returns:
      a Flux of materialized Signal
      See Also:
    • mergeOrderedWith

      @Deprecated public final Flux<T> mergeOrderedWith(Publisher<? extends T> other, Comparator<? super T> otherComparator)
      Deprecated.
      Use mergeComparingWith(Publisher, Comparator) instead (with the caveat that it defaults to NOT delaying errors, unlike this operator). To be removed in 3.6.0 at the earliest.
      Merge data from this Flux and a Publisher into a reordered merge sequence, by picking the smallest value from each sequence as defined by a provided Comparator. Note that subsequent calls are combined, and their comparators are in lexicographic order as defined by Comparator.thenComparing(Comparator).

      The combination step is avoided if the two Comparators are equal (which can easily be achieved by using the same reference, and is also always true of Comparator.naturalOrder()).

      Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

      Note that it is delaying errors until all data is consumed.

      Parameters:
      other - the Publisher to merge with
      otherComparator - the Comparator to use for merging
      Returns:
      a new Flux that compares latest values from the given publisher and this flux, using the smallest value and replenishing the source that produced it
    • mergeComparingWith

      public final Flux<T> mergeComparingWith(Publisher<? extends T> other, Comparator<? super T> otherComparator)
      Merge data from this Flux and a Publisher into a reordered merge sequence, by picking the smallest value from each sequence as defined by a provided Comparator. Note that subsequent calls are combined, and their comparators are in lexicographic order as defined by Comparator.thenComparing(Comparator).

      The combination step is avoided if the two Comparators are equal (which can easily be achieved by using the same reference, and is also always true of Comparator.naturalOrder()).

      Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

      mergeComparingWith doesn't delay errors by default, but it will inherit the delayError behavior of a mergeComparingDelayError directly above it.

      Parameters:
      other - the Publisher to merge with
      otherComparator - the Comparator to use for merging
      Returns:
      a new Flux that compares latest values from the given publisher and this flux, using the smallest value and replenishing the source that produced it
    • mergeWith

      public final Flux<T> mergeWith(Publisher<? extends T> other)
      Merge data from this Flux and a Publisher into an interleaved merged sequence. Unlike concat, inner sources are subscribed to eagerly.

      Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

      Parameters:
      other - the Publisher to merge with
      Returns:
      a new Flux
    • metrics

      @Deprecated public final Flux<T> metrics()
      Deprecated.
      Prefer using the tap(SignalListenerFactory) with the SignalListenerFactory provided by the new reactor-core-micrometer module. To be removed in 3.6.0 at the earliest.
      Activate metrics for this sequence, provided there is an instrumentation facade on the classpath (otherwise this method is a pure no-op).

      Metrics are gathered on Subscriber events, and it is recommended to also name (and optionally tag) the sequence.

      The name serves as a prefix in the reported metrics names. In case no name has been provided, the default name "reactor" will be applied.

      The MeterRegistry used by reactor can be configured via Metrics.MicrometerConfiguration.useRegistry(MeterRegistry) prior to using this operator, the default being Metrics.globalRegistry.

      Returns:
      an instrumented Flux
      See Also:
    • name

      public final Flux<T> name(String name)
      Give a name to this sequence, which can be retrieved using Scannable.name() as long as this is the first reachable Scannable.parents().

      The name is typically visible at assembly time by the tap(SignalListenerFactory) operator, which could for example be configured with a metrics listener using the name as a prefix for meters' id.

      Parameters:
      name - a name for the sequence
      Returns:
      the same sequence, but bearing a name
      See Also:
    • next

      public final Mono<T> next()
      Emit only the first item emitted by this Flux, into a new Mono. If called on an empty Flux, emits an empty Mono.

      Returns:
      a new Mono emitting the first value in this Flux
    • ofType

      public final <U> Flux<U> ofType(Class<U> clazz)
      Evaluate each accepted value against the given Class type. If the value matches the type, it is passed into the resulting Flux. Otherwise the value is ignored and a request of 1 is emitted.

      Parameters:
      clazz - the Class type to test values against
      Returns:
      a new Flux filtered on items of the requested type
    • onBackpressureBuffer

      public final Flux<T> onBackpressureBuffer()
      Request an unbounded demand and push to the returned Flux, or park the observed elements if not enough demand is requested downstream. Errors will be delayed until the buffer gets consumed.

      Discard Support: This operator discards the buffered overflow elements upon cancellation or error triggered by a data signal.

      Returns:
      a backpressured Flux that buffers with unbounded capacity
    • onBackpressureBuffer

      public final Flux<T> onBackpressureBuffer(int maxSize)
      Request an unbounded demand and push to the returned Flux, or park up to maxSize elements when not enough demand is requested downstream. The first element past this buffer to arrive out of sync with the downstream subscriber's demand (the "overflowing" element) immediately triggers an overflow error and cancels the source. The Flux is going to terminate with an overflow error, but this error is delayed, which lets the subscriber make more requests for the content of the buffer.

      Discard Support: This operator discards the buffered overflow elements upon cancellation or error triggered by a data signal, as well as elements that are rejected by the buffer due to maxSize.

      Parameters:
      maxSize - maximum number of elements overflowing request before the source is cancelled
      Returns:
      a backpressured Flux that buffers with bounded capacity
    • onBackpressureBuffer

      public final Flux<T> onBackpressureBuffer(int maxSize, Consumer<? super T> onOverflow)
      Request an unbounded demand and push to the returned Flux, or park up to maxSize elements when not enough demand is requested downstream. The first element past this buffer to arrive out of sync with the downstream subscriber's demand (the "overflowing" element) is immediately passed to a Consumer and the source is cancelled. The Flux is going to terminate with an overflow error, but this error is delayed, which lets the subscriber make more requests for the content of the buffer.

      Note that should the cancelled source produce further overflowing elements, these would be passed to the onNextDropped hook.

      Discard Support: This operator discards the buffered overflow elements upon cancellation or error triggered by a data signal, as well as elements that are rejected by the buffer due to maxSize (even though they are passed to the onOverflow Consumer first).

      Parameters:
      maxSize - maximum number of elements overflowing request before callback is called and source is cancelled
      onOverflow - callback to invoke on overflow
      Returns:
      a backpressured Flux that buffers with a bounded capacity
    • onBackpressureBuffer

      public final Flux<T> onBackpressureBuffer(int maxSize, BufferOverflowStrategy bufferOverflowStrategy)
      Request an unbounded demand and push to the returned Flux, or park the observed elements if not enough demand is requested downstream, within a maxSize limit. Over that limit, the overflow strategy is applied (see BufferOverflowStrategy).

      Note that for the ERROR strategy, the overflow error will be delayed after the current backlog is consumed.

      Discard Support: This operator discards the buffered overflow elements upon cancellation or error triggered by a data signal, as well as elements that are rejected by the buffer due to maxSize (even though they are passed to the bufferOverflowStrategy first).

      Parameters:
      maxSize - maximum buffer backlog size before overflow strategy is applied
      bufferOverflowStrategy - strategy to apply to overflowing elements
      Returns:
      a backpressured Flux that buffers up to a capacity then applies an overflow strategy
    • onBackpressureBuffer

      public final Flux<T> onBackpressureBuffer(int maxSize, Consumer<? super T> onBufferOverflow, BufferOverflowStrategy bufferOverflowStrategy)
      Request an unbounded demand and push to the returned Flux, or park the observed elements if not enough demand is requested downstream, within a maxSize limit. Over that limit, the overflow strategy is applied (see BufferOverflowStrategy).

      A Consumer is immediately invoked when there is an overflow, receiving the value that was discarded because of the overflow (which can be different from the latest element emitted by the source in case of a DROP_LATEST strategy).

      Note that for the ERROR strategy, the overflow error will be delayed after the current backlog is consumed. The consumer is still invoked immediately.

      Discard Support: This operator discards the buffered overflow elements upon cancellation or error triggered by a data signal, as well as elements that are rejected by the buffer due to maxSize (even though they are passed to the onOverflow Consumer AND the bufferOverflowStrategy first).

      Parameters:
      maxSize - maximum buffer backlog size before overflow callback is called
      onBufferOverflow - callback to invoke on overflow
      bufferOverflowStrategy - strategy to apply to overflowing elements
      Returns:
      a backpressured Flux that buffers up to a capacity then applies an overflow strategy
    • onBackpressureBuffer

      public final Flux<T> onBackpressureBuffer(Duration ttl, int maxSize, Consumer<? super T> onBufferEviction)
      Request an unbounded demand and push to the returned Flux, or park the observed elements if not enough demand is requested downstream, within a maxSize limit and for a maximum Duration of ttl (as measured on the parallel Scheduler). Over that limit, oldest elements from the source are dropped.

      Elements evicted based on the TTL are passed to a cleanup Consumer, which is also immediately invoked when there is an overflow.

      Discard Support: This operator discards its internal buffer of elements that overflow, after having applied the onBufferEviction handler.

      Parameters:
      ttl - maximum Duration for which an element is kept in the backlog
      maxSize - maximum buffer backlog size before overflow callback is called
      onBufferEviction - callback to invoke once TTL is reached or on overflow
      Returns:
      a backpressured Flux that buffers with a TTL and up to a capacity then applies an overflow strategy
    • onBackpressureBuffer

      public final Flux<T> onBackpressureBuffer(Duration ttl, int maxSize, Consumer<? super T> onBufferEviction, Scheduler scheduler)
      Request an unbounded demand and push to the returned Flux, or park the observed elements if not enough demand is requested downstream, within a maxSize limit and for a maximum Duration of ttl (as measured on the provided Scheduler). Over that limit, oldest elements from the source are dropped.

      Elements evicted based on the TTL are passed to a cleanup Consumer, which is also immediately invoked when there is an overflow.

      Discard Support: This operator discards its internal buffer of elements that overflow, after having applied the onBufferEviction handler.

      Parameters:
      ttl - maximum Duration for which an element is kept in the backlog
      maxSize - maximum buffer backlog size before overflow callback is called
      onBufferEviction - callback to invoke once TTL is reached or on overflow
      scheduler - the scheduler on which to run the timeout check
      Returns:
      a backpressured Flux that buffers with a TTL and up to a capacity then applies an overflow strategy
    • onBackpressureDrop

      public final Flux<T> onBackpressureDrop()
      Request an unbounded demand and push to the returned Flux, or drop the observed elements if not enough demand is requested downstream.

      Discard Support: This operator discards elements that it drops.

      Returns:
      a backpressured Flux that drops overflowing elements
    • onBackpressureDrop

      public final Flux<T> onBackpressureDrop(Consumer<? super T> onDropped)
      Request an unbounded demand and push to the returned Flux, or drop and notify dropping Consumer with the observed elements if not enough demand is requested downstream.

      Discard Support: This operator discards elements that it drops after having passed them to the provided onDropped handler.

      Parameters:
      onDropped - the Consumer called when a value gets dropped due to lack of downstream requests
      Returns:
      a backpressured Flux that drops overflowing elements
    • onBackpressureError

      public final Flux<T> onBackpressureError()
      Request an unbounded demand and push to the returned Flux, or emit onError fom Exceptions.failWithOverflow() if not enough demand is requested downstream.

      Discard Support: This operator discards elements that it drops, after having propagated the error.

      Returns:
      a backpressured Flux that errors on overflowing elements
    • onBackpressureLatest

      public final Flux<T> onBackpressureLatest()
      Request an unbounded demand and push to the returned Flux, or only keep the most recent observed item if not enough demand is requested downstream.

      Discard Support: Each time a new element comes in (the new "latest"), this operator discards the previously retained element.

      Returns:
      a backpressured Flux that will only keep a reference to the last observed item
    • onErrorComplete

      public final Flux<T> onErrorComplete()
      Simply complete the sequence by replacing an onError signal with an onComplete signal. All other signals are propagated as-is.

      Returns:
      a new Flux falling back on completion when an onError occurs
      See Also:
    • onErrorComplete

      public final Flux<T> onErrorComplete(Class<? extends Throwable> type)
      Simply complete the sequence by replacing an onError signal with an onComplete signal if the error matches the given Class. All other signals, including non-matching onError, are propagated as-is.

      Returns:
      a new Flux falling back on completion when a matching error occurs
      See Also:
    • onErrorComplete

      public final Flux<T> onErrorComplete(Predicate<? super Throwable> predicate)
      Simply complete the sequence by replacing an onError signal with an onComplete signal if the error matches the given Predicate. All other signals, including non-matching onError, are propagated as-is.

      Returns:
      a new Flux falling back on completion when a matching error occurs
      See Also:
    • onErrorContinue

      public final Flux<T> onErrorContinue(BiConsumer<Throwable,Object> errorConsumer)
      Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements. The recovered error and associated value are notified via the provided BiConsumer. Alternatively, throwing from that biconsumer will propagate the thrown exception downstream in place of the original error, which is added as a suppressed exception to the new one.

      Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)

      In most cases, you should instead handle the error inside the specific function which may cause it. Specifically, on each inner publisher you can use doOnError to log the error, and onErrorResume(e -> Mono.empty()) to drop erroneous elements:

       .flatMap(id -> repository.retrieveById(id)
                                .doOnError(System.err::println)
                                .onErrorResume(e -> Mono.empty()))
       

      This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.

      Parameters:
      errorConsumer - a BiConsumer fed with errors matching the predicate and the value that triggered the error.
      Returns:
      a Flux that attempts to continue processing on errors.
    • onErrorContinue

      public final <E extends Throwable> Flux<T> onErrorContinue(Class<E> type, BiConsumer<Throwable,Object> errorConsumer)
      Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements. Only errors matching the specified type are recovered from. The recovered error and associated value are notified via the provided BiConsumer. Alternatively, throwing from that biconsumer will propagate the thrown exception downstream in place of the original error, which is added as a suppressed exception to the new one.

      Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)

      In most cases, you should instead handle the error inside the specific function which may cause it. Specifically, on each inner publisher you can use doOnError to log the error, and onErrorResume(e -> Mono.empty()) to drop erroneous elements:

       .flatMap(id -> repository.retrieveById(id)
                                .doOnError(MyException.class, System.err::println)
                                .onErrorResume(MyException.class, e -> Mono.empty()))
       

      This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.

      Parameters:
      type - the Class of Exception that are resumed from.
      errorConsumer - a BiConsumer fed with errors matching the Class and the value that triggered the error.
      Returns:
      a Flux that attempts to continue processing on some errors.
    • onErrorContinue

      public final <E extends Throwable> Flux<T> onErrorContinue(Predicate<E> errorPredicate, BiConsumer<Throwable,Object> errorConsumer)
      Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements. Only errors matching the Predicate are recovered from (note that this predicate can be applied several times and thus must be idempotent). The recovered error and associated value are notified via the provided BiConsumer. Alternatively, throwing from that biconsumer will propagate the thrown exception downstream in place of the original error, which is added as a suppressed exception to the new one.

      Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)

      In most cases, you should instead handle the error inside the specific function which may cause it. Specifically, on each inner publisher you can use doOnError to log the error, and onErrorResume(e -> Mono.empty()) to drop erroneous elements:

       .flatMap(id -> repository.retrieveById(id)
                                .doOnError(errorPredicate, System.err::println)
                                .onErrorResume(errorPredicate, e -> Mono.empty()))
       

      This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.

      Parameters:
      errorPredicate - a Predicate used to filter which errors should be resumed from. This MUST be idempotent, as it can be used several times.
      errorConsumer - a BiConsumer fed with errors matching the predicate and the value that triggered the error.
      Returns:
      a Flux that attempts to continue processing on some errors.
    • onErrorStop

      public final Flux<T> onErrorStop()
      If an onErrorContinue(BiConsumer) variant has been used downstream, reverts to the default 'STOP' mode where errors are terminal events upstream. It can be used for easier scoping of the on next failure strategy or to override the inherited strategy in a sub-stream (for example in a flatMap). It has no effect if onErrorContinue(BiConsumer) has not been used downstream.
      Returns:
      a Flux that terminates on errors, even if onErrorContinue(BiConsumer) was used downstream
    • onErrorMap

      public final Flux<T> onErrorMap(Function<? super Throwable,? extends Throwable> mapper)
      Transform any error emitted by this Flux by synchronously applying a function to it.

      Parameters:
      mapper - the error transforming Function
      Returns:
      a Flux that transforms source errors to other errors
    • onErrorMap

      public final <E extends Throwable> Flux<T> onErrorMap(Class<E> type, Function<? super E,? extends Throwable> mapper)
      Transform an error emitted by this Flux by synchronously applying a function to it if the error matches the given type. Otherwise let the error pass through.

      Type Parameters:
      E - the error type
      Parameters:
      type - the class of the exception type to react to
      mapper - the error transforming Function
      Returns:
      a Flux that transforms some source errors to other errors
    • onErrorMap

      public final Flux<T> onErrorMap(Predicate<? super Throwable> predicate, Function<? super Throwable,? extends Throwable> mapper)
      Transform an error emitted by this Flux by synchronously applying a function to it if the error matches the given predicate. Otherwise let the error pass through.

      Parameters:
      predicate - the error predicate
      mapper - the error transforming Function
      Returns:
      a Flux that transforms some source errors to other errors
    • onErrorResume

      public final Flux<T> onErrorResume(Function<? super Throwable,? extends Publisher<? extends T>> fallback)
      Subscribe to a returned fallback publisher when any error occurs, using a function to choose the fallback depending on the error.

      Parameters:
      fallback - the function to choose the fallback to an alternative Publisher
      Returns:
      a Flux falling back upon source onError
    • onErrorResume

      public final <E extends Throwable> Flux<T> onErrorResume(Class<E> type, Function<? super E,? extends Publisher<? extends T>> fallback)
      Subscribe to a fallback publisher when an error matching the given type occurs, using a function to choose the fallback depending on the error.

      Type Parameters:
      E - the error type
      Parameters:
      type - the error type to match
      fallback - the function to choose the fallback to an alternative Publisher
      Returns:
      a Flux falling back upon source onError
    • onErrorResume

      public final Flux<T> onErrorResume(Predicate<? super Throwable> predicate, Function<? super Throwable,? extends Publisher<? extends T>> fallback)
      Subscribe to a fallback publisher when an error matching a given predicate occurs.

      Parameters:
      predicate - the error predicate to match
      fallback - the function to choose the fallback to an alternative Publisher
      Returns:
      a Flux falling back upon source onError
    • onErrorReturn

      public final Flux<T> onErrorReturn(T fallbackValue)
      Simply emit a captured fallback value when any error is observed on this Flux.

      Parameters:
      fallbackValue - the value to emit if an error occurs
      Returns:
      a new falling back Flux
      See Also:
    • onErrorReturn

      public final <E extends Throwable> Flux<T> onErrorReturn(Class<E> type, T fallbackValue)
      Simply emit a captured fallback value when an error of the specified type is observed on this Flux.

      Type Parameters:
      E - the error type
      Parameters:
      type - the error type to match
      fallbackValue - the value to emit if an error occurs that matches the type
      Returns:
      a new falling back Flux
      See Also:
    • onErrorReturn

      public final Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)
      Simply emit a captured fallback value when an error matching the given predicate is observed on this Flux.

      Parameters:
      predicate - the error predicate to match
      fallbackValue - the value to emit if an error occurs that matches the predicate
      Returns:
      a new falling back Flux
      See Also:
    • onTerminateDetach

      public final Flux<T> onTerminateDetach()
      Detaches both the child Subscriber and the Subscription on termination or cancellation.

      This is an advanced interoperability operator that should help with odd retention scenarios when running with non-reactor Subscriber.

      Returns:
      a detachable Flux
    • or

      public final Flux<T> or(Publisher<? extends T> other)
      Pick the first Publisher between this Flux and another publisher to emit any signal (onNext/onError/onComplete) and replay all signals from that Publisher, effectively behaving like the fastest of these competing sources.

      Parameters:
      other - the Publisher to race with
      Returns:
      the fastest sequence
      See Also:
    • parallel

      public final ParallelFlux<T> parallel()
      Prepare this Flux by dividing data on a number of 'rails' matching the number of CPU cores, in a round-robin fashion. Note that to actually perform the work in parallel, you should call ParallelFlux.runOn(Scheduler) afterward.

      Returns:
      a new ParallelFlux instance
    • parallel

      public final ParallelFlux<T> parallel(int parallelism)
      Prepare this Flux by dividing data on a number of 'rails' matching the provided parallelism parameter, in a round-robin fashion. Note that to actually perform the work in parallel, you should call ParallelFlux.runOn(Scheduler) afterward.

      Parameters:
      parallelism - the number of parallel rails
      Returns:
      a new ParallelFlux instance
    • parallel

      public final ParallelFlux<T> parallel(int parallelism, int prefetch)
      Prepare this Flux by dividing data on a number of 'rails' matching the provided parallelism parameter, in a round-robin fashion and using a custom prefetch amount and queue for dealing with the source Flux's values. Note that to actually perform the work in parallel, you should call ParallelFlux.runOn(Scheduler) afterward.

      Parameters:
      parallelism - the number of parallel rails
      prefetch - the number of values to prefetch from the source
      Returns:
      a new ParallelFlux instance
    • publish

      public final ConnectableFlux<T> publish()
      Prepare a ConnectableFlux which shares this Flux sequence and dispatches values to subscribers in a backpressure-aware manner. Prefetch will default to Queues.SMALL_BUFFER_SIZE. This will effectively turn any type of sequence into a hot sequence.

      Backpressure will be coordinated on Subscription.request(long) and if any Subscriber is missing demand (requested = 0), multicast will pause pushing/pulling.

      Returns:
      a new ConnectableFlux
    • publish

      public final ConnectableFlux<T> publish(int prefetch)
      Prepare a ConnectableFlux which shares this Flux sequence and dispatches values to subscribers in a backpressure-aware manner. This will effectively turn any type of sequence into a hot sequence.

      Backpressure will be coordinated on Subscription.request(long) and if any Subscriber is missing demand (requested = 0), multicast will pause pushing/pulling.

      Parameters:
      prefetch - bounded requested demand
      Returns:
      a new ConnectableFlux
    • publish

      public final <R> Flux<R> publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform)
      Shares a sequence for the duration of a function that may transform it and consume it as many times as necessary without causing multiple subscriptions to the upstream.
      Type Parameters:
      R - the output value type
      Parameters:
      transform - the transformation function
      Returns:
      a new Flux
    • publish

      public final <R> Flux<R> publish(Function<? super Flux<T>,? extends Publisher<? extends R>> transform, int prefetch)
      Shares a sequence for the duration of a function that may transform it and consume it as many times as necessary without causing multiple subscriptions to the upstream.
      Type Parameters:
      R - the output value type
      Parameters:
      transform - the transformation function
      prefetch - the request size
      Returns:
      a new Flux
    • publishNext

      @Deprecated public final Mono<T> publishNext()
      Deprecated.
      use shareNext() instead, or use `publish().next()` if you need to `connect(). To be removed in 3.5.0
      Prepare a Mono which shares this Flux sequence and dispatches the first observed item to subscribers in a backpressure-aware manner. This will effectively turn any type of sequence into a hot sequence when the first Subscriber subscribes.

      Returns:
      a new Mono
    • publishOn

      public final Flux<T> publishOn(Scheduler scheduler)
      Run onNext, onComplete and onError on a supplied Scheduler Worker.

      This operator influences the threading context where the rest of the operators in the chain below it will execute, up to a new occurrence of publishOn.

      Typically used for fast publisher, slow consumer(s) scenarios.

       flux.publishOn(Schedulers.single()).subscribe() 
       

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.

      Parameters:
      scheduler - a Scheduler providing the Scheduler.Worker where to publish
      Returns:
      a Flux producing asynchronously on a given Scheduler
    • publishOn

      public final Flux<T> publishOn(Scheduler scheduler, int prefetch)
      Run onNext, onComplete and onError on a supplied Scheduler Scheduler.Worker.

      This operator influences the threading context where the rest of the operators in the chain below it will execute, up to a new occurrence of publishOn.

      Typically used for fast publisher, slow consumer(s) scenarios.

       flux.publishOn(Schedulers.single()).subscribe() 
       

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.

      Parameters:
      scheduler - a Scheduler providing the Scheduler.Worker where to publish
      prefetch - the asynchronous boundary capacity
      Returns:
      a Flux producing asynchronously
    • publishOn

      public final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch)
      Run onNext, onComplete and onError on a supplied Scheduler Scheduler.Worker.

      This operator influences the threading context where the rest of the operators in the chain below it will execute, up to a new occurrence of publishOn.

      Typically used for fast publisher, slow consumer(s) scenarios.

       flux.publishOn(Schedulers.single()).subscribe() 
       

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.

      Parameters:
      scheduler - a Scheduler providing the Scheduler.Worker where to publish
      delayError - should the buffer be consumed before forwarding any error
      prefetch - the asynchronous boundary capacity
      Returns:
      a Flux producing asynchronously
    • reduce

      public final Mono<T> reduce(BiFunction<T,T,T> aggregator)
      Reduce the values from this Flux sequence into a single object of the same type than the emitted items. Reduction is performed using a BiFunction that takes the intermediate result of the reduction and the current value and returns the next intermediate value of the reduction. Note, BiFunction will not be invoked for a sequence with 0 or 1 elements. In case of one element's sequence, the result will be directly sent to the subscriber.

      Discard Support: This operator discards the internally accumulated value upon cancellation or error.

      Parameters:
      aggregator - the reducing BiFunction
      Returns:
      a reduced Flux
    • reduce

      public final <A> Mono<A> reduce(A initial, BiFunction<A,? super T,A> accumulator)
      Reduce the values from this Flux sequence into a single object matching the type of a seed value. Reduction is performed using a BiFunction that takes the intermediate result of the reduction and the current value and returns the next intermediate value of the reduction. First element is paired with the seed value, initial.

      Discard Support: This operator discards the internally accumulated value upon cancellation or error.

      Type Parameters:
      A - the type of the seed and the reduced object
      Parameters:
      accumulator - the reducing BiFunction
      initial - the seed, the initial leftmost argument to pass to the reducing BiFunction
      Returns:
      a reduced Flux
    • reduceWith

      public final <A> Mono<A> reduceWith(Supplier<A> initial, BiFunction<A,? super T,A> accumulator)
      Reduce the values from this Flux sequence into a single object matching the type of a lazily supplied seed value. Reduction is performed using a BiFunction that takes the intermediate result of the reduction and the current value and returns the next intermediate value of the reduction. First element is paired with the seed value, supplied via initial.

      Discard Support: This operator discards the internally accumulated value upon cancellation or error.

      Type Parameters:
      A - the type of the seed and the reduced object
      Parameters:
      accumulator - the reducing BiFunction
      initial - a Supplier of the seed, called on subscription and passed to the the reducing BiFunction
      Returns:
      a reduced Flux
    • repeat

      public final Flux<T> repeat()
      Repeatedly and indefinitely subscribe to the source upon completion of the previous subscription.

      Returns:
      an indefinitely repeated Flux on onComplete
    • repeat

      public final Flux<T> repeat(BooleanSupplier predicate)
      Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.

      Parameters:
      predicate - the boolean to evaluate on onComplete.
      Returns:
      a Flux that repeats on onComplete while the predicate matches
    • repeat

      public final Flux<T> repeat(long numRepeat)
      Repeatedly subscribe to the source numRepeat times. This results in numRepeat + 1 total subscriptions to the original source. As a consequence, using 0 plays the original sequence once.

      Parameters:
      numRepeat - the number of times to re-subscribe on onComplete (positive, or 0 for original sequence only)
      Returns:
      a Flux that repeats on onComplete, up to the specified number of repetitions
    • repeat

      public final Flux<T> repeat(long numRepeat, BooleanSupplier predicate)
      Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription. A specified maximum of repeat will limit the number of re-subscribe.

      Parameters:
      numRepeat - the number of times to re-subscribe on complete (positive, or 0 for original sequence only)
      predicate - the boolean to evaluate on onComplete
      Returns:
      a Flux that repeats on onComplete while the predicate matches, up to the specified number of repetitions
    • repeatWhen

      public final Flux<T> repeatWhen(Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
      Repeatedly subscribe to this Flux when a companion sequence emits elements in response to the flux completion signal. Any terminal signal from the companion sequence will terminate the resulting Flux with the same signal immediately.

      If the companion sequence signals when this Flux is active, the repeat attempt is suppressed.

      Note that if the companion Publisher created by the repeatFactory emits Context as trigger objects, these Context will be merged with the previous Context:

      
       .repeatWhen(emittedEachAttempt -> emittedEachAttempt.handle((lastEmitted, sink) -> {
                  Context ctx = sink.currentContext();
                  int rl = ctx.getOrDefault("repeatsLeft", 0);
                  if (rl > 0) {
                          sink.next(Context.of(
                              "repeatsLeft", rl - 1,
                              "emitted", lastEmitted
                          ));
                  } else {
                      sink.error(new IllegalStateException("repeats exhausted"));
                  }
       }))
       
      Parameters:
      repeatFactory - the Function that returns the associated Publisher companion, given a Flux that signals each onComplete as a Long representing the number of source elements emitted in the latest attempt.
      Returns:
      a Flux that repeats on onComplete when the companion Publisher produces an onNext signal
    • replay

      public final ConnectableFlux<T> replay()
      Turn this Flux into a hot source and cache last emitted signals for further Subscriber. Will retain an unbounded amount of onNext signals. Completion and Error will also be replayed.

      Returns:
      a replaying ConnectableFlux
    • replay

      public final ConnectableFlux<T> replay(int history)
      Turn this Flux into a connectable hot source and cache last emitted signals for further Subscriber. Will retain up to the given history size onNext signals. Completion and Error will also be replayed.

      Note that replay(0) will only cache the terminal signal without expiration.

      Re-connects are not supported.

      Parameters:
      history - number of events retained in history excluding complete and error
      Returns:
      a replaying ConnectableFlux
    • replay

      public final ConnectableFlux<T> replay(Duration ttl)
      Turn this Flux into a connectable hot source and cache last emitted signals for further Subscriber. Will retain each onNext up to the given per-item expiry timeout.

      Completion and Error will also be replayed until ttl triggers in which case the next Subscriber will start over a new subscription

      Parameters:
      ttl - Per-item and post termination timeout duration
      Returns:
      a replaying ConnectableFlux
    • replay

      public final ConnectableFlux<T> replay(int history, Duration ttl)
      Turn this Flux into a connectable hot source and cache last emitted signals for further Subscriber. Will retain up to the given history size onNext signals with a per-item ttl.

      Completion and Error will also be replayed until ttl triggers in which case the next Subscriber will start over a new subscription

      Parameters:
      history - number of events retained in history excluding complete and error
      ttl - Per-item and post termination timeout duration
      Returns:
      a replaying ConnectableFlux
    • replay

      public final ConnectableFlux<T> replay(Duration ttl, Scheduler timer)
      Turn this Flux into a connectable hot source and cache last emitted signals for further Subscriber. Will retain onNext signal for up to the given Duration with a per-item ttl.

      Completion and Error will also be replayed until ttl triggers in which case the next Subscriber will start over a new subscription

      Parameters:
      ttl - Per-item and post termination timeout duration
      timer - a time-capable Scheduler instance to read current time from
      Returns:
      a replaying ConnectableFlux
    • replay

      public final ConnectableFlux<T> replay(int history, Duration ttl, Scheduler timer)
      Turn this Flux into a connectable hot source and cache last emitted signals for further Subscriber. Will retain up to the given history size onNext signals with a per-item ttl.

      Completion and Error will also be replayed until ttl triggers in which case the next Subscriber will start over a new subscription

      Parameters:
      history - number of events retained in history excluding complete and error
      ttl - Per-item and post termination timeout duration
      timer - a Scheduler instance to read current time from
      Returns:
      a replaying ConnectableFlux
    • retry

      public final Flux<T> retry()
      Re-subscribes to this Flux sequence if it signals any error, indefinitely.

      Returns:
      a Flux that retries on onError
    • retry

      public final Flux<T> retry(long numRetries)
      Re-subscribes to this Flux sequence if it signals any error, for a fixed number of times.

      Note that passing Long.MAX_VALUE is treated as infinite retry.

      Parameters:
      numRetries - the number of times to tolerate an error
      Returns:
      a Flux that retries on onError up to the specified number of retry attempts.
    • retryWhen

      public final Flux<T> retryWhen(Retry retrySpec)
      Retries this Flux in response to signals emitted by a companion Publisher. The companion is generated by the provided Retry instance, see Retry.max(long), Retry.maxInARow(long) and Retry.backoff(long, Duration) for readily available strategy builders.

      The operator generates a base for the companion, a Flux of Retry.RetrySignal which each give metadata about each retryable failure whenever this Flux signals an error. The final companion should be derived from that base companion and emit data in response to incoming onNext (although it can emit less elements, or delay the emissions).

      Terminal signals in the companion terminate the sequence with the same signal, so emitting an Subscriber.onError(Throwable) will fail the resulting Flux with that same error.

      Note that the Retry.RetrySignal state can be transient and change between each source onError or onNext. If processed with a delay, this could lead to the represented state being out of sync with the state at which the retry was evaluated. Map it to Retry.RetrySignal.copy() right away to mediate this.

      Note that if the companion Publisher created by the whenFactory emits Context as trigger objects, these Context will be merged with the previous Context:

       
       Retry customStrategy = Retry.from(companion -> companion.handle((retrySignal, sink) -> {
       	    ContextView ctx = sink.contextView();
       	    int rl = ctx.getOrDefault("retriesLeft", 0);
       	    if (rl > 0) {
      		    sink.next(Context.of(
      		        "retriesLeft", rl - 1,
      		        "lastError", retrySignal.failure()
      		    ));
       	    } else {
       	        sink.error(Exceptions.retryExhausted("retries exhausted", retrySignal.failure()));
       	    }
       }));
       Flux<T> retried = originalFlux.retryWhen(customStrategy);
       
      Parameters:
      retrySpec - the Retry strategy that will generate the companion Publisher, given a Flux that signals each onError as a Retry.RetrySignal.
      Returns:
      a Flux that retries on onError when a companion Publisher produces an onNext signal
      See Also:
    • sample

      public final Flux<T> sample(Duration timespan)
      Sample this Flux by periodically emitting an item corresponding to that Flux latest emitted value within the periodical time window. Note that if some elements are emitted quicker than the timespan just before source completion, the last of these elements will be emitted along with the onComplete signal.

      Discard Support: This operator discards elements that are not part of the sampling.

      Parameters:
      timespan - the duration of the window after which to emit the latest observed item
      Returns:
      a Flux sampled to the last item seen over each periodic window
    • sample

      public final <U> Flux<T> sample(Publisher<U> sampler)
      Sample this Flux by emitting an item corresponding to that Flux latest emitted value whenever a companion sampler Publisher signals a value.

      Termination of either Publisher will result in termination for the Subscriber as well. Note that if some elements are emitted just before source completion and before a last sampler can trigger, the last of these elements will be emitted along with the onComplete signal.

      Both Publisher will run in unbounded mode because the backpressure would interfere with the sampling precision.

      Discard Support: This operator discards elements that are not part of the sampling.

      Type Parameters:
      U - the type of the sampler sequence
      Parameters:
      sampler - the sampler companion Publisher
      Returns:
      a Flux sampled to the last item observed each time the sampler Publisher signals
    • sampleFirst

      public final Flux<T> sampleFirst(Duration timespan)
      Repeatedly take a value from this Flux then skip the values that follow within a given duration.

      Discard Support: This operator discards elements that are not part of the sampling.

      Parameters:
      timespan - the duration during which to skip values after each sample
      Returns:
      a Flux sampled to the first item of each duration-based window
    • sampleFirst

      public final <U> Flux<T> sampleFirst(Function<? super T,? extends Publisher<U>> samplerFactory)
      Repeatedly take a value from this Flux then skip the values that follow before the next signal from a companion sampler Publisher.

      Discard Support: This operator discards elements that are not part of the sampling.

      Type Parameters:
      U - the companion reified type
      Parameters:
      samplerFactory - supply a companion sampler Publisher which signals the end of the skip window
      Returns:
      a Flux sampled to the first item observed in each window closed by the sampler signals
    • sampleTimeout

      public final <U> Flux<T> sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory)
      Emit the latest value from this Flux only if there were no new values emitted during the window defined by a companion Publisher derived from that particular value.

      Note that this means that the last value in the sequence is always emitted.

      Discard Support: This operator discards elements that are not part of the sampling.

      Type Parameters:
      U - the companion reified type
      Parameters:
      throttlerFactory - supply a companion sampler Publisher which signals the end of the window during which no new emission should occur. If it is the case, the original value triggering the window is emitted.
      Returns:
      a Flux sampled to items not followed by any other item within a window defined by a companion Publisher
    • sampleTimeout

      public final <U> Flux<T> sampleTimeout(Function<? super T,? extends Publisher<U>> throttlerFactory, int maxConcurrency)
      Emit the latest value from this Flux only if there were no new values emitted during the window defined by a companion Publisher derived from that particular value.

      The provided maxConcurrency will keep a bounded maximum of concurrent timeouts and drop any new items until at least one timeout terminates.

      Note that this means that the last value in the sequence is always emitted.

      Discard Support: This operator discards elements that are not part of the sampling.

      Type Parameters:
      U - the throttling type
      Parameters:
      throttlerFactory - supply a companion sampler Publisher which signals the end of the window during which no new emission should occur. If it is the case, the original value triggering the window is emitted.
      maxConcurrency - the maximum number of concurrent timeouts
      Returns:
      a Flux sampled to items not followed by any other item within a window defined by a companion Publisher
    • scan

      public final Flux<T> scan(BiFunction<T,T,T> accumulator)
      Reduce this Flux values with an accumulator BiFunction and also emit the intermediate results of this function.

      Unlike scan(Object, BiFunction), this operator doesn't take an initial value but treats the first Flux value as initial value.
      The accumulation works as follows:

      
       result[0] = source[0]
       result[1] = accumulator(result[0], source[1])
       result[2] = accumulator(result[1], source[2])
       result[3] = accumulator(result[2], source[3])
       ...
       

      Parameters:
      accumulator - the accumulating BiFunction
      Returns:
      an accumulating Flux
    • scan

      public final <A> Flux<A> scan(A initial, BiFunction<A,? super T,A> accumulator)
      Reduce this Flux values with an accumulator BiFunction and also emit the intermediate results of this function.

      The accumulation works as follows:

      
       result[0] = initialValue;
       result[1] = accumulator(result[0], source[0])
       result[2] = accumulator(result[1], source[1])
       result[3] = accumulator(result[2], source[2])
       ...
       

      Type Parameters:
      A - the accumulated type
      Parameters:
      initial - the initial leftmost argument to pass to the reduce function
      accumulator - the accumulating BiFunction
      Returns:
      an accumulating Flux starting with initial state
    • scanWith

      public final <A> Flux<A> scanWith(Supplier<A> initial, BiFunction<A,? super T,A> accumulator)
      Reduce this Flux values with the help of an accumulator BiFunction and also emits the intermediate results. A seed value is lazily provided by a Supplier invoked for each Subscriber.

      The accumulation works as follows:

      
       result[0] = initialValue;
       result[1] = accumulator(result[0], source[0])
       result[2] = accumulator(result[1], source[1])
       result[3] = accumulator(result[2], source[2])
       ...
       

      Type Parameters:
      A - the accumulated type
      Parameters:
      initial - the supplier providing the seed, the leftmost parameter initially passed to the reduce function
      accumulator - the accumulating BiFunction
      Returns:
      an accumulating Flux starting with initial state
    • share

      public final Flux<T> share()
      Returns a new Flux that multicasts (shares) the original Flux. As long as there is at least one Subscriber this Flux will be subscribed and emitting data. When all subscribers have cancelled it will cancel the source Flux.

      This is an alias for publish().ConnectableFlux.refCount().

      Returns:
      a Flux that upon first subscribe causes the source Flux to subscribe once, late subscribers might therefore miss items.
    • shareNext

      public final Mono<T> shareNext()
      Prepare a Mono which shares this Flux sequence and dispatches the first observed item to subscribers. This will effectively turn any type of sequence into a hot sequence when the first Subscriber subscribes.

      Returns:
      a new Mono
    • single

      public final Mono<T> single()
      Expect and emit a single item from this Flux source or signal NoSuchElementException for an empty source, or IndexOutOfBoundsException for a source with more than one element.

      Returns:
      a Mono with the single item or an error signal
    • single

      public final Mono<T> single(T defaultValue)
      Expect and emit a single item from this Flux source and emit a default value for an empty source, but signal an IndexOutOfBoundsException for a source with more than one element.

      Parameters:
      defaultValue - a single fallback item if this Flux is empty
      Returns:
      a Mono with the expected single item, the supplied default value or an error signal
    • singleOrEmpty

      public final Mono<T> singleOrEmpty()
      Expect and emit a single item from this Flux source, and accept an empty source but signal an IndexOutOfBoundsException for a source with more than one element.

      Returns:
      a Mono with the expected single item, no item or an error
    • skip

      public final Flux<T> skip(long skipped)
      Skip the specified number of elements from the beginning of this Flux then emit the remaining elements.

      Discard Support: This operator discards elements that are skipped.

      Parameters:
      skipped - the number of elements to drop
      Returns:
      a dropping Flux with the specified number of elements skipped at the beginning
    • skip

      public final Flux<T> skip(Duration timespan)
      Skip elements from this Flux emitted within the specified initial duration.

      Discard Support: This operator discards elements that are skipped.

      Parameters:
      timespan - the initial time window during which to drop elements
      Returns:
      a Flux dropping at the beginning until the end of the given duration
    • skip

      public final Flux<T> skip(Duration timespan, Scheduler timer)
      Skip elements from this Flux emitted within the specified initial duration, as measured on the provided Scheduler.

      Discard Support: This operator discards elements that are skipped.

      Parameters:
      timespan - the initial time window during which to drop elements
      timer - a time-capable Scheduler instance to measure the time window on
      Returns:
      a Flux dropping at the beginning for the given duration
    • skipLast

      public final Flux<T> skipLast(int n)
      Skip a specified number of elements at the end of this Flux sequence.

      Discard Support: This operator discards elements that are skipped.

      Parameters:
      n - the number of elements to drop before completion
      Returns:
      a Flux dropping the specified number of elements at the end of the sequence
    • skipUntil

      public final Flux<T> skipUntil(Predicate<? super T> untilPredicate)
      Skips values from this Flux until a Predicate returns true for the value. The resulting Flux will include and emit the matching value.

      Discard Support: This operator discards elements that are skipped.

      Parameters:
      untilPredicate - the Predicate evaluated to stop skipping.
      Returns:
      a Flux dropping until the Predicate matches
    • skipUntilOther

      public final Flux<T> skipUntilOther(Publisher<?> other)
      Skip values from this Flux until a specified Publisher signals an onNext or onComplete.

      Discard Support: This operator discards elements that are skipped.

      Parameters:
      other - the companion Publisher to coordinate with to stop skipping
      Returns:
      a Flux dropping until the other Publisher emits
    • skipWhile

      public final Flux<T> skipWhile(Predicate<? super T> skipPredicate)
      Skips values from this Flux while a Predicate returns true for the value.

      Discard Support: This operator discards elements that are skipped.

      Parameters:
      skipPredicate - the Predicate that causes skipping while evaluating to true.
      Returns:
      a Flux dropping while the Predicate matches
    • sort

      public final Flux<T> sort()
      Sort elements from this Flux by collecting and sorting them in the background then emitting the sorted sequence once this sequence completes. Each item emitted by the Flux must implement Comparable with respect to all other items in the sequence.

      Note that calling sort with long, non-terminating or infinite sources might cause OutOfMemoryError. Use sequence splitting like window(int) to sort batches in that case.

      Returns:
      a sorted Flux
      Throws:
      ClassCastException - if any item emitted by the Flux does not implement Comparable with respect to all other items emitted by the Flux
    • sort

      public final Flux<T> sort(Comparator<? super T> sortFunction)
      Sort elements from this Flux using a Comparator function, by collecting and sorting elements in the background then emitting the sorted sequence once this sequence completes.

      Note that calling sort with long, non-terminating or infinite sources might cause OutOfMemoryError

      Parameters:
      sortFunction - a function that compares two items emitted by this Flux to indicate their sort order
      Returns:
      a sorted Flux
    • startWith

      public final Flux<T> startWith(Iterable<? extends T> iterable)
      Prepend the given Iterable before this Flux sequence.

      Parameters:
      iterable - the sequence of values to start the resulting Flux with
      Returns:
      a new Flux prefixed with elements from an Iterable
    • startWith

      @SafeVarargs public final Flux<T> startWith(T... values)
      Prepend the given values before this Flux sequence.

      Parameters:
      values - the array of values to start the resulting Flux with
      Returns:
      a new Flux prefixed with the given elements
    • startWith

      public final Flux<T> startWith(Publisher<? extends T> publisher)
      Prepend the given Publisher sequence to this Flux sequence.

      Parameters:
      publisher - the Publisher whose values to prepend
      Returns:
      a new Flux prefixed with the given Publisher sequence
    • subscribe

      public final Disposable subscribe()
      Subscribe to this Flux and request unbounded demand.

      This version doesn't specify any consumption behavior for the events from the chain, especially no error handling, so other variants should usually be preferred.

      Returns:
      a new Disposable that can be used to cancel the underlying Subscription
    • subscribe

      public final Disposable subscribe(Consumer<? super T> consumer)
      Subscribe a Consumer to this Flux that will consume all the elements in the sequence. It will request an unbounded demand (Long.MAX_VALUE).

      For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer).

      For a version that gives you more control over backpressure and the request, see subscribe(Subscriber) with a BaseSubscriber.

      Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.

      Parameters:
      consumer - the consumer to invoke on each value (onNext signal)
      Returns:
      a new Disposable that can be used to cancel the underlying Subscription
    • subscribe

      public final Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer)
      Subscribe to this Flux with a Consumer that will consume all the elements in the sequence, as well as a Consumer that will handle errors. The subscription will request an unbounded demand (Long.MAX_VALUE).

      For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer) and doOnError(java.util.function.Consumer).

      For a version that gives you more control over backpressure and the request, see subscribe(Subscriber) with a BaseSubscriber.

      Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumers are not invoked when executing in a main thread or a unit test for instance.

      Parameters:
      consumer - the consumer to invoke on each next signal
      errorConsumer - the consumer to invoke on error signal
      Returns:
      a new Disposable that can be used to cancel the underlying Subscription
    • subscribe

      public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer)
      Subscribe Consumer to this Flux that will respectively consume all the elements in the sequence, handle errors and react to completion. The subscription will request unbounded demand (Long.MAX_VALUE).

      For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer), doOnError(java.util.function.Consumer) and doOnComplete(Runnable).

      For a version that gives you more control over backpressure and the request, see subscribe(Subscriber) with a BaseSubscriber.

      Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.

      Parameters:
      consumer - the consumer to invoke on each value
      errorConsumer - the consumer to invoke on error signal
      completeConsumer - the consumer to invoke on complete signal
      Returns:
      a new Disposable that can be used to cancel the underlying Subscription
    • subscribe

      @Deprecated public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer)
      Deprecated.
      Because users tend to forget to request the subsciption. If the behavior is really needed, consider using subscribeWith(Subscriber). To be removed in 3.5.
      Subscribe Consumer to this Flux that will respectively consume all the elements in the sequence, handle errors, react to completion, and request upon subscription. It will let the provided subscriptionConsumer request the adequate amount of data, or request unbounded demand Long.MAX_VALUE if no such consumer is provided.

      For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer), doOnError(java.util.function.Consumer), doOnComplete(Runnable) and doOnSubscribe(Consumer).

      For a version that gives you more control over backpressure and the request, see subscribe(Subscriber) with a BaseSubscriber.

      Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.

      Parameters:
      consumer - the consumer to invoke on each value
      errorConsumer - the consumer to invoke on error signal
      completeConsumer - the consumer to invoke on complete signal
      subscriptionConsumer - the consumer to invoke on subscribe signal, to be used for the initial request, or null for max request
      Returns:
      a new Disposable that can be used to cancel the underlying Subscription
    • subscribe

      public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Context initialContext)
      Subscribe Consumer to this Flux that will respectively consume all the elements in the sequence, handle errors and react to completion. Additionally, a Context is tied to the subscription. At subscription, an unbounded request is implicitly made.

      For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer), doOnError(java.util.function.Consumer), doOnComplete(Runnable) and doOnSubscribe(Consumer).

      For a version that gives you more control over backpressure and the request, see subscribe(Subscriber) with a BaseSubscriber.

      Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.

      Parameters:
      consumer - the consumer to invoke on each value
      errorConsumer - the consumer to invoke on error signal
      completeConsumer - the consumer to invoke on complete signal
      initialContext - the base Context tied to the subscription that will be visible to operators upstream
      Returns:
      a new Disposable that can be used to cancel the underlying Subscription
    • subscribe

      public final void subscribe(Subscriber<? super T> actual)
      Specified by:
      subscribe in interface Publisher<T>
    • subscribe

      public abstract void subscribe(CoreSubscriber<? super T> actual)
      An internal Publisher.subscribe(Subscriber) that will bypass Hooks.onLastOperator(Function) pointcut.

      In addition to behave as expected by Publisher.subscribe(Subscriber) in a controlled manner, it supports direct subscribe-time Context passing.

      Specified by:
      subscribe in interface CorePublisher<T>
      Parameters:
      actual - the Subscriber interested into the published sequence
      See Also:
    • subscribeOn

      public final Flux<T> subscribeOn(Scheduler scheduler)
      Run subscribe, onSubscribe and request on a specified Scheduler's Scheduler.Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of a publishOn.

      Note that if you are using an eager or blocking create(Consumer, FluxSink.OverflowStrategy) as the source, it can lead to deadlocks due to requests piling up behind the emitter. In such case, you should call subscribeOn(scheduler, false) instead.

      Typically used for slow publisher e.g., blocking IO, fast consumer(s) scenarios.

       flux.subscribeOn(Schedulers.single()).subscribe() 
       

      Note that Scheduler.Worker.schedule(Runnable) raising RejectedExecutionException on late Subscription.request(long) will be propagated to the request caller.

      Parameters:
      scheduler - a Scheduler providing the Scheduler.Worker where to subscribe
      Returns:
      a Flux requesting asynchronously
      See Also:
    • subscribeOn

      public final Flux<T> subscribeOn(Scheduler scheduler, boolean requestOnSeparateThread)
      Run subscribe and onSubscribe on a specified Scheduler's Scheduler.Worker. Request will be run on that worker too depending on the requestOnSeparateThread parameter (which defaults to true in the subscribeOn(Scheduler) version). As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of a publishOn.

      Note that if you are using an eager or blocking create(Consumer, FluxSink.OverflowStrategy) as the source, it can lead to deadlocks due to requests piling up behind the emitter. Thus this operator has a requestOnSeparateThread parameter, which should be set to false in this case.

      Typically used for slow publisher e.g., blocking IO, fast consumer(s) scenarios.

       flux.subscribeOn(Schedulers.single()).subscribe() 
       

      Note that Scheduler.Worker.schedule(Runnable) raising RejectedExecutionException on late Subscription.request(long) will be propagated to the request caller.

      Parameters:
      scheduler - a Scheduler providing the Scheduler.Worker where to subscribe
      requestOnSeparateThread - whether or not to also perform requests on the worker. true to behave like subscribeOn(Scheduler)
      Returns:
      a Flux requesting asynchronously
      See Also:
    • subscribeWith

      public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber)
      Subscribe a provided instance of a subclass of Subscriber to this Flux and return said instance for further chaining calls. This is similar to as(Function), except a subscription is explicitly performed by this method.

      If you need more control over backpressure and the request, use a BaseSubscriber.

      Type Parameters:
      E - the reified type from the input/output subscriber
      Parameters:
      subscriber - the Subscriber to subscribe with and return
      Returns:
      the passed Subscriber
    • switchOnFirst

      public final <V> Flux<V> switchOnFirst(BiFunction<Signal<? extends T>,Flux<T>,Publisher<? extends V>> transformer)
      Transform the current Flux once it emits its first element, making a conditional transformation possible. This operator first requests one element from the source then applies a transformation derived from the first Signal and the source. The whole source (including the first signal) is passed as second argument to the BiFunction and it is very strongly advised to always build upon with operators (see below).

      Note that the source might complete or error immediately instead of emitting, in which case the Signal would be onComplete or onError. It is NOT necessarily an onNext Signal, and must be checked accordingly.

      For example, this operator could be used to define a dynamic transformation that depends on the first element (which could contain routing metadata for instance):

       
        fluxOfIntegers.switchOnFirst((signal, flux) -> {
            if (signal.hasValue()) {
                ColoredShape firstColor = signal.get();
                return flux.filter(v -> !v.hasSameColorAs(firstColor))
            }
            return flux; //either early complete or error, this forwards the termination in any case
            //`return flux.onErrorResume(t -> Mono.empty());` instead would suppress an early error
            //`return Flux.just(1,2,3);` instead would suppress an early error and return 1, 2, 3.
            //It would also only cancel the original `flux` at the completion of `just`.
        })
       
       

      It is advised to return a Publisher derived from the original Flux in all cases, as not doing so would keep the original Publisher open and hanging with a single request until the inner Publisher terminates or the whole Flux is cancelled.

      Type Parameters:
      V - the item type in the returned Flux
      Parameters:
      transformer - A BiFunction executed once the first signal is available and used to transform the source conditionally. The whole source (including first signal) is passed as second argument to the BiFunction.
      Returns:
      a new Flux that transform the upstream once a signal is available
    • switchOnFirst

      public final <V> Flux<V> switchOnFirst(BiFunction<Signal<? extends T>,Flux<T>,Publisher<? extends V>> transformer, boolean cancelSourceOnComplete)
      Transform the current Flux once it emits its first element, making a conditional transformation possible. This operator first requests one element from the source then applies a transformation derived from the first Signal and the source. The whole source (including the first signal) is passed as second argument to the BiFunction and it is very strongly advised to always build upon with operators (see below).

      Note that the source might complete or error immediately instead of emitting, in which case the Signal would be onComplete or onError. It is NOT necessarily an onNext Signal, and must be checked accordingly.

      For example, this operator could be used to define a dynamic transformation that depends on the first element (which could contain routing metadata for instance):

       
        fluxOfIntegers.switchOnFirst((signal, flux) -> {
            if (signal.hasValue()) {
                ColoredShape firstColor = signal.get();
                return flux.filter(v -> !v.hasSameColorAs(firstColor))
            }
            return flux; //either early complete or error, this forwards the termination in any case
            //`return flux.onErrorResume(t -> Mono.empty());` instead would suppress an early error
            //`return Flux.just(1,2,3);` instead would suppress an early error and return 1, 2, 3.
            //It would also only cancel the original `flux` at the completion of `just`.
        })
       
       

      It is advised to return a Publisher derived from the original Flux in all cases, as not doing so would keep the original Publisher open and hanging with a single request. In case the value of the cancelSourceOnComplete parameter is true the original publisher until the inner Publisher terminates or the whole Flux is cancelled. Otherwise the original publisher will hang forever.

      Type Parameters:
      V - the item type in the returned Flux
      Parameters:
      transformer - A BiFunction executed once the first signal is available and used to transform the source conditionally. The whole source (including first signal) is passed as second argument to the BiFunction.
      cancelSourceOnComplete - specify whether original publisher should be cancelled on onComplete from the derived one
      Returns:
      a new Flux that transform the upstream once a signal is available
    • switchIfEmpty

      public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate)
      Switch to an alternative Publisher if this sequence is completed without any data.

      Parameters:
      alternate - the alternative Publisher if this sequence is empty
      Returns:
      a new Flux that falls back on a Publisher if source is empty
    • switchMap

      public final <V> Flux<V> switchMap(Function<? super T,Publisher<? extends V>> fn)
      Switch to a new Publisher generated via a Function whenever this Flux produces an item. As such, the elements from each generated Publisher are emitted in the resulting Flux.

      This operator requests the source for an unbounded amount, but doesn't request each generated Publisher unless the downstream has made a corresponding request (no prefetch of inner publishers).

      Type Parameters:
      V - the type of the return value of the transformation function
      Parameters:
      fn - the Function to generate a Publisher for each source value
      Returns:
      a new Flux that emits values from an alternative Publisher for each source onNext
    • switchMap

      @Deprecated public final <V> Flux<V> switchMap(Function<? super T,Publisher<? extends V>> fn, int prefetch)
      Deprecated.
      to be removed in 3.6.0 at the earliest. In 3.5.0, you should replace calls with prefetch=0 with calls to switchMap(fn), as the default behavior of the single-parameter variant will then change to prefetch=0.
      Switch to a new Publisher generated via a Function whenever this Flux produces an item. As such, the elements from each generated Publisher are emitted in the resulting Flux.

      Type Parameters:
      V - the type of the return value of the transformation function
      Parameters:
      fn - the Function to generate a Publisher for each source value
      prefetch - the produced demand for inner sources
      Returns:
      a new Flux that emits values from an alternative Publisher for each source onNext
    • tag

      public final Flux<T> tag(String key, String value)
      Tag this flux with a key/value pair. These can be retrieved as a Set of all tags throughout the publisher chain by using Scannable.tags() (as traversed by Scannable.parents()).

      The name is typically visible at assembly time by the tap(SignalListenerFactory) operator, which could for example be configured with a metrics listener applying the tag(s) to its meters.

      Parameters:
      key - a tag key
      value - a tag value
      Returns:
      the same sequence, but bearing tags
      See Also:
    • take

      public final Flux<T> take(long n)
      Take only the first N values from this Flux, if available. If n is zero, the source isn't even subscribed to and the operator completes immediately upon subscription.

      This ensures that the total amount requested upstream is capped at n, although smaller requests can be made if the downstream makes requests < n. In any case, this operator never lets the upstream produce more elements than the cap, and it can be used to more strictly adhere to backpressure.

      This mode is typically useful for cases where a race between request and cancellation can lead the upstream to producing a lot of extraneous data, and such a production is undesirable (e.g. a source that would send the extraneous data over the network). It is equivalent to take(long, boolean) with limitRequest == true, If there is a requirement for unbounded upstream request (eg. for performance reasons), use take(long, boolean) with limitRequest=false instead.

      Parameters:
      n - the maximum number of items to request from upstream and emit from this Flux
      Returns:
      a Flux limited to size N
      See Also:
    • take

      public final Flux<T> take(long n, boolean limitRequest)
      Take only the first N values from this Flux, if available.

      If limitRequest == true, ensure that the total amount requested upstream is capped at n. In that configuration, this operator never let the upstream produce more elements than the cap, and it can be used to more strictly adhere to backpressure. If n is zero, the source isn't even subscribed to and the operator completes immediately upon subscription (the behavior inherited from take(long)).

      This mode is typically useful for cases where a race between request and cancellation can lead the upstream to producing a lot of extraneous data, and such a production is undesirable (e.g. a source that would send the extraneous data over the network).

      takeLimitRequestFalse

      If limitRequest == false this operator doesn't propagate the backpressure requested amount. Rather, it makes an unbounded request and cancels once N elements have been emitted. If n is zero, the source is subscribed to but immediately cancelled, then the operator completes.

      In this mode, the source could produce a lot of extraneous elements despite cancellation. If that behavior is undesirable and you do not own the request from downstream (e.g. prefetching operators), consider using limitRequest = true instead.

      Parameters:
      n - the number of items to emit from this Flux
      limitRequest - true to follow the downstream request more closely and limit the upstream request to n. false to request an unbounded amount from upstream.
      Returns:
      a Flux limited to size N
    • take

      public final Flux<T> take(Duration timespan)
      Relay values from this Flux until the specified Duration elapses.

      If the duration is zero, the resulting Flux completes as soon as this Flux signals its first value (which is not relayed, though).

      Parameters:
      timespan - the Duration of the time window during which to emit elements from this Flux
      Returns:
      a Flux limited to elements emitted within a specific duration
    • take

      public final Flux<T> take(Duration timespan, Scheduler timer)
      Relay values from this Flux until the specified Duration elapses, as measured on the specified Scheduler.

      If the duration is zero, the resulting Flux completes as soon as this Flux signals its first value (which is not relayed, though).

      Parameters:
      timespan - the Duration of the time window during which to emit elements from this Flux
      timer - a time-capable Scheduler instance to run on
      Returns:
      a Flux limited to elements emitted within a specific duration
    • takeLast

      public final Flux<T> takeLast(int n)
      Emit the last N values this Flux emitted before its completion.

      Parameters:
      n - the number of items from this Flux to retain and emit on onComplete
      Returns:
      a terminating Flux sub-sequence
    • takeUntil

      public final Flux<T> takeUntil(Predicate<? super T> predicate)
      Relay values from this Flux until the given Predicate matches. This includes the matching data (unlike takeWhile(java.util.function.Predicate<? super T>)). The predicate is tested before the element is emitted, so if the element is modified by the consumer, this won't affect the predicate. In case of an error during the predicate test, the current element is emitted before the error.

      Parameters:
      predicate - the Predicate that stops the taking of values from this Flux when returning true.
      Returns:
      a new Flux limited by the predicate
    • takeUntilOther

      public final Flux<T> takeUntilOther(Publisher<?> other)
      Relay values from this Flux until the given Publisher emits.

      Parameters:
      other - the companion Publisher that signals when to stop taking values from this Flux
      Returns:
      a new Flux limited by a companion Publisher
    • takeWhile

      public final Flux<T> takeWhile(Predicate<? super T> continuePredicate)
      Relay values from this Flux while a predicate returns TRUE for the values (checked before each value is delivered). This only includes the matching data (unlike takeUntil(java.util.function.Predicate<? super T>)).

      Parameters:
      continuePredicate - the Predicate invoked each onNext returning TRUE to relay a value or FALSE to terminate
      Returns:
      a new Flux taking values from the source while the predicate matches
    • tap

      public final Flux<T> tap(Supplier<SignalListener<T>> simpleListenerGenerator)
      Tap into Reactive Streams signals emitted or received by this Flux and notify a stateful per-Subscriber SignalListener.

      Any exception thrown by the SignalListener methods causes the subscription to be cancelled and the subscriber to be terminated with an onError signal of that exception. Note that SignalListener.doFinally(SignalType), SignalListener.doAfterComplete() and SignalListener.doAfterError(Throwable) instead just drop the exception.

      This simplified variant assumes the state is purely initialized within the Supplier, as it is called for each incoming Subscriber without additional context.

      When the context-propagation library is available at runtime and the downstream ContextView is not empty, this operator implicitly uses the library to restore thread locals around all invocations of SignalListener methods. Typically, this would be done in conjunction with the use of contextCapture() operator down the chain.

      Parameters:
      simpleListenerGenerator - the Supplier to create a new SignalListener on each subscription
      Returns:
      a new Flux with side effects defined by generated SignalListener
      See Also:
    • tap

      public final Flux<T> tap(Function<ContextView,SignalListener<T>> listenerGenerator)
      Tap into Reactive Streams signals emitted or received by this Flux and notify a stateful per-Subscriber SignalListener.

      Any exception thrown by the SignalListener methods causes the subscription to be cancelled and the subscriber to be terminated with an onError signal of that exception. Note that SignalListener.doFinally(SignalType), SignalListener.doAfterComplete() and SignalListener.doAfterError(Throwable) instead just drop the exception.

      This simplified variant allows the SignalListener to be constructed for each subscription with access to the incoming Subscriber's ContextView.

      When the context-propagation library is available at runtime and the ContextView is not empty, this operator implicitly uses the library to restore thread locals around all invocations of SignalListener methods. Typically, this would be done in conjunction with the use of contextCapture() operator down the chain.

      Parameters:
      listenerGenerator - the Function to create a new SignalListener on each subscription
      Returns:
      a new Flux with side effects defined by generated SignalListener
      See Also:
    • tap

      public final Flux<T> tap(SignalListenerFactory<T,?> listenerFactory)
      Tap into Reactive Streams signals emitted or received by this Flux and notify a stateful per-Subscriber SignalListener created by the provided SignalListenerFactory.

      The factory will initialize a state object for each Flux or Mono instance it is used with, and that state will be cached and exposed for each incoming Subscriber in order to generate the associated listener.

      Any exception thrown by the SignalListener methods causes the subscription to be cancelled and the subscriber to be terminated with an onError signal of that exception. Note that SignalListener.doFinally(SignalType), SignalListener.doAfterComplete() and SignalListener.doAfterError(Throwable) instead just drop the exception.

      When the context-propagation library is available at runtime and the downstream ContextView is not empty, this operator implicitly uses the library to restore thread locals around all invocations of SignalListener methods. Typically, this would be done in conjunction with the use of contextCapture() operator down the chain.

      Parameters:
      listenerFactory - the SignalListenerFactory to create a new SignalListener on each subscription
      Returns:
      a new Flux with side effects defined by generated SignalListener
      See Also:
    • then

      public final Mono<Void> then()
      Return a Mono<Void> that completes when this Flux completes. This will actively ignore the sequence and only replay completion or error signals.

      Discard Support: This operator discards elements from the source.

      Returns:
      a new Mono representing the termination of this Flux
    • then

      public final <V> Mono<V> then(Mono<V> other)
      Let this Flux complete then play signals from a provided Mono.

      In other words ignore element from this Flux and transform its completion signal into the emission and completion signal of a provided Mono<V>. Error signal is replayed in the resulting Mono<V>.

      Discard Support: This operator discards elements from the source.

      Type Parameters:
      V - the element type of the supplied Mono
      Parameters:
      other - a Mono to emit from after termination
      Returns:
      a new Flux that wait for source completion then emits from the supplied Mono
    • thenEmpty

      public final Mono<Void> thenEmpty(Publisher<Void> other)
      Return a Mono<Void> that waits for this Flux to complete then for a supplied Publisher<Void> to also complete. The second completion signal is replayed, or any error signal that occurs instead.

      Discard Support: This operator discards elements from the source.

      Parameters:
      other - a Publisher to wait for after this Flux's termination
      Returns:
      a new Mono completing when both publishers have completed in sequence
    • thenMany

      public final <V> Flux<V> thenMany(Publisher<V> other)
      Let this Flux complete then play another Publisher.

      In other words ignore element from this flux and transform the completion signal into a Publisher<V> that will emit elements from the provided Publisher.

      Discard Support: This operator discards elements from the source.

      Type Parameters:
      V - the element type of the supplied Publisher
      Parameters:
      other - a Publisher to emit from after termination
      Returns:
      a new Flux that emits from the supplied Publisher after this Flux completes.
    • timed

      public final Flux<Timed<T>> timed()
      Times Subscriber.onNext(Object) events, encapsulated into a Timed object that lets downstream consumer look at various time information gathered with nanosecond resolution using the default clock (Schedulers.parallel()):
      • Timed.elapsed(): the time in nanoseconds since last event, as a Duration. For the first onNext, "last event" is the subscription. Otherwise it is the previous onNext. This is functionally equivalent to elapsed(), with a more expressive and precise representation than a Tuple2 with a long.
      • Timed.timestamp(): the timestamp of this onNext, as an Instant (with nanoseconds part). This is functionally equivalent to timestamp(), with a more expressive and precise representation than a Tuple2 with a long.
      • Timed.elapsedSinceSubscription(): the time in nanoseconds since subscription, as a Duration.

      The Timed object instances are safe to store and use later, as they are created as an immutable wrapper around the <T> value and immediately passed downstream.

      Returns:
      a timed Flux
      See Also:
    • timed

      public final Flux<Timed<T>> timed(Scheduler clock)
      Times Subscriber.onNext(Object) events, encapsulated into a Timed object that lets downstream consumer look at various time information gathered with nanosecond resolution using the provided Scheduler as a clock:
      • Timed.elapsed(): the time in nanoseconds since last event, as a Duration. For the first onNext, "last event" is the subscription. Otherwise it is the previous onNext. This is functionally equivalent to elapsed(), with a more expressive and precise representation than a Tuple2 with a long.
      • Timed.timestamp(): the timestamp of this onNext, as an Instant (with nanoseconds part). This is functionally equivalent to timestamp(), with a more expressive and precise representation than a Tuple2 with a long.
      • Timed.elapsedSinceSubscription(): the time in nanoseconds since subscription, as a Duration.

      The Timed object instances are safe to store and use later, as they are created as an immutable wrapper around the <T> value and immediately passed downstream.

      Returns:
      a timed Flux
      See Also:
    • timeout

      public final Flux<T> timeout(Duration timeout)
      Propagate a TimeoutException as soon as no item is emitted within the given Duration from the previous emission (or the subscription for the first item).

      Parameters:
      timeout - the timeout between two signals from this Flux
      Returns:
      a Flux that can time out on a per-item basis
    • timeout

      public final Flux<T> timeout(Duration timeout, @Nullable Publisher<? extends T> fallback)
      Switch to a fallback Flux as soon as no item is emitted within the given Duration from the previous emission (or the subscription for the first item).

      If the given Publisher is null, signal a TimeoutException instead.

      Parameters:
      timeout - the timeout between two signals from this Flux
      fallback - the fallback Publisher to subscribe when a timeout occurs
      Returns:
      a Flux that will fallback to a different Publisher in case of a per-item timeout
    • timeout

      public final Flux<T> timeout(Duration timeout, Scheduler timer)
      Propagate a TimeoutException as soon as no item is emitted within the given Duration from the previous emission (or the subscription for the first item), as measured by the specified Scheduler.

      Parameters:
      timeout - the timeout Duration between two signals from this Flux
      timer - a time-capable Scheduler instance to run on
      Returns:
      a Flux that can time out on a per-item basis
    • timeout

      public final Flux<T> timeout(Duration timeout, @Nullable Publisher<? extends T> fallback, Scheduler timer)
      Switch to a fallback Flux as soon as no item is emitted within the given Duration from the previous emission (or the subscription for the first item), as measured on the specified Scheduler.

      If the given Publisher is null, signal a TimeoutException instead.

      Parameters:
      timeout - the timeout Duration between two signals from this Flux
      fallback - the fallback Publisher to subscribe when a timeout occurs
      timer - a time-capable Scheduler instance to run on
      Returns:
      a Flux that will fallback to a different Publisher in case of a per-item timeout
    • timeout

      public final <U> Flux<T> timeout(Publisher<U> firstTimeout)
      Signal a TimeoutException in case the first item from this Flux has not been emitted before the given Publisher emits.

      Type Parameters:
      U - the type of the timeout Publisher
      Parameters:
      firstTimeout - the companion Publisher that will trigger a timeout if emitting before the first signal from this Flux
      Returns:
      a Flux that can time out if the first item does not come before a signal from a companion Publisher
    • timeout

      public final <U, V> Flux<T> timeout(Publisher<U> firstTimeout, Function<? super T,? extends Publisher<V>> nextTimeoutFactory)
      Signal a TimeoutException in case the first item from this Flux has not been emitted before the firstTimeout Publisher emits, and whenever each subsequent elements is not emitted before a Publisher generated from the latest element signals.

      Discard Support: This operator discards an element if it comes right after the timeout.

      Type Parameters:
      U - the type of the elements of the first timeout Publisher
      V - the type of the elements of the subsequent timeout Publishers
      Parameters:
      firstTimeout - the timeout Publisher that must not emit before the first signal from this Flux
      nextTimeoutFactory - the timeout Publisher factory for each next item
      Returns:
      a Flux that can time out if each element does not come before a signal from a per-item companion Publisher
    • timeout

      public final <U, V> Flux<T> timeout(Publisher<U> firstTimeout, Function<? super T,? extends Publisher<V>> nextTimeoutFactory, Publisher<? extends T> fallback)
      Switch to a fallback Publisher in case the first item from this Flux has not been emitted before the firstTimeout Publisher emits, and whenever each subsequent elements is not emitted before a Publisher generated from the latest element signals.

      Type Parameters:
      U - the type of the elements of the first timeout Publisher
      V - the type of the elements of the subsequent timeout Publishers
      Parameters:
      firstTimeout - the timeout Publisher that must not emit before the first signal from this Flux
      nextTimeoutFactory - the timeout Publisher factory for each next item
      fallback - the fallback Publisher to subscribe when a timeout occurs
      Returns:
      a Flux that can time out if each element does not come before a signal from a per-item companion Publisher
    • timestamp

      public final Flux<Tuple2<Long,T>> timestamp()
      Emit a Tuple2 pair of T1 the current clock time in millis (as a Long measured by the parallel Scheduler) and T2 the emitted data (as a T), for each item from this Flux.

      Returns:
      a timestamped Flux
      See Also:
    • timestamp

      public final Flux<Tuple2<Long,T>> timestamp(Scheduler scheduler)
      Emit a Tuple2 pair of T1 the current clock time in millis (as a Long measured by the provided Scheduler) and T2 the emitted data (as a T), for each item from this Flux.

      The provider Scheduler will be asked to provide time with a granularity of TimeUnit.MILLISECONDS. In order for this operator to work as advertised, the provided Scheduler should thus return results that can be interpreted as unix timestamps.

      Parameters:
      scheduler - the Scheduler to read time from
      Returns:
      a timestamped Flux
      See Also:
    • toIterable

      public final Iterable<T> toIterable()
      Transform this Flux into a lazy Iterable blocking on Iterator.next() calls.

      Note that iterating from within threads marked as "non-blocking only" is illegal and will cause an IllegalStateException to be thrown, but obtaining the Iterable itself within these threads is ok.

      Returns:
      a blocking Iterable
    • toIterable

      public final Iterable<T> toIterable(int batchSize)
      Transform this Flux into a lazy Iterable blocking on Iterator.next() calls.

      Note that iterating from within threads marked as "non-blocking only" is illegal and will cause an IllegalStateException to be thrown, but obtaining the Iterable itself within these threads is ok.

      Parameters:
      batchSize - the bounded capacity to prefetch from this Flux or Integer.MAX_VALUE for unbounded demand
      Returns:
      a blocking Iterable
    • toIterable

      public final Iterable<T> toIterable(int batchSize, @Nullable Supplier<Queue<T>> queueProvider)
      Transform this Flux into a lazy Iterable blocking on Iterator.next() calls.

      Note that iterating from within threads marked as "non-blocking only" is illegal and will cause an IllegalStateException to be thrown, but obtaining the Iterable itself within these threads is ok.

      Parameters:
      batchSize - the bounded capacity to prefetch from this Flux or Integer.MAX_VALUE for unbounded demand
      queueProvider - the supplier of the queue implementation to be used for storing elements emitted faster than the iteration
      Returns:
      a blocking Iterable
    • toStream

      public final Stream<T> toStream()
      Transform this Flux into a lazy Stream blocking for each source onNext call.

      Note that iterating from within threads marked as "non-blocking only" is illegal and will cause an IllegalStateException to be thrown, but obtaining the Stream itself or applying lazy intermediate operation on the stream within these threads is ok.

      Returns:
      a Stream of unknown size with onClose attached to Subscription.cancel()
    • toStream

      public final Stream<T> toStream(int batchSize)
      Transform this Flux into a lazy Stream blocking for each source onNext call.

      Note that iterating from within threads marked as "non-blocking only" is illegal and will cause an IllegalStateException to be thrown, but obtaining the Stream itself or applying lazy intermediate operation on the stream within these threads is ok.

      Parameters:
      batchSize - the bounded capacity to prefetch from this Flux or Integer.MAX_VALUE for unbounded demand
      Returns:
      a Stream of unknown size with onClose attached to Subscription.cancel()
    • transform

      public final <V> Flux<V> transform(Function<? super Flux<T>,? extends Publisher<V>> transformer)
      Transform this Flux in order to generate a target Flux. Unlike transformDeferred(Function), the provided function is executed as part of assembly.
       Function<Flux, Flux> applySchedulers = flux -> flux.subscribeOn(Schedulers.boundedElastic())
                                                          .publishOn(Schedulers.parallel());
       flux.transform(applySchedulers).map(v -> v * v).subscribe();
       

      Type Parameters:
      V - the item type in the returned Flux
      Parameters:
      transformer - the Function to immediately map this Flux into a target Flux instance.
      Returns:
      a new Flux
      See Also:
    • transformDeferred

      public final <V> Flux<V> transformDeferred(Function<? super Flux<T>,? extends Publisher<V>> transformer)
      Defer the transformation of this Flux in order to generate a target Flux type. A transformation will occur for each Subscriber. For instance:
       flux.transformDeferred(original -> original.log());
       

      Type Parameters:
      V - the item type in the returned Publisher
      Parameters:
      transformer - the Function to lazily map this Flux into a target Publisher instance for each new subscriber
      Returns:
      a new Flux
      See Also:
    • transformDeferredContextual

      public final <V> Flux<V> transformDeferredContextual(BiFunction<? super Flux<T>,? super ContextView,? extends Publisher<V>> transformer)
      Defer the given transformation to this Flux in order to generate a target Flux type. A transformation will occur for each Subscriber. In addition, the transforming BiFunction exposes the ContextView of each Subscriber. For instance:
       Flux<T> fluxLogged = flux.transformDeferredContextual((original, ctx) -> original.log("for RequestID" + ctx.get("RequestID"))
       //...later subscribe. Each subscriber has its Context with a RequestID entry
       fluxLogged.contextWrite(Context.of("RequestID", "requestA").subscribe();
       fluxLogged.contextWrite(Context.of("RequestID", "requestB").subscribe();
       

      Type Parameters:
      V - the item type in the returned Publisher
      Parameters:
      transformer - the BiFunction to lazily map this Flux into a target Flux instance upon subscription, with access to ContextView
      Returns:
      a new Flux
      See Also:
    • window

      public final Flux<Flux<T>> window(int maxSize)
      Split this Flux sequence into multiple Flux windows containing maxSize elements (or less for the final window) and starting from the first item. Each Flux window will onComplete after maxSize items have been routed.

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.

      Parameters:
      maxSize - the maximum number of items to emit in the window before closing it
      Returns:
      a Flux of Flux windows based on element count
    • window

      public final Flux<Flux<T>> window(int maxSize, int skip)
      Split this Flux sequence into multiple Flux windows of size maxSize, that each open every skip elements in the source.

      When maxSize < skip : dropping windows

      When maxSize > skip : overlapping windows

      When maxSize == skip : exact windows

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: The overlapping variant DOES NOT discard elements, as they might be part of another still valid window. The exact window and dropping window variants bot discard elements they internally queued for backpressure upon cancellation or error triggered by a data signal. The dropping window variant also discards elements in between windows.

      Parameters:
      maxSize - the maximum number of items to emit in the window before closing it
      skip - the number of items to count before opening and emitting a new window
      Returns:
      a Flux of Flux windows based on element count and opened every skipCount
    • window

      public final Flux<Flux<T>> window(Publisher<?> boundary)
      Split this Flux sequence into continuous, non-overlapping windows where the window boundary is signalled by another Publisher

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors and those emitted by the boundary delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.

      Parameters:
      boundary - a Publisher to emit any item for a split signal and complete to terminate
      Returns:
      a Flux of Flux windows delimited by a given Publisher
    • window

      public final Flux<Flux<T>> window(Duration windowingTimespan)
      Split this Flux sequence into continuous, non-overlapping windows that open for a windowingTimespan Duration (as measured on the parallel Scheduler).

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.

      Parameters:
      windowingTimespan - the Duration to delimit Flux windows
      Returns:
      a Flux of Flux windows continuously opened for a given Duration
    • window

      public final Flux<Flux<T>> window(Duration windowingTimespan, Duration openWindowEvery)
      Split this Flux sequence into multiple Flux windows that open for a given windowingTimespan Duration, after which it closes with onComplete. Each window is opened at a regular timeShift interval, starting from the first item. Both durations are measured on the parallel Scheduler.

      When windowingTimespan < openWindowEvery : dropping windows

      When windowingTimespan > openWindowEvery : overlapping windows

      When windowingTimespan == openWindowEvery : exact windows

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: The overlapping variant DOES NOT discard elements, as they might be part of another still valid window. The exact window and dropping window variants bot discard elements they internally queued for backpressure upon cancellation or error triggered by a data signal. The dropping window variant also discards elements in between windows.

      Parameters:
      windowingTimespan - the maximum Flux window Duration
      openWindowEvery - the period of time at which to create new Flux windows
      Returns:
      a Flux of Flux windows opened at regular intervals and closed after a Duration
    • window

      public final Flux<Flux<T>> window(Duration windowingTimespan, Scheduler timer)
      Split this Flux sequence into continuous, non-overlapping windows that open for a windowingTimespan Duration (as measured on the provided Scheduler).

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.

      Parameters:
      windowingTimespan - the Duration to delimit Flux windows
      timer - a time-capable Scheduler instance to run on
      Returns:
      a Flux of Flux windows continuously opened for a given Duration
    • window

      public final Flux<Flux<T>> window(Duration windowingTimespan, Duration openWindowEvery, Scheduler timer)
      Split this Flux sequence into multiple Flux windows that open for a given windowingTimespan Duration, after which it closes with onComplete. Each window is opened at a regular timeShift interval, starting from the first item. Both durations are measured on the provided Scheduler.

      When windowingTimespan < openWindowEvery : dropping windows

      When windowingTimespan > openWindowEvery : overlapping windows

      When openWindowEvery == openWindowEvery : exact windows

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: The overlapping variant DOES NOT discard elements, as they might be part of another still valid window. The exact window and dropping window variants bot discard elements they internally queued for backpressure upon cancellation or error triggered by a data signal. The dropping window variant also discards elements in between windows.

      Parameters:
      windowingTimespan - the maximum Flux window Duration
      openWindowEvery - the period of time at which to create new Flux windows
      timer - a time-capable Scheduler instance to run on
      Returns:
      a Flux of Flux windows opened at regular intervals and closed after a Duration
    • windowTimeout

      public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime)
      Split this Flux sequence into multiple Flux windows containing maxSize elements (or less for the final window) and starting from the first item. Each Flux window will onComplete once it contains maxSize elements OR it has been open for the given Duration (as measured on the parallel Scheduler).

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.

      Parameters:
      maxSize - the maximum number of items to emit in the window before closing it
      maxTime - the maximum Duration since the window was opened before closing it
      Returns:
      a Flux of Flux windows based on element count and duration
    • windowTimeout

      public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime, boolean fairBackpressure)
      Split this Flux sequence into multiple Flux windows containing maxSize elements (or less for the final window) and starting from the first item. Each Flux window will onComplete once it contains maxSize elements OR it has been open for the given Duration (as measured on the parallel Scheduler).

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.

      Parameters:
      maxSize - the maximum number of items to emit in the window before closing it
      maxTime - the maximum Duration since the window was opened before closing it
      fairBackpressure - define whether operator request unbounded demand or prefetch by maxSize
      Returns:
      a Flux of Flux windows based on element count and duration
    • windowTimeout

      public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime, Scheduler timer)
      Split this Flux sequence into multiple Flux windows containing maxSize elements (or less for the final window) and starting from the first item. Each Flux window will onComplete once it contains maxSize elements OR it has been open for the given Duration (as measured on the provided Scheduler).

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.

      Parameters:
      maxSize - the maximum number of items to emit in the window before closing it
      maxTime - the maximum Duration since the window was opened before closing it
      timer - a time-capable Scheduler instance to run on
      Returns:
      a Flux of Flux windows based on element count and duration
    • windowTimeout

      public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime, Scheduler timer, boolean fairBackpressure)
      Split this Flux sequence into multiple Flux windows containing maxSize elements (or less for the final window) and starting from the first item. Each Flux window will onComplete once it contains maxSize elements OR it has been open for the given Duration (as measured on the provided Scheduler).

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.

      Parameters:
      maxSize - the maximum number of items to emit in the window before closing it
      maxTime - the maximum Duration since the window was opened before closing it
      timer - a time-capable Scheduler instance to run on
      fairBackpressure - define whether operator request unbounded demand or prefetch by maxSize
      Returns:
      a Flux of Flux windows based on element count and duration
    • windowUntil

      public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger)
      Split this Flux sequence into multiple Flux windows delimited by the given predicate. A new window is opened each time the predicate returns true, at which point the previous window will receive the triggering element then onComplete.

      Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window errors). This variant shouldn't expose empty windows, as the separators are emitted into the windows they close.

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.

      Parameters:
      boundaryTrigger - a predicate that triggers the next window when it becomes true.
      Returns:
      a Flux of Flux windows, bounded depending on the predicate.
    • windowUntil

      public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore)
      Split this Flux sequence into multiple Flux windows delimited by the given predicate. A new window is opened each time the predicate returns true.

      Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window completes or errors).

      If cutBefore is true, the old window will onComplete and the triggering element will be emitted in the new window, which becomes immediately available. This variant can emit an empty window if the sequence starts with a separator.

      Otherwise, the triggering element will be emitted in the old window before it does onComplete, similar to windowUntil(Predicate). This variant shouldn't expose empty windows, as the separators are emitted into the windows they close.

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.

      Parameters:
      boundaryTrigger - a predicate that triggers the next window when it becomes true.
      cutBefore - set to true to include the triggering element in the new window rather than the old.
      Returns:
      a Flux of Flux windows, bounded depending on the predicate.
    • windowUntil

      public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore, int prefetch)
      Split this Flux sequence into multiple Flux windows delimited by the given predicate and using a prefetch. A new window is opened each time the predicate returns true.

      Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window completes or errors).

      If cutBefore is true, the old window will onComplete and the triggering element will be emitted in the new window. This variant can emit an empty window if the sequence starts with a separator.

      Otherwise, the triggering element will be emitted in the old window before it does onComplete, similar to windowUntil(Predicate). This variant shouldn't expose empty windows, as the separators are emitted into the windows they close.

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.

      Parameters:
      boundaryTrigger - a predicate that triggers the next window when it becomes true.
      cutBefore - set to true to include the triggering element in the new window rather than the old.
      prefetch - the request size to use for this Flux.
      Returns:
      a Flux of Flux windows, bounded depending on the predicate.
    • windowUntilChanged

      public final Flux<Flux<T>> windowUntilChanged()
      Collect subsequent repetitions of an element (that is, if they arrive right after one another) into multiple Flux windows.

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.

      Returns:
      a microbatched Flux of Flux windows.
    • windowUntilChanged

      public final <V> Flux<Flux<T>> windowUntilChanged(Function<? super T,? super V> keySelector)
      Collect subsequent repetitions of an element (that is, if they arrive right after one another), as compared by a key extracted through the user provided Function, into multiple Flux windows.

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.

      Parameters:
      keySelector - function to compute comparison key for each element
      Returns:
      a microbatched Flux of Flux windows.
    • windowUntilChanged

      public final <V> Flux<Flux<T>> windowUntilChanged(Function<? super T,? extends V> keySelector, BiPredicate<? super V,? super V> keyComparator)
      Collect subsequent repetitions of an element (that is, if they arrive right after one another), as compared by a key extracted through the user provided Function and compared using a supplied BiPredicate, into multiple Flux windows.

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.

      Parameters:
      keySelector - function to compute comparison key for each element
      keyComparator - predicate used to compare keys
      Returns:
      a microbatched Flux of Flux windows.
    • windowWhile

      public final Flux<Flux<T>> windowWhile(Predicate<T> inclusionPredicate)
      Split this Flux sequence into multiple Flux windows that stay open while a given predicate matches the source elements. Once the predicate returns false, the window closes with an onComplete and the triggering element is discarded.

      Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window completes or errors). Empty windows can happen when a sequence starts with a separator or contains multiple separators, but a sequence that finishes with a separator won't cause a remainder empty window to be emitted.

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal, as well as the triggering element(s) (that doesn't match the predicate). Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.

      Parameters:
      inclusionPredicate - a predicate that triggers the next window when it becomes false.
      Returns:
      a Flux of Flux windows, each containing subsequent elements that all passed a predicate.
    • windowWhile

      public final Flux<Flux<T>> windowWhile(Predicate<T> inclusionPredicate, int prefetch)
      Split this Flux sequence into multiple Flux windows that stay open while a given predicate matches the source elements. Once the predicate returns false, the window closes with an onComplete and the triggering element is discarded.

      Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window completes or errors). Empty windows can happen when a sequence starts with a separator or contains multiple separators, but a sequence that finishes with a separator won't cause a remainder empty window to be emitted.

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal, as well as the triggering element(s) (that doesn't match the predicate). Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.

      Parameters:
      inclusionPredicate - a predicate that triggers the next window when it becomes false.
      prefetch - the request size to use for this Flux.
      Returns:
      a Flux of Flux windows, each containing subsequent elements that all passed a predicate.
    • windowWhen

      public final <U, V> Flux<Flux<T>> windowWhen(Publisher<U> bucketOpening, Function<? super U,? extends Publisher<V>> closeSelector)
      Split this Flux sequence into potentially overlapping windows controlled by items of a start Publisher and end Publisher derived from the start values.

      When Open signal is strictly not overlapping Close signal : dropping windows

      When Open signal is strictly more frequent than Close signal : overlapping windows

      When Open signal is exactly coordinated with Close signal : exact windows

      Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to retry() or repeat() a window, as these operators are based on re-subscription.

      To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window Flux are wrapped in Exceptions.SourceException.

      Discard Support: This operator DOES NOT discard elements.

      Type Parameters:
      U - the type of the sequence opening windows
      V - the type of the sequence closing windows opened by the bucketOpening Publisher's elements
      Parameters:
      bucketOpening - a Publisher that opens a new window when it emits any item
      closeSelector - a Function given an opening signal and returning a Publisher that will close the window when emitting
      Returns:
      a Flux of Flux windows opened by signals from a first Publisher and lasting until a selected second Publisher emits
    • withLatestFrom

      public final <U, R> Flux<R> withLatestFrom(Publisher<? extends U> other, BiFunction<? super T,? super U,? extends R> resultSelector)
      Combine the most recently emitted values from both this Flux and another Publisher through a BiFunction and emits the result.

      The operator will drop values from this Flux until the other Publisher produces any value.

      If the other Publisher completes without any value, the sequence is completed.

      Type Parameters:
      U - the other Publisher sequence type
      R - the result type
      Parameters:
      other - the Publisher to combine with
      resultSelector - the bi-function called with each pair of source and other elements that should return a single value to be emitted
      Returns:
      a combined Flux gated by another Publisher
    • zipWith

      public final <T2> Flux<Tuple2<T,T2>> zipWith(Publisher<? extends T2> source2)
      Zip this Flux with another Publisher source, that is to say wait for both to emit one element and combine these elements once into a Tuple2. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

      Type Parameters:
      T2 - type of the value from source2
      Parameters:
      source2 - The second source Publisher to zip with this Flux.
      Returns:
      a zipped Flux
    • zipWith

      public final <T2, V> Flux<V> zipWith(Publisher<? extends T2> source2, BiFunction<? super T,? super T2,? extends V> combinator)
      Zip this Flux with another Publisher source, that is to say wait for both to emit one element and combine these elements using a combinator BiFunction The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

      Type Parameters:
      T2 - type of the value from source2
      V - The produced output after transformation by the combinator
      Parameters:
      source2 - The second source Publisher to zip with this Flux.
      combinator - The aggregate function that will receive a unique value from each source and return the value to signal downstream
      Returns:
      a zipped Flux
    • zipWith

      public final <T2, V> Flux<V> zipWith(Publisher<? extends T2> source2, int prefetch, BiFunction<? super T,? super T2,? extends V> combinator)
      Zip this Flux with another Publisher source, that is to say wait for both to emit one element and combine these elements using a combinator BiFunction The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

      Type Parameters:
      T2 - type of the value from source2
      V - The produced output after transformation by the combinator
      Parameters:
      source2 - The second source Publisher to zip with this Flux.
      prefetch - the request size to use for this Flux and the other Publisher
      combinator - The aggregate function that will receive a unique value from each source and return the value to signal downstream
      Returns:
      a zipped Flux
    • zipWith

      public final <T2> Flux<Tuple2<T,T2>> zipWith(Publisher<? extends T2> source2, int prefetch)
      Zip this Flux with another Publisher source, that is to say wait for both to emit one element and combine these elements once into a Tuple2. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

      Type Parameters:
      T2 - type of the value from source2
      Parameters:
      source2 - The second source Publisher to zip with this Flux.
      prefetch - the request size to use for this Flux and the other Publisher
      Returns:
      a zipped Flux
    • zipWithIterable

      public final <T2> Flux<Tuple2<T,T2>> zipWithIterable(Iterable<? extends T2> iterable)
      Zip elements from this Flux with the content of an Iterable, that is to say combine one element from each, pairwise, into a Tuple2.

      Type Parameters:
      T2 - the value type of the other iterable sequence
      Parameters:
      iterable - the Iterable to zip with
      Returns:
      a zipped Flux
    • zipWithIterable

      public final <T2, V> Flux<V> zipWithIterable(Iterable<? extends T2> iterable, BiFunction<? super T,? super T2,? extends V> zipper)
      Zip elements from this Flux with the content of an Iterable, that is to say combine one element from each, pairwise, using the given zipper BiFunction.

      Type Parameters:
      T2 - the value type of the other iterable sequence
      V - the result type
      Parameters:
      iterable - the Iterable to zip with
      zipper - the BiFunction pair combinator
      Returns:
      a zipped Flux
    • onAssembly

      protected static <T> Flux<T> onAssembly(Flux<T> source)
      To be used by custom operators: invokes assembly Hooks pointcut given a Flux, potentially returning a new Flux. This is for example useful to activate cross-cutting concerns at assembly time, eg. a generalized checkpoint().
      Type Parameters:
      T - the value type
      Parameters:
      source - the source to apply assembly hooks onto
      Returns:
      the source, potentially wrapped with assembly time cross-cutting behavior
    • onAssembly

      protected static <T> ConnectableFlux<T> onAssembly(ConnectableFlux<T> source)
      To be used by custom operators: invokes assembly Hooks pointcut given a ConnectableFlux, potentially returning a new ConnectableFlux. This is for example useful to activate cross-cutting concerns at assembly time, eg. a generalized checkpoint().
      Type Parameters:
      T - the value type
      Parameters:
      source - the source to apply assembly hooks onto
      Returns:
      the source, potentially wrapped with assembly time cross-cutting behavior
    • toString

      public String toString()
      Overrides:
      toString in class Object