Class Flux<T>
- Type Parameters:
T- the element type of this Reactive StreamsPublisher
- All Implemented Interfaces:
Publisher<T>,CorePublisher<T>
- Direct Known Subclasses:
ConnectableFlux,FluxOperator,FluxProcessor,GroupedFlux
Publisher with rx operators that emits 0 to N elements, and then completes
(successfully or with an error).
The recommended way to learn about the Flux API and discover new operators is
through the reference documentation, rather than through this javadoc (as opposed to
learning more about individual operators). See the
"which operator do I need?" appendix.
It is intended to be used in implementations and return types. Input parameters should keep using raw
Publisher as much as possible.
If it is known that the underlying Publisher will emit 0 or 1 element, Mono should be used
instead.
Note that using state in the java.util.function / lambdas used within Flux operators
should be avoided, as these may be shared between several Subscribers.
subscribe(CoreSubscriber) is an internal extension to
subscribe(Subscriber) used internally for Context passing. User
provided Subscriber may
be passed to this "subscribe" extension but will loose the available
per-subscribe Hooks.onLastOperator(java.util.function.Function<? super org.reactivestreams.Publisher<java.lang.Object>, ? extends org.reactivestreams.Publisher<java.lang.Object>>).
- Author:
- Sebastien Deleuze, Stephane Maldini, David Karnok, Simon Baslé, Injae Kim
- See Also:
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionEmit a single boolean true if all values of this sequence match thePredicate.Emit a single boolean true if any of the values of thisFluxsequence match the predicate.final <P> PTransform thisFluxinto a target type.Subscribe to thisFluxand block indefinitely until the upstream signals its first value or completes.blockFirst(Duration timeout) Subscribe to thisFluxand block until the upstream signals its first value, completes or a timeout expires.Subscribe to thisFluxand block indefinitely until the upstream signals its last value or completes.Subscribe to thisFluxand block until the upstream signals its last value, completes or a timeout expires.buffer()buffer(int maxSize) buffer(int maxSize, int skip) final <C extends Collection<? super T>>
Flux<C>Collect incoming values into multiple user-definedCollectionbuffers that will be emitted by the returnedFluxeach time the given max size is reached or once this Flux completes.final <C extends Collection<? super T>>
Flux<C>Collect incoming values into multiple user-definedCollectionbuffers that will be emitted by the returnedFluxeach time the given max size is reached or once this Flux completes.Collect incoming values into multipleListbuffers created at a givenopenBufferEveryperiod.final <C extends Collection<? super T>>
Flux<C>Collect incoming values into multiple user-definedCollectionbuffers, as delimited by the signals of a companionPublisherthis operator will subscribe to.bufferTimeout(int maxSize, Duration maxTime) bufferTimeout(int maxSize, Duration maxTime, boolean fairBackpressure) final <C extends Collection<? super T>>
Flux<C>bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier) Collect incoming values into multiple user-definedCollectionbuffers that will be emitted by the returnedFluxeach time the buffer reaches a maximum size OR the maxTimeDurationelapses.final <C extends Collection<? super T>>
Flux<C>bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier, boolean fairBackpressure) Collect incoming values into multiple user-definedCollectionbuffers that will be emitted by the returnedFluxeach time the buffer reaches a maximum size OR the maxTimeDurationelapses.bufferTimeout(int maxSize, Duration maxTime, Scheduler timer) bufferTimeout(int maxSize, Duration maxTime, Scheduler timer, boolean fairBackpressure) final <C extends Collection<? super T>>
Flux<C>bufferTimeout(int maxSize, Duration maxTime, Scheduler timer, Supplier<C> bufferSupplier) Collect incoming values into multiple user-definedCollectionbuffers that will be emitted by the returnedFluxeach time the buffer reaches a maximum size OR the maxTimeDurationelapses, as measured on the providedScheduler.final <C extends Collection<? super T>>
Flux<C>bufferTimeout(int maxSize, Duration maxTime, Scheduler timer, Supplier<C> bufferSupplier, boolean fairBackpressure) Collect incoming values into multiple user-definedCollectionbuffers that will be emitted by the returnedFluxeach time the buffer reaches a maximum size OR the maxTimeDurationelapses, as measured on the providedScheduler.bufferUntil(Predicate<? super T> predicate) bufferUntil(Predicate<? super T> predicate, boolean cutBefore) bufferUntilChanged(Function<? super T, ? extends V> keySelector) bufferUntilChanged(Function<? super T, ? extends V> keySelector, BiPredicate<? super V, ? super V> keyComparator) Collect subsequent repetitions of an element (that is, if they arrive right after one another), as compared by a key extracted through the user providedFunctionand compared using a suppliedBiPredicate, into multipleListbuffers that will be emitted by the resultingFlux.bufferWhen(Publisher<U> bucketOpening, Function<? super U, ? extends Publisher<V>> closeSelector) final <U,V, C extends Collection<? super T>>
Flux<C>bufferWhen(Publisher<U> bucketOpening, Function<? super U, ? extends Publisher<V>> closeSelector, Supplier<C> bufferSupplier) Collect incoming values into multiple user-definedCollectionbuffers started each time an opening companionPublisheremits.bufferWhile(Predicate<? super T> predicate) cache()Turn thisFluxinto a hot source and cache last emitted signals for furtherSubscriber.cache(int history) Turn thisFluxinto a hot source and cache last emitted signals for furtherSubscriber.Turn thisFluxinto a hot source and cache last emitted signals for furtherSubscriber.Turn thisFluxinto a hot source and cache last emitted signals for furtherSubscriber.Turn thisFluxinto a hot source and cache last emitted signals for furtherSubscriber.Turn thisFluxinto a hot source and cache last emitted signals for furtherSubscriber.final <E> Flux<E>Cast the currentFluxproduced type into a target produced type.Activate traceback (full assembly tracing) for this particularFlux, in case of an error upstream of the checkpoint.checkpoint(@Nullable String description, boolean forceStackTrace) Activate traceback (full assembly tracing or the lighter assembly marking depending on theforceStackTraceoption).checkpoint(String description) Activate traceback (assembly marker) for this particularFluxby giving it a description that will be reflected in the assembly traceback in case of an error upstream of the checkpoint.final <E> Mono<E>collect(Supplier<E> containerSupplier, BiConsumer<E, ? super T> collector) Collect all elements emitted by thisFluxinto a user-defined container, by applying a collectorBiConsumertaking the container and each element.final <R,A> Mono<R> collectMap(Function<? super T, ? extends K> keyExtractor) collectMap(Function<? super T, ? extends K> keyExtractor, Function<? super T, ? extends V> valueExtractor) collectMap(Function<? super T, ? extends K> keyExtractor, Function<? super T, ? extends V> valueExtractor, Supplier<Map<K, V>> mapSupplier) final <K> Mono<Map<K,Collection<T>>> collectMultimap(Function<? super T, ? extends K> keyExtractor) final <K,V> Mono<Map<K, Collection<V>>> collectMultimap(Function<? super T, ? extends K> keyExtractor, Function<? super T, ? extends V> valueExtractor) final <K,V> Mono<Map<K, Collection<V>>> collectMultimap(Function<? super T, ? extends K> keyExtractor, Function<? super T, ? extends V> valueExtractor, Supplier<Map<K, Collection<V>>> mapSupplier) collectSortedList(@Nullable Comparator<? super T> comparator) Collect all elements emitted by thisFluxuntil this sequence completes, and then sort them using aComparatorinto aListthat is emitted by the resultingMono.static <T,V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, int prefetch, Function<Object[], V> combinator) static <T,V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<Object[], V> combinator) static <T,V> Flux<V> combineLatest(Function<Object[], V> combinator, int prefetch, Publisher<? extends T>... sources) static <T,V> Flux<V> combineLatest(Function<Object[], V> combinator, Publisher<? extends T>... sources) static <T1,T2, V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends V> combinator) static <T1,T2, T3, V>
Flux<V>combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Function<Object[], V> combinator) static <T1,T2, T3, T4, V>
Flux<V>combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Function<Object[], V> combinator) static <T1,T2, T3, T4, T5, V>
Flux<V>combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Function<Object[], V> combinator) static <T1,T2, T3, T4, T5, T6, V>
Flux<V>combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Function<Object[], V> combinator) static <T> Flux<T>Concatenate all sources provided in anIterable, forwarding elements emitted by the sources downstream.static <T> Flux<T>Concatenate all sources emitted as an onNext signal from a parentPublisher, forwarding elements emitted by the sources downstream.static <T> Flux<T>Concatenate all sources emitted as an onNext signal from a parentPublisher, forwarding elements emitted by the sources downstream.static <T> Flux<T>Concatenate all sources provided as a vararg, forwarding elements emitted by the sources downstream.static <T> Flux<T>concatDelayError(Publisher<? extends Publisher<? extends T>> sources) Concatenate all sources emitted as an onNext signal from a parentPublisher, forwarding elements emitted by the sources downstream.static <T> Flux<T>concatDelayError(Publisher<? extends Publisher<? extends T>> sources, boolean delayUntilEnd, int prefetch) Concatenate all sources emitted as an onNext signal from a parentPublisher, forwarding elements emitted by the sources downstream.static <T> Flux<T>concatDelayError(Publisher<? extends Publisher<? extends T>> sources, int prefetch) Concatenate all sources emitted as an onNext signal from a parentPublisher, forwarding elements emitted by the sources downstream.static <T> Flux<T>concatDelayError(Publisher<? extends T>... sources) Concatenate all sources provided as a vararg, forwarding elements emitted by the sources downstream.final <V> Flux<V>final <V> Flux<V>final <V> Flux<V>concatMapDelayError(Function<? super T, ? extends Publisher<? extends V>> mapper) final <V> Flux<V>concatMapDelayError(Function<? super T, ? extends Publisher<? extends V>> mapper, boolean delayUntilEnd, int prefetch) final <V> Flux<V>concatMapDelayError(Function<? super T, ? extends Publisher<? extends V>> mapper, int prefetch) final <R> Flux<R>concatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper) final <R> Flux<R>concatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper, int prefetch) concatWith(Publisher<? extends T> other) concatWithValues(T... values) Concatenates the values to the end of theFluxIf context-propagation library is on the classpath, this is a convenience shortcut to capture thread local values during the subscription phase and put them in theContextthat is visible upstream of this operator.contextWrite(Function<Context, Context> contextModifier) contextWrite(ContextView contextToAppend) Enrich theContextvisible from downstream for the benefit of upstream operators, by making all values from the providedContextViewvisible on top of pairs from downstream.count()Counts the number of values in thisFlux.static <T> Flux<T>static <T> Flux<T>create(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure) defaultIfEmpty(T defaultV) Provide a default unique value if this sequence is completed without any datastatic <T> Flux<T>Lazily supply aPublisherevery time aSubscriptionis made on the resultingFlux, so the actual source instantiation is deferred until each subscribe and theSuppliercan create a subscriber-specific instance.static <T> Flux<T>deferContextual(Function<ContextView, ? extends Publisher<T>> contextualPublisherFactory) Lazily supply aPublisherevery time aSubscriptionis made on the resultingFlux, so the actual source instantiation is deferred until each subscribe and theFunctioncan create a subscriber-specific instance.delayElements(Duration delay) delayElements(Duration delay, Scheduler timer) delaySequence(Duration delay) delaySequence(Duration delay, Scheduler timer) delaySubscription(Duration delay) Delay thesubscriptionto thisFluxsource until the given period elapses.delaySubscription(Duration delay, Scheduler timer) Delay thesubscriptionto thisFluxsource until the given period elapses, as measured on the user-providedScheduler.delaySubscription(Publisher<U> subscriptionDelay) delayUntil(Function<? super T, ? extends Publisher<?>> triggerProvider) final <X> Flux<X>An operator working only if thisFluxemits onNext, onError or onCompleteSignalinstances, transforming thesematerializedsignals into real signals on theSubscriber.distinct()For eachSubscriber, track elements from thisFluxthat have been seen and filter out duplicates.For eachSubscriber, track elements from thisFluxthat have been seen and filter out duplicates, as compared by a key extracted through the user providedFunction.final <V,C extends Collection<? super V>>
Flux<T>For eachSubscriber, track elements from thisFluxthat have been seen and filter out duplicates, as compared by a key extracted through the user providedFunctionand by theadd methodof theCollectionsupplied (typically aSet).distinct(Function<? super T, ? extends V> keySelector, Supplier<C> distinctStoreSupplier, BiPredicate<C, V> distinctPredicate, Consumer<C> cleanup) For eachSubscriber, track elements from thisFluxthat have been seen and filter out duplicates, as compared by applying aBiPredicateon an arbitrary user-supplied<C>store and a key extracted through the user providedFunction.Filter out subsequent repetitions of an element (that is, if they arrive right after one another).distinctUntilChanged(Function<? super T, ? extends V> keySelector) Filter out subsequent repetitions of an element (that is, if they arrive right after one another), as compared by a key extracted through the user providedFunctionusing equality.distinctUntilChanged(Function<? super T, ? extends V> keySelector, BiPredicate<? super V, ? super V> keyComparator) Filter out subsequent repetitions of an element (that is, if they arrive right after one another), as compared by a key extracted through the user providedFunctionand then comparing keys with the suppliedBiPredicate.doAfterTerminate(Runnable afterTerminate) Add behavior (side-effect) triggered after theFluxterminates, either by completing downstream successfully or with an error.doFinally(Consumer<SignalType> onFinally) Add behavior (side-effect) triggered after theFluxterminates for any reason, including cancellation.Add behavior (side-effect) triggered before theFluxis subscribed to, which should be the first event after assembly time.doOnCancel(Runnable onCancel) Add behavior (side-effect) triggered when theFluxis cancelled.doOnComplete(Runnable onComplete) Add behavior (side-effect) triggered when theFluxcompletes successfully.doOnDiscard(Class<R> type, Consumer<? super R> discardHook) Potentially modify the behavior of the whole chain of operators upstream of this one to conditionally clean up elements that get discarded by these operators.Add behavior (side-effects) triggered when theFluxemits an item, fails with an error or completes successfully.Add behavior (side-effect) triggered when theFluxcompletes with an error matching the given exception type.Add behavior (side-effect) triggered when theFluxcompletes with an error.Add behavior (side-effect) triggered when theFluxcompletes with an error matching the given exception.Add behavior (side-effect) triggered when theFluxemits an item.doOnRequest(LongConsumer consumer) Add behavior (side-effect) triggering aLongConsumerwhen thisFluxreceives any request.doOnSubscribe(Consumer<? super Subscription> onSubscribe) Add behavior (side-effect) triggered when theFluxis being subscribed, that is to say when aSubscriptionhas been produced by thePublisherand is being passed to theSubscriber.onSubscribe(Subscription).doOnTerminate(Runnable onTerminate) Add behavior (side-effect) triggered when theFluxterminates, either by completing successfully or failing with an error.elapsed()Map thisFluxintoTuple2<Long, T>of timemillis and source data.Map thisFluxintoTuple2<Long, T>of timemillis and source data.elementAt(int index) Emit only the element at the given index position orIndexOutOfBoundsExceptionif the sequence is shorter.Emit only the element at the given index position or fall back to a default value if the sequence is shorter.static <T> Flux<T>empty()Create aFluxthat completes without emitting any item.static <T> Flux<T>Create aFluxthat terminates with the specified error immediately after being subscribed to.static <O> Flux<O>Create aFluxthat terminates with the specified error, either immediately after being subscribed to or after being first requested.static <T> Flux<T>Create aFluxthat terminates with an error immediately after being subscribed to.Recursively expand elements into a graph and emit all the resulting element using a breadth-first traversal strategy.Recursively expand elements into a graph and emit all the resulting element using a breadth-first traversal strategy.expandDeep(Function<? super T, ? extends Publisher<? extends T>> expander) Recursively expand elements into a graph and emit all the resulting element, in a depth-first traversal order.expandDeep(Function<? super T, ? extends Publisher<? extends T>> expander, int capacityHint) Recursively expand elements into a graph and emit all the resulting element, in a depth-first traversal order.Evaluate each source value against the givenPredicate.filterWhen(Function<? super T, ? extends Publisher<Boolean>> asyncPredicate) Test each value emitted by thisFluxasynchronously using a generatedPublisher<Boolean>test.filterWhen(Function<? super T, ? extends Publisher<Boolean>> asyncPredicate, int bufferSize) Test each value emitted by thisFluxasynchronously using a generatedPublisher<Boolean>test.static <I> Flux<I>Deprecated.static <I> Flux<I>Deprecated.static <I> Flux<I>firstWithSignal(Iterable<? extends Publisher<? extends I>> sources) static <I> Flux<I>firstWithSignal(Publisher<? extends I>... sources) static <I> Flux<I>firstWithValue(Iterable<? extends Publisher<? extends I>> sources) static <I> Flux<I>firstWithValue(Publisher<? extends I> first, Publisher<? extends I>... others) final <R> Flux<R>flatMap(@Nullable Function<? super T, ? extends Publisher<? extends R>> mapperOnNext, @Nullable Function<? super Throwable, ? extends Publisher<? extends R>> mapperOnError, @Nullable Supplier<? extends Publisher<? extends R>> mapperOnComplete) final <R> Flux<R>final <V> Flux<V>final <V> Flux<V>final <V> Flux<V>flatMapDelayError(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency, int prefetch) final <R> Flux<R>flatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper) final <R> Flux<R>flatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper, int prefetch) final <R> Flux<R>flatMapSequential(Function<? super T, ? extends Publisher<? extends R>> mapper) final <R> Flux<R>flatMapSequential(Function<? super T, ? extends Publisher<? extends R>> mapper, int maxConcurrency) final <R> Flux<R>flatMapSequential(Function<? super T, ? extends Publisher<? extends R>> mapper, int maxConcurrency, int prefetch) final <R> Flux<R>flatMapSequentialDelayError(Function<? super T, ? extends Publisher<? extends R>> mapper, int maxConcurrency, int prefetch) static <T> Flux<T>static <T> Flux<T>fromArray(T[] array) Create aFluxthat emits the items contained in the provided array.static <T> Flux<T>fromIterable(Iterable<? extends T> it) static <T> Flux<T>fromStream(Supplier<Stream<? extends T>> streamSupplier) static <T> Flux<T>fromStream(Stream<? extends T> s) generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) Programmatically create aFluxby generating signals one-by-one via a consumer callback and some state.generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer) Programmatically create aFluxby generating signals one-by-one via a consumer callback and some state, with a final cleanup callback.static <T> Flux<T>generate(Consumer<SynchronousSink<T>> generator) Programmatically create aFluxby generating signals one-by-one via a consumer callback.intThe prefetch configuration of theFluxfinal <K> Flux<GroupedFlux<K,T>> final <K> Flux<GroupedFlux<K,T>> final <K,V> Flux<GroupedFlux<K, V>> final <K,V> Flux<GroupedFlux<K, V>> groupBy(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends V> valueMapper, int prefetch) final <TRight,TLeftEnd, TRightEnd, R>
Flux<R>groupJoin(Publisher<? extends TRight> other, Function<? super T, ? extends Publisher<TLeftEnd>> leftEnd, Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd, BiFunction<? super T, ? super Flux<TRight>, ? extends R> resultSelector) Map values from two Publishers into time windows and emit combination of values in case their windows overlap.final <R> Flux<R>handle(BiConsumer<? super T, SynchronousSink<R>> handler) Handle the items emitted by thisFluxby calling a biconsumer with the output sink for each onNext.hasElement(T value) Emit a single boolean true if any of the elements of thisFluxsequence is equal to the provided value.Emit a single boolean true if thisFluxsequence has at least one element.hide()Hides the identities of thisFluxinstance.Ignores onNext signals (dropping them) and only propagate termination events.index()Keep information about the order in which source values were received by indexing them with a 0-based incrementing long, returning aFluxofTuple2<(index, value)>.final <I> Flux<I>index(BiFunction<? super Long, ? super T, ? extends I> indexMapper) Keep information about the order in which source values were received by indexing them internally with a 0-based incrementing long then combining this information with the source value into aIusing the providedBiFunction, returning aFlux<I>.Create aFluxthat emits long values starting with 0 and incrementing at specified time intervals on the global timer.Create aFluxthat emits long values starting with 0 and incrementing at specified time intervals, after an initial delay, on the global timer.final <TRight,TLeftEnd, TRightEnd, R>
Flux<R>join(Publisher<? extends TRight> other, Function<? super T, ? extends Publisher<TLeftEnd>> leftEnd, Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd, BiFunction<? super T, ? super TRight, ? extends R> resultSelector) Combine values from two Publishers in case their windows overlap.static <T> Flux<T>just(T data) Create a newFluxthat will only emit a single element then onComplete.static <T> Flux<T>just(T... data) Create aFluxthat emits the provided elements and then completes.last()Emit the last element observed before complete signal as aMono, or emitNoSuchElementExceptionerror if the source was empty.Emit the last element observed before complete signal as aMono, or emit thedefaultValueif the source was empty.limitRate(int prefetchRate) Ensure that backpressure signals from downstream subscribers are split into batches capped at the providedprefetchRatewhen propagated upstream, effectively rate limiting the upstreamPublisher.limitRate(int highTide, int lowTide) Ensure that backpressure signals from downstream subscribers are split into batches capped at the providedhighTidefirst, then replenishing at the providedlowTide, effectively rate limiting the upstreamPublisher.limitRequest(long n) Deprecated.replace withtake(n, true)in 3.4.x, thentake(long)in 3.5.0.log()Observe all Reactive Streams signals and trace them usingLoggersupport.log(@Nullable String category, Level level, boolean showOperatorLine, SignalType... options) Observe Reactive Streams signals matching the passed filteroptionsand trace them usingLoggersupport.log(@Nullable String category, Level level, SignalType... options) Observe Reactive Streams signals matching the passed filteroptionsand trace them usingLoggersupport.Observe all Reactive Streams signals and trace them usingLoggersupport.Observe Reactive Streams signals matching the passed filteroptionsand trace them using a specific user-providedLogger, atLevel.INFOlevel.log(Logger logger, Level level, boolean showOperatorLine, SignalType... options) final <V> Flux<V>Transform the items emitted by thisFluxby applying a synchronous function to each item.final <V> Flux<V>mapNotNull(Function<? super T, ? extends @Nullable V> mapper) Transform the items emitted by thisFluxby applying a synchronous function to each item, which may producenullvalues.Transform incoming onNext, onError and onComplete signals intoSignalinstances, materializing these signals.static <I> Flux<I>Merge data fromPublishersequences contained in an array / vararg into an interleaved merged sequence.static <I> Flux<I>static <I> Flux<I>Merge data fromPublishersequences contained in an array / vararg into an interleaved merged sequence.static <T> Flux<T>static <T> Flux<T>static <T> Flux<T>static <T> Flux<T>mergeComparing(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources) Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the providedComparator).static <T> Flux<T>mergeComparing(Comparator<? super T> comparator, Publisher<? extends T>... sources) Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the providedComparator).static <I extends Comparable<? super I>>
Flux<I>mergeComparing(Publisher<? extends I>... sources) Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by their natural order).static <T> Flux<T>mergeComparingDelayError(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources) Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the providedComparator).mergeComparingWith(Publisher<? extends T> other, Comparator<? super T> otherComparator) Merge data from thisFluxand aPublisherinto a reordered merge sequence, by picking the smallest value from each sequence as defined by a providedComparator.static <I> Flux<I>mergeDelayError(int prefetch, Publisher<? extends I>... sources) Merge data fromPublishersequences contained in an array / vararg into an interleaved merged sequence.static <T> Flux<T>mergeOrdered(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources) Deprecated.UsemergeComparingDelayError(int, Comparator, Publisher[])instead (asmergeComparing(Publisher[])don't have this operator's delayError behavior).static <T> Flux<T>mergeOrdered(Comparator<? super T> comparator, Publisher<? extends T>... sources) Deprecated.UsemergeComparingDelayError(int, Comparator, Publisher[])instead (asmergeComparing(Publisher[])don't have this operator's delayError behavior).static <I extends Comparable<? super I>>
Flux<I>mergeOrdered(Publisher<? extends I>... sources) Deprecated.UsemergeComparingDelayError(int, Comparator, Publisher[])instead (asmergeComparing(Publisher[])don't have this operator's delayError behavior).mergeOrderedWith(Publisher<? extends T> other, Comparator<? super T> otherComparator) Deprecated.UsemergeComparingWith(Publisher, Comparator)instead (with the caveat that it defaults to NOT delaying errors, unlike this operator).static <T> Flux<T>mergePriority(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources) Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the providedComparator) as they arrive.static <T> Flux<T>mergePriority(Comparator<? super T> comparator, Publisher<? extends T>... sources) Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the providedComparator) as they arrive.static <I extends Comparable<? super I>>
Flux<I>mergePriority(Publisher<? extends I>... sources) Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by their natural order) as they arrive.static <T> Flux<T>mergePriorityDelayError(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources) Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the providedComparator) as they arrive.static <I> Flux<I>mergeSequential(int prefetch, Publisher<? extends I>... sources) Merge data fromPublishersequences provided in an array/vararg into an ordered merged sequence.static <I> Flux<I>mergeSequential(Iterable<? extends Publisher<? extends I>> sources) static <I> Flux<I>mergeSequential(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch) static <I> Flux<I>mergeSequential(Publisher<? extends I>... sources) Merge data fromPublishersequences provided in an array/vararg into an ordered merged sequence.static <T> Flux<T>mergeSequential(Publisher<? extends Publisher<? extends T>> sources) static <T> Flux<T>mergeSequential(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch) static <I> Flux<I>mergeSequentialDelayError(int prefetch, Publisher<? extends I>... sources) Merge data fromPublishersequences provided in an array/vararg into an ordered merged sequence.static <I> Flux<I>mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch) static <T> Flux<T>mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch) metrics()Deprecated.Prefer using thetap(SignalListenerFactory)with theSignalListenerFactoryprovided by the new reactor-core-micrometer module.Give a name to this sequence, which can be retrieved usingScannable.name()as long as this is the first reachableScannable.parents().static <T> Flux<T>never()Create aFluxthat will never signal any data, error or completion signal.next()final <U> Flux<U>Evaluate each accepted value against the givenClasstype.protected static <T> ConnectableFlux<T>onAssembly(ConnectableFlux<T> source) To be used by custom operators: invokes assemblyHookspointcut given aConnectableFlux, potentially returning a newConnectableFlux.protected static <T> Flux<T>onAssembly(Flux<T> source) Request an unbounded demand and push to the returnedFlux, or park the observed elements if not enough demand is requested downstream.onBackpressureBuffer(int maxSize) Request an unbounded demand and push to the returnedFlux, or park up tomaxSizeelements when not enough demand is requested downstream.onBackpressureBuffer(int maxSize, Consumer<? super T> onOverflow) Request an unbounded demand and push to the returnedFlux, or park up tomaxSizeelements when not enough demand is requested downstream.onBackpressureBuffer(int maxSize, Consumer<? super T> onBufferOverflow, BufferOverflowStrategy bufferOverflowStrategy) Request an unbounded demand and push to the returnedFlux, or park the observed elements if not enough demand is requested downstream, within amaxSizelimit.onBackpressureBuffer(int maxSize, BufferOverflowStrategy bufferOverflowStrategy) Request an unbounded demand and push to the returnedFlux, or park the observed elements if not enough demand is requested downstream, within amaxSizelimit.onBackpressureBuffer(Duration ttl, int maxSize, Consumer<? super T> onBufferEviction) Request an unbounded demand and push to the returnedFlux, or park the observed elements if not enough demand is requested downstream, within amaxSizelimit and for a maximumDurationofttl(as measured on theparallel Scheduler).onBackpressureBuffer(Duration ttl, int maxSize, Consumer<? super T> onBufferEviction, Scheduler scheduler) Request an unbounded demand and push to the returnedFlux, or drop the observed elements if not enough demand is requested downstream.onBackpressureDrop(Consumer<? super T> onDropped) Request an unbounded demand and push to the returnedFlux, or emit onError fomExceptions.failWithOverflow()if not enough demand is requested downstream.Request an unbounded demand and push to the returnedFlux, or only keep the most recent observed item if not enough demand is requested downstream.Simply complete the sequence by replacing anonError signalwith anonComplete signal.onErrorComplete(Class<? extends Throwable> type) Simply complete the sequence by replacing anonError signalwith anonComplete signalif the error matches the givenClass.onErrorComplete(Predicate<? super Throwable> predicate) Simply complete the sequence by replacing anonError signalwith anonComplete signalif the error matches the givenPredicate.onErrorContinue(Class<E> type, BiConsumer<Throwable, Object> errorConsumer) Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements.onErrorContinue(BiConsumer<Throwable, Object> errorConsumer) Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements.onErrorContinue(Predicate<E> errorPredicate, BiConsumer<Throwable, Object> errorConsumer) Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements.onErrorMap(Class<E> type, Function<? super E, ? extends Throwable> mapper) Transform an error emitted by thisFluxby synchronously applying a function to it if the error matches the given type.onErrorMap(Function<? super Throwable, ? extends Throwable> mapper) Transform any error emitted by thisFluxby synchronously applying a function to it.onErrorMap(Predicate<? super Throwable> predicate, Function<? super Throwable, ? extends Throwable> mapper) Transform an error emitted by thisFluxby synchronously applying a function to it if the error matches the given predicate.onErrorResume(Class<E> type, Function<? super E, ? extends Publisher<? extends T>> fallback) Subscribe to a fallback publisher when an error matching the given type occurs, using a function to choose the fallback depending on the error.onErrorResume(Function<? super Throwable, ? extends Publisher<? extends T>> fallback) Subscribe to a returned fallback publisher when any error occurs, using a function to choose the fallback depending on the error.onErrorResume(Predicate<? super Throwable> predicate, Function<? super Throwable, ? extends Publisher<? extends T>> fallback) Subscribe to a fallback publisher when an error matching a given predicate occurs.onErrorReturn(Class<E> type, T fallbackValue) Simply emit a captured fallback value when an error of the specified type is observed on thisFlux.onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue) Simply emit a captured fallback value when an error matching the given predicate is observed on thisFlux.onErrorReturn(T fallbackValue) Simply emit a captured fallback value when any error is observed on thisFlux.If anonErrorContinue(BiConsumer)variant has been used downstream, reverts to the default 'STOP' mode where errors are terminal events upstream.Detaches both the childSubscriberand theSubscriptionon termination or cancellation.final ParallelFlux<T>parallel()Prepare thisFluxby dividing data on a number of 'rails' matching the number of CPU cores, in a round-robin fashion.final ParallelFlux<T>parallel(int parallelism) Prepare thisFluxby dividing data on a number of 'rails' matching the providedparallelismparameter, in a round-robin fashion.final ParallelFlux<T>parallel(int parallelism, int prefetch) final ConnectableFlux<T>publish()Prepare aConnectableFluxwhich shares thisFluxsequence and dispatches values to subscribers in a backpressure-aware manner.final ConnectableFlux<T>publish(int prefetch) Prepare aConnectableFluxwhich shares thisFluxsequence and dispatches values to subscribers in a backpressure-aware manner.final <R> Flux<R>Shares a sequence for the duration of a function that may transform it and consume it as many times as necessary without causing multiple subscriptions to the upstream.final <R> Flux<R>Shares a sequence for the duration of a function that may transform it and consume it as many times as necessary without causing multiple subscriptions to the upstream.Deprecated.useshareNext()instead, or use `publish().next()` if you need to `connect().Run onNext, onComplete and onError on a suppliedSchedulerScheduler.Worker.Run onNext, onComplete and onError on a suppliedSchedulerScheduler.Worker.static <T> Flux<T>static <T> Flux<T>push(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure) range(int start, int count) final <A> Mono<A>reduce(A initial, BiFunction<A, ? super T, A> accumulator) Reduce the values from thisFluxsequence into a single object matching the type of a seed value.reduce(BiFunction<T, T, T> aggregator) Reduce the values from thisFluxsequence into a single object of the same type than the emitted items.final <A> Mono<A>reduceWith(Supplier<A> initial, BiFunction<A, ? super T, A> accumulator) Reduce the values from thisFluxsequence into a single object matching the type of a lazily supplied seed value.repeat()Repeatedly and indefinitely subscribe to the source upon completion of the previous subscription.repeat(long numRepeat) Repeatedly subscribe to the sourcenumRepeattimes.repeat(long numRepeat, BooleanSupplier predicate) Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.repeat(BooleanSupplier predicate) Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.repeatWhen(Function<Flux<Long>, ? extends Publisher<?>> repeatFactory) Repeatedly subscribe to thisFluxwhen a companion sequence emits elements in response to the flux completion signal.final ConnectableFlux<T>replay()Turn thisFluxinto a hot source and cache last emitted signals for furtherSubscriber.final ConnectableFlux<T>replay(int history) Turn thisFluxinto a connectable hot source and cache last emitted signals for furtherSubscriber.final ConnectableFlux<T>Turn thisFluxinto a connectable hot source and cache last emitted signals for furtherSubscriber.final ConnectableFlux<T>Turn thisFluxinto a connectable hot source and cache last emitted signals for furtherSubscriber.final ConnectableFlux<T>Turn thisFluxinto a connectable hot source and cache last emitted signals for furtherSubscriber.final ConnectableFlux<T>Turn thisFluxinto a connectable hot source and cache last emitted signals for furtherSubscriber.retry()Re-subscribes to thisFluxsequence if it signals any error, indefinitely.retry(long numRetries) Re-subscribes to thisFluxsequence if it signals any error, for a fixed number of times.sampleFirst(Duration timespan) Repeatedly take a value from thisFluxthen skip the values that follow within a given duration.sampleFirst(Function<? super T, ? extends Publisher<U>> samplerFactory) sampleTimeout(Function<? super T, ? extends Publisher<U>> throttlerFactory) sampleTimeout(Function<? super T, ? extends Publisher<U>> throttlerFactory, int maxConcurrency) final <A> Flux<A>scan(A initial, BiFunction<A, ? super T, A> accumulator) Reduce thisFluxvalues with an accumulatorBiFunctionand also emit the intermediate results of this function.scan(BiFunction<T, T, T> accumulator) Reduce thisFluxvalues with an accumulatorBiFunctionand also emit the intermediate results of this function.final <A> Flux<A>scanWith(Supplier<A> initial, BiFunction<A, ? super T, A> accumulator) Reduce thisFluxvalues with the help of an accumulatorBiFunctionand also emits the intermediate results.share()single()Expect and emit a single item from thisFluxsource or signalNoSuchElementExceptionfor an empty source, orIndexOutOfBoundsExceptionfor a source with more than one element.Expect and emit a single item from thisFluxsource and emit a default value for an empty source, but signal anIndexOutOfBoundsExceptionfor a source with more than one element.Expect and emit a single item from thisFluxsource, and accept an empty source but signal anIndexOutOfBoundsExceptionfor a source with more than one element.skip(long skipped) Skip the specified number of elements from the beginning of thisFluxthen emit the remaining elements.Skip elements from thisFluxemitted within the specified initial duration.skipLast(int n) Skip a specified number of elements at the end of thisFluxsequence.skipUntilOther(Publisher<?> other) sort()Sort elements from thisFluxby collecting and sorting them in the background then emitting the sorted sequence once this sequence completes.sort(Comparator<? super T> sortFunction) Sort elements from thisFluxusing aComparatorfunction, by collecting and sorting elements in the background then emitting the sorted sequence once this sequence completes.Prepend the given values before thisFluxsequence.final DisposableSubscribe to thisFluxand request unbounded demand.final Disposablesubscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer) final Disposablesubscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer) Deprecated.Because users tend to forget torequestthe subsciption.final Disposablesubscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Context initialContext) final Disposablefinal Disposablefinal voidsubscribe(Subscriber<? super T> actual) abstract voidsubscribe(CoreSubscriber<? super T> actual) An internalPublisher.subscribe(Subscriber)that will bypassHooks.onLastOperator(Function)pointcut.subscribeOn(Scheduler scheduler) Run subscribe, onSubscribe and request on a specifiedScheduler'sScheduler.Worker.subscribeOn(Scheduler scheduler, boolean requestOnSeparateThread) Run subscribe and onSubscribe on a specifiedScheduler'sScheduler.Worker.final <E extends Subscriber<? super T>>
EsubscribeWith(E subscriber) Subscribe a provided instance of a subclass ofSubscriberto thisFluxand return said instance for further chaining calls.switchIfEmpty(Publisher<? extends T> alternate) Switch to an alternativePublisherif this sequence is completed without any data.final <V> Flux<V>final <V> Flux<V>Deprecated.to be removed in 3.6.0 at the earliest.final <V> Flux<V>switchOnFirst(BiFunction<Signal<? extends T>, Flux<T>, Publisher<? extends V>> transformer) Transform the currentFluxonce it emits its first element, making a conditional transformation possible.final <V> Flux<V>switchOnFirst(BiFunction<Signal<? extends T>, Flux<T>, Publisher<? extends V>> transformer, boolean cancelSourceOnComplete) Transform the currentFluxonce it emits its first element, making a conditional transformation possible.static <T> Flux<T>switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers) static <T> Flux<T>switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers, int prefetch) Deprecated.to be removed in 3.6.0 at the earliest.Tag this flux with a key/value pair.take(long n) Take only the first N values from thisFlux, if available.take(long n, boolean limitRequest) Take only the first N values from thisFlux, if available.takeLast(int n) Emit the last N values thisFluxemitted before its completion.takeUntilOther(Publisher<?> other) Relay values from thisFluxwhile a predicate returns TRUE for the values (checked before each value is delivered).tap(Function<ContextView, SignalListener<T>> listenerGenerator) Tap into Reactive Streams signals emitted or received by thisFluxand notify a stateful per-SubscriberSignalListener.tap(Supplier<SignalListener<T>> simpleListenerGenerator) Tap into Reactive Streams signals emitted or received by thisFluxand notify a stateful per-SubscriberSignalListener.tap(SignalListenerFactory<T, ?> listenerFactory) Tap into Reactive Streams signals emitted or received by thisFluxand notify a stateful per-SubscriberSignalListenercreated by the providedSignalListenerFactory.then()Return aMono<Void>that completes when thisFluxcompletes.final <V> Mono<V>Return aMono<Void>that waits for thisFluxto complete then for a suppliedPublisher<Void>to also complete.final <V> Flux<V>timed()TimesSubscriber.onNext(Object)events, encapsulated into aTimedobject that lets downstream consumer look at various time information gathered with nanosecond resolution using the default clock (Schedulers.parallel()):Timed.elapsed(): the time in nanoseconds since last event, as aDuration.TimesSubscriber.onNext(Object)events, encapsulated into aTimedobject that lets downstream consumer look at various time information gathered with nanosecond resolution using the providedScheduleras a clock:Timed.elapsed(): the time in nanoseconds since last event, as aDuration.Propagate aTimeoutExceptionas soon as no item is emitted within the givenDurationfrom the previous emission (or the subscription for the first item).Propagate aTimeoutExceptionas soon as no item is emitted within the givenDurationfrom the previous emission (or the subscription for the first item), as measured by the specifiedScheduler.Signal aTimeoutExceptionin case the first item from thisFluxhas not been emitted before the givenPublisheremits.Signal aTimeoutExceptionin case the first item from thisFluxhas not been emitted before thefirstTimeoutPublisheremits, and whenever each subsequent elements is not emitted before aPublishergenerated from the latest element signals.timeout(Publisher<U> firstTimeout, Function<? super T, ? extends Publisher<V>> nextTimeoutFactory, Publisher<? extends T> fallback) toIterable(int batchSize) toIterable(int batchSize, @Nullable Supplier<Queue<T>> queueProvider) toStream()toStream(int batchSize) toString()final <V> Flux<V>final <V> Flux<V>transformDeferred(Function<? super Flux<T>, ? extends Publisher<V>> transformer) final <V> Flux<V>transformDeferredContextual(BiFunction<? super Flux<T>, ? super ContextView, ? extends Publisher<V>> transformer) static <T,D extends AutoCloseable>
Flux<T>using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier) Uses anAutoCloseableresource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.static <T,D extends AutoCloseable>
Flux<T>using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, boolean eager) Uses anAutoCloseableresource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup) Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager) Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> resourceClosure, Function<? super D, ? extends Publisher<?>> asyncCleanup) Uses a resource, generated by aPublisherfor each individualSubscriber, while streaming the values from aPublisherderived from the same resource.static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> resourceClosure, Function<? super D, ? extends Publisher<?>> asyncComplete, BiFunction<? super D, ? super Throwable, ? extends Publisher<?>> asyncError, Function<? super D, ? extends Publisher<?>> asyncCancel) Uses a resource, generated by aPublisherfor each individualSubscriber, while streaming the values from aPublisherderived from the same resource.window(int maxSize) window(int maxSize, int skip) windowTimeout(int maxSize, Duration maxTime) windowTimeout(int maxSize, Duration maxTime, boolean fairBackpressure) windowTimeout(int maxSize, Duration maxTime, Scheduler timer) windowTimeout(int maxSize, Duration maxTime, Scheduler timer, boolean fairBackpressure) windowUntil(Predicate<T> boundaryTrigger) windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore) windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore, int prefetch) Collect subsequent repetitions of an element (that is, if they arrive right after one another) into multipleFluxwindows.windowUntilChanged(Function<? super T, ? extends V> keySelector, BiPredicate<? super V, ? super V> keyComparator) Collect subsequent repetitions of an element (that is, if they arrive right after one another), as compared by a key extracted through the user providedFunctionand compared using a suppliedBiPredicate, into multipleFluxwindows.windowUntilChanged(Function<? super T, ? super V> keySelector) windowWhen(Publisher<U> bucketOpening, Function<? super U, ? extends Publisher<V>> closeSelector) windowWhile(Predicate<T> inclusionPredicate) windowWhile(Predicate<T> inclusionPredicate, int prefetch) final <U,R> Flux<R> withLatestFrom(Publisher<? extends U> other, BiFunction<? super T, ? super U, ? extends R> resultSelector) Combine the most recently emitted values from both thisFluxand anotherPublisherthrough aBiFunctionand emits the result.static <O> Flux<O>zip(Iterable<? extends Publisher<?>> sources, int prefetch, Function<? super Object[], ? extends O> combinator) Zip multiple sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator).static <O> Flux<O>Zip multiple sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator).static <I,O> Flux<O> zip(Function<? super Object[], ? extends O> combinator, int prefetch, Publisher<? extends I>... sources) Zip multiple sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator).static <I,O> Flux<O> Zip multiple sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator).Zip multiple sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator).Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into aTuple2.static <T1,T2, O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends O> combinator) Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator).zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3) Zip three sources together, that is to say wait for all the sources to emit one element and combine these elements once into aTuple3.zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4) Zip four sources together, that is to say wait for all the sources to emit one element and combine these elements once into aTuple4.zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5) Zip five sources together, that is to say wait for all the sources to emit one element and combine these elements once into aTuple5.zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6) Zip six sources together, that is to say wait for all the sources to emit one element and combine these elements once into aTuple6.zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7) Zip seven sources together, that is to say wait for all the sources to emit one element and combine these elements once into aTuple7.zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7, Publisher<? extends T8> source8) Zip eight sources together, that is to say wait for all the sources to emit one element and combine these elements once into aTuple8.final <T2,V> Flux<V> zipWith(Publisher<? extends T2> source2, int prefetch, BiFunction<? super T, ? super T2, ? extends V> combinator) Zip thisFluxwith anotherPublishersource, that is to say wait for both to emit one element and combine these elements using acombinatorBiFunctionThe operator will continue doing so until any of the sources completes.final <T2,V> Flux<V> zipWith(Publisher<? extends T2> source2, BiFunction<? super T, ? super T2, ? extends V> combinator) Zip thisFluxwith anotherPublishersource, that is to say wait for both to emit one element and combine these elements using acombinatorBiFunctionThe operator will continue doing so until any of the sources completes.zipWithIterable(Iterable<? extends T2> iterable) final <T2,V> Flux<V> zipWithIterable(Iterable<? extends T2> iterable, BiFunction<? super T, ? super T2, ? extends V> zipper) Zip elements from thisFluxwith the content of anIterable, that is to say combine one element from each, pairwise, using the given zipperBiFunction.
-
Constructor Details
-
Flux
public Flux()
-
-
Method Details
-
combineLatest
@SafeVarargs public static <T,V> Flux<V> combineLatest(Function<Object[], V> combinator, Publisher<? extends T>... sources) Build aFluxwhose data are generated by the combination of the most recently published value from each of thePublishersources.Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
- Type Parameters:
T- type of the value from sourcesV- The produced output after transformation by the given combinator- Parameters:
sources- ThePublishersources to combine values fromcombinator- The aggregate function that will receive the latest value from each upstream and return the value to signal downstream- Returns:
- a
Fluxbased on the produced combinations
-
combineLatest
@SafeVarargs public static <T,V> Flux<V> combineLatest(Function<Object[], V> combinator, int prefetch, Publisher<? extends T>... sources) Build aFluxwhose data are generated by the combination of the most recently published value from each of thePublishersources.Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
- Type Parameters:
T- type of the value from sourcesV- The produced output after transformation by the given combinator- Parameters:
sources- ThePublishersources to combine values fromprefetch- The demand sent to each combined sourcePublishercombinator- The aggregate function that will receive the latest value from each upstream and return the value to signal downstream- Returns:
- a
Fluxbased on the produced combinations
-
combineLatest
public static <T1,T2, Flux<V> combineLatestV> (Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends V> combinator) Build aFluxwhose data are generated by the combination of the most recently published value from each of twoPublishersources.Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
- Type Parameters:
T1- type of the value from source1T2- type of the value from source2V- The produced output after transformation by the given combinator- Parameters:
source1- The firstPublishersource to combine values fromsource2- The secondPublishersource to combine values fromcombinator- The aggregate function that will receive the latest value from each upstream and return the value to signal downstream- Returns:
- a
Fluxbased on the produced combinations
-
combineLatest
public static <T1,T2, Flux<V> combineLatestT3, V> (Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Function<Object[], V> combinator) Build aFluxwhose data are generated by the combination of the most recently published value from each of threePublishersources.Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
- Type Parameters:
T1- type of the value from source1T2- type of the value from source2T3- type of the value from source3V- The produced output after transformation by the given combinator- Parameters:
source1- The firstPublishersource to combine values fromsource2- The secondPublishersource to combine values fromsource3- The thirdPublishersource to combine values fromcombinator- The aggregate function that will receive the latest value from each upstream and return the value to signal downstream- Returns:
- a
Fluxbased on the produced combinations
-
combineLatest
public static <T1,T2, Flux<V> combineLatestT3, T4, V> (Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Function<Object[], V> combinator) Build aFluxwhose data are generated by the combination of the most recently published value from each of fourPublishersources.Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
- Type Parameters:
T1- type of the value from source1T2- type of the value from source2T3- type of the value from source3T4- type of the value from source4V- The produced output after transformation by the given combinator- Parameters:
source1- The firstPublishersource to combine values fromsource2- The secondPublishersource to combine values fromsource3- The thirdPublishersource to combine values fromsource4- The fourthPublishersource to combine values fromcombinator- The aggregate function that will receive the latest value from each upstream and return the value to signal downstream- Returns:
- a
Fluxbased on the produced combinations
-
combineLatest
public static <T1,T2, Flux<V> combineLatestT3, T4, T5, V> (Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Function<Object[], V> combinator) Build aFluxwhose data are generated by the combination of the most recently published value from each of fivePublishersources.Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
- Type Parameters:
T1- type of the value from source1T2- type of the value from source2T3- type of the value from source3T4- type of the value from source4T5- type of the value from source5V- The produced output after transformation by the given combinator- Parameters:
source1- The firstPublishersource to combine values fromsource2- The secondPublishersource to combine values fromsource3- The thirdPublishersource to combine values fromsource4- The fourthPublishersource to combine values fromsource5- The fifthPublishersource to combine values fromcombinator- The aggregate function that will receive the latest value from each upstream and return the value to signal downstream- Returns:
- a
Fluxbased on the produced combinations
-
combineLatest
public static <T1,T2, Flux<V> combineLatestT3, T4, T5, T6, V> (Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Function<Object[], V> combinator) Build aFluxwhose data are generated by the combination of the most recently published value from each of sixPublishersources.Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
- Type Parameters:
T1- type of the value from source1T2- type of the value from source2T3- type of the value from source3T4- type of the value from source4T5- type of the value from source5T6- type of the value from source6V- The produced output after transformation by the given combinator- Parameters:
source1- The firstPublishersource to combine values fromsource2- The secondPublishersource to combine values fromsource3- The thirdPublishersource to combine values fromsource4- The fourthPublishersource to combine values fromsource5- The fifthPublishersource to combine values fromsource6- The sixthPublishersource to combine values fromcombinator- The aggregate function that will receive the latest value from each upstream and return the value to signal downstream- Returns:
- a
Fluxbased on the produced combinations
-
combineLatest
public static <T,V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<Object[], V> combinator) Build aFluxwhose data are generated by the combination of the most recently published value from each of thePublishersources provided in anIterable.Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
- Type Parameters:
T- The common base type of the values from sourcesV- The produced output after transformation by the given combinator- Parameters:
sources- The list ofPublishersources to combine values fromcombinator- The aggregate function that will receive the latest value from each upstream and return the value to signal downstream- Returns:
- a
Fluxbased on the produced combinations
-
combineLatest
public static <T,V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources, int prefetch, Function<Object[], V> combinator) Build aFluxwhose data are generated by the combination of the most recently published value from each of thePublishersources provided in anIterable.Discard Support: This operator is NOT suited for types that need guaranteed discard of unpropagated elements, as it doesn't track which elements have been used by the combinator and which haven't. Furthermore, elements can and will be passed to the combinator multiple times.
- Type Parameters:
T- The common base type of the values from sourcesV- The produced output after transformation by the given combinator- Parameters:
sources- The list ofPublishersources to combine values fromprefetch- demand produced to each combined sourcePublishercombinator- The aggregate function that will receive the latest value from each upstream and return the value to signal downstream- Returns:
- a
Fluxbased on the produced combinations
-
concat
Concatenate all sources provided in anIterable, forwarding elements emitted by the sources downstream.Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
-
concatWithValues
Concatenates the values to the end of theFlux- Parameters:
values- The values to concatenate- Returns:
- a new
Fluxconcatenating all source sequences
-
concat
Concatenate all sources emitted as an onNext signal from a parentPublisher, forwarding elements emitted by the sources downstream.Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
-
concat
Concatenate all sources emitted as an onNext signal from a parentPublisher, forwarding elements emitted by the sources downstream.Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
-
concat
Concatenate all sources provided as a vararg, forwarding elements emitted by the sources downstream.Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
-
concatDelayError
Concatenate all sources emitted as an onNext signal from a parentPublisher, forwarding elements emitted by the sources downstream.Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
-
concatDelayError
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, int prefetch) Concatenate all sources emitted as an onNext signal from a parentPublisher, forwarding elements emitted by the sources downstream.Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
- Type Parameters:
T- The type of values in both source and output sequences- Parameters:
sources- ThePublisherofPublisherto concatenateprefetch- number of elements to prefetch from the source, to be turned into inner Publishers- Returns:
- a new
Fluxconcatenating all inner sources sequences until complete or error
-
concatDelayError
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, boolean delayUntilEnd, int prefetch) Concatenate all sources emitted as an onNext signal from a parentPublisher, forwarding elements emitted by the sources downstream.Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes.
Errors do not interrupt the main sequence but are propagated after the current concat backlog if
delayUntilEndis false or after all sources have had a chance to be concatenated ifdelayUntilEndis true.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
- Type Parameters:
T- The type of values in both source and output sequences- Parameters:
sources- ThePublisherofPublisherto concatenatedelayUntilEnd- delay error until all sources have been consumed instead of after the current sourceprefetch- the number of Publishers to prefetch from the outerPublisher- Returns:
- a new
Fluxconcatenating all inner sources sequences until complete or error
-
concatDelayError
Concatenate all sources provided as a vararg, forwarding elements emitted by the sources downstream.Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Errors do not interrupt the main sequence but are propagated after the rest of the sources have had a chance to be concatenated.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
-
create
Programmatically create aFluxwith the capability of emitting multiple elements in a synchronous or asynchronous manner through theFluxSinkAPI. This includes emitting elements from multiple threads.This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>create(emitter -> { ActionListener al = e -> { emitter.next(textField.getText()); }; // without cleanup support: button.addActionListener(al); // with cleanup support: button.addActionListener(al); emitter.onDispose(() -> { button.removeListener(al); }); });Discard Support: The
FluxSinkexposed by this operator buffers in case of overflow. The buffer is discarded when the main sequence is cancelled. -
create
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure) Programmatically create aFluxwith the capability of emitting multiple elements in a synchronous or asynchronous manner through theFluxSinkAPI. This includes emitting elements from multiple threads.This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>create(emitter -> { ActionListener al = e -> { emitter.next(textField.getText()); }; // without cleanup support: button.addActionListener(al); // with cleanup support: button.addActionListener(al); emitter.onDispose(() -> { button.removeListener(al); }); }, FluxSink.OverflowStrategy.LATEST);Discard Support: The
FluxSinkexposed by this operator discards elements as relevant to the chosenFluxSink.OverflowStrategy. For example, theFluxSink.OverflowStrategy.DROPdiscards each items as they are being dropped, whileFluxSink.OverflowStrategy.BUFFERwill discard the buffer upon cancellation.- Type Parameters:
T- The type of values in the sequence- Parameters:
backpressure- the backpressure mode, seeFluxSink.OverflowStrategyfor the available backpressure modesemitter- Consume theFluxSinkprovided per-subscriber by Reactor to generate signals.- Returns:
- a
Flux - See Also:
-
push
Programmatically create aFluxwith the capability of emitting multiple elements from a single-threaded producer through theFluxSinkAPI. For a multi-threaded capable alternative, seecreate(Consumer).This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>push(emitter -> { ActionListener al = e -> { emitter.next(textField.getText()); }; // without cleanup support: button.addActionListener(al); // with cleanup support: button.addActionListener(al); emitter.onDispose(() -> { button.removeListener(al); }); });Discard Support: The
FluxSinkexposed by this operator buffers in case of overflow. The buffer is discarded when the main sequence is cancelled. -
push
public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure) Programmatically create aFluxwith the capability of emitting multiple elements from a single-threaded producer through theFluxSinkAPI. For a multi-threaded capable alternative, seecreate(Consumer, reactor.core.publisher.FluxSink.OverflowStrategy).This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>push(emitter -> { ActionListener al = e -> { emitter.next(textField.getText()); }; // without cleanup support: button.addActionListener(al); // with cleanup support: button.addActionListener(al); emitter.onDispose(() -> { button.removeListener(al); }); }, FluxSink.OverflowStrategy.LATEST);Discard Support: The
FluxSinkexposed by this operator discards elements as relevant to the chosenFluxSink.OverflowStrategy. For example, theFluxSink.OverflowStrategy.DROPdiscards each items as they are being dropped, whileFluxSink.OverflowStrategy.BUFFERwill discard the buffer upon cancellation.- Type Parameters:
T- The type of values in the sequence- Parameters:
backpressure- the backpressure mode, seeFluxSink.OverflowStrategyfor the available backpressure modesemitter- Consume theFluxSinkprovided per-subscriber by Reactor to generate signals.- Returns:
- a
Flux - See Also:
-
defer
Lazily supply aPublisherevery time aSubscriptionis made on the resultingFlux, so the actual source instantiation is deferred until each subscribe and theSuppliercan create a subscriber-specific instance. If the supplier doesn't generate a new instance however, this operator will effectively behave likefrom(Publisher). -
deferContextual
public static <T> Flux<T> deferContextual(Function<ContextView, ? extends Publisher<T>> contextualPublisherFactory) Lazily supply aPublisherevery time aSubscriptionis made on the resultingFlux, so the actual source instantiation is deferred until each subscribe and theFunctioncan create a subscriber-specific instance. This operator behaves the same way asdefer(Supplier), but accepts aFunctionthat will receive the currentContextViewas an argument. If the function doesn't generate a new instance however, this operator will effectively behave likefrom(Publisher). -
empty
Create aFluxthat completes without emitting any item.- Type Parameters:
T- the reified type of the targetSubscriber- Returns:
- an empty
Flux
-
error
Create aFluxthat terminates with the specified error immediately after being subscribed to.- Type Parameters:
T- the reified type of the targetSubscriber- Parameters:
error- the error to signal to eachSubscriber- Returns:
- a new failing
Flux
-
error
Create aFluxthat terminates with an error immediately after being subscribed to. TheThrowableis generated by aSupplier, invoked each time there is a subscription and allowing for lazy instantiation.- Type Parameters:
T- the reified type of the targetSubscriber- Parameters:
errorSupplier- the error signalSupplierto invoke for eachSubscriber- Returns:
- a new failing
Flux
-
error
Create aFluxthat terminates with the specified error, either immediately after being subscribed to or after being first requested.- Type Parameters:
O- the reified type of the targetSubscriber- Parameters:
throwable- the error to signal to eachSubscriberwhenRequested- if true, will onError on the first request instead of subscribe().- Returns:
- a new failing
Flux
-
first
Deprecated.usefirstWithSignal(Publisher[]). To be removed in reactor 3.5.Pick the firstPublisherto emit any signal (onNext/onError/onComplete) and replay all signals from thatPublisher, effectively behaving like the fastest of these competing sources.- Type Parameters:
I- The type of values in both source and output sequences- Parameters:
sources- The competing source publishers- Returns:
- a new
Fluxbehaving like the fastest of its sources
-
first
Deprecated.usefirstWithSignal(Iterable). To be removed in reactor 3.5.Pick the firstPublisherto emit any signal (onNext/onError/onComplete) and replay all signals from thatPublisher, effectively behaving like the fastest of these competing sources.- Type Parameters:
I- The type of values in both source and output sequences- Parameters:
sources- The competing source publishers- Returns:
- a new
Fluxbehaving like the fastest of its sources
-
firstWithSignal
Pick the firstPublisherto emit any signal (onNext/onError/onComplete) and replay all signals from thatPublisher, effectively behaving like the fastest of these competing sources.- Type Parameters:
I- The type of values in both source and output sequences- Parameters:
sources- The competing source publishers- Returns:
- a new
Fluxbehaving like the fastest of its sources
-
firstWithSignal
Pick the firstPublisherto emit any signal (onNext/onError/onComplete) and replay all signals from thatPublisher, effectively behaving like the fastest of these competing sources.- Type Parameters:
I- The type of values in both source and output sequences- Parameters:
sources- The competing source publishers- Returns:
- a new
Fluxbehaving like the fastest of its sources
-
firstWithValue
Pick the firstPublisherto emit any value and replay all values from thatPublisher, effectively behaving like the source that first emits anonNext.Sources with values always "win" over empty sources (ones that only emit onComplete) or failing sources (ones that only emit onError).
When no source can provide a value, this operator fails with a
NoSuchElementException(provided there are at least two sources). This exception has acompositeas itscausethat can be used to inspect what went wrong with each source (so the composite has as many elements as there are sources).Exceptions from failing sources are directly reflected in the composite at the index of the failing source. For empty sources, a
NoSuchElementExceptionis added at their respective index. One can useExceptions.unwrapMultiple(topLevel.getCause())to easily inspect these errors as aList.Note that like in
firstWithSignal(Iterable), an infinite source can be problematic if no other source emits onNext. -
firstWithValue
@SafeVarargs public static <I> Flux<I> firstWithValue(Publisher<? extends I> first, Publisher<? extends I>... others) Pick the firstPublisherto emit any value and replay all values from thatPublisher, effectively behaving like the source that first emits anonNext.Sources with values always "win" over an empty source (ones that only emit onComplete) or failing sources (ones that only emit onError).
When no source can provide a value, this operator fails with a
NoSuchElementException(provided there are at least two sources). This exception has acompositeas itscausethat can be used to inspect what went wrong with each source (so the composite has as many elements as there are sources).Exceptions from failing sources are directly reflected in the composite at the index of the failing source. For empty sources, a
NoSuchElementExceptionis added at their respective index. One can useExceptions.unwrapMultiple(topLevel.getCause())to easily inspect these errors as aList.Note that like in
firstWithSignal(Publisher[]), an infinite source can be problematic if no other source emits onNext. In case thefirstsource is already an array-basedfirstWithValue(Publisher, Publisher[])instance, nesting is avoided: a single new array-based instance is created with all the sources fromfirstplus all theotherssources at the same level.- Type Parameters:
I- The type of values in both source and output sequences- Parameters:
first- The first competing source publisherothers- The other competing source publishers- Returns:
- a new
Fluxbehaving like the fastest of its sources
-
from
Decorate the specifiedPublisherwith theFluxAPI.Hooks.onEachOperator(String, Function)and similar assembly hooks are applied unless the source is already aFlux.- Type Parameters:
T- The type of values in both source and output sequences- Parameters:
source- the source to decorate- Returns:
- a new
Flux
-
fromArray
Create aFluxthat emits the items contained in the provided array.- Type Parameters:
T- The type of values in the source array and resulting Flux- Parameters:
array- the array to read data from- Returns:
- a new
Flux
-
fromIterable
Create aFluxthat emits the items contained in the providedIterable. TheIterable.iterator()method will be invoked at least once and at most twice for each subscriber.This operator inspects the
Iterable'sSpliteratorto assess if the iteration can be guaranteed to be finite (seeOperators.onDiscardMultiple(Iterator, boolean, Context)). Since the default Spliterator wraps the Iterator we can have twoIterable.iterator()calls. This second invocation is skipped on aCollectionhowever, a type which is assumed to be always finite.Discard Support: Upon cancellation, this operator attempts to discard the remainder of the
Iterableif it can safely ensure the iterator is finite. Note that this means theIterable.iterator()method could be invoked twice. -
fromStream
Create aFluxthat emits the items contained in the providedStream. Keep in mind that aStreamcannot be re-used, which can be problematic in case of multiple subscriptions or re-subscription (like withrepeat()orretry()). TheStreamisclosedautomatically by the operator on cancellation, error or completion.Discard Support: Upon cancellation, this operator attempts to discard remainder of the
Streamthrough its openSpliterator, if it can safely ensure it is finite (seeOperators.onDiscardMultiple(Iterator, boolean, Context)). -
fromStream
Create aFluxthat emits the items contained in aStreamcreated by the providedSupplierfor each subscription. TheStreamisclosedautomatically by the operator on cancellation, error or completion.Discard Support: Upon cancellation, this operator attempts to discard remainder of the
Streamthrough its openSpliterator, if it can safely ensure it is finite (seeOperators.onDiscardMultiple(Iterator, boolean, Context)). -
generate
Programmatically create aFluxby generating signals one-by-one via a consumer callback.- Type Parameters:
T- the value type emitted- Parameters:
generator- Consume theSynchronousSinkprovided per-subscriber by Reactor to generate a single signal on each pass.- Returns:
- a
Flux
-
generate
public static <T,S extends @Nullable Object> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) Programmatically create aFluxby generating signals one-by-one via a consumer callback and some state. ThestateSuppliermay return null.- Type Parameters:
T- the value type emittedS- the per-subscriber custom state type- Parameters:
stateSupplier- called for each incoming Subscriber to provide the initial state for the generator bifunctiongenerator- Consume theSynchronousSinkprovided per-subscriber by Reactor as well as the current state to generate a single signal on each pass and return a (new) state.- Returns:
- a
Flux
-
generate
public static <T,S extends @Nullable Object> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer) Programmatically create aFluxby generating signals one-by-one via a consumer callback and some state, with a final cleanup callback. ThestateSuppliermay return null but your cleanupstateConsumerwill need to handle the null case.- Type Parameters:
T- the value type emittedS- the per-subscriber custom state type- Parameters:
stateSupplier- called for each incoming Subscriber to provide the initial state for the generator bifunctiongenerator- Consume theSynchronousSinkprovided per-subscriber by Reactor as well as the current state to generate a single signal on each pass and return a (new) state.stateConsumer- called after the generator has terminated or the downstream cancelled, receiving the last state to be handled (i.e., release resources or do other cleanup).- Returns:
- a
Flux
-
interval
Create aFluxthat emits long values starting with 0 and incrementing at specified time intervals on the global timer. The first element is emitted after an initial delay equal to theperiod. If demand is not produced in time, an onError will be signalled with anoverflowIllegalStateExceptiondetailing the tick that couldn't be emitted. In normal conditions, theFluxwill never complete.Runs on the
Schedulers.parallel()Scheduler. -
interval
Create aFluxthat emits long values starting with 0 and incrementing at specified time intervals, after an initial delay, on the global timer. If demand is not produced in time, an onError will be signalled with anoverflowIllegalStateExceptiondetailing the tick that couldn't be emitted. In normal conditions, theFluxwill never complete.Runs on the
Schedulers.parallel()Scheduler. -
interval
Create aFluxthat emits long values starting with 0 and incrementing at specified time intervals, on the specifiedScheduler. The first element is emitted after an initial delay equal to theperiod. If demand is not produced in time, an onError will be signalled with anoverflowIllegalStateExceptiondetailing the tick that couldn't be emitted. In normal conditions, theFluxwill never complete. -
interval
Create aFluxthat emits long values starting with 0 and incrementing at specified time intervals, after an initial delay, on the specifiedScheduler. If demand is not produced in time, an onError will be signalled with anoverflowIllegalStateExceptiondetailing the tick that couldn't be emitted. In normal conditions, theFluxwill never complete. -
just
Create aFluxthat emits the provided elements and then completes.- Type Parameters:
T- the emitted data type- Parameters:
data- the elements to emit, as a vararg- Returns:
- a new
Flux
-
just
Create a newFluxthat will only emit a single element then onComplete.- Type Parameters:
T- the emitted data type- Parameters:
data- the single element to emit- Returns:
- a new
Flux
-
merge
Merge data fromPublishersequences emitted by the passedPublisherinto an interleaved merged sequence. Unlikeconcat, inner sources are subscribed to eagerly.Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
-
merge
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency) Merge data fromPublishersequences emitted by the passedPublisherinto an interleaved merged sequence. Unlikeconcat, inner sources are subscribed to eagerly (but at mostconcurrencysources are subscribed to at the same time).Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
-
merge
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency, int prefetch) Merge data fromPublishersequences emitted by the passedPublisherinto an interleaved merged sequence. Unlikeconcat, inner sources are subscribed to eagerly (but at mostconcurrencysources are subscribed to at the same time).Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
-
merge
Merge data fromPublishersequences contained in anIterableinto an interleaved merged sequence. Unlikeconcat, inner sources are subscribed to eagerly. A newIteratorwill be created for each subscriber.Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
-
merge
Merge data fromPublishersequences contained in an array / vararg into an interleaved merged sequence. Unlikeconcat, sources are subscribed to eagerly.Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
-
merge
Merge data fromPublishersequences contained in an array / vararg into an interleaved merged sequence. Unlikeconcat, sources are subscribed to eagerly.Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
-
mergeDelayError
@SafeVarargs public static <I> Flux<I> mergeDelayError(int prefetch, Publisher<? extends I>... sources) Merge data fromPublishersequences contained in an array / vararg into an interleaved merged sequence. Unlikeconcat, sources are subscribed to eagerly. This variant will delay any error until after the rest of the merge backlog has been processed.Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
-
mergePriority
@SafeVarargs public static <I extends Comparable<? super I>> Flux<I> mergePriority(Publisher<? extends I>... sources) Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by their natural order) as they arrive. This is not asort(), as it doesn't consider the whole of each sequences. Unlike mergeComparing, this operator does not wait for a value from each source to arrive either.While this operator does retrieve at most one value from each source, it only compares values when two or more sources emit at the same time. In that case it picks the smallest of these competing values and continues doing so as long as there is demand. It is therefore best suited for asynchronous sources where you do not want to wait for a value from each source before emitting a value downstream.
- Type Parameters:
I- aComparablemerged type that has anatural order- Parameters:
sources-Publishersources ofComparableto merge- Returns:
- a merged
Fluxthat compares the latest available value from each source, publishing the smallest value and replenishing the source that produced it.
-
mergePriority
@SafeVarargs public static <T> Flux<T> mergePriority(Comparator<? super T> comparator, Publisher<? extends T>... sources) Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the providedComparator) as they arrive. This is not asort(Comparator), as it doesn't consider the whole of each sequences. Unlike mergeComparing, this operator does not wait for a value from each source to arrive either.While this operator does retrieve at most one value from each source, it only compares values when two or more sources emit at the same time. In that case it picks the smallest of these competing values and continues doing so as long as there is demand. It is therefore best suited for asynchronous sources where you do not want to wait for a value from each source before emitting a value downstream.
- Type Parameters:
T- the merged type- Parameters:
comparator- theComparatorto use to find the smallest valuesources-Publishersources to merge- Returns:
- a merged
Fluxthat compares the latest available value from each source, publishing the smallest value and replenishing the source that produced it.
-
mergePriority
@SafeVarargs public static <T> Flux<T> mergePriority(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources) Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the providedComparator) as they arrive. This is not asort(Comparator), as it doesn't consider the whole of each sequences. Unlike mergeComparing, this operator does not wait for a value from each source to arrive either.While this operator does retrieve at most one value from each source, it only compares values when two or more sources emit at the same time. In that case it picks the smallest of these competing values and continues doing so as long as there is demand. It is therefore best suited for asynchronous sources where you do not want to wait for a value from each source before emitting a value downstream.
- Type Parameters:
T- the merged type- Parameters:
prefetch- the number of elements to prefetch from each source (avoiding too many small requests to the source when picking)comparator- theComparatorto use to find the smallest valuesources-Publishersources to merge- Returns:
- a merged
Fluxthat compares the latest available value from each source, publishing the smallest value and replenishing the source that produced it.
-
mergePriorityDelayError
@SafeVarargs public static <T> Flux<T> mergePriorityDelayError(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources) Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the providedComparator) as they arrive. This is not asort(Comparator), as it doesn't consider the whole of each sequences. Unlike mergeComparing, this operator does not wait for a value from each source to arrive either.While this operator does retrieve at most one value from each source, it only compares values when two or more sources emit at the same time. In that case it picks the smallest of these competing values and continues doing so as long as there is demand. It is therefore best suited for asynchronous sources where you do not want to wait for a value from each source before emitting a value downstream.
Note that it is delaying errors until all data is consumed.
- Type Parameters:
T- the merged type- Parameters:
prefetch- the number of elements to prefetch from each source (avoiding too many small requests to the source when picking)comparator- theComparatorto use to find the smallest valuesources-Publishersources to merge- Returns:
- a merged
Fluxthat compares the latest available value from each source, publishing the smallest value and replenishing the source that produced it.
-
mergeComparing
@SafeVarargs public static <I extends Comparable<? super I>> Flux<I> mergeComparing(Publisher<? extends I>... sources) Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by their natural order). This is not asort(), as it doesn't consider the whole of each sequences.Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
- Type Parameters:
I- aComparablemerged type that has anatural order- Parameters:
sources-Publishersources ofComparableto merge- Returns:
- a merged
Fluxthat , subscribing early but keeping the original ordering
-
mergeComparing
@SafeVarargs public static <T> Flux<T> mergeComparing(Comparator<? super T> comparator, Publisher<? extends T>... sources) Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the providedComparator). This is not asort(Comparator), as it doesn't consider the whole of each sequences.Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
- Type Parameters:
T- the merged type- Parameters:
comparator- theComparatorto use to find the smallest valuesources-Publishersources to merge- Returns:
- a merged
Fluxthat compares latest values from each source, using the smallest value and replenishing the source that produced it
-
mergeComparing
@SafeVarargs public static <T> Flux<T> mergeComparing(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources) Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the providedComparator). This is not asort(Comparator), as it doesn't consider the whole of each sequences.Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
- Type Parameters:
T- the merged type- Parameters:
prefetch- the number of elements to prefetch from each source (avoiding too many small requests to the source when picking)comparator- theComparatorto use to find the smallest valuesources-Publishersources to merge- Returns:
- a merged
Fluxthat compares latest values from each source, using the smallest value and replenishing the source that produced it
-
mergeComparingDelayError
@SafeVarargs public static <T> Flux<T> mergeComparingDelayError(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources) Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the providedComparator). This is not asort(Comparator), as it doesn't consider the whole of each sequences.Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
Note that it is delaying errors until all data is consumed.
- Type Parameters:
T- the merged type- Parameters:
prefetch- the number of elements to prefetch from each source (avoiding too many small requests to the source when picking)comparator- theComparatorto use to find the smallest valuesources-Publishersources to merge- Returns:
- a merged
Fluxthat compares latest values from each source, using the smallest value and replenishing the source that produced it
-
mergeOrdered
@SafeVarargs @Deprecated public static <I extends Comparable<? super I>> Flux<I> mergeOrdered(Publisher<? extends I>... sources) Deprecated.UsemergeComparingDelayError(int, Comparator, Publisher[])instead (asmergeComparing(Publisher[])don't have this operator's delayError behavior). To be removed in 3.6.0 at the earliest.Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by their natural order). This is not asort(), as it doesn't consider the whole of each sequences.Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
Note that it is delaying errors until all data is consumed.
- Type Parameters:
I- aComparablemerged type that has anatural order- Parameters:
sources-Publishersources ofComparableto merge- Returns:
- a merged
Fluxthat compares latest values from each source, using the smallest value and replenishing the source that produced it
-
mergeOrdered
@SafeVarargs @Deprecated public static <T> Flux<T> mergeOrdered(Comparator<? super T> comparator, Publisher<? extends T>... sources) Deprecated.UsemergeComparingDelayError(int, Comparator, Publisher[])instead (asmergeComparing(Publisher[])don't have this operator's delayError behavior). To be removed in 3.6.0 at the earliest.Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the providedComparator). This is not asort(Comparator), as it doesn't consider the whole of each sequences.Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
Note that it is delaying errors until all data is consumed.
- Type Parameters:
T- the merged type- Parameters:
comparator- theComparatorto use to find the smallest valuesources-Publishersources to merge- Returns:
- a merged
Fluxthat compares latest values from each source, using the smallest value and replenishing the source that produced it
-
mergeOrdered
@SafeVarargs @Deprecated public static <T> Flux<T> mergeOrdered(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources) Deprecated.UsemergeComparingDelayError(int, Comparator, Publisher[])instead (asmergeComparing(Publisher[])don't have this operator's delayError behavior). To be removed in 3.6.0 at the earliest.Merge data from providedPublishersequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the providedComparator). This is not asort(Comparator), as it doesn't consider the whole of each sequences.Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
Note that it is delaying errors until all data is consumed.
- Type Parameters:
T- the merged type- Parameters:
prefetch- the number of elements to prefetch from each source (avoiding too many small requests to the source when picking)comparator- theComparatorto use to find the smallest valuesources-Publishersources to merge- Returns:
- a merged
Fluxthat compares latest values from each source, using the smallest value and replenishing the source that produced it
-
mergeSequential
-
mergeSequential
public static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch) Merge data fromPublishersequences emitted by the passedPublisherinto an ordered merged sequence. Unlike concat, the inner publishers are subscribed to eagerly (but at mostmaxConcurrencysources at a time). Unlike merge, their emitted values are merged into the final sequence in subscription order.- Type Parameters:
T- the merged type- Parameters:
sources- aPublisherofPublishersources to mergeprefetch- the inner source request sizemaxConcurrency- the request produced to the main source thus limiting concurrent merge backlog- Returns:
- a merged
Flux, subscribing early but keeping the original ordering
-
mergeSequentialDelayError
public static <T> Flux<T> mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch) Merge data fromPublishersequences emitted by the passedPublisherinto an ordered merged sequence. Unlike concat, the inner publishers are subscribed to eagerly (but at mostmaxConcurrencysources at a time). Unlike merge, their emitted values are merged into the final sequence in subscription order. This variant will delay any error until after the rest of the mergeSequential backlog has been processed.- Type Parameters:
T- the merged type- Parameters:
sources- aPublisherofPublishersources to mergeprefetch- the inner source request sizemaxConcurrency- the request produced to the main source thus limiting concurrent merge backlog- Returns:
- a merged
Flux, subscribing early but keeping the original ordering
-
mergeSequential
Merge data fromPublishersequences provided in an array/vararg into an ordered merged sequence. Unlike concat, sources are subscribed to eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order. -
mergeSequential
@SafeVarargs public static <I> Flux<I> mergeSequential(int prefetch, Publisher<? extends I>... sources) Merge data fromPublishersequences provided in an array/vararg into an ordered merged sequence. Unlike concat, sources are subscribed to eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order. -
mergeSequentialDelayError
@SafeVarargs public static <I> Flux<I> mergeSequentialDelayError(int prefetch, Publisher<? extends I>... sources) Merge data fromPublishersequences provided in an array/vararg into an ordered merged sequence. Unlike concat, sources are subscribed to eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order. This variant will delay any error until after the rest of the mergeSequential backlog has been processed. -
mergeSequential
-
mergeSequential
public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch) Merge data fromPublishersequences provided in anIterableinto an ordered merged sequence. Unlike concat, sources are subscribed to eagerly (but at mostmaxConcurrencysources at a time). Unlike merge, their emitted values are merged into the final sequence in subscription order.- Type Parameters:
I- the merged type- Parameters:
sources- anIterableofPublishersequences to mergemaxConcurrency- the request produced to the main source thus limiting concurrent merge backlogprefetch- the inner source request size- Returns:
- a merged
Flux, subscribing early but keeping the original ordering
-
mergeSequentialDelayError
public static <I> Flux<I> mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources, int maxConcurrency, int prefetch) Merge data fromPublishersequences provided in anIterableinto an ordered merged sequence. Unlike concat, sources are subscribed to eagerly (but at mostmaxConcurrencysources at a time). Unlike merge, their emitted values are merged into the final sequence in subscription order. This variant will delay any error until after the rest of the mergeSequential backlog has been processed.- Type Parameters:
I- the merged type- Parameters:
sources- anIterableofPublishersequences to mergemaxConcurrency- the request produced to the main source thus limiting concurrent merge backlogprefetch- the inner source request size- Returns:
- a merged
Flux, subscribing early but keeping the original ordering
-
never
Create aFluxthat will never signal any data, error or completion signal.- Type Parameters:
T- theSubscribertype target- Returns:
- a never completing
Flux
-
range
Build aFluxthat will only emit a sequence ofcountincrementing integers, starting fromstart. That is, emit integers betweenstart(included) andstart + count(excluded) then complete.- Parameters:
start- the first integer to be emitcount- the total number of incrementing values to emit, including the first value- Returns:
- a ranged
Flux
-
switchOnNext
public static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers) Creates aFluxthat mirrors the most recently emittedPublisher, forwarding its data until a newPublishercomes in the source.The resulting
Fluxwill complete once there are no newPublisherin the source (source has completed) and the last mirroredPublisherhas also completed.This operator requests the
mergedPublisherssource for an unbounded amount of inner publishers, but doesn't request each innerPublisherunless the downstream has made a corresponding request (no prefetch on publishers emitted bymergedPublishers). -
switchOnNext
@Deprecated public static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers, int prefetch) Deprecated.to be removed in 3.6.0 at the earliest. In 3.5.0, you should replace calls with prefetch=0 with calls to switchOnNext(mergedPublishers), as the default behavior of the single-parameter variant will then change to prefetch=0. -
using
public static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup) Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal event.
For an asynchronous version of the cleanup, with distinct path for onComplete, onError and cancel terminations, see
usingWhen(Publisher, Function, Function, BiFunction, Function).- Type Parameters:
T- emitted typeD- resource type- Parameters:
resourceSupplier- aCallablethat is called on subscribe to generate the resourcesourceSupplier- a factory to derive aPublisherfrom the supplied resourceresourceCleanup- a resource cleanup callback invoked on completion- Returns:
- a new
Fluxbuilt around a disposable resource - See Also:
-
using
public static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager) Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.- Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal event.
- Non-eager cleanup will drop any exception.
For an asynchronous version of the cleanup, with distinct path for onComplete, onError and cancel terminations, see
usingWhen(Publisher, Function, Function, BiFunction, Function).- Type Parameters:
T- emitted typeD- resource type- Parameters:
resourceSupplier- aCallablethat is called on subscribe to generate the resourcesourceSupplier- a factory to derive aPublisherfrom the supplied resourceresourceCleanup- a resource cleanup callback invoked on completioneager- true to clean before terminating downstream subscribers- Returns:
- a new
Fluxbuilt around a disposable resource - See Also:
-
using
public static <T,D extends AutoCloseable> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier) Uses anAutoCloseableresource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.Eager
AutoCloseableresource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal event.For an asynchronous version of the cleanup, with distinct path for onComplete, onError and cancel terminations, see
usingWhen(Publisher, Function, Function, BiFunction, Function). -
using
public static <T,D extends AutoCloseable> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, boolean eager) Uses anAutoCloseableresource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.- Eager
AutoCloseableresource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal event. - Non-eager cleanup will drop any exception.
For an asynchronous version of the cleanup, with distinct path for onComplete, onError and cancel terminations, see
usingWhen(Publisher, Function, Function, BiFunction, Function).- Type Parameters:
T- emitted typeD- resource type- Parameters:
resourceSupplier- aCallablethat is called on subscribe to generate the resourcesourceSupplier- a factory to derive aPublisherfrom the supplied resourceeager- true to clean before terminating downstream subscribers- Returns:
- a new
Fluxbuilt around a disposable resource - See Also:
- Eager
-
usingWhen
public static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> resourceClosure, Function<? super D, ? extends Publisher<?>> asyncCleanup) Uses a resource, generated by aPublisherfor each individualSubscriber, while streaming the values from aPublisherderived from the same resource. Whenever the resulting sequence terminates, a providedFunctiongenerates a "cleanup"Publisherthat is invoked but doesn't change the content of the main sequence. Instead it just defers the termination (unless it errors, in which case the error suppresses the original termination signal).Note that if the resource supplying
Publisheremits more than one resource, the subsequent resources are dropped (Operators.onNextDropped(Object, Context)). If the publisher errors AFTER having emitted one resource, the error is also silently dropped (Operators.onErrorDropped(Throwable, Context)). An empty completion or error without at least one onNext signal triggers a short-circuit of the main sequence with the same terminal signal (no resource is established, no cleanup is invoked).- Type Parameters:
T- the type of elements emitted by the resource closure, and thus the main sequenceD- the type of the resource object- Parameters:
resourceSupplier- aPublisherthat "generates" the resource, subscribed for each subscription to the main sequenceresourceClosure- a factory to derive aPublisherfrom the supplied resourceasyncCleanup- an asynchronous resource cleanup invoked when the resource closure terminates (with onComplete, onError or cancel)- Returns:
- a new
Fluxbuilt around a "transactional" resource, with asynchronous cleanup on all terminations (onComplete, onError, cancel)
-
usingWhen
public static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> resourceClosure, Function<? super D, ? extends Publisher<?>> asyncComplete, BiFunction<? super D, ? super Throwable, ? extends Publisher<?>> asyncError, Function<? super D, ? extends Publisher<?>> asyncCancel) Uses a resource, generated by aPublisherfor each individualSubscriber, while streaming the values from aPublisherderived from the same resource. Note that all steps of the operator chain that would need the resource to be in an open stable state need to be described inside theresourceClosureFunction.Whenever the resulting sequence terminates, the relevant
Functiongenerates a "cleanup"Publisherthat is invoked but doesn't change the content of the main sequence. Instead it just defers the termination (unless it errors, in which case the error suppresses the original termination signal).Individual cleanups can also be associated with main sequence cancellation and error terminations:
Note that if the resource supplying
Publisheremits more than one resource, the subsequent resources are dropped (Operators.onNextDropped(Object, Context)). If the publisher errors AFTER having emitted one resource, the error is also silently dropped (Operators.onErrorDropped(Throwable, Context)). An empty completion or error without at least one onNext signal triggers a short-circuit of the main sequence with the same terminal signal (no resource is established, no cleanup is invoked).Additionally, the terminal signal is replaced by any error that might have happened in the terminating
Publisher:Finally, early cancellations will cancel the resource supplying
Publisher:- Type Parameters:
T- the type of elements emitted by the resource closure, and thus the main sequenceD- the type of the resource object- Parameters:
resourceSupplier- aPublisherthat "generates" the resource, subscribed for each subscription to the main sequenceresourceClosure- a factory to derive aPublisherfrom the supplied resourceasyncComplete- an asynchronous resource cleanup invoked if the resource closure terminates with onCompleteasyncError- an asynchronous resource cleanup invoked if the resource closure terminates with onError. The terminating error is provided to theBiFunctionasyncCancel- an asynchronous resource cleanup invoked if the resource closure is cancelled. Whennull, theasyncCompletepath is used instead.- Returns:
- a new
Fluxbuilt around a "transactional" resource, with several termination path triggering asynchronous cleanup sequences - See Also:
-
zip
public static <T1,T2, Flux<O> zipO> (Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends O> combinator) Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.- Type Parameters:
T1- type of the value from source1T2- type of the value from source2O- The produced output after transformation by the combinator- Parameters:
source1- The firstPublishersource to zip.source2- The secondPublishersource to zip.combinator- The aggregate function that will receive a unique value from each upstream and return the value to signal downstream- Returns:
- a zipped
Flux
-
zip
public static <T1,T2> Flux<Tuple2<T1,T2>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2) Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into aTuple2. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. -
zip
public static <T1,T2, Flux<Tuple3<T1,T3> T2, zipT3>> (Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3) Zip three sources together, that is to say wait for all the sources to emit one element and combine these elements once into aTuple3. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.- Type Parameters:
T1- type of the value from source1T2- type of the value from source2T3- type of the value from source3- Parameters:
source1- The first upstreamPublisherto subscribe to.source2- The second upstreamPublisherto subscribe to.source3- The third upstreamPublisherto subscribe to.- Returns:
- a zipped
Flux
-
zip
public static <T1,T2, Flux<Tuple4<T1,T3, T4> T2, zipT3, T4>> (Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4) Zip four sources together, that is to say wait for all the sources to emit one element and combine these elements once into aTuple4. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.- Type Parameters:
T1- type of the value from source1T2- type of the value from source2T3- type of the value from source3T4- type of the value from source4- Parameters:
source1- The first upstreamPublisherto subscribe to.source2- The second upstreamPublisherto subscribe to.source3- The third upstreamPublisherto subscribe to.source4- The fourth upstreamPublisherto subscribe to.- Returns:
- a zipped
Flux
-
zip
public static <T1,T2, Flux<Tuple5<T1,T3, T4, T5> T2, zipT3, T4, T5>> (Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5) Zip five sources together, that is to say wait for all the sources to emit one element and combine these elements once into aTuple5. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.- Type Parameters:
T1- type of the value from source1T2- type of the value from source2T3- type of the value from source3T4- type of the value from source4T5- type of the value from source5- Parameters:
source1- The first upstreamPublisherto subscribe to.source2- The second upstreamPublisherto subscribe to.source3- The third upstreamPublisherto subscribe to.source4- The fourth upstreamPublisherto subscribe to.source5- The fifth upstreamPublisherto subscribe to.- Returns:
- a zipped
Flux
-
zip
public static <T1,T2, Flux<Tuple6<T1,T3, T4, T5, T6> T2, zipT3, T4, T5, T6>> (Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6) Zip six sources together, that is to say wait for all the sources to emit one element and combine these elements once into aTuple6. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.- Type Parameters:
T1- type of the value from source1T2- type of the value from source2T3- type of the value from source3T4- type of the value from source4T5- type of the value from source5T6- type of the value from source6- Parameters:
source1- The first upstreamPublisherto subscribe to.source2- The second upstreamPublisherto subscribe to.source3- The third upstreamPublisherto subscribe to.source4- The fourth upstreamPublisherto subscribe to.source5- The fifth upstreamPublisherto subscribe to.source6- The sixth upstreamPublisherto subscribe to.- Returns:
- a zipped
Flux
-
zip
public static <T1,T2, Flux<Tuple7<T1,T3, T4, T5, T6, T7> T2, zipT3, T4, T5, T6, T7>> (Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7) Zip seven sources together, that is to say wait for all the sources to emit one element and combine these elements once into aTuple7. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.- Type Parameters:
T1- type of the value from source1T2- type of the value from source2T3- type of the value from source3T4- type of the value from source4T5- type of the value from source5T6- type of the value from source6T7- type of the value from source7- Parameters:
source1- The first upstreamPublisherto subscribe to.source2- The second upstreamPublisherto subscribe to.source3- The third upstreamPublisherto subscribe to.source4- The fourth upstreamPublisherto subscribe to.source5- The fifth upstreamPublisherto subscribe to.source6- The sixth upstreamPublisherto subscribe to.source7- The seventh upstreamPublisherto subscribe to.- Returns:
- a zipped
Flux
-
zip
public static <T1,T2, Flux<Tuple8<T1,T3, T4, T5, T6, T7, T8> T2, zipT3, T4, T5, T6, T7, T8>> (Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7, Publisher<? extends T8> source8) Zip eight sources together, that is to say wait for all the sources to emit one element and combine these elements once into aTuple8. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.- Type Parameters:
T1- type of the value from source1T2- type of the value from source2T3- type of the value from source3T4- type of the value from source4T5- type of the value from source5T6- type of the value from source6T7- type of the value from source7T8- type of the value from source8- Parameters:
source1- The first upstreamPublisherto subscribe to.source2- The second upstreamPublisherto subscribe to.source3- The third upstreamPublisherto subscribe to.source4- The fourth upstreamPublisherto subscribe to.source5- The fifth upstreamPublisherto subscribe to.source6- The sixth upstreamPublisherto subscribe to.source7- The seventh upstreamPublisherto subscribe to.source8- The eight upstreamPublisherto subscribe to.- Returns:
- a zipped
Flux
-
zip
public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources, Function<? super Object[], ? extends O> combinator) Zip multiple sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. TheIterable.iterator()will be called on eachPublisher.subscribe(Subscriber). -
zip
public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources, int prefetch, Function<? super Object[], ? extends O> combinator) Zip multiple sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. TheIterable.iterator()will be called on eachPublisher.subscribe(Subscriber).- Type Parameters:
O- the combined produced type- Parameters:
sources- theIterableproviding sources to zipprefetch- the inner source request sizecombinator- The aggregate function that will receive a unique value from each upstream and return the value to signal downstream- Returns:
- a zipped
Flux
-
zip
@SafeVarargs public static <I,O> Flux<O> zip(Function<? super Object[], ? extends O> combinator, Publisher<? extends I>... sources) Zip multiple sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.- Type Parameters:
I- the type of the input sourcesO- the combined produced type- Parameters:
combinator- The aggregate function that will receive a unique value from each upstream and return the value to signal downstreamsources- the array providing sources to zip- Returns:
- a zipped
Flux
-
zip
@SafeVarargs public static <I,O> Flux<O> zip(Function<? super Object[], ? extends O> combinator, int prefetch, Publisher<? extends I>... sources) Zip multiple sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.- Type Parameters:
I- the type of the input sourcesO- the combined produced type- Parameters:
combinator- The aggregate function that will receive a unique value from each upstream and return the value to signal downstreamprefetch- individual source request sizesources- the array providing sources to zip- Returns:
- a zipped
Flux
-
zip
public static <TUPLE extends Tuple2,V> Flux<V> zip(Publisher<? extends Publisher<?>> sources, Function<? super TUPLE, ? extends V> combinator) Zip multiple sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.Note that the
Publishersources from the outerPublisherwill accumulate into an exhaustive list before starting zip operation.- Type Parameters:
TUPLE- the raw tuple typeV- The produced output after transformation by the given combinator- Parameters:
sources- ThePublisherofPublishersources to zip. A finite publisher is required.combinator- The aggregate function that will receive a unique value from each upstream and return the value to signal downstream- Returns:
- a
Fluxbased on the produced value
-
all
Emit a single boolean true if all values of this sequence match thePredicate.The implementation uses short-circuit logic and completes with false if the predicate doesn't match a value.
-
any
Emit a single boolean true if any of the values of thisFluxsequence match the predicate.The implementation uses short-circuit logic and completes with true if the predicate matches a value.
-
as
Transform thisFluxinto a target type.flux.as(Mono::from).subscribe() -
blockFirst
Subscribe to thisFluxand block indefinitely until the upstream signals its first value or completes. Returns that value, or null if the Flux completes empty. In case the Flux errors, the original exception is thrown (wrapped in aRuntimeExceptionif it was a checked exception).Note that each blockFirst() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
- Returns:
- the first value or null
-
blockFirst
Subscribe to thisFluxand block until the upstream signals its first value, completes or a timeout expires. Returns that value, or null if the Flux completes empty. In case the Flux errors, the original exception is thrown (wrapped in aRuntimeExceptionif it was a checked exception). If the provided timeout expires, aRuntimeExceptionis thrown with aTimeoutExceptionas the cause.Note that each blockFirst() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
- Parameters:
timeout- maximum time period to wait for before raising aRuntimeExceptionwith aTimeoutExceptionas the cause- Returns:
- the first value or null
-
blockLast
Subscribe to thisFluxand block indefinitely until the upstream signals its last value or completes. Returns that value, or null if the Flux completes empty. In case the Flux errors, the original exception is thrown (wrapped in aRuntimeExceptionif it was a checked exception).Note that each blockLast() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
- Returns:
- the last value or null
-
blockLast
Subscribe to thisFluxand block until the upstream signals its last value, completes or a timeout expires. Returns that value, or null if the Flux completes empty. In case the Flux errors, the original exception is thrown (wrapped in aRuntimeExceptionif it was a checked exception). If the provided timeout expires, aRuntimeExceptionis thrown with aTimeoutExceptionas the cause.Note that each blockLast() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
- Parameters:
timeout- maximum time period to wait for before raising aRuntimeExceptionwith aTimeoutExceptionas the cause- Returns:
- the last value or null
-
buffer
Collect all incoming values into a singleListbuffer that will be emitted by the returnedFluxonce this Flux completes.Discard Support: This operator discards the buffer upon cancellation or error triggered by a data signal.
-
buffer
-
buffer
public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, Supplier<C> bufferSupplier) Collect incoming values into multiple user-definedCollectionbuffers that will be emitted by the returnedFluxeach time the given max size is reached or once this Flux completes.Note that if buffers provided by the bufferSupplier return false upon invocation of
Collection.add(Object)for a given element, that element will be discarded.Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal, as well as latest unbuffered element if the bufferSupplier fails.
- Type Parameters:
C- theCollectionbuffer type- Parameters:
maxSize- the maximum collected sizebufferSupplier- aSupplierof the concreteCollectionto use for each buffer- Returns:
- a microbatched
FluxofCollection
-
buffer
Collect incoming values into multipleListbuffers that will be emitted by the returnedFluxeach time the given max size is reached or once this Flux completes. Buffers can be created with gaps, as a new buffer will be created every timeskipvalues have been emitted by the source.When maxSize < skip : dropping buffers
When maxSize > skip : overlapping buffers
When maxSize == skip : exact buffers
Discard Support: This operator discards elements in between buffers (in the case of dropping buffers). It also discards the currently open buffer upon cancellation or error triggered by a data signal. Note however that overlapping buffer variant DOES NOT discard, as this might result in an element being discarded from an early buffer while it is still valid in a more recent buffer.
-
buffer
public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, int skip, Supplier<C> bufferSupplier) Collect incoming values into multiple user-definedCollectionbuffers that will be emitted by the returnedFluxeach time the given max size is reached or once this Flux completes. Buffers can be created with gaps, as a new buffer will be created every timeskipvalues have been emitted by the sourceWhen maxSize < skip : dropping buffers
When maxSize > skip : overlapping buffers
When maxSize == skip : exact buffers
Note for exact buffers: If buffers provided by the bufferSupplier return false upon invocation of
Collection.add(Object)for a given element, that element will be discarded.Discard Support: This operator discards elements in between buffers (in the case of dropping buffers). It also discards the currently open buffer upon cancellation or error triggered by a data signal. Note however that overlapping buffer variant DOES NOT discard, as this might result in an element being discarded from an early buffer while it is still valid in a more recent buffer.
- Type Parameters:
C- theCollectionbuffer type- Parameters:
skip- the number of items to count before creating a new buffermaxSize- the max collected sizebufferSupplier- aSupplierof the concreteCollectionto use for each buffer- Returns:
- a microbatched
Fluxof possibly overlapped or gappedCollection
-
buffer
-
buffer
public final <C extends Collection<? super T>> Flux<C> buffer(Publisher<?> other, Supplier<C> bufferSupplier) Collect incoming values into multiple user-definedCollectionbuffers, as delimited by the signals of a companionPublisherthis operator will subscribe to.Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal, and the last received element when the bufferSupplier fails.
- Type Parameters:
C- theCollectionbuffer type- Parameters:
other- the companionPublisherwhose signals trigger new buffersbufferSupplier- aSupplierof the concreteCollectionto use for each buffer- Returns:
- a microbatched
FluxofCollectiondelimited by signals from aPublisher
-
buffer
-
buffer
Collect incoming values into multipleListbuffers created at a givenopenBufferEveryperiod. Each buffer will last until thebufferingTimespanhas elapsed, thus emitting the bucket in the resultingFlux.When bufferingTimespan < openBufferEvery : dropping buffers
When bufferingTimespan > openBufferEvery : overlapping buffers
When bufferingTimespan == openBufferEvery : exact buffers
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. It DOES NOT provide strong guarantees in the case of overlapping buffers, as elements might get discarded too early (from the first of two overlapping buffers for instance).
-
buffer
-
buffer
public final Flux<List<T>> buffer(Duration bufferingTimespan, Duration openBufferEvery, Scheduler timer) Collect incoming values into multipleListbuffers created at a givenopenBufferEveryperiod, as measured on the providedScheduler. Each buffer will last until thebufferingTimespanhas elapsed (also measured on the scheduler), thus emitting the bucket in the resultingFlux.When bufferingTimespan < openBufferEvery : dropping buffers
When bufferingTimespan > openBufferEvery : overlapping buffers
When bufferingTimespan == openBufferEvery : exact buffers
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. It DOES NOT provide strong guarantees in the case of overlapping buffers, as elements might get discarded too early (from the first of two overlapping buffers for instance).
- Parameters:
bufferingTimespan- the duration from buffer creation until a buffer is closed and emittedopenBufferEvery- the interval at which to create a new buffertimer- a time-capableSchedulerinstance to run on- Returns:
- a microbatched
FluxofListdelimited by the given period openBufferEvery and sized by bufferingTimespan
-
bufferTimeout
-
bufferTimeout
public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier) Collect incoming values into multiple user-definedCollectionbuffers that will be emitted by the returnedFluxeach time the buffer reaches a maximum size OR the maxTimeDurationelapses.Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
- Type Parameters:
C- theCollectionbuffer type- Parameters:
maxSize- the max collected sizemaxTime- the timeout enforcing the release of a partial bufferbufferSupplier- aSupplierof the concreteCollectionto use for each buffer- Returns:
- a microbatched
FluxofCollectiondelimited by given size or a given period timeout
-
bufferTimeout
Collect incoming values into multipleListbuffers that will be emitted by the returnedFluxeach time the buffer reaches a maximum size OR the maxTimeDurationelapses, as measured on the providedScheduler.Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
-
bufferTimeout
public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer, Supplier<C> bufferSupplier) Collect incoming values into multiple user-definedCollectionbuffers that will be emitted by the returnedFluxeach time the buffer reaches a maximum size OR the maxTimeDurationelapses, as measured on the providedScheduler.Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
- Type Parameters:
C- theCollectionbuffer type- Parameters:
maxSize- the max collected sizemaxTime- the timeout enforcing the release of a partial buffertimer- a time-capableSchedulerinstance to run onbufferSupplier- aSupplierof the concreteCollectionto use for each buffer- Returns:
- a microbatched
FluxofCollectiondelimited by given size or a given period timeout
-
bufferTimeout
Collect incoming values into multipleListbuffers that will be emitted by the returnedFluxeach time the buffer reaches a maximum size OR the maxTimeDurationelapses.Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
- Parameters:
maxSize- the max collected sizemaxTime- the timeout enforcing the release of a partial bufferfairBackpressure- Iftrue, prefetchesmaxSize * 4from upstream and replenishes the buffer when the downstream demand is satisfactory. Whenfalse, no prefetching takes place and a single buffer is always ready to be pushed downstream.- Returns:
- a microbatched
FluxofListdelimited by given size or a given period timeout
-
bufferTimeout
public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer, boolean fairBackpressure) Collect incoming values into multipleListbuffers that will be emitted by the returnedFluxeach time the buffer reaches a maximum size OR the maxTimeDurationelapses, as measured on the providedScheduler.Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
- Parameters:
maxSize- the max collected sizemaxTime- the timeout enforcing the release of a partial buffertimer- a time-capableSchedulerinstance to run onfairBackpressure- Iftrue, prefetchesmaxSize * 4from upstream and replenishes the buffer when the downstream demand is satisfactory. Whenfalse, no prefetching takes place and a single buffer is always ready to be pushed downstream.- Returns:
- a microbatched
FluxofListdelimited by given size or a given period timeout
-
bufferTimeout
public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier, boolean fairBackpressure) Collect incoming values into multiple user-definedCollectionbuffers that will be emitted by the returnedFluxeach time the buffer reaches a maximum size OR the maxTimeDurationelapses.Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
- Type Parameters:
C- theCollectionbuffer type- Parameters:
maxSize- the max collected sizemaxTime- the timeout enforcing the release of a partial bufferbufferSupplier- aSupplierof the concreteCollectionto use for each bufferfairBackpressure- Iftrue, prefetchesmaxSize * 4from upstream and replenishes the buffer when the downstream demand is satisfactory. Whenfalse, no prefetching takes place and a single buffer is always ready to be pushed downstream.- Returns:
- a microbatched
FluxofCollectiondelimited by given size or a given period timeout
-
bufferTimeout
public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer, Supplier<C> bufferSupplier, boolean fairBackpressure) Collect incoming values into multiple user-definedCollectionbuffers that will be emitted by the returnedFluxeach time the buffer reaches a maximum size OR the maxTimeDurationelapses, as measured on the providedScheduler.Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
- Type Parameters:
C- theCollectionbuffer type- Parameters:
maxSize- the max collected sizemaxTime- the timeout enforcing the release of a partial buffertimer- a time-capableSchedulerinstance to run onbufferSupplier- aSupplierof the concreteCollectionto use for each bufferfairBackpressure- Iftrue, prefetchesmaxSize * 4from upstream and replenishes the buffer when the downstream demand is satisfactory. Whenfalse, no prefetching takes place and a single buffer is always ready to be pushed downstream.- Returns:
- a microbatched
FluxofCollectiondelimited by given size or a given period timeout
-
bufferUntil
Collect incoming values into multipleListbuffers that will be emitted by the resultingFluxeach time the given predicate returns true. Note that the element that triggers the predicate to return true (and thus closes a buffer) is included as last element in the emitted buffer.On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
-
bufferUntil
Collect incoming values into multipleListbuffers that will be emitted by the resultingFluxeach time the given predicate returns true. Note that the buffer into which the element that triggers the predicate to return true (and thus closes a buffer) is included depends on thecutBeforeparameter: set it to true to include the boundary element in the newly opened buffer, false to include it in the closed buffer (as inbufferUntil(Predicate)).On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
-
bufferUntilChanged
-
bufferUntilChanged
-
bufferUntilChanged
public final <V> Flux<List<T>> bufferUntilChanged(Function<? super T, ? extends V> keySelector, BiPredicate<? super V, ? super V> keyComparator) Collect subsequent repetitions of an element (that is, if they arrive right after one another), as compared by a key extracted through the user providedFunctionand compared using a suppliedBiPredicate, into multipleListbuffers that will be emitted by the resultingFlux. -
bufferWhile
Collect incoming values into multipleListbuffers that will be emitted by the resultingFlux. Each buffer continues aggregating values while the given predicate returns true, and a new buffer is created as soon as the predicate returns false... Note that the element that triggers the predicate to return false (and thus closes a buffer) is NOT included in any emitted buffer.On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal, as well as the buffer-triggering element.
-
bufferWhen
public final <U,V> Flux<List<T>> bufferWhen(Publisher<U> bucketOpening, Function<? super U, ? extends Publisher<V>> closeSelector) Collect incoming values into multipleListbuffers started each time an opening companionPublisheremits. Each buffer will last until the corresponding closing companionPublisheremits, thus releasing the buffer to the resultingFlux.When Open signal is strictly not overlapping Close signal : dropping buffers (see green marbles in diagram below).
When Open signal is strictly more frequent than Close signal : overlapping buffers (see second and third buffers in diagram below).
When Open signal is exactly coordinated with Close signal : exact buffers
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. It DOES NOT provide strong guarantees in the case of overlapping buffers, as elements might get discarded too early (from the first of two overlapping buffers for instance).
- Type Parameters:
U- the element type of the buffer-opening sequenceV- the element type of the buffer-closing sequence- Parameters:
bucketOpening- a companionPublisherto subscribe for buffer creation signals.closeSelector- a factory that, given a buffer opening signal, returns a companionPublisherto subscribe to for buffer closure and emission signals.- Returns:
- a microbatched
FluxofListdelimited by an openingPublisherand a relative closingPublisher
-
bufferWhen
public final <U,V, Flux<C> bufferWhenC extends Collection<? super T>> (Publisher<U> bucketOpening, Function<? super U, ? extends Publisher<V>> closeSelector, Supplier<C> bufferSupplier) Collect incoming values into multiple user-definedCollectionbuffers started each time an opening companionPublisheremits. Each buffer will last until the corresponding closing companionPublisheremits, thus releasing the buffer to the resultingFlux.When Open signal is strictly not overlapping Close signal : dropping buffers (see green marbles in diagram below).
When Open signal is strictly more frequent than Close signal : overlapping buffers (see second and third buffers in diagram below).
Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. It DOES NOT provide strong guarantees in the case of overlapping buffers, as elements might get discarded too early (from the first of two overlapping buffers for instance).
- Type Parameters:
U- the element type of the buffer-opening sequenceV- the element type of the buffer-closing sequenceC- theCollectionbuffer type- Parameters:
bucketOpening- a companionPublisherto subscribe for buffer creation signals.closeSelector- a factory that, given a buffer opening signal, returns a companionPublisherto subscribe to for buffer closure and emission signals.bufferSupplier- aSupplierof the concreteCollectionto use for each buffer- Returns:
- a microbatched
FluxofCollectiondelimited by an openingPublisherand a relative closingPublisher
-
cache
Turn thisFluxinto a hot source and cache last emitted signals for furtherSubscriber. Will retain an unbounded volume of onNext signals. Completion and Error will also be replayed.- Returns:
- a replaying
Flux
-
cache
Turn thisFluxinto a hot source and cache last emitted signals for furtherSubscriber. Will retain up to the given history size onNext signals. Completion and Error will also be replayed.Note that
cache(0)will only cache the terminal signal without expiration.- Parameters:
history- number of elements retained in cache- Returns:
- a replaying
Flux
-
cache
Turn thisFluxinto a hot source and cache last emitted signals for furtherSubscriber. Will retain an unbounded history but apply a per-item expiry timeoutCompletion and Error will also be replayed until
ttltriggers in which case the nextSubscriberwill start over a new subscription.- Parameters:
ttl- Time-to-live for each cached item and post termination.- Returns:
- a replaying
Flux
-
cache
Turn thisFluxinto a hot source and cache last emitted signals for furtherSubscriber. Will retain an unbounded history but apply a per-item expiry timeoutCompletion and Error will also be replayed until
ttltriggers in which case the nextSubscriberwill start over a new subscription. -
cache
Turn thisFluxinto a hot source and cache last emitted signals for furtherSubscriber. Will retain up to the given history size and apply a per-item expiry timeout.Completion and Error will also be replayed until
ttltriggers in which case the nextSubscriberwill start over a new subscription.- Parameters:
history- number of elements retained in cachettl- Time-to-live for each cached item and post termination.- Returns:
- a replaying
Flux
-
cache
Turn thisFluxinto a hot source and cache last emitted signals for furtherSubscriber. Will retain up to the given history size and apply a per-item expiry timeout.Completion and Error will also be replayed until
ttltriggers in which case the nextSubscriberwill start over a new subscription. -
cast
Cast the currentFluxproduced type into a target produced type. -
cancelOn
-
checkpoint
Activate traceback (full assembly tracing) for this particularFlux, in case of an error upstream of the checkpoint. Tracing incurs the cost of an exception stack trace creation.It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with the traceback.
The traceback is attached to the error as a
suppressed exception. As such, if the error is acomposite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected viaExceptions.isTraceback(Throwable).- Returns:
- the assembly tracing
Flux.
-
checkpoint
Activate traceback (assembly marker) for this particularFluxby giving it a description that will be reflected in the assembly traceback in case of an error upstream of the checkpoint. Note that unlikecheckpoint(), this doesn't create a filled stack trace, avoiding the main cost of the operator. However, as a trade-off the description must be unique enough for the user to find out where this Flux was assembled. If you only want a generic description, and still rely on the stack trace to find the assembly site, use thecheckpoint(String, boolean)variant.It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
The traceback is attached to the error as a
suppressed exception. As such, if the error is acomposite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected viaExceptions.isTraceback(Throwable).- Parameters:
description- a unique enough description to include in the light assembly traceback.- Returns:
- the assembly marked
Flux
-
checkpoint
Activate traceback (full assembly tracing or the lighter assembly marking depending on theforceStackTraceoption).By setting the
forceStackTraceparameter to true, activate assembly tracing for this particularFluxand give it a description that will be reflected in the assembly traceback in case of an error upstream of the checkpoint. Note that unlikecheckpoint(String), this will incur the cost of an exception stack trace creation. The description could for example be a meaningful name for the assembled flux or a wider correlation ID, since the stack trace will always provide enough information to locate where this Flux was assembled.By setting
forceStackTraceto false, behaves likecheckpoint(String)and is subject to the same caveat in choosing the description.It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly marker.
The traceback is attached to the error as a
suppressed exception. As such, if the error is acomposite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected viaExceptions.isTraceback(Throwable).- Parameters:
description- a description (must be unique enough if forceStackTrace is set to false).forceStackTrace- false to make a light checkpoint without a stacktrace, true to use a stack trace.- Returns:
- the assembly marked
Flux.
-
collect
Collect all elements emitted by thisFluxinto a user-defined container, by applying a collectorBiConsumertaking the container and each element. The collected result will be emitted when this sequence completes, emitting the empty container if the sequence was empty.Discard Support: This operator discards the container upon cancellation or error triggered by a data signal. Either the container type is a
Collection(in which case individual elements are discarded) or not (in which case the entire container is discarded). In case the collectorBiConsumerfails to accumulate an element, the container is discarded as above and the triggering element is also discarded.- Type Parameters:
E- the container type- Parameters:
containerSupplier- the supplier of the container instance for each Subscribercollector- a consumer of both the container instance and the value being currently collected- Returns:
- a
Monoof the collected container on complete
-
collect
Collect all elements emitted by thisFluxinto a container, by applying a Java 8 Stream APICollectorThe collected result will be emitted when this sequence completes, emitting the empty container if the sequence was empty.Discard Support: This operator discards the intermediate container (see
Collector.supplier()) upon cancellation, error or exception while applying theCollector.finisher(). Either the container type is aCollection(in which case individual elements are discarded) or not (in which case the entire container is discarded). In case the accumulatorBiConsumerof the collector fails to accumulate an element into the intermediate container, the container is discarded as above and the triggering element is also discarded. -
collectList
-
collectMap
Collect all elements emitted by thisFluxinto a hashedMapthat is emitted by the resultingMonowhen this sequence completes, emitting the emptyMapif the sequence was empty. The key is extracted from each element by applying thekeyExtractorFunction. In case several elements map to the same key, the associated value will be the most recently emitted element.Discard Support: This operator discards the whole
Mapupon cancellation or error triggered by a data signal, so discard handlers will have to unpack the map. -
collectMap
public final <K,V> Mono<Map<K,V>> collectMap(Function<? super T, ? extends K> keyExtractor, Function<? super T, ? extends V> valueExtractor) Collect all elements emitted by thisFluxinto a hashedMapthat is emitted by the resultingMonowhen this sequence completes, emitting the emptyMapif the sequence was empty. The key is extracted from each element by applying thekeyExtractorFunction, and the value is extracted by thevalueExtractorFunction. In case several elements map to the same key, the associated value will be derived from the most recently emitted element.Discard Support: This operator discards the whole
Mapupon cancellation or error triggered by a data signal, so discard handlers will have to unpack the map.- Type Parameters:
K- the type of the key extracted from each source elementV- the type of the value extracted from each source element- Parameters:
keyExtractor- aFunctionto map elements to a key for theMapvalueExtractor- aFunctionto map elements to a value for theMap- Returns:
- a
Monoof aMapof key-element pairs (only including latest element's value in case of key conflicts)
-
collectMap
public final <K,V> Mono<Map<K,V>> collectMap(Function<? super T, ? extends K> keyExtractor, Function<? super T, ? extends V> valueExtractor, Supplier<Map<K, V>> mapSupplier) Collect all elements emitted by thisFluxinto a user-definedMapthat is emitted by the resultingMonowhen this sequence completes, emitting the emptyMapif the sequence was empty. The key is extracted from each element by applying thekeyExtractorFunction, and the value is extracted by thevalueExtractorFunction. In case several elements map to the same key, the associated value will be derived from the most recently emitted element.Discard Support: This operator discards the whole
Mapupon cancellation or error triggered by a data signal, so discard handlers will have to unpack the map.- Type Parameters:
K- the type of the key extracted from each source elementV- the type of the value extracted from each source element- Parameters:
keyExtractor- aFunctionto map elements to a key for theMapvalueExtractor- aFunctionto map elements to a value for theMapmapSupplier- aMapfactory called for eachSubscriber- Returns:
- a
Monoof aMapof key-value pairs (only including latest element's value in case of key conflicts)
-
collectMultimap
public final <K> Mono<Map<K,Collection<T>>> collectMultimap(Function<? super T, ? extends K> keyExtractor) Collect all elements emitted by thisFluxinto amultimapthat is emitted by the resultingMonowhen this sequence completes, emitting the emptymultimapif the sequence was empty. The key is extracted from each element by applying thekeyExtractorFunction, and every element mapping to the same key is stored in theListassociated to said key.Discard Support: This operator discards the whole
Mapupon cancellation or error triggered by a data signal, so discard handlers will have to unpack the list values in the map. -
collectMultimap
public final <K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T, ? extends K> keyExtractor, Function<? super T, ? extends V> valueExtractor) Collect all elements emitted by thisFluxinto amultimapthat is emitted by the resultingMonowhen this sequence completes, emitting the emptymultimapif the sequence was empty. The key is extracted from each element by applying thekeyExtractorFunction, and every element mapping to the same key is converted by thevalueExtractorFunction to a value stored in theListassociated to said key.Discard Support: This operator discards the whole
Mapupon cancellation or error triggered by a data signal, so discard handlers will have to unpack the list values in the map.- Type Parameters:
K- the type of the key extracted from each source elementV- the type of the value extracted from each source element- Parameters:
keyExtractor- aFunctionto map elements to a key for theMapvalueExtractor- aFunctionto map elements to a value for theMap- Returns:
- a
Monoof aMapof key-List(values) pairs
-
collectMultimap
public final <K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T, ? extends K> keyExtractor, Function<? super T, ? extends V> valueExtractor, Supplier<Map<K, Collection<V>>> mapSupplier) Collect all elements emitted by thisFluxinto a user-definedmultimapthat is emitted by the resultingMonowhen this sequence completes, emitting the emptymultimapif the sequence was empty. The key is extracted from each element by applying thekeyExtractorFunction, and every element mapping to the same key is converted by thevalueExtractorFunction to a value stored in theCollectionassociated to said key.Discard Support: This operator discards the whole
Mapupon cancellation or error triggered by a data signal, so discard handlers will have to unpack the list values in the map.- Type Parameters:
K- the type of the key extracted from each source elementV- the type of the value extracted from each source element- Parameters:
keyExtractor- aFunctionto map elements to a key for theMapvalueExtractor- aFunctionto map elements to a value for theMapmapSupplier- a multimap (MapofCollection) factory called for eachSubscriber- Returns:
- a
Monoof aMapof key-Collection(values) pairs
-
collectSortedList
Collect all elements emitted by thisFluxuntil this sequence completes, and then sort them in natural order into aListthat is emitted by the resultingMono. If the sequence was empty, emptyListwill be emitted.Discard Support: This operator is based on
collectList(), and as such discards the elements in theListindividually upon cancellation or error triggered by a data signal. -
collectSortedList
Collect all elements emitted by thisFluxuntil this sequence completes, and then sort them using aComparatorinto aListthat is emitted by the resultingMono. If the sequence was empty, emptyListwill be emitted.Discard Support: This operator is based on
collectList(), and as such discards the elements in theListindividually upon cancellation or error triggered by a data signal.- Parameters:
comparator- aComparatorto sort the items of this sequences- Returns:
- a
Monoof a sortedListof all values from thisFlux
-
concatMap
Transform the elements emitted by thisFluxasynchronously into Publishers, then flatten these inner publishers into a singleFlux, sequentially and preserving order using concatenation.There are three dimensions to this operator that can be compared with
flatMapandflatMapSequential:- Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.
- Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.
- Interleaving: this operator does not let values from different inners interleave (concatenation).
Errors will immediately short circuit current concat backlog. Note that no prefetching is done on the source, which gets requested only if there is downstream demand.
Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
- Type Parameters:
V- the produced concatenated type- Parameters:
mapper- the function to transform this sequence of T into concatenated sequences of V- Returns:
- a concatenated
Flux
-
concatMap
public final <V> Flux<V> concatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int prefetch) Transform the elements emitted by thisFluxasynchronously into Publishers, then flatten these inner publishers into a singleFlux, sequentially and preserving order using concatenation.There are three dimensions to this operator that can be compared with
flatMapandflatMapSequential:- Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.
- Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.
- Interleaving: this operator does not let values from different inners interleave (concatenation).
Errors will immediately short circuit current concat backlog. The prefetch argument allows to give an arbitrary prefetch size to the upstream source, or to disable prefetching with
0.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
- Type Parameters:
V- the produced concatenated type- Parameters:
mapper- the function to transform this sequence of T into concatenated sequences of Vprefetch- the number of values to prefetch from upstream source, or0to disable prefetching- Returns:
- a concatenated
Flux
-
concatMapDelayError
public final <V> Flux<V> concatMapDelayError(Function<? super T, ? extends Publisher<? extends V>> mapper) Transform the elements emitted by thisFluxasynchronously into Publishers, then flatten these inner publishers into a singleFlux, sequentially and preserving order using concatenation.There are three dimensions to this operator that can be compared with
flatMapandflatMapSequential:- Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.
- Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.
- Interleaving: this operator does not let values from different inners interleave (concatenation).
Errors in the individual publishers will be delayed at the end of the whole concat sequence (possibly getting combined into a
composite) if several sources error. Note that no prefetching is done on the source, which gets requested only if there is downstream demand.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
- Type Parameters:
V- the produced concatenated type- Parameters:
mapper- the function to transform this sequence of T into concatenated sequences of V- Returns:
- a concatenated
Flux
-
concatMapDelayError
public final <V> Flux<V> concatMapDelayError(Function<? super T, ? extends Publisher<? extends V>> mapper, int prefetch) Transform the elements emitted by thisFluxasynchronously into Publishers, then flatten these inner publishers into a singleFlux, sequentially and preserving order using concatenation.There are three dimensions to this operator that can be compared with
flatMapandflatMapSequential:- Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.
- Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.
- Interleaving: this operator does not let values from different inners interleave (concatenation).
Errors in the individual publishers will be delayed at the end of the whole concat sequence (possibly getting combined into a
composite) if several sources error. The prefetch argument allows to give an arbitrary prefetch size to the upstream source, or to disable prefetching with0.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
- Type Parameters:
V- the produced concatenated type- Parameters:
mapper- the function to transform this sequence of T into concatenated sequences of Vprefetch- the number of values to prefetch from upstream source, or0to disable prefetching- Returns:
- a concatenated
Flux
-
concatMapDelayError
public final <V> Flux<V> concatMapDelayError(Function<? super T, ? extends Publisher<? extends V>> mapper, boolean delayUntilEnd, int prefetch) Transform the elements emitted by thisFluxasynchronously into Publishers, then flatten these inner publishers into a singleFlux, sequentially and preserving order using concatenation.There are three dimensions to this operator that can be compared with
flatMapandflatMapSequential:- Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.
- Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.
- Interleaving: this operator does not let values from different inners interleave (concatenation).
Errors in the individual publishers will be delayed after the current concat backlog if delayUntilEnd is false or after all sources if delayUntilEnd is true. The prefetch argument allows to give an arbitrary prefetch size to the upstream source, or to disable prefetching with
0.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation.
- Type Parameters:
V- the produced concatenated type- Parameters:
mapper- the function to transform this sequence of T into concatenated sequences of VdelayUntilEnd- delay error until all sources have been consumed instead of after the current sourceprefetch- the number of values to prefetch from upstream source, or0to disable prefetching- Returns:
- a concatenated
Flux
-
concatMapIterable
public final <R> Flux<R> concatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper) Transform the items emitted by thisFluxintoIterable, then flatten the elements from those by concatenating them into a singleFlux. For each iterable,Iterable.iterator()will be called at least once and at most twice.This operator inspects each
Iterable'sSpliteratorto assess if the iteration can be guaranteed to be finite (seeOperators.onDiscardMultiple(Iterator, boolean, Context)). Since the default Spliterator wraps the Iterator we can have twoIterable.iterator()calls per iterable. This second invocation is skipped on aCollectionhowever, a type which is assumed to be always finite.Note that unlike
flatMap(Function)andconcatMap(Function), with Iterable there is no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially. ThusflatMapIterableandconcatMapIterableare equivalent offered as a discoverability improvement for users that explore the API with the concat vs flatMap expectation.Discard Support: Upon cancellation, this operator discards
Telements it prefetched and, in some cases, attempts to discard remainder of the currently processedIterable(if it can safely ensure the iterator is finite). Note that this means eachIterable'sIterable.iterator()method could be invoked twice.Error Mode Support: This operator supports
resuming on errors(including when fusion is enabled). Exceptions thrown by the consumer are passed to theonErrorContinue(BiConsumer)error consumer (the value consumer is not invoked, as the source element will be part of the sequence). The onNext signal is then propagated as normal. -
concatMapIterable
public final <R> Flux<R> concatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper, int prefetch) Transform the items emitted by thisFluxintoIterable, then flatten the emissions from those by concatenating them into a singleFlux. The prefetch argument allows to give an arbitrary prefetch size to the upstream source. For each iterable,Iterable.iterator()will be called at least once and at most twice.This operator inspects each
Iterable'sSpliteratorto assess if the iteration can be guaranteed to be finite (seeOperators.onDiscardMultiple(Iterator, boolean, Context)). Since the default Spliterator wraps the Iterator we can have twoIterable.iterator()calls per iterable. This second invocation is skipped on aCollectionhowever, a type which is assumed to be always finite.Note that unlike
flatMap(Function)andconcatMap(Function), with Iterable there is no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially. ThusflatMapIterableandconcatMapIterableare equivalent offered as a discoverability improvement for users that explore the API with the concat vs flatMap expectation.Discard Support: Upon cancellation, this operator discards
Telements it prefetched and, in some cases, attempts to discard remainder of the currently processedIterable(if it can safely ensure the iterator is finite). Note that this means eachIterable'sIterable.iterator()method could be invoked twice.Error Mode Support: This operator supports
resuming on errors(including when fusion is enabled). Exceptions thrown by the consumer are passed to theonErrorContinue(BiConsumer)error consumer (the value consumer is not invoked, as the source element will be part of the sequence). The onNext signal is then propagated as normal.- Type Parameters:
R- the merged output sequence type- Parameters:
mapper- theFunctionto transform input sequence into NIterableprefetch- the number of values to request from the source upon subscription, to be transformed toIterable- Returns:
- a concatenation of the values from the Iterables obtained from each element in this
Flux
-
concatWith
-
contextCapture
If context-propagation library is on the classpath, this is a convenience shortcut to capture thread local values during the subscription phase and put them in theContextthat is visible upstream of this operator.As a result this operator should generally be used as close as possible to the end of the chain / subscription point.
If the
ContextViewvisible upstream is not empty, a small subset of operators will automatically restore the context snapshot (handle,tap). If context-propagation is not available at runtime, this operator simply returns the currentFluxinstance. -
contextWrite
Enrich theContextvisible from downstream for the benefit of upstream operators, by making all values from the providedContextViewvisible on top of pairs from downstream.A
Context(and itsContextView) is tied to a given subscription and is read by querying the downstreamSubscriber.Subscriberthat don't enrich the context instead access their own downstream's context. As a result, this operator conceptually enriches aContextcoming from under it in the chain (downstream, by default an empty one) and makes the new enrichedContextvisible to operators above it in the chain.- Parameters:
contextToAppend- theContextViewto merge with the downstreamContext, resulting in a new more completeContextthat will be visible from upstream.- Returns:
- a contextualized
Flux - See Also:
-
contextWrite
Enrich theContextvisible from downstream for the benefit of upstream operators, by applying aFunctionto the downstreamContext.The
Functiontakes aContextfor convenience, allowing to easily callwrite APIsto return a newContext.A
Context(and itsContextView) is tied to a given subscription and is read by querying the downstreamSubscriber.Subscriberthat don't enrich the context instead access their own downstream's context. As a result, this operator conceptually enriches aContextcoming from under it in the chain (downstream, by default an empty one) and makes the new enrichedContextvisible to operators above it in the chain. -
count
Counts the number of values in thisFlux. The count will be emitted when onComplete is observed. -
defaultIfEmpty
Provide a default unique value if this sequence is completed without any data- Parameters:
defaultV- the alternate value if this sequence is empty- Returns:
- a new
Flux
-
delayElements
Delay each of thisFluxelements (Subscriber.onNext(T)signals) by a givenDuration. Signals are delayed and continue on theparalleldefault Scheduler, but empty sequences or immediate error signals are not delayed.- Parameters:
delay- duration by which to delay eachSubscriber.onNext(T)signal- Returns:
- a delayed
Flux - See Also:
-
delayElements
Delay each of thisFluxelements (Subscriber.onNext(T)signals) by a givenDuration. Signals are delayed and continue on a user-specifiedScheduler, but empty sequences or immediate error signals are not delayed.- Parameters:
delay- period to delay eachSubscriber.onNext(T)signaltimer- a time-capableSchedulerinstance to delay each signal on- Returns:
- a delayed
Flux
-
delaySequence
Shift thisFluxforward in time by a givenDuration. Unlike withdelayElements(Duration), elements are shifted forward in time as they are emitted, always resulting in the delay between two elements being the same as in the source (only the first element is visibly delayed from the previous event, that is the subscription). Signals are delayed and continue on theparallelScheduler, but empty sequences or immediate error signals are not delayed.With this operator, a source emitting at 10Hz with a delaySequence
Durationof 1s will still emit at 10Hz, with an initial "hiccup" of 1s. On the other hand,delayElements(Duration)would end up emitting at 1Hz.This is closer to
delaySubscription(Duration), except the source is subscribed to immediately.Discard Support: This operator discards elements currently being delayed * if the sequence is cancelled during the delay.
-
delaySequence
Shift thisFluxforward in time by a givenDuration. Unlike withdelayElements(Duration, Scheduler), elements are shifted forward in time as they are emitted, always resulting in the delay between two elements being the same as in the source (only the first element is visibly delayed from the previous event, that is the subscription). Signals are delayed and continue on a user-specifiedScheduler, but empty sequences or immediate error signals are not delayed.With this operator, a source emitting at 10Hz with a delaySequence
Durationof 1s will still emit at 10Hz, with an initial "hiccup" of 1s. On the other hand,delayElements(Duration, Scheduler)would end up emitting at 1Hz.This is closer to
delaySubscription(Duration, Scheduler), except the source is subscribed to immediately.Discard Support: This operator discards elements currently being delayed if the sequence is cancelled during the delay.
-
delayUntil
Subscribe to thisFluxand generate aPublisherfrom each of this Flux elements, each acting as a trigger for relaying said element.That is to say, the resulting
Fluxdelays each of its emission until the associated trigger Publisher terminates.In case of an error either in the source or in a trigger, that error is propagated immediately downstream. Note that unlike with the
Mono variantthere is no fusion of subsequent calls. -
delaySubscription
Delay thesubscriptionto thisFluxsource until the given period elapses. The delay is introduced through theparalleldefault Scheduler. -
delaySubscription
Delay thesubscriptionto thisFluxsource until the given period elapses, as measured on the user-providedScheduler. -
delaySubscription
- Type Parameters:
U- the other source type- Parameters:
subscriptionDelay- a companionPublisherwhose onNext/onComplete signal will trigger thesubscription- Returns:
- a delayed
Flux
-
dematerialize
An operator working only if thisFluxemits onNext, onError or onCompleteSignalinstances, transforming thesematerializedsignals into real signals on theSubscriber. The errorSignalwill trigger onError and completeSignalwill trigger onComplete.- Type Parameters:
X- the dematerialized type- Returns:
- a dematerialized
Flux - See Also:
-
distinct
For eachSubscriber, track elements from thisFluxthat have been seen and filter out duplicates.The values themselves are recorded into a
HashSetfor distinct detection. Usedistinct(Object::hashcode)if you want a more lightweight approach that doesn't retain all the objects, but is more susceptible to falsely considering two elements as distinct due to a hashcode collision.Discard Support: This operator discards elements that don't match the distinct predicate, but you should use the version with a cleanup if you need discarding of keys categorized by the operator as "seen". See
distinct(Function, Supplier, BiPredicate, Consumer).- Returns:
- a filtering
Fluxonly emitting distinct values
-
distinct
For eachSubscriber, track elements from thisFluxthat have been seen and filter out duplicates, as compared by a key extracted through the user providedFunction.Discard Support: This operator discards elements that don't match the distinct predicate, but you should use the version with a cleanup if you need discarding of keys categorized by the operator as "seen". See
distinct(Function, Supplier, BiPredicate, Consumer).- Type Parameters:
V- the type of the key extracted from each value in this sequence- Parameters:
keySelector- function to compute comparison key for each element- Returns:
- a filtering
Fluxonly emitting values with distinct keys
-
distinct
public final <V,C extends Collection<? super V>> Flux<T> distinct(Function<? super T, ? extends V> keySelector, Supplier<C> distinctCollectionSupplier) For eachSubscriber, track elements from thisFluxthat have been seen and filter out duplicates, as compared by a key extracted through the user providedFunctionand by theadd methodof theCollectionsupplied (typically aSet).Discard Support: This operator discards elements that don't match the distinct predicate, but you should use the version with a cleanup if you need discarding of keys categorized by the operator as "seen". See
distinct(Function, Supplier, BiPredicate, Consumer).- Type Parameters:
V- the type of the key extracted from each value in this sequenceC- the type of Collection used for distinct checking of keys- Parameters:
keySelector- function to compute comparison key for each elementdistinctCollectionSupplier- supplier of theCollectionused for distinct check throughaddof the key.- Returns:
- a filtering
Fluxonly emitting values with distinct keys
-
distinct
public final <V,C> Flux<T> distinct(Function<? super T, ? extends V> keySelector, Supplier<C> distinctStoreSupplier, BiPredicate<C, V> distinctPredicate, Consumer<C> cleanup) For eachSubscriber, track elements from thisFluxthat have been seen and filter out duplicates, as compared by applying aBiPredicateon an arbitrary user-supplied<C>store and a key extracted through the user providedFunction. The BiPredicate should typically add the key to the arbitrary store for further comparison. A cleanup callback is also invoked on the store upon termination of the sequence.Discard Support: This operator discards elements that don't match the distinct predicate, but you should use the
cleanupas well if you need discarding of keys categorized by the operator as "seen".- Type Parameters:
V- the type of the key extracted from each value in this sequenceC- the type of store backing theBiPredicate- Parameters:
keySelector- function to compute comparison key for each elementdistinctStoreSupplier- supplier of the arbitrary store object used in distinct checks along the extracted key.distinctPredicate- theBiPredicateto apply to the arbitrary store + extracted key to perform a distinct check. Since nothing is assumed of the store, this predicate should also add the key to the store as necessary.cleanup- the cleanup callback to invoke on the store upon termination.- Returns:
- a filtering
Fluxonly emitting values with distinct keys
-
distinctUntilChanged
Filter out subsequent repetitions of an element (that is, if they arrive right after one another).The last distinct value seen is retained for further comparison, which is done on the values themselves using
the equals method. UsedistinctUntilChanged(Object::hashcode)if you want a more lightweight approach that doesn't retain all the objects, but is more susceptible to falsely considering two elements as distinct due to a hashcode collision.Discard Support: Although this operator discards elements that are considered as "already seen", it is not recommended for cases where discarding is needed as the operator doesn't discard the "key" (in this context, the distinct instance that was last seen).
- Returns:
- a filtering
Fluxwith only one occurrence in a row of each element (yet elements can repeat in the overall sequence)
-
distinctUntilChanged
Filter out subsequent repetitions of an element (that is, if they arrive right after one another), as compared by a key extracted through the user providedFunctionusing equality.Discard Support: This operator discards elements that are considered as "already seen". The keys themselves are not discarded.
- Type Parameters:
V- the type of the key extracted from each value in this sequence- Parameters:
keySelector- function to compute comparison key for each element- Returns:
- a filtering
Fluxwith only one occurrence in a row of each element of the same key (yet element keys can repeat in the overall sequence)
-
distinctUntilChanged
public final <V> Flux<T> distinctUntilChanged(Function<? super T, ? extends V> keySelector, BiPredicate<? super V, ? super V> keyComparator) Filter out subsequent repetitions of an element (that is, if they arrive right after one another), as compared by a key extracted through the user providedFunctionand then comparing keys with the suppliedBiPredicate.Discard Support: This operator discards elements that are considered as "already seen" (for which the
keyComparatorreturns true). The keys themselves are not discarded.- Type Parameters:
V- the type of the key extracted from each value in this sequence- Parameters:
keySelector- function to compute comparison key for each elementkeyComparator- predicate used to compare keys.- Returns:
- a filtering
Fluxwith only one occurrence in a row of each element of the same key for which the predicate returns true (yet element keys can repeat in the overall sequence)
-
doAfterTerminate
Add behavior (side-effect) triggered after theFluxterminates, either by completing downstream successfully or with an error.The relevant signal is propagated downstream, then the
Runnableis executed.- Parameters:
afterTerminate- the callback to call afterSubscriber.onComplete()orSubscriber.onError(java.lang.Throwable)- Returns:
- an observed
Flux
-
doOnCancel
Add behavior (side-effect) triggered when theFluxis cancelled.The handler is executed first, then the cancel signal is propagated upstream to the source.
- Parameters:
onCancel- the callback to call onSubscription.cancel()- Returns:
- an observed
Flux
-
doOnComplete
Add behavior (side-effect) triggered when theFluxcompletes successfully.The
Runnableis executed first, then the onComplete signal is propagated downstream.- Parameters:
onComplete- the callback to call onSubscriber.onComplete()- Returns:
- an observed
Flux
-
doOnDiscard
Potentially modify the behavior of the whole chain of operators upstream of this one to conditionally clean up elements that get discarded by these operators.The
discardHookMUST be idempotent and safe to use on any instance of the desired type. Calls to this method are additive, and the order of invocation of thediscardHookis the same as the order of declaration (calling.filter(...).doOnDiscard(first).doOnDiscard(second)will let the filter invokefirstthensecondhandlers).Two main categories of discarding operators exist:
- filtering operators, dropping some source elements as part of their designed behavior
- operators that prefetch a few elements and keep them around pending a request, but get cancelled/in error
- Parameters:
type- theClassof elements in the upstream chain of operators that this cleanup hook should take into account.discardHook- aConsumerof elements in the upstream chain of operators that performs the cleanup.- Returns:
- a
Fluxthat cleans up matching elements that get discarded upstream of it.
-
doOnEach
Add behavior (side-effects) triggered when theFluxemits an item, fails with an error or completes successfully. All these events are represented as aSignalthat is passed to the side-effect callback. Note that this is an advanced operator, typically used for monitoring of a Flux. TheseSignalhave aContextassociated to them.The
Consumeris executed first, then the relevant signal is propagated downstream.- Parameters:
signalConsumer- the mandatory callback to call onSubscriber.onNext(Object),Subscriber.onError(Throwable)andSubscriber.onComplete()- Returns:
- an observed
Flux - See Also:
-
doOnError
Add behavior (side-effect) triggered when theFluxcompletes with an error.The
Consumeris executed first, then the onError signal is propagated downstream.- Parameters:
onError- the callback to call onSubscriber.onError(java.lang.Throwable)- Returns:
- an observed
Flux
-
doOnError
public final <E extends Throwable> Flux<T> doOnError(Class<E> exceptionType, Consumer<? super E> onError) Add behavior (side-effect) triggered when theFluxcompletes with an error matching the given exception type.The
Consumeris executed first, then the onError signal is propagated downstream.- Type Parameters:
E- type of the error to handle- Parameters:
exceptionType- the type of exceptions to handleonError- the error handler for each error- Returns:
- an observed
Flux
-
doOnError
public final Flux<T> doOnError(Predicate<? super Throwable> predicate, Consumer<? super Throwable> onError) Add behavior (side-effect) triggered when theFluxcompletes with an error matching the given exception.The
Consumeris executed first, then the onError signal is propagated downstream.- Parameters:
predicate- the matcher for exceptions to handleonError- the error handler for each error- Returns:
- an observed
Flux
-
doOnNext
Add behavior (side-effect) triggered when theFluxemits an item.The
Consumeris executed first, then the onNext signal is propagated downstream.Error Mode Support: This operator supports
resuming on errors(including when fusion is enabled). Exceptions thrown by the consumer are passed to theonErrorContinue(BiConsumer)error consumer (the value consumer is not invoked, as the source element will be part of the sequence). The onNext signal is then propagated as normal.- Parameters:
onNext- the callback to call onSubscriber.onNext(T)- Returns:
- an observed
Flux
-
doOnRequest
Add behavior (side-effect) triggering aLongConsumerwhen thisFluxreceives any request.Note that non fatal error raised in the callback will not be propagated and will simply trigger
Operators.onOperatorError(Throwable, Context).The
LongConsumeris executed first, then the request signal is propagated upstream to the parent.- Parameters:
consumer- the consumer to invoke on each request- Returns:
- an observed
Flux
-
doOnSubscribe
Add behavior (side-effect) triggered when theFluxis being subscribed, that is to say when aSubscriptionhas been produced by thePublisherand is being passed to theSubscriber.onSubscribe(Subscription).This method is not intended for capturing the subscription and calling its methods, but for side effects like monitoring. For instance, the correct way to cancel a subscription is to call
Disposable.dispose()on the Disposable returned bysubscribe().The
Consumeris executed first, then theSubscriptionis propagated downstream to the next subscriber in the chain that is being established.- Parameters:
onSubscribe- the callback to call onSubscriber.onSubscribe(org.reactivestreams.Subscription)- Returns:
- an observed
Flux - See Also:
-
doOnTerminate
Add behavior (side-effect) triggered when theFluxterminates, either by completing successfully or failing with an error.The
Runnableis executed first, then the onComplete/onError signal is propagated downstream.- Parameters:
onTerminate- the callback to call onSubscriber.onComplete()orSubscriber.onError(java.lang.Throwable)- Returns:
- an observed
Flux
-
doFirst
Add behavior (side-effect) triggered before theFluxis subscribed to, which should be the first event after assembly time.Note that when several
doFirst(Runnable)operators are used anywhere in a chain of operators, their order of execution is reversed compared to the declaration order (as subscribe signal flows backward, from the ultimate subscriber to the source publisher):Flux.just(1, 2) .doFirst(() -> System.out.println("three")) .doFirst(() -> System.out.println("two")) .doFirst(() -> System.out.println("one")); //would print one two threeIn case the
Runnablethrows an exception, said exception will be directly propagated to the subscribingSubscriberalong with a no-opSubscription, similarly to whaterror(Throwable)does. Otherwise, after the handler has executed, theSubscriberis directly subscribed to the original sourceFlux(this).This side-effect method provides stronger first guarantees compared to
doOnSubscribe(Consumer), which is triggered once theSubscriptionhas been set up and passed to theSubscriber. -
doFinally
Add behavior (side-effect) triggered after theFluxterminates for any reason, including cancellation. The terminating event (SignalType.ON_COMPLETE,SignalType.ON_ERRORandSignalType.CANCEL) is passed to the consumer, which is executed after the signal has been passed downstream.Note that the fact that the signal is propagated downstream before the callback is executed means that several doFinally in a row will be executed in reverse order. If you want to assert the execution of the callback please keep in mind that the Flux will complete before it is executed, so its effect might not be visible immediately after eg. a
blockLast().- Parameters:
onFinally- the callback to execute after a terminal signal (complete, error or cancel)- Returns:
- an observed
Flux
-
elapsed
Map thisFluxintoTuple2<Long, T>of timemillis and source data. The timemillis corresponds to the elapsed time between each signal as measured by theparallelscheduler. First duration is measured between the subscription and the first element.- Returns:
- a new
Fluxthat emits a tuple of time elapsed in milliseconds and matching data - See Also:
-
elapsed
Map thisFluxintoTuple2<Long, T>of timemillis and source data. The timemillis corresponds to the elapsed time between each signal as measured by the providedScheduler. First duration is measured between the subscription and the first element.- Parameters:
scheduler- aSchedulerinstance toread time from- Returns:
- a new
Fluxthat emits tuples of time elapsed in milliseconds and matching data - See Also:
-
elementAt
Emit only the element at the given index position orIndexOutOfBoundsExceptionif the sequence is shorter.Discard Support: This operator discards elements that appear before the requested index.
- Parameters:
index- zero-based index of the only item to emit- Returns:
- a
Monoof the item at the specified zero-based index
-
elementAt
Emit only the element at the given index position or fall back to a default value if the sequence is shorter.Discard Support: This operator discards elements that appear before the requested index.
- Parameters:
index- zero-based index of the only item to emitdefaultValue- a default value to emit if the sequence is shorter- Returns:
- a
Monoof the item at the specified zero-based index or a default value
-
expandDeep
public final Flux<T> expandDeep(Function<? super T, ? extends Publisher<? extends T>> expander, int capacityHint) Recursively expand elements into a graph and emit all the resulting element, in a depth-first traversal order.That is: emit one value from this
Flux, expand it and emit the first value at this first level of recursion, and so on... When no more recursion is possible, backtrack to the previous level and re-apply the strategy.For example, given the hierarchical structure
A - AA - aa1 B - BB - bb1ExpandsFlux.just(A, B)intoA AA aa1 B BB bb1
-
expandDeep
Recursively expand elements into a graph and emit all the resulting element, in a depth-first traversal order.That is: emit one value from this
Flux, expand it and emit the first value at this first level of recursion, and so on... When no more recursion is possible, backtrack to the previous level and re-apply the strategy.For example, given the hierarchical structure
A - AA - aa1 B - BB - bb1ExpandsFlux.just(A, B)intoA AA aa1 B BB bb1
-
expand
public final Flux<T> expand(Function<? super T, ? extends Publisher<? extends T>> expander, int capacityHint) Recursively expand elements into a graph and emit all the resulting element using a breadth-first traversal strategy.That is: emit the values from this
Fluxfirst, then expand each at a first level of recursion and emit all of the resulting values, then expand all of these at a second level and so on.For example, given the hierarchical structure
A - AA - aa1 B - BB - bb1ExpandsFlux.just(A, B)intoA B AA BB aa1 bb1
-
expand
Recursively expand elements into a graph and emit all the resulting element using a breadth-first traversal strategy.That is: emit the values from this
Fluxfirst, then expand each at a first level of recursion and emit all of the resulting values, then expand all of these at a second level and so on..For example, given the hierarchical structure
A - AA - aa1 B - BB - bb1ExpandsFlux.just(A, B)intoA B AA BB aa1 bb1
-
filter
Evaluate each source value against the givenPredicate. If the predicate test succeeds, the value is emitted. If the predicate test fails, the value is ignored and a request of 1 is made upstream.Discard Support: This operator discards elements that do not match the filter. It also discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.
Error Mode Support: This operator supports
resuming on errors(including when fusion is enabled). Exceptions thrown by the predicate are considered as if the predicate returned false: they cause the source value to be dropped and a new element (request(1)) being requested from upstream. -
filterWhen
Test each value emitted by thisFluxasynchronously using a generatedPublisher<Boolean>test. A value is replayed if the first item emitted by its corresponding test is true. It is dropped if its test is either empty or its first emitted value is false.Note that only the first value of the test publisher is considered, and unless it is a
Mono, test will be cancelled after receiving that first value. Test publishers are generated and subscribed to in sequence.Discard Support: This operator discards elements that do not match the filter. It also discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.
-
filterWhen
public final Flux<T> filterWhen(Function<? super T, ? extends Publisher<Boolean>> asyncPredicate, int bufferSize) Test each value emitted by thisFluxasynchronously using a generatedPublisher<Boolean>test. A value is replayed if the first item emitted by its corresponding test is true. It is dropped if its test is either empty or its first emitted value is false.Note that only the first value of the test publisher is considered, and unless it is a
Mono, test will be cancelled after receiving that first value. Test publishers are generated and subscribed to in sequence.Discard Support: This operator discards elements that do not match the filter. It also discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.
- Parameters:
asyncPredicate- the function generating aPublisherofBooleanfor each value, to filter the Flux withbufferSize- the maximum expected number of values to hold pending a result of their respective asynchronous predicates, rounded to the next power of two. This is capped depending on the size of the heap and the JVM limits, so be careful with large values (although eg. 65536 should still be fine). Also serves as the initial request size for the source.- Returns:
- a filtered
Flux
-
flatMap
Transform the elements emitted by thisFluxasynchronously into Publishers, then flatten these inner publishers into a singleFluxthrough merging, which allow them to interleave.There are three dimensions to this operator that can be compared with
flatMapSequentialandconcatMap:- Generation of inners and subscription: this operator is eagerly subscribing to its inners.
- Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner element are flattened as they arrive.
- Interleaving: this operator lets values from different inners interleave (similar to merging the inner sequences).
Discard Support: This operator discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.
Error Mode Support: This operator supports
resuming on errorsin the mapperFunction. Exceptions thrown by the mapper then behave as if it had mapped the value to an empty publisher. If the mapper does map to a scalar publisher (an optimization in which the value can be resolved immediately without subscribing to the publisher, e.g. aMono.fromCallable(Callable)) but said publisher throws, this can be resumed from in the same manner. -
flatMap
public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency) Transform the elements emitted by thisFluxasynchronously into Publishers, then flatten these inner publishers into a singleFluxthrough merging, which allow them to interleave.There are three dimensions to this operator that can be compared with
flatMapSequentialandconcatMap:- Generation of inners and subscription: this operator is eagerly subscribing to its inners.
- Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner element are flattened as they arrive.
- Interleaving: this operator lets values from different inners interleave (similar to merging the inner sequences).
Publishercan be subscribed to and merged in parallel. In turn, that argument shows the size of the firstSubscription.request(long)to the upstream.Discard Support: This operator discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.
Error Mode Support: This operator supports
resuming on errorsin the mapperFunction. Exceptions thrown by the mapper then behave as if it had mapped the value to an empty publisher. If the mapper does map to a scalar publisher (an optimization in which the value can be resolved immediately without subscribing to the publisher, e.g. aMono.fromCallable(Callable)) but said publisher throws, this can be resumed from in the same manner. -
flatMap
public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency, int prefetch) Transform the elements emitted by thisFluxasynchronously into Publishers, then flatten these inner publishers into a singleFluxthrough merging, which allow them to interleave.There are three dimensions to this operator that can be compared with
flatMapSequentialandconcatMap:- Generation of inners and subscription: this operator is eagerly subscribing to its inners.
- Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner element are flattened as they arrive.
- Interleaving: this operator lets values from different inners interleave (similar to merging the inner sequences).
Publishercan be subscribed to and merged in parallel. In turn, that argument shows the size of the firstSubscription.request(long)to the upstream. The prefetch argument allows to give an arbitrary prefetch size to the mergedPublisher(in other words prefetch size means the size of the firstSubscription.request(long)to the mergedPublisher).Discard Support: This operator discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.
Error Mode Support: This operator supports
resuming on errorsin the mapperFunction. Exceptions thrown by the mapper then behave as if it had mapped the value to an empty publisher. If the mapper does map to a scalar publisher (an optimization in which the value can be resolved immediately without subscribing to the publisher, e.g. aMono.fromCallable(Callable)) but said publisher throws, this can be resumed from in the same manner.- Type Parameters:
V- the merged output sequence type- Parameters:
mapper- theFunctionto transform input sequence into N sequencesPublisherconcurrency- the maximum number of in-flight inner sequencesprefetch- the maximum in-flight elements from each innerPublishersequence- Returns:
- a merged
Flux
-
flatMapDelayError
public final <V> Flux<V> flatMapDelayError(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency, int prefetch) Transform the elements emitted by thisFluxasynchronously into Publishers, then flatten these inner publishers into a singleFluxthrough merging, which allow them to interleave.There are three dimensions to this operator that can be compared with
flatMapSequentialandconcatMap:- Generation of inners and subscription: this operator is eagerly subscribing to its inners.
- Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner element are flattened as they arrive.
- Interleaving: this operator lets values from different inners interleave (similar to merging the inner sequences).
Publishercan be subscribed to and merged in parallel. The prefetch argument allows to give an arbitrary prefetch size to the mergedPublisher. This variant will delay any error until after the rest of the flatMap backlog has been processed.Discard Support: This operator discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.
Error Mode Support: This operator supports
resuming on errorsin the mapperFunction. Exceptions thrown by the mapper then behave as if it had mapped the value to an empty publisher. If the mapper does map to a scalar publisher (an optimization in which the value can be resolved immediately without subscribing to the publisher, e.g. aMono.fromCallable(Callable)) but said publisher throws, this can be resumed from in the same manner.- Type Parameters:
V- the merged output sequence type- Parameters:
mapper- theFunctionto transform input sequence into N sequencesPublisherconcurrency- the maximum number of in-flight inner sequencesprefetch- the maximum in-flight elements from each innerPublishersequence- Returns:
- a merged
Flux
-
flatMap
public final <R> Flux<R> flatMap(@Nullable Function<? super T, ? extends Publisher<? extends R>> mapperOnNext, @Nullable Function<? super Throwable, ? extends Publisher<? extends R>> mapperOnError, @Nullable Supplier<? extends Publisher<? extends R>> mapperOnComplete) Transform the signals emitted by thisFluxasynchronously into Publishers, then flatten these inner publishers into a singleFluxthrough merging, which allow them to interleave. Note that at least one of the signal mappers must be provided, and all provided mappers must produce a publisher.There are three dimensions to this operator that can be compared with
flatMapSequentialandconcatMap:- Generation of inners and subscription: this operator is eagerly subscribing to its inners.
- Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner element are flattened as they arrive.
- Interleaving: this operator lets values from different inners interleave (similar to merging the inner sequences).
OnError will be transformed into completion signal after its mapping callback has been applied.
- Type Parameters:
R- the outputPublishertype target- Parameters:
mapperOnNext- theFunctionto call on next data and returning a sequence to merge. Use null to ignore (provided at least one other mapper is specified).mapperOnError- theFunctionto call on error signal and returning a sequence to merge. Use null to ignore (provided at least one other mapper is specified).mapperOnComplete- theFunctionto call on complete signal and returning a sequence to merge. Use null to ignore (provided at least one other mapper is specified).- Returns:
- a new
Flux
-
flatMapIterable
public final <R> Flux<R> flatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper) Transform the items emitted by thisFluxintoIterable, then flatten the elements from those by merging them into a singleFlux. For each iterable,Iterable.iterator()will be called at least once and at most twice.This operator inspects each
Iterable'sSpliteratorto assess if the iteration can be guaranteed to be finite (seeOperators.onDiscardMultiple(Iterator, boolean, Context)). Since the default Spliterator wraps the Iterator we can have twoIterable.iterator()calls per iterable. This second invocation is skipped on aCollectionhowever, a type which is assumed to be always finite.Note that unlike
flatMap(Function)andconcatMap(Function), with Iterable there is no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially. ThusflatMapIterableandconcatMapIterableare equivalent offered as a discoverability improvement for users that explore the API with the concat vs flatMap expectation.Discard Support: Upon cancellation, this operator discards
Telements it prefetched and, in some cases, attempts to discard remainder of the currently processedIterable(if it can safely ensure the iterator is finite). Note that this means eachIterable'sIterable.iterator()method could be invoked twice.Error Mode Support: This operator supports
resuming on errors(including when fusion is enabled). Exceptions thrown by the consumer are passed to theonErrorContinue(BiConsumer)error consumer (the value consumer is not invoked, as the source element will be part of the sequence). The onNext signal is then propagated as normal. -
flatMapIterable
public final <R> Flux<R> flatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper, int prefetch) Transform the items emitted by thisFluxintoIterable, then flatten the emissions from those by merging them into a singleFlux. The prefetch argument allows to give an arbitrary prefetch size to the upstream source. For each iterable,Iterable.iterator()will be called at least once and at most twice.This operator inspects each
Iterable'sSpliteratorto assess if the iteration can be guaranteed to be finite (seeOperators.onDiscardMultiple(Iterator, boolean, Context)). Since the default Spliterator wraps the Iterator we can have twoIterable.iterator()calls per iterable. This second invocation is skipped on aCollectionhowever, a type which is assumed to be always finite.Note that unlike
flatMap(Function)andconcatMap(Function), with Iterable there is no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially. ThusflatMapIterableandconcatMapIterableare equivalent offered as a discoverability improvement for users that explore the API with the concat vs flatMap expectation.Discard Support: Upon cancellation, this operator discards
Telements it prefetched and, in some cases, attempts to discard remainder of the currently processedIterable(if it can safely ensure the iterator is finite). Note that this means eachIterable'sIterable.iterator()method could be invoked twice.Error Mode Support: This operator supports
resuming on errors(including when fusion is enabled). Exceptions thrown by the consumer are passed to theonErrorContinue(BiConsumer)error consumer (the value consumer is not invoked, as the source element will be part of the sequence). The onNext signal is then propagated as normal.- Type Parameters:
R- the merged output sequence type- Parameters:
mapper- theFunctionto transform input sequence into NIterableprefetch- the number of values to request from the source upon subscription, to be transformed toIterable- Returns:
- a concatenation of the values from the Iterables obtained from each element in this
Flux
-
flatMapSequential
public final <R> Flux<R> flatMapSequential(Function<? super T, ? extends Publisher<? extends R>> mapper) Transform the elements emitted by thisFluxasynchronously into Publishers, then flatten these inner publishers into a singleFlux, but merge them in the order of their source element.There are three dimensions to this operator that can be compared with
flatMapandconcatMap:- Generation of inners and subscription: this operator is eagerly subscribing to its inners (like flatMap).
- Ordering of the flattened values: this operator queues elements from late inners until all elements from earlier inners have been emitted, thus emitting inner sequences as a whole, in an order that matches their source's order.
- Interleaving: this operator does not let values from different inners interleave (similar looking result to concatMap, but due to queueing of values that would have been interleaved otherwise).
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher. However, if such an early inner takes more time to complete than subsequent faster inners, the data from these faster inners will be queued until the earlier inner completes, so as to maintain source ordering. -
flatMapSequential
public final <R> Flux<R> flatMapSequential(Function<? super T, ? extends Publisher<? extends R>> mapper, int maxConcurrency) Transform the elements emitted by thisFluxasynchronously into Publishers, then flatten these inner publishers into a singleFlux, but merge them in the order of their source element.There are three dimensions to this operator that can be compared with
flatMapandconcatMap:- Generation of inners and subscription: this operator is eagerly subscribing to its inners (like flatMap).
- Ordering of the flattened values: this operator queues elements from late inners until all elements from earlier inners have been emitted, thus emitting inner sequences as a whole, in an order that matches their source's order.
- Interleaving: this operator does not let values from different inners interleave (similar looking result to concatMap, but due to queueing of values that would have been interleaved otherwise).
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher. However, if such an early inner takes more time to complete than subsequent faster inners, the data from these faster inners will be queued until the earlier inner completes, so as to maintain source ordering.The concurrency argument allows to control how many merged
Publishercan happen in parallel. -
flatMapSequential
public final <R> Flux<R> flatMapSequential(Function<? super T, ? extends Publisher<? extends R>> mapper, int maxConcurrency, int prefetch) Transform the elements emitted by thisFluxasynchronously into Publishers, then flatten these inner publishers into a singleFlux, but merge them in the order of their source element.There are three dimensions to this operator that can be compared with
flatMapandconcatMap:- Generation of inners and subscription: this operator is eagerly subscribing to its inners (like flatMap).
- Ordering of the flattened values: this operator queues elements from late inners until all elements from earlier inners have been emitted, thus emitting inner sequences as a whole, in an order that matches their source's order.
- Interleaving: this operator does not let values from different inners interleave (similar looking result to concatMap, but due to queueing of values that would have been interleaved otherwise).
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher. However, if such an early inner takes more time to complete than subsequent faster inners, the data from these faster inners will be queued until the earlier inner completes, so as to maintain source ordering.The concurrency argument allows to control how many merged
Publishercan happen in parallel. The prefetch argument allows to give an arbitrary prefetch size to the mergedPublisher.- Type Parameters:
R- the merged output sequence type- Parameters:
mapper- theFunctionto transform input sequence into N sequencesPublishermaxConcurrency- the maximum number of in-flight inner sequencesprefetch- the maximum in-flight elements from each innerPublishersequence- Returns:
- a merged
Flux, subscribing early but keeping the original ordering
-
flatMapSequentialDelayError
public final <R> Flux<R> flatMapSequentialDelayError(Function<? super T, ? extends Publisher<? extends R>> mapper, int maxConcurrency, int prefetch) Transform the elements emitted by thisFluxasynchronously into Publishers, then flatten these inner publishers into a singleFlux, but merge them in the order of their source element.There are three dimensions to this operator that can be compared with
flatMapandconcatMap:- Generation of inners and subscription: this operator is eagerly subscribing to its inners (like flatMap).
- Ordering of the flattened values: this operator queues elements from late inners until all elements from earlier inners have been emitted, thus emitting inner sequences as a whole, in an order that matches their source's order.
- Interleaving: this operator does not let values from different inners interleave (similar looking result to concatMap, but due to queueing of values that would have been interleaved otherwise).
That is to say, whenever a source element is emitted it is transformed to an inner
Publisher. However, if such an early inner takes more time to complete than subsequent faster inners, the data from these faster inners will be queued until the earlier inner completes, so as to maintain source ordering.The concurrency argument allows to control how many merged
Publishercan happen in parallel. The prefetch argument allows to give an arbitrary prefetch size to the mergedPublisher. This variant will delay any error until after the rest of the flatMap backlog has been processed.- Type Parameters:
R- the merged output sequence type- Parameters:
mapper- theFunctionto transform input sequence into N sequencesPublishermaxConcurrency- the maximum number of in-flight inner sequencesprefetch- the maximum in-flight elements from each innerPublishersequence- Returns:
- a merged
Flux, subscribing early but keeping the original ordering
-
getPrefetch
public int getPrefetch()The prefetch configuration of theFlux- Returns:
- the prefetch configuration of the
Flux, -1 if unspecified
-
groupBy
Divide this sequence into dynamically createdFlux(or groups) for each unique key, as produced by the provided keyMapperFunction. Note that groupBy works best with a low cardinality of groups, so chose your keyMapper function accordingly.The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a
flatMapwith amaxConcurrencyparameter that is set too low).To avoid deadlock, the concurrency of the subscriber to groupBy should be greater than or equal to the number of groups created. In that case every group has its own subscriber and progress can be made, even when the data publish pattern is arbitrary. Otherwise, when the number of groups exceeds downstream concurrency, the subscribers should be designed with caution, because if the consumption pattern doesn't match what can be accommodated in its producer buffer, the process may enter deadlock due to backpressure.
Note that groups are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a specific group more than once: groups are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.- Type Parameters:
K- the key type extracted from each value of this sequence- Parameters:
keyMapper- the key mappingFunctionthat evaluates an incoming data and returns a key.- Returns:
- a
FluxofGroupedFluxgrouped sequences
-
groupBy
public final <K> Flux<GroupedFlux<K,T>> groupBy(Function<? super T, ? extends K> keyMapper, int prefetch) Divide this sequence into dynamically createdFlux(or groups) for each unique key, as produced by the provided keyMapperFunction. Note that groupBy works best with a low cardinality of groups, so chose your keyMapper function accordingly.The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a
flatMapwith amaxConcurrencyparameter that is set too low).To avoid deadlock, the concurrency of the subscriber to groupBy should be greater than or equal to the number of groups created. In that case every group has its own subscriber and progress can be made, even when the data publish pattern is arbitrary. Otherwise, when the number of groups exceeds downstream concurrency, the subscribers should be designed with caution, because if the consumption pattern doesn't match what can be accommodated in its producer buffer, the process may enter deadlock due to backpressure.
Note that groups are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a specific group more than once: groups are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.- Type Parameters:
K- the key type extracted from each value of this sequence- Parameters:
keyMapper- the key mappingFunctionthat evaluates an incoming data and returns a key.prefetch- the number of values to prefetch from the source- Returns:
- a
FluxofGroupedFluxgrouped sequences
-
groupBy
public final <K,V> Flux<GroupedFlux<K,V>> groupBy(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends V> valueMapper) Divide this sequence into dynamically createdFlux(or groups) for each unique key, as produced by the provided keyMapperFunction. Source elements are also mapped to a different value using thevalueMapper. Note that groupBy works best with a low cardinality of groups, so chose your keyMapper function accordingly.The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a
flatMapwith amaxConcurrencyparameter that is set too low).To avoid deadlock, the concurrency of the subscriber to groupBy should be greater than or equal to the number of groups created. In that case every group has its own subscriber and progress can be made, even when the data publish pattern is arbitrary. Otherwise, when the number of groups exceeds downstream concurrency, the subscribers should be designed with caution, because if the consumption pattern doesn't match what can be accommodated in its producer buffer, the process may enter deadlock due to backpressure.
Note that groups are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a specific group more than once: groups are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.- Type Parameters:
K- the key type extracted from each value of this sequenceV- the value type extracted from each value of this sequence- Parameters:
keyMapper- the key mapping function that evaluates an incoming data and returns a key.valueMapper- the value mapping function that evaluates which data to extract for re-routing.- Returns:
- a
FluxofGroupedFluxgrouped sequences
-
groupBy
public final <K,V> Flux<GroupedFlux<K,V>> groupBy(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends V> valueMapper, int prefetch) Divide this sequence into dynamically createdFlux(or groups) for each unique key, as produced by the provided keyMapperFunction. Source elements are also mapped to a different value using thevalueMapper. Note that groupBy works best with a low cardinality of groups, so chose your keyMapper function accordingly.The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a
flatMapwith amaxConcurrencyparameter that is set too low).To avoid deadlock, the concurrency of the subscriber to groupBy should be greater than or equal to the number of groups created. In that case every group has its own subscriber and progress can be made, even when the data publish pattern is arbitrary. Otherwise, when the number of groups exceeds downstream concurrency, the subscribers should be designed with caution, because if the consumption pattern doesn't match what can be accommodated in its producer buffer, the process may enter deadlock due to backpressure.
Note that groups are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a specific group more than once: groups are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.- Type Parameters:
K- the key type extracted from each value of this sequenceV- the value type extracted from each value of this sequence- Parameters:
keyMapper- the key mapping function that evaluates an incoming data and returns a key.valueMapper- the value mapping function that evaluates which data to extract for re-routing.prefetch- the number of values to prefetch from the source- Returns:
- a
FluxofGroupedFluxgrouped sequences
-
groupJoin
public final <TRight,TLeftEnd, Flux<R> groupJoinTRightEnd, R> (Publisher<? extends TRight> other, Function<? super T, ? extends Publisher<TLeftEnd>> leftEnd, Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd, BiFunction<? super T, ? super Flux<TRight>, ? extends R> resultSelector) Map values from two Publishers into time windows and emit combination of values in case their windows overlap. The emitted elements are obtained by passing the value from thisFluxand aFluxemitting the value from the otherPublisherto aBiFunction.There are no guarantees in what order the items get combined when multiple items from one or both source Publishers overlap.
Unlike
join(org.reactivestreams.Publisher<? extends TRight>, java.util.function.Function<? super T, ? extends org.reactivestreams.Publisher<TLeftEnd>>, java.util.function.Function<? super TRight, ? extends org.reactivestreams.Publisher<TRightEnd>>, java.util.function.BiFunction<? super T, ? super TRight, ? extends R>), items from the secondPublisherwill be provided as aFluxto theresultSelector.- Type Parameters:
TRight- the type of the elements from the rightPublisherTLeftEnd- the type for thisFluxwindow signalsTRightEnd- the type for the rightPublisherwindow signalsR- the combined result type- Parameters:
other- the otherPublisherto correlate items withleftEnd- a function that returns a Publisher whose emissions indicate the time window for the source value to be consideredrightEnd- a function that returns a Publisher whose emissions indicate the time window for therightPublisher value to be consideredresultSelector- a function that takes an item emitted by thisFluxand aFluxrepresentation of the overlapping item from the otherPublisherand returns the value to be emitted by the resultingFlux- Returns:
- a joining
Flux - See Also:
-
handle
Handle the items emitted by thisFluxby calling a biconsumer with the output sink for each onNext. At most oneSynchronousSink.next(Object)call must be performed and/or 0 or 1SynchronousSink.error(Throwable)orSynchronousSink.complete().Error Mode Support: This operator supports
resuming on errors(including when fusion is enabled) when theBiConsumerthrows an exception or if an error is signaled explicitly viaSynchronousSink.error(Throwable).When the context-propagation library is available at runtime and the downstream
ContextViewis not empty, this operator implicitly uses the library to restore thread locals around the handlerBiConsumer. Typically, this would be done in conjunction with the use ofcontextCapture()operator down the chain.- Type Parameters:
R- the transformed type- Parameters:
handler- the handlingBiConsumer- Returns:
- a transformed
Flux
-
hasElement
Emit a single boolean true if any of the elements of thisFluxsequence is equal to the provided value.The implementation uses short-circuit logic and completes with true if an element matches the value.
- Parameters:
value- constant compared to incoming signals- Returns:
- a new
Fluxwithtrueif any element is equal to a given value andfalseotherwise
-
hasElements
Emit a single boolean true if thisFluxsequence has at least one element.The implementation uses short-circuit logic and completes with true on onNext.
- Returns:
- a new
Monowithtrueif any value is emitted andfalseotherwise
-
hide
Hides the identities of thisFluxinstance.The main purpose of this operator is to prevent certain identity-based optimizations from happening, mostly for diagnostic purposes.
- Returns:
- a new
FluxpreventingPublisher/Subscriptionbased Reactor optimizations
-
index
Keep information about the order in which source values were received by indexing them with a 0-based incrementing long, returning aFluxofTuple2<(index, value)>.- Returns:
- an indexed
Fluxwith each source value combined with its 0-based index.
-
index
Keep information about the order in which source values were received by indexing them internally with a 0-based incrementing long then combining this information with the source value into aIusing the providedBiFunction, returning aFlux<I>.Typical usage would be to produce a
Tuple2similar toindex(), but 1-based instead of 0-based:index((i, v) -> Tuples.of(i+1, v))- Parameters:
indexMapper- theBiFunctionto use to combine elements and their index.- Returns:
- an indexed
Fluxwith each source value combined with its computed index.
-
ignoreElements
Ignores onNext signals (dropping them) and only propagate termination events.Discard Support: This operator discards the upstream's elements.
-
join
public final <TRight,TLeftEnd, Flux<R> joinTRightEnd, R> (Publisher<? extends TRight> other, Function<? super T, ? extends Publisher<TLeftEnd>> leftEnd, Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd, BiFunction<? super T, ? super TRight, ? extends R> resultSelector) Combine values from two Publishers in case their windows overlap. Each incoming value triggers a creation of a new Publisher via the givenFunction. If the Publisher signals its first value or completes, the time windows for the original element is immediately closed. The emitted elements are obtained by passing the values from thisFluxand the otherPublisherto aBiFunction.There are no guarantees in what order the items get combined when multiple items from one or both source Publishers overlap.
- Type Parameters:
TRight- the type of the elements from the rightPublisherTLeftEnd- the type for thisFluxwindow signalsTRightEnd- the type for the rightPublisherwindow signalsR- the combined result type- Parameters:
other- the otherPublisherto correlate items withleftEnd- a function that returns a Publisher whose emissions indicate the time window for the source value to be consideredrightEnd- a function that returns a Publisher whose emissions indicate the time window for therightPublisher value to be consideredresultSelector- a function that takes an item emitted by each Publisher and returns the value to be emitted by the resultingFlux- Returns:
- a joining
Flux - See Also:
-
last
Emit the last element observed before complete signal as aMono, or emitNoSuchElementExceptionerror if the source was empty. For a passive version usetakeLast(int)Discard Support: This operator discards elements before the last.
-
last
Emit the last element observed before complete signal as aMono, or emit thedefaultValueif the source was empty. For a passive version usetakeLast(int)Discard Support: This operator discards elements before the last.
-
limitRate
Ensure that backpressure signals from downstream subscribers are split into batches capped at the providedprefetchRatewhen propagated upstream, effectively rate limiting the upstreamPublisher.Note that this is an upper bound, and that this operator uses a prefetch-and-replenish strategy, requesting a replenishing amount when 75% of the prefetch amount has been emitted.
Typically used for scenarios where consumer(s) request a large amount of data (eg.
Long.MAX_VALUE) but the data source behaves better or can be optimized with smaller requests (eg. database paging, etc...). All data is still processed, unlike withtake(long)which will cap the grand total request amount.Equivalent to
flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe(). Note that theprefetchRateis an upper bound, and that this operator uses a prefetch-and-replenish strategy, requesting a replenishing amount when 75% of the prefetch amount has been emitted.- Parameters:
prefetchRate- the limit to apply to downstream's backpressure- Returns:
- a
Fluxlimiting downstream's backpressure - See Also:
-
limitRate
Ensure that backpressure signals from downstream subscribers are split into batches capped at the providedhighTidefirst, then replenishing at the providedlowTide, effectively rate limiting the upstreamPublisher.Note that this is an upper bound, and that this operator uses a prefetch-and-replenish strategy, requesting a replenishing amount when 75% of the prefetch amount has been emitted.
Typically used for scenarios where consumer(s) request a large amount of data (eg.
Long.MAX_VALUE) but the data source behaves better or can be optimized with smaller requests (eg. database paging, etc...). All data is still processed, unlike withtake(long)which will cap the grand total request amount.Similar to
flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe(), except with a customized "low tide" instead of the default 75%. Note that the smaller the lowTide is, the higher the potential for concurrency between request and data production. And thus the more extraneous replenishment requests this operator could make. For example, for a global downstream request of 14, with a highTide of 10 and a lowTide of 2, the operator would perform low tide requests (request(2)) seven times in a row, whereas with the default lowTide of 8 it would only perform one low tide request (request(8)). Using alowTideequal tohighTidereverts to the default 75% strategy, while using alowTideof 0 disables the lowTide, resulting in all requests strictly adhering to the highTide.- Parameters:
highTide- the initial request amountlowTide- the subsequent (or replenishing) request amount, 0 to disable early replenishing, highTide to revert to a 75% replenish strategy.- Returns:
- a
Fluxlimiting downstream's backpressure and customizing the replenishment request amount - See Also:
-
limitRequest
Deprecated.replace withtake(n, true)in 3.4.x, thentake(long)in 3.5.0. To be removed in 3.6.0 at the earliest. See https://github.com/reactor/reactor-core/issues/2339Take only the first N values from thisFlux, if available. Furthermore, ensure that the total amount requested upstream is capped atn. If n is zero, the source isn't even subscribed to and the operator completes immediately upon subscription.Backpressure signals from downstream subscribers are smaller than the cap are propagated as is, but if they would cause the total requested amount to go over the cap, they are reduced to the minimum value that doesn't go over.
As a result, this operator never let the upstream produce more elements than the cap. Typically useful for cases where a race between request and cancellation can lead the upstream to producing a lot of extraneous data, and such a production is undesirable (e.g. a source that would send the extraneous data over the network).
- Parameters:
n- the number of elements to emit from this flux, which is also the backpressure cap for all of downstream's request- Returns:
- a
Fluxofnelements from the source, that requests AT MOSTnfrom upstream in total. - See Also:
-
log
Observe all Reactive Streams signals and trace them usingLoggersupport. Default will useLevel.INFOandjava.util.logging. If SLF4J is available, it will be used instead.The default log category will be "reactor.Flux.", followed by a suffix generated from the source operator, e.g. "reactor.Flux.Map".
- Returns:
- a new
Fluxthat logs signals
-
log
Observe all Reactive Streams signals and trace them usingLoggersupport. Default will useLevel.INFOandjava.util.logging. If SLF4J is available, it will be used instead.- Parameters:
category- to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will be added, e.g. "reactor.Flux.Map".- Returns:
- a new
Fluxthat logs signals
-
log
Observe Reactive Streams signals matching the passed filteroptionsand trace them usingLoggersupport. Default will useLevel.INFOandjava.util.logging. If SLF4J is available, it will be used instead.Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
flux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)- Parameters:
category- to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will be added, e.g. "reactor.Flux.Map".level- theLevelto enforce for this tracing Flux (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account)options- a varargSignalTypeoption to filter log messages- Returns:
- a new
Fluxthat logs signals
-
log
public final Flux<T> log(@Nullable String category, Level level, boolean showOperatorLine, SignalType... options) Observe Reactive Streams signals matching the passed filteroptionsand trace them usingLoggersupport. Default will useLevel.INFOandjava.util.logging. If SLF4J is available, it will be used instead.Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
flux.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)- Parameters:
category- to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will be added, e.g. "reactor.Flux.Map".level- theLevelto enforce for this tracing Flux (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account)showOperatorLine- capture the current stack to display operator class/line number.options- a varargSignalTypeoption to filter log messages- Returns:
- a new
Fluxthat logs signals
-
log
Observe Reactive Streams signals matching the passed filteroptionsand trace them using a specific user-providedLogger, atLevel.INFOlevel. -
log
public final Flux<T> log(Logger logger, Level level, boolean showOperatorLine, SignalType... options) Observe Reactive Streams signals matching the passed filteroptionsand trace them using a specific user-providedLogger, at the givenLevel.Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
flux.log(myCustomLogger, Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)- Parameters:
logger- theLoggerto use, instead of resolving one through a category.level- theLevelto enforce for this tracing Flux (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account)showOperatorLine- capture the current stack to display operator class/line number (default in overload is false).options- a varargSignalTypeoption to filter log messages- Returns:
- a new
Fluxthat logs signals
-
map
Transform the items emitted by thisFluxby applying a synchronous function to each item.Error Mode Support: This operator supports
resuming on errors(including when fusion is enabled). Exceptions thrown by the mapper then cause the source value to be dropped and a new element (request(1)) being requested from upstream. -
mapNotNull
Transform the items emitted by thisFluxby applying a synchronous function to each item, which may producenullvalues. In that case, no value is emitted. This operator effectively behaves likemap(Function)followed byfilter(Predicate)althoughnullis not a supported value, so it can't be filtered out.Error Mode Support: This operator supports
resuming on errors(including when fusion is enabled). Exceptions thrown by the mapper then cause the source value to be dropped and a new element (request(1)) being requested from upstream. -
materialize
Transform incoming onNext, onError and onComplete signals intoSignalinstances, materializing these signals. Since the error is materialized as aSignal, the propagation will be stopped and onComplete will be emitted. Complete signal will first emit aSignal.complete()and then effectively complete the flux. All theseSignalhave aContextassociated to them. -
mergeOrderedWith
@Deprecated public final Flux<T> mergeOrderedWith(Publisher<? extends T> other, Comparator<? super T> otherComparator) Deprecated.UsemergeComparingWith(Publisher, Comparator)instead (with the caveat that it defaults to NOT delaying errors, unlike this operator). To be removed in 3.6.0 at the earliest.Merge data from thisFluxand aPublisherinto a reordered merge sequence, by picking the smallest value from each sequence as defined by a providedComparator. Note that subsequent calls are combined, and their comparators are in lexicographic order as defined byComparator.thenComparing(Comparator).The combination step is avoided if the two
Comparatorsareequal(which can easily be achieved by using the same reference, and is also always true ofComparator.naturalOrder()).Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
Note that it is delaying errors until all data is consumed.
- Parameters:
other- thePublisherto merge withotherComparator- theComparatorto use for merging- Returns:
- a new
Fluxthat compares latest values from the given publisher and this flux, using the smallest value and replenishing the source that produced it
-
mergeComparingWith
public final Flux<T> mergeComparingWith(Publisher<? extends T> other, Comparator<? super T> otherComparator) Merge data from thisFluxand aPublisherinto a reordered merge sequence, by picking the smallest value from each sequence as defined by a providedComparator. Note that subsequent calls are combined, and their comparators are in lexicographic order as defined byComparator.thenComparing(Comparator).The combination step is avoided if the two
Comparatorsareequal(which can easily be achieved by using the same reference, and is also always true ofComparator.naturalOrder()).Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
mergeComparingWith doesn't delay errors by default, but it will inherit the delayError behavior of a mergeComparingDelayError directly above it.
- Parameters:
other- thePublisherto merge withotherComparator- theComparatorto use for merging- Returns:
- a new
Fluxthat compares latest values from the given publisher and this flux, using the smallest value and replenishing the source that produced it
-
mergeWith
Merge data from thisFluxand aPublisherinto an interleaved merged sequence. Unlikeconcat, inner sources are subscribed to eagerly.Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
-
metrics
Deprecated.Prefer using thetap(SignalListenerFactory)with theSignalListenerFactoryprovided by the new reactor-core-micrometer module. To be removed in 3.6.0 at the earliest.Activate metrics for this sequence, provided there is an instrumentation facade on the classpath (otherwise this method is a pure no-op).Metrics are gathered on
Subscriberevents, and it is recommended to alsoname(and optionallytag) the sequence.The name serves as a prefix in the reported metrics names. In case no name has been provided, the default name "reactor" will be applied.
The
MeterRegistryused by reactor can be configured viaMetrics.MicrometerConfiguration.useRegistry(MeterRegistry)prior to using this operator, the default beingMetrics.globalRegistry.- Returns:
- an instrumented
Flux - See Also:
-
name
Give a name to this sequence, which can be retrieved usingScannable.name()as long as this is the first reachableScannable.parents().The name is typically visible at assembly time by the
tap(SignalListenerFactory)operator, which could for example be configured with a metrics listener using the name as a prefix for meters' id.- Parameters:
name- a name for the sequence- Returns:
- the same sequence, but bearing a name
- See Also:
-
next
-
ofType
-
onBackpressureBuffer
Request an unbounded demand and push to the returnedFlux, or park the observed elements if not enough demand is requested downstream. Errors will be delayed until the buffer gets consumed.Discard Support: This operator discards the buffered overflow elements upon cancellation or error triggered by a data signal.
- Returns:
- a backpressured
Fluxthat buffers with unbounded capacity
-
onBackpressureBuffer
Request an unbounded demand and push to the returnedFlux, or park up tomaxSizeelements when not enough demand is requested downstream. The first element past this buffer to arrive out of sync with the downstream subscriber's demand (the "overflowing" element) immediately triggers an overflow error and cancels the source. TheFluxis going to terminate with an overflow error, but this error is delayed, which lets the subscriber make more requests for the content of the buffer.Discard Support: This operator discards the buffered overflow elements upon cancellation or error triggered by a data signal, as well as elements that are rejected by the buffer due to
maxSize.- Parameters:
maxSize- maximum number of elements overflowing request before the source is cancelled- Returns:
- a backpressured
Fluxthat buffers with bounded capacity
-
onBackpressureBuffer
Request an unbounded demand and push to the returnedFlux, or park up tomaxSizeelements when not enough demand is requested downstream. The first element past this buffer to arrive out of sync with the downstream subscriber's demand (the "overflowing" element) is immediately passed to aConsumerand the source is cancelled. TheFluxis going to terminate with an overflow error, but this error is delayed, which lets the subscriber make more requests for the content of the buffer.Note that should the cancelled source produce further overflowing elements, these would be passed to the
onNextDropped hook.Discard Support: This operator discards the buffered overflow elements upon cancellation or error triggered by a data signal, as well as elements that are rejected by the buffer due to
maxSize(even though they are passed to theonOverflowConsumerfirst).- Parameters:
maxSize- maximum number of elements overflowing request before callback is called and source is cancelledonOverflow- callback to invoke on overflow- Returns:
- a backpressured
Fluxthat buffers with a bounded capacity
-
onBackpressureBuffer
public final Flux<T> onBackpressureBuffer(int maxSize, BufferOverflowStrategy bufferOverflowStrategy) Request an unbounded demand and push to the returnedFlux, or park the observed elements if not enough demand is requested downstream, within amaxSizelimit. Over that limit, the overflow strategy is applied (seeBufferOverflowStrategy).Note that for the
ERRORstrategy, the overflow error will be delayed after the current backlog is consumed.Discard Support: This operator discards the buffered overflow elements upon cancellation or error triggered by a data signal, as well as elements that are rejected by the buffer due to
maxSize(even though they are passed to thebufferOverflowStrategyfirst).- Parameters:
maxSize- maximum buffer backlog size before overflow strategy is appliedbufferOverflowStrategy- strategy to apply to overflowing elements- Returns:
- a backpressured
Fluxthat buffers up to a capacity then applies an overflow strategy
-
onBackpressureBuffer
public final Flux<T> onBackpressureBuffer(int maxSize, Consumer<? super T> onBufferOverflow, BufferOverflowStrategy bufferOverflowStrategy) Request an unbounded demand and push to the returnedFlux, or park the observed elements if not enough demand is requested downstream, within amaxSizelimit. Over that limit, the overflow strategy is applied (seeBufferOverflowStrategy).A
Consumeris immediately invoked when there is an overflow, receiving the value that was discarded because of the overflow (which can be different from the latest element emitted by the source in case of aDROP_LATESTstrategy).Note that for the
ERRORstrategy, the overflow error will be delayed after the current backlog is consumed. The consumer is still invoked immediately.Discard Support: This operator discards the buffered overflow elements upon cancellation or error triggered by a data signal, as well as elements that are rejected by the buffer due to
maxSize(even though they are passed to theonOverflowConsumerAND thebufferOverflowStrategyfirst).- Parameters:
maxSize- maximum buffer backlog size before overflow callback is calledonBufferOverflow- callback to invoke on overflowbufferOverflowStrategy- strategy to apply to overflowing elements- Returns:
- a backpressured
Fluxthat buffers up to a capacity then applies an overflow strategy
-
onBackpressureBuffer
public final Flux<T> onBackpressureBuffer(Duration ttl, int maxSize, Consumer<? super T> onBufferEviction) Request an unbounded demand and push to the returnedFlux, or park the observed elements if not enough demand is requested downstream, within amaxSizelimit and for a maximumDurationofttl(as measured on theparallel Scheduler). Over that limit, oldest elements from the source are dropped.Elements evicted based on the TTL are passed to a cleanup
Consumer, which is also immediately invoked when there is an overflow.Discard Support: This operator discards its internal buffer of elements that overflow, after having applied the
onBufferEvictionhandler.- Parameters:
ttl- maximumDurationfor which an element is kept in the backlogmaxSize- maximum buffer backlog size before overflow callback is calledonBufferEviction- callback to invoke once TTL is reached or on overflow- Returns:
- a backpressured
Fluxthat buffers with a TTL and up to a capacity then applies an overflow strategy
-
onBackpressureBuffer
public final Flux<T> onBackpressureBuffer(Duration ttl, int maxSize, Consumer<? super T> onBufferEviction, Scheduler scheduler) Request an unbounded demand and push to the returnedFlux, or park the observed elements if not enough demand is requested downstream, within amaxSizelimit and for a maximumDurationofttl(as measured on the providedScheduler). Over that limit, oldest elements from the source are dropped.Elements evicted based on the TTL are passed to a cleanup
Consumer, which is also immediately invoked when there is an overflow.Discard Support: This operator discards its internal buffer of elements that overflow, after having applied the
onBufferEvictionhandler.- Parameters:
ttl- maximumDurationfor which an element is kept in the backlogmaxSize- maximum buffer backlog size before overflow callback is calledonBufferEviction- callback to invoke once TTL is reached or on overflowscheduler- the scheduler on which to run the timeout check- Returns:
- a backpressured
Fluxthat buffers with a TTL and up to a capacity then applies an overflow strategy
-
onBackpressureDrop
Request an unbounded demand and push to the returnedFlux, or drop the observed elements if not enough demand is requested downstream.Discard Support: This operator discards elements that it drops.
- Returns:
- a backpressured
Fluxthat drops overflowing elements
-
onBackpressureDrop
Request an unbounded demand and push to the returnedFlux, or drop and notify droppingConsumerwith the observed elements if not enough demand is requested downstream.Discard Support: This operator discards elements that it drops after having passed them to the provided
onDroppedhandler.- Parameters:
onDropped- the Consumer called when a value gets dropped due to lack of downstream requests- Returns:
- a backpressured
Fluxthat drops overflowing elements
-
onBackpressureError
Request an unbounded demand and push to the returnedFlux, or emit onError fomExceptions.failWithOverflow()if not enough demand is requested downstream.Discard Support: This operator discards elements that it drops, after having propagated the error.
- Returns:
- a backpressured
Fluxthat errors on overflowing elements
-
onBackpressureLatest
Request an unbounded demand and push to the returnedFlux, or only keep the most recent observed item if not enough demand is requested downstream.Discard Support: Each time a new element comes in (the new "latest"), this operator discards the previously retained element.
- Returns:
- a backpressured
Fluxthat will only keep a reference to the last observed item
-
onErrorComplete
Simply complete the sequence by replacing anonError signalwith anonComplete signal. All other signals are propagated as-is.- Returns:
- a new
Fluxfalling back on completion when an onError occurs - See Also:
-
onErrorComplete
Simply complete the sequence by replacing anonError signalwith anonComplete signalif the error matches the givenClass. All other signals, including non-matching onError, are propagated as-is.- Returns:
- a new
Fluxfalling back on completion when a matching error occurs - See Also:
-
onErrorComplete
Simply complete the sequence by replacing anonError signalwith anonComplete signalif the error matches the givenPredicate. All other signals, including non-matching onError, are propagated as-is.- Returns:
- a new
Fluxfalling back on completion when a matching error occurs - See Also:
-
onErrorContinue
Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements. The recovered error and associated value are notified via the providedBiConsumer. Alternatively, throwing from that biconsumer will propagate the thrown exception downstream in place of the original error, which is added as a suppressed exception to the new one.Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)
In most cases, you should instead handle the error inside the specific function which may cause it. Specifically, on each inner publisher you can use
doOnErrorto log the error, andonErrorResume(e -> Mono.empty())to drop erroneous elements:.flatMap(id -> repository.retrieveById(id) .doOnError(System.err::println) .onErrorResume(e -> Mono.empty()))This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.
- Parameters:
errorConsumer- aBiConsumerfed with errors matching the predicate and the value that triggered the error.- Returns:
- a
Fluxthat attempts to continue processing on errors.
-
onErrorContinue
public final <E extends Throwable> Flux<T> onErrorContinue(Class<E> type, BiConsumer<Throwable, Object> errorConsumer) Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements. Only errors matching the specifiedtypeare recovered from. The recovered error and associated value are notified via the providedBiConsumer. Alternatively, throwing from that biconsumer will propagate the thrown exception downstream in place of the original error, which is added as a suppressed exception to the new one.Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)
In most cases, you should instead handle the error inside the specific function which may cause it. Specifically, on each inner publisher you can use
doOnErrorto log the error, andonErrorResume(e -> Mono.empty())to drop erroneous elements:.flatMap(id -> repository.retrieveById(id) .doOnError(MyException.class, System.err::println) .onErrorResume(MyException.class, e -> Mono.empty()))This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.
- Parameters:
type- theClassofExceptionthat are resumed from.errorConsumer- aBiConsumerfed with errors matching theClassand the value that triggered the error.- Returns:
- a
Fluxthat attempts to continue processing on some errors.
-
onErrorContinue
public final <E extends Throwable> Flux<T> onErrorContinue(Predicate<E> errorPredicate, BiConsumer<Throwable, Object> errorConsumer) Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements. Only errors matching thePredicateare recovered from (note that this predicate can be applied several times and thus must be idempotent). The recovered error and associated value are notified via the providedBiConsumer. Alternatively, throwing from that biconsumer will propagate the thrown exception downstream in place of the original error, which is added as a suppressed exception to the new one.Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)
In most cases, you should instead handle the error inside the specific function which may cause it. Specifically, on each inner publisher you can use
doOnErrorto log the error, andonErrorResume(e -> Mono.empty())to drop erroneous elements:.flatMap(id -> repository.retrieveById(id) .doOnError(errorPredicate, System.err::println) .onErrorResume(errorPredicate, e -> Mono.empty()))This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.
- Parameters:
errorPredicate- aPredicateused to filter which errors should be resumed from. This MUST be idempotent, as it can be used several times.errorConsumer- aBiConsumerfed with errors matching the predicate and the value that triggered the error.- Returns:
- a
Fluxthat attempts to continue processing on some errors.
-
onErrorStop
If anonErrorContinue(BiConsumer)variant has been used downstream, reverts to the default 'STOP' mode where errors are terminal events upstream. It can be used for easier scoping of the on next failure strategy or to override the inherited strategy in a sub-stream (for example in a flatMap). It has no effect ifonErrorContinue(BiConsumer)has not been used downstream.- Returns:
- a
Fluxthat terminates on errors, even ifonErrorContinue(BiConsumer)was used downstream
-
onErrorMap
Transform any error emitted by thisFluxby synchronously applying a function to it. -
onErrorMap
public final <E extends Throwable> Flux<T> onErrorMap(Class<E> type, Function<? super E, ? extends Throwable> mapper) Transform an error emitted by thisFluxby synchronously applying a function to it if the error matches the given type. Otherwise let the error pass through. -
onErrorMap
public final Flux<T> onErrorMap(Predicate<? super Throwable> predicate, Function<? super Throwable, ? extends Throwable> mapper) Transform an error emitted by thisFluxby synchronously applying a function to it if the error matches the given predicate. Otherwise let the error pass through. -
onErrorResume
public final Flux<T> onErrorResume(Function<? super Throwable, ? extends Publisher<? extends T>> fallback) Subscribe to a returned fallback publisher when any error occurs, using a function to choose the fallback depending on the error. -
onErrorResume
public final <E extends Throwable> Flux<T> onErrorResume(Class<E> type, Function<? super E, ? extends Publisher<? extends T>> fallback) Subscribe to a fallback publisher when an error matching the given type occurs, using a function to choose the fallback depending on the error. -
onErrorResume
public final Flux<T> onErrorResume(Predicate<? super Throwable> predicate, Function<? super Throwable, ? extends Publisher<? extends T>> fallback) Subscribe to a fallback publisher when an error matching a given predicate occurs. -
onErrorReturn
Simply emit a captured fallback value when any error is observed on thisFlux.- Parameters:
fallbackValue- the value to emit if an error occurs- Returns:
- a new falling back
Flux - See Also:
-
onErrorReturn
Simply emit a captured fallback value when an error of the specified type is observed on thisFlux.- Type Parameters:
E- the error type- Parameters:
type- the error type to matchfallbackValue- the value to emit if an error occurs that matches the type- Returns:
- a new falling back
Flux - See Also:
-
onErrorReturn
Simply emit a captured fallback value when an error matching the given predicate is observed on thisFlux.- Parameters:
predicate- the error predicate to matchfallbackValue- the value to emit if an error occurs that matches the predicate- Returns:
- a new falling back
Flux - See Also:
-
onTerminateDetach
Detaches both the childSubscriberand theSubscriptionon termination or cancellation.This is an advanced interoperability operator that should help with odd retention scenarios when running with non-reactor
Subscriber.- Returns:
- a detachable
Flux
-
or
Pick the firstPublisherbetween thisFluxand another publisher to emit any signal (onNext/onError/onComplete) and replay all signals from thatPublisher, effectively behaving like the fastest of these competing sources.- Parameters:
other- thePublisherto race with- Returns:
- the fastest sequence
- See Also:
-
parallel
Prepare thisFluxby dividing data on a number of 'rails' matching the number of CPU cores, in a round-robin fashion. Note that to actually perform the work in parallel, you should callParallelFlux.runOn(Scheduler)afterward.- Returns:
- a new
ParallelFluxinstance
-
parallel
Prepare thisFluxby dividing data on a number of 'rails' matching the providedparallelismparameter, in a round-robin fashion. Note that to actually perform the work in parallel, you should callParallelFlux.runOn(Scheduler)afterward.- Parameters:
parallelism- the number of parallel rails- Returns:
- a new
ParallelFluxinstance
-
parallel
Prepare thisFluxby dividing data on a number of 'rails' matching the providedparallelismparameter, in a round-robin fashion and using a custom prefetch amount and queue for dealing with the sourceFlux's values. Note that to actually perform the work in parallel, you should callParallelFlux.runOn(Scheduler)afterward.- Parameters:
parallelism- the number of parallel railsprefetch- the number of values to prefetch from the source- Returns:
- a new
ParallelFluxinstance
-
publish
Prepare aConnectableFluxwhich shares thisFluxsequence and dispatches values to subscribers in a backpressure-aware manner. Prefetch will default toQueues.SMALL_BUFFER_SIZE. This will effectively turn any type of sequence into a hot sequence.Backpressure will be coordinated on
Subscription.request(long)and if anySubscriberis missing demand (requested = 0), multicast will pause pushing/pulling.- Returns:
- a new
ConnectableFlux
-
publish
Prepare aConnectableFluxwhich shares thisFluxsequence and dispatches values to subscribers in a backpressure-aware manner. This will effectively turn any type of sequence into a hot sequence.Backpressure will be coordinated on
Subscription.request(long)and if anySubscriberis missing demand (requested = 0), multicast will pause pushing/pulling.- Parameters:
prefetch- bounded requested demand- Returns:
- a new
ConnectableFlux
-
publish
public final <R> Flux<R> publish(Function<? super Flux<T>, ? extends Publisher<? extends R>> transform) Shares a sequence for the duration of a function that may transform it and consume it as many times as necessary without causing multiple subscriptions to the upstream.- Type Parameters:
R- the output value type- Parameters:
transform- the transformation function- Returns:
- a new
Flux
-
publish
public final <R> Flux<R> publish(Function<? super Flux<T>, ? extends Publisher<? extends R>> transform, int prefetch) Shares a sequence for the duration of a function that may transform it and consume it as many times as necessary without causing multiple subscriptions to the upstream.- Type Parameters:
R- the output value type- Parameters:
transform- the transformation functionprefetch- the request size- Returns:
- a new
Flux
-
publishNext
Deprecated.useshareNext()instead, or use `publish().next()` if you need to `connect(). To be removed in 3.5.0Prepare aMonowhich shares thisFluxsequence and dispatches the first observed item to subscribers in a backpressure-aware manner. This will effectively turn any type of sequence into a hot sequence when the firstSubscribersubscribes.- Returns:
- a new
Mono
-
publishOn
Run onNext, onComplete and onError on a suppliedSchedulerWorker.This operator influences the threading context where the rest of the operators in the chain below it will execute, up to a new occurrence of
publishOn.Typically used for fast publisher, slow consumer(s) scenarios.
flux.publishOn(Schedulers.single()).subscribe()Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
- Parameters:
scheduler- aSchedulerproviding theScheduler.Workerwhere to publish- Returns:
- a
Fluxproducing asynchronously on a givenScheduler
-
publishOn
Run onNext, onComplete and onError on a suppliedSchedulerScheduler.Worker.This operator influences the threading context where the rest of the operators in the chain below it will execute, up to a new occurrence of
publishOn.Typically used for fast publisher, slow consumer(s) scenarios.
flux.publishOn(Schedulers.single()).subscribe()Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
- Parameters:
scheduler- aSchedulerproviding theScheduler.Workerwhere to publishprefetch- the asynchronous boundary capacity- Returns:
- a
Fluxproducing asynchronously
-
publishOn
Run onNext, onComplete and onError on a suppliedSchedulerScheduler.Worker.This operator influences the threading context where the rest of the operators in the chain below it will execute, up to a new occurrence of
publishOn.Typically used for fast publisher, slow consumer(s) scenarios.
flux.publishOn(Schedulers.single()).subscribe()Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
- Parameters:
scheduler- aSchedulerproviding theScheduler.Workerwhere to publishdelayError- should the buffer be consumed before forwarding any errorprefetch- the asynchronous boundary capacity- Returns:
- a
Fluxproducing asynchronously
-
reduce
Reduce the values from thisFluxsequence into a single object of the same type than the emitted items. Reduction is performed using aBiFunctionthat takes the intermediate result of the reduction and the current value and returns the next intermediate value of the reduction. Note,BiFunctionwill not be invoked for a sequence with 0 or 1 elements. In case of one element's sequence, the result will be directly sent to the subscriber.Discard Support: This operator discards the internally accumulated value upon cancellation or error.
- Parameters:
aggregator- the reducingBiFunction- Returns:
- a reduced
Flux
-
reduce
Reduce the values from thisFluxsequence into a single object matching the type of a seed value. Reduction is performed using aBiFunctionthat takes the intermediate result of the reduction and the current value and returns the next intermediate value of the reduction. First element is paired with the seed value, initial.Discard Support: This operator discards the internally accumulated value upon cancellation or error.
- Type Parameters:
A- the type of the seed and the reduced object- Parameters:
accumulator- the reducingBiFunctioninitial- the seed, the initial leftmost argument to pass to the reducingBiFunction- Returns:
- a reduced
Flux
-
reduceWith
Reduce the values from thisFluxsequence into a single object matching the type of a lazily supplied seed value. Reduction is performed using aBiFunctionthat takes the intermediate result of the reduction and the current value and returns the next intermediate value of the reduction. First element is paired with the seed value, supplied via initial.Discard Support: This operator discards the internally accumulated value upon cancellation or error.
- Type Parameters:
A- the type of the seed and the reduced object- Parameters:
accumulator- the reducingBiFunctioninitial- aSupplierof the seed, called on subscription and passed to the the reducingBiFunction- Returns:
- a reduced
Flux
-
repeat
Repeatedly and indefinitely subscribe to the source upon completion of the previous subscription.- Returns:
- an indefinitely repeated
Fluxon onComplete
-
repeat
Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.- Parameters:
predicate- the boolean to evaluate on onComplete.- Returns:
- a
Fluxthat repeats on onComplete while the predicate matches
-
repeat
Repeatedly subscribe to the sourcenumRepeattimes. This results innumRepeat + 1total subscriptions to the original source. As a consequence, using 0 plays the original sequence once.- Parameters:
numRepeat- the number of times to re-subscribe on onComplete (positive, or 0 for original sequence only)- Returns:
- a
Fluxthat repeats on onComplete, up to the specified number of repetitions
-
repeat
Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription. A specified maximum of repeat will limit the number of re-subscribe.- Parameters:
numRepeat- the number of times to re-subscribe on complete (positive, or 0 for original sequence only)predicate- the boolean to evaluate on onComplete- Returns:
- a
Fluxthat repeats on onComplete while the predicate matches, up to the specified number of repetitions
-
repeatWhen
Repeatedly subscribe to thisFluxwhen a companion sequence emits elements in response to the flux completion signal. Any terminal signal from the companion sequence will terminate the resultingFluxwith the same signal immediately.If the companion sequence signals when this
Fluxis active, the repeat attempt is suppressed.Note that if the companion
Publishercreated by therepeatFactoryemitsContextas trigger objects, theseContextwill be merged with the previous Context:.repeatWhen(emittedEachAttempt -> emittedEachAttempt.handle((lastEmitted, sink) -> { Context ctx = sink.currentContext(); int rl = ctx.getOrDefault("repeatsLeft", 0); if (rl > 0) { sink.next(Context.of( "repeatsLeft", rl - 1, "emitted", lastEmitted )); } else { sink.error(new IllegalStateException("repeats exhausted")); } }))- Parameters:
repeatFactory- theFunctionthat returns the associatedPublishercompanion, given aFluxthat signals each onComplete as aLongrepresenting the number of source elements emitted in the latest attempt.- Returns:
- a
Fluxthat repeats on onComplete when the companionPublisherproduces an onNext signal
-
replay
Turn thisFluxinto a hot source and cache last emitted signals for furtherSubscriber. Will retain an unbounded amount of onNext signals. Completion and Error will also be replayed.- Returns:
- a replaying
ConnectableFlux
-
replay
Turn thisFluxinto a connectable hot source and cache last emitted signals for furtherSubscriber. Will retain up to the given history size onNext signals. Completion and Error will also be replayed.Note that
replay(0)will only cache the terminal signal without expiration.Re-connects are not supported.
- Parameters:
history- number of events retained in history excluding complete and error- Returns:
- a replaying
ConnectableFlux
-
replay
Turn thisFluxinto a connectable hot source and cache last emitted signals for furtherSubscriber. Will retain each onNext up to the given per-item expiry timeout.Completion and Error will also be replayed until
ttltriggers in which case the nextSubscriberwill start over a new subscription- Parameters:
ttl- Per-item and post termination timeout duration- Returns:
- a replaying
ConnectableFlux
-
replay
Turn thisFluxinto a connectable hot source and cache last emitted signals for furtherSubscriber. Will retain up to the given history size onNext signals with a per-item ttl.Completion and Error will also be replayed until
ttltriggers in which case the nextSubscriberwill start over a new subscription- Parameters:
history- number of events retained in history excluding complete and errorttl- Per-item and post termination timeout duration- Returns:
- a replaying
ConnectableFlux
-
replay
Turn thisFluxinto a connectable hot source and cache last emitted signals for furtherSubscriber. Will retain onNext signal for up to the givenDurationwith a per-item ttl.Completion and Error will also be replayed until
ttltriggers in which case the nextSubscriberwill start over a new subscription- Parameters:
ttl- Per-item and post termination timeout durationtimer- a time-capableSchedulerinstance to read current time from- Returns:
- a replaying
ConnectableFlux
-
replay
Turn thisFluxinto a connectable hot source and cache last emitted signals for furtherSubscriber. Will retain up to the given history size onNext signals with a per-item ttl.Completion and Error will also be replayed until
ttltriggers in which case the nextSubscriberwill start over a new subscription- Parameters:
history- number of events retained in history excluding complete and errorttl- Per-item and post termination timeout durationtimer- aSchedulerinstance to read current time from- Returns:
- a replaying
ConnectableFlux
-
retry
Re-subscribes to thisFluxsequence if it signals any error, indefinitely.- Returns:
- a
Fluxthat retries on onError
-
retry
Re-subscribes to thisFluxsequence if it signals any error, for a fixed number of times.Note that passing Long.MAX_VALUE is treated as infinite retry.
- Parameters:
numRetries- the number of times to tolerate an error- Returns:
- a
Fluxthat retries on onError up to the specified number of retry attempts.
-
retryWhen
Retries thisFluxin response to signals emitted by a companionPublisher. The companion is generated by the providedRetryinstance, seeRetry.max(long),Retry.maxInARow(long)andRetry.backoff(long, Duration)for readily available strategy builders.The operator generates a base for the companion, a
FluxofRetry.RetrySignalwhich each give metadata about each retryable failure whenever thisFluxsignals an error. The final companion should be derived from that base companion and emit data in response to incoming onNext (although it can emit less elements, or delay the emissions).Terminal signals in the companion terminate the sequence with the same signal, so emitting an
Subscriber.onError(Throwable)will fail the resultingFluxwith that same error.Note that the
Retry.RetrySignalstate can be transient and change between each sourceonErrororonNext. If processed with a delay, this could lead to the represented state being out of sync with the state at which the retry was evaluated. Map it toRetry.RetrySignal.copy()right away to mediate this.Note that if the companion
Publishercreated by thewhenFactoryemitsContextas trigger objects, theseContextwill be merged with the previous Context:Retry customStrategy = Retry.from(companion -> companion.handle((retrySignal, sink) -> { ContextView ctx = sink.contextView(); int rl = ctx.getOrDefault("retriesLeft", 0); if (rl > 0) { sink.next(Context.of( "retriesLeft", rl - 1, "lastError", retrySignal.failure() )); } else { sink.error(Exceptions.retryExhausted("retries exhausted", retrySignal.failure())); } })); Flux<T> retried = originalFlux.retryWhen(customStrategy); -
sample
Sample thisFluxby periodically emitting an item corresponding to thatFluxlatest emitted value within the periodical time window. Note that if some elements are emitted quicker than the timespan just before source completion, the last of these elements will be emitted along with the onComplete signal.Discard Support: This operator discards elements that are not part of the sampling.
- Parameters:
timespan- the duration of the window after which to emit the latest observed item- Returns:
- a
Fluxsampled to the last item seen over each periodic window
-
sample
Sample thisFluxby emitting an item corresponding to thatFluxlatest emitted value whenever a companion samplerPublishersignals a value.Termination of either
Publisherwill result in termination for theSubscriberas well. Note that if some elements are emitted just before source completion and before a last sampler can trigger, the last of these elements will be emitted along with the onComplete signal.Both
Publisherwill run in unbounded mode because the backpressure would interfere with the sampling precision.Discard Support: This operator discards elements that are not part of the sampling.
-
sampleFirst
Repeatedly take a value from thisFluxthen skip the values that follow within a given duration.Discard Support: This operator discards elements that are not part of the sampling.
- Parameters:
timespan- the duration during which to skip values after each sample- Returns:
- a
Fluxsampled to the first item of each duration-based window
-
sampleFirst
-
sampleTimeout
Emit the latest value from thisFluxonly if there were no new values emitted during the window defined by a companionPublisherderived from that particular value.Note that this means that the last value in the sequence is always emitted.
Discard Support: This operator discards elements that are not part of the sampling.
- Type Parameters:
U- the companion reified type- Parameters:
throttlerFactory- supply a companion samplerPublisherwhich signals the end of the window during which no new emission should occur. If it is the case, the original value triggering the window is emitted.- Returns:
- a
Fluxsampled to items not followed by any other item within a window defined by a companionPublisher
-
sampleTimeout
public final <U> Flux<T> sampleTimeout(Function<? super T, ? extends Publisher<U>> throttlerFactory, int maxConcurrency) Emit the latest value from thisFluxonly if there were no new values emitted during the window defined by a companionPublisherderived from that particular value.The provided maxConcurrency will keep a bounded maximum of concurrent timeouts and drop any new items until at least one timeout terminates.
Note that this means that the last value in the sequence is always emitted.
Discard Support: This operator discards elements that are not part of the sampling.
- Type Parameters:
U- the throttling type- Parameters:
throttlerFactory- supply a companion samplerPublisherwhich signals the end of the window during which no new emission should occur. If it is the case, the original value triggering the window is emitted.maxConcurrency- the maximum number of concurrent timeouts- Returns:
- a
Fluxsampled to items not followed by any other item within a window defined by a companionPublisher
-
scan
Reduce thisFluxvalues with an accumulatorBiFunctionand also emit the intermediate results of this function.Unlike
scan(Object, BiFunction), this operator doesn't take an initial value but treats the firstFluxvalue as initial value.
The accumulation works as follows:result[0] = source[0] result[1] = accumulator(result[0], source[1]) result[2] = accumulator(result[1], source[2]) result[3] = accumulator(result[2], source[3]) ...- Parameters:
accumulator- the accumulatingBiFunction- Returns:
- an accumulating
Flux
-
scan
Reduce thisFluxvalues with an accumulatorBiFunctionand also emit the intermediate results of this function.The accumulation works as follows:
result[0] = initialValue; result[1] = accumulator(result[0], source[0]) result[2] = accumulator(result[1], source[1]) result[3] = accumulator(result[2], source[2]) ...- Type Parameters:
A- the accumulated type- Parameters:
initial- the initial leftmost argument to pass to the reduce functionaccumulator- the accumulatingBiFunction- Returns:
- an accumulating
Fluxstarting with initial state
-
scanWith
Reduce thisFluxvalues with the help of an accumulatorBiFunctionand also emits the intermediate results. A seed value is lazily provided by aSupplierinvoked for eachSubscriber.The accumulation works as follows:
result[0] = initialValue; result[1] = accumulator(result[0], source[0]) result[2] = accumulator(result[1], source[1]) result[3] = accumulator(result[2], source[2]) ...- Type Parameters:
A- the accumulated type- Parameters:
initial- the supplier providing the seed, the leftmost parameter initially passed to the reduce functionaccumulator- the accumulatingBiFunction- Returns:
- an accumulating
Fluxstarting with initial state
-
single
Expect and emit a single item from thisFluxsource or signalNoSuchElementExceptionfor an empty source, orIndexOutOfBoundsExceptionfor a source with more than one element.- Returns:
- a
Monowith the single item or an error signal
-
single
Expect and emit a single item from thisFluxsource and emit a default value for an empty source, but signal anIndexOutOfBoundsExceptionfor a source with more than one element. -
singleOrEmpty
Expect and emit a single item from thisFluxsource, and accept an empty source but signal anIndexOutOfBoundsExceptionfor a source with more than one element.- Returns:
- a
Monowith the expected single item, no item or an error
-
skip
Skip the specified number of elements from the beginning of thisFluxthen emit the remaining elements.Discard Support: This operator discards elements that are skipped.
- Parameters:
skipped- the number of elements to drop- Returns:
- a dropping
Fluxwith the specified number of elements skipped at the beginning
-
skip
Skip elements from thisFluxemitted within the specified initial duration.Discard Support: This operator discards elements that are skipped.
- Parameters:
timespan- the initial time window during which to drop elements- Returns:
- a
Fluxdropping at the beginning until the end of the given duration
-
skip
-
skipLast
Skip a specified number of elements at the end of thisFluxsequence.Discard Support: This operator discards elements that are skipped.
- Parameters:
n- the number of elements to drop before completion- Returns:
- a
Fluxdropping the specified number of elements at the end of the sequence
-
skipUntil
-
skipUntilOther
-
skipWhile
-
sort
Sort elements from thisFluxby collecting and sorting them in the background then emitting the sorted sequence once this sequence completes. Each item emitted by theFluxmust implementComparablewith respect to all other items in the sequence.Note that calling
sortwith long, non-terminating or infinite sources might causeOutOfMemoryError. Use sequence splitting likewindow(int)to sort batches in that case.- Returns:
- a sorted
Flux - Throws:
ClassCastException- if any item emitted by theFluxdoes not implementComparablewith respect to all other items emitted by theFlux
-
sort
Sort elements from thisFluxusing aComparatorfunction, by collecting and sorting elements in the background then emitting the sorted sequence once this sequence completes.Note that calling
sortwith long, non-terminating or infinite sources might causeOutOfMemoryError -
startWith
-
startWith
Prepend the given values before thisFluxsequence. -
startWith
-
subscribe
Subscribe to thisFluxand request unbounded demand.This version doesn't specify any consumption behavior for the events from the chain, especially no error handling, so other variants should usually be preferred.
- Returns:
- a new
Disposablethat can be used to cancel the underlyingSubscription
-
subscribe
Subscribe aConsumerto thisFluxthat will consume all the elements in the sequence. It will request an unbounded demand (Long.MAX_VALUE).For a passive version that observe and forward incoming data see
doOnNext(java.util.function.Consumer).For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber)with aBaseSubscriber.Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
- Parameters:
consumer- the consumer to invoke on each value (onNext signal)- Returns:
- a new
Disposablethat can be used to cancel the underlyingSubscription
-
subscribe
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer) Subscribe to thisFluxwith aConsumerthat will consume all the elements in the sequence, as well as aConsumerthat will handle errors. The subscription will request an unbounded demand (Long.MAX_VALUE).For a passive version that observe and forward incoming data see
doOnNext(java.util.function.Consumer)anddoOnError(java.util.function.Consumer).For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber)with aBaseSubscriber.Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumers are not invoked when executing in a main thread or a unit test for instance.
- Parameters:
consumer- the consumer to invoke on each next signalerrorConsumer- the consumer to invoke on error signal- Returns:
- a new
Disposablethat can be used to cancel the underlyingSubscription
-
subscribe
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer) SubscribeConsumerto thisFluxthat will respectively consume all the elements in the sequence, handle errors and react to completion. The subscription will request unbounded demand (Long.MAX_VALUE).For a passive version that observe and forward incoming data see
doOnNext(java.util.function.Consumer),doOnError(java.util.function.Consumer)anddoOnComplete(Runnable).For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber)with aBaseSubscriber.Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
- Parameters:
consumer- the consumer to invoke on each valueerrorConsumer- the consumer to invoke on error signalcompleteConsumer- the consumer to invoke on complete signal- Returns:
- a new
Disposablethat can be used to cancel the underlyingSubscription
-
subscribe
@Deprecated public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer) Deprecated.Because users tend to forget torequestthe subsciption. If the behavior is really needed, consider usingsubscribeWith(Subscriber). To be removed in 3.5.SubscribeConsumerto thisFluxthat will respectively consume all the elements in the sequence, handle errors, react to completion, and request upon subscription. It will let the providedsubscriptionConsumerrequest the adequate amount of data, or request unbounded demandLong.MAX_VALUEif no such consumer is provided.For a passive version that observe and forward incoming data see
doOnNext(java.util.function.Consumer),doOnError(java.util.function.Consumer),doOnComplete(Runnable)anddoOnSubscribe(Consumer).For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber)with aBaseSubscriber.Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
- Parameters:
consumer- the consumer to invoke on each valueerrorConsumer- the consumer to invoke on error signalcompleteConsumer- the consumer to invoke on complete signalsubscriptionConsumer- the consumer to invoke on subscribe signal, to be used for the initialrequest, or null for max request- Returns:
- a new
Disposablethat can be used to cancel the underlyingSubscription
-
subscribe
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Context initialContext) SubscribeConsumerto thisFluxthat will respectively consume all the elements in the sequence, handle errors and react to completion. Additionally, aContextis tied to the subscription. At subscription, an unbounded request is implicitly made.For a passive version that observe and forward incoming data see
doOnNext(java.util.function.Consumer),doOnError(java.util.function.Consumer),doOnComplete(Runnable)anddoOnSubscribe(Consumer).For a version that gives you more control over backpressure and the request, see
subscribe(Subscriber)with aBaseSubscriber.Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
- Parameters:
consumer- the consumer to invoke on each valueerrorConsumer- the consumer to invoke on error signalcompleteConsumer- the consumer to invoke on complete signalinitialContext- the baseContexttied to the subscription that will be visible to operators upstream- Returns:
- a new
Disposablethat can be used to cancel the underlyingSubscription
-
subscribe
-
subscribe
An internalPublisher.subscribe(Subscriber)that will bypassHooks.onLastOperator(Function)pointcut.In addition to behave as expected by
Publisher.subscribe(Subscriber)in a controlled manner, it supports direct subscribe-timeContextpassing.- Specified by:
subscribein interfaceCorePublisher<T>- Parameters:
actual- theSubscriberinterested into the published sequence- See Also:
-
subscribeOn
Run subscribe, onSubscribe and request on a specifiedScheduler'sScheduler.Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of apublishOn.Note that if you are using an eager or blocking
create(Consumer, FluxSink.OverflowStrategy)as the source, it can lead to deadlocks due to requests piling up behind the emitter. In such case, you should callsubscribeOn(scheduler, false)instead.Typically used for slow publisher e.g., blocking IO, fast consumer(s) scenarios.
flux.subscribeOn(Schedulers.single()).subscribe()Note that
Scheduler.Worker.schedule(Runnable)raisingRejectedExecutionExceptionon lateSubscription.request(long)will be propagated to the request caller.- Parameters:
scheduler- aSchedulerproviding theScheduler.Workerwhere to subscribe- Returns:
- a
Fluxrequesting asynchronously - See Also:
-
subscribeOn
Run subscribe and onSubscribe on a specifiedScheduler'sScheduler.Worker. Request will be run on that worker too depending on therequestOnSeparateThreadparameter (which defaults to true in thesubscribeOn(Scheduler)version). As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of apublishOn.Note that if you are using an eager or blocking
create(Consumer, FluxSink.OverflowStrategy)as the source, it can lead to deadlocks due to requests piling up behind the emitter. Thus this operator has arequestOnSeparateThreadparameter, which should be set tofalsein this case.Typically used for slow publisher e.g., blocking IO, fast consumer(s) scenarios.
flux.subscribeOn(Schedulers.single()).subscribe()Note that
Scheduler.Worker.schedule(Runnable)raisingRejectedExecutionExceptionon lateSubscription.request(long)will be propagated to the request caller.- Parameters:
scheduler- aSchedulerproviding theScheduler.Workerwhere to subscriberequestOnSeparateThread- whether or not to also perform requests on the worker.trueto behave likesubscribeOn(Scheduler)- Returns:
- a
Fluxrequesting asynchronously - See Also:
-
subscribeWith
Subscribe a provided instance of a subclass ofSubscriberto thisFluxand return said instance for further chaining calls. This is similar toas(Function), except a subscription is explicitly performed by this method.If you need more control over backpressure and the request, use a
BaseSubscriber.- Type Parameters:
E- the reified type from the input/output subscriber- Parameters:
subscriber- theSubscriberto subscribe with and return- Returns:
- the passed
Subscriber
-
switchOnFirst
public final <V> Flux<V> switchOnFirst(BiFunction<Signal<? extends T>, Flux<T>, Publisher<? extends V>> transformer) Transform the currentFluxonce it emits its first element, making a conditional transformation possible. This operator first requests one element from the source then applies a transformation derived from the firstSignaland the source. The whole source (including the first signal) is passed as second argument to theBiFunctionand it is very strongly advised to always build upon with operators (see below).Note that the source might complete or error immediately instead of emitting, in which case the
Signalwould be onComplete or onError. It is NOT necessarily an onNext Signal, and must be checked accordingly.For example, this operator could be used to define a dynamic transformation that depends on the first element (which could contain routing metadata for instance):
fluxOfIntegers.switchOnFirst((signal, flux) -> { if (signal.hasValue()) { ColoredShape firstColor = signal.get(); return flux.filter(v -> !v.hasSameColorAs(firstColor)) } return flux; //either early complete or error, this forwards the termination in any case //`return flux.onErrorResume(t -> Mono.empty());` instead would suppress an early error //`return Flux.just(1,2,3);` instead would suppress an early error and return 1, 2, 3. //It would also only cancel the original `flux` at the completion of `just`. })It is advised to return a
Publisherderived from the originalFluxin all cases, as not doing so would keep the originalPublisheropen and hanging with a single request until the innerPublisherterminates or the wholeFluxis cancelled.- Type Parameters:
V- the item type in the returnedFlux- Parameters:
transformer- ABiFunctionexecuted once the first signal is available and used to transform the source conditionally. The whole source (including first signal) is passed as second argument to the BiFunction.- Returns:
- a new
Fluxthat transform the upstream once a signal is available
-
switchOnFirst
public final <V> Flux<V> switchOnFirst(BiFunction<Signal<? extends T>, Flux<T>, Publisher<? extends V>> transformer, boolean cancelSourceOnComplete) Transform the currentFluxonce it emits its first element, making a conditional transformation possible. This operator first requests one element from the source then applies a transformation derived from the firstSignaland the source. The whole source (including the first signal) is passed as second argument to theBiFunctionand it is very strongly advised to always build upon with operators (see below).Note that the source might complete or error immediately instead of emitting, in which case the
Signalwould be onComplete or onError. It is NOT necessarily an onNext Signal, and must be checked accordingly.For example, this operator could be used to define a dynamic transformation that depends on the first element (which could contain routing metadata for instance):
fluxOfIntegers.switchOnFirst((signal, flux) -> { if (signal.hasValue()) { ColoredShape firstColor = signal.get(); return flux.filter(v -> !v.hasSameColorAs(firstColor)) } return flux; //either early complete or error, this forwards the termination in any case //`return flux.onErrorResume(t -> Mono.empty());` instead would suppress an early error //`return Flux.just(1,2,3);` instead would suppress an early error and return 1, 2, 3. //It would also only cancel the original `flux` at the completion of `just`. })It is advised to return a
Publisherderived from the originalFluxin all cases, as not doing so would keep the originalPublisheropen and hanging with a single request. In case the value of thecancelSourceOnCompleteparameter istruethe original publisher until the innerPublisherterminates or the wholeFluxis cancelled. Otherwise the original publisher will hang forever.- Type Parameters:
V- the item type in the returnedFlux- Parameters:
transformer- ABiFunctionexecuted once the first signal is available and used to transform the source conditionally. The whole source (including first signal) is passed as second argument to the BiFunction.cancelSourceOnComplete- specify whether original publisher should be cancelled ononCompletefrom the derived one- Returns:
- a new
Fluxthat transform the upstream once a signal is available
-
switchIfEmpty
Switch to an alternativePublisherif this sequence is completed without any data. -
switchMap
Switch to a newPublishergenerated via aFunctionwhenever thisFluxproduces an item. As such, the elements from each generated Publisher are emitted in the resultingFlux.This operator requests the source for an unbounded amount, but doesn't request each generated
Publisherunless the downstream has made a corresponding request (no prefetch of inner publishers). -
switchMap
@Deprecated public final <V> Flux<V> switchMap(Function<? super T, Publisher<? extends V>> fn, int prefetch) Deprecated.to be removed in 3.6.0 at the earliest. In 3.5.0, you should replace calls with prefetch=0 with calls to switchMap(fn), as the default behavior of the single-parameter variant will then change to prefetch=0. -
tag
Tag this flux with a key/value pair. These can be retrieved as aSetof all tags throughout the publisher chain by usingScannable.tags()(as traversed byScannable.parents()).The name is typically visible at assembly time by the
tap(SignalListenerFactory)operator, which could for example be configured with a metrics listener applying the tag(s) to its meters.- Parameters:
key- a tag keyvalue- a tag value- Returns:
- the same sequence, but bearing tags
- See Also:
-
take
Take only the first N values from thisFlux, if available. If n is zero, the source isn't even subscribed to and the operator completes immediately upon subscription.This ensures that the total amount requested upstream is capped at
n, although smaller requests can be made if the downstream makes requests < n. In any case, this operator never lets the upstream produce more elements than the cap, and it can be used to more strictly adhere to backpressure.This mode is typically useful for cases where a race between request and cancellation can lead the upstream to producing a lot of extraneous data, and such a production is undesirable (e.g. a source that would send the extraneous data over the network). It is equivalent to
take(long, boolean)withlimitRequest == true, If there is a requirement for unbounded upstream request (eg. for performance reasons), usetake(long, boolean)withlimitRequest=falseinstead. -
take
Take only the first N values from thisFlux, if available.If
limitRequest == true, ensure that the total amount requested upstream is capped atn. In that configuration, this operator never let the upstream produce more elements than the cap, and it can be used to more strictly adhere to backpressure. If n is zero, the source isn't even subscribed to and the operator completes immediately upon subscription (the behavior inherited fromtake(long)).This mode is typically useful for cases where a race between request and cancellation can lead the upstream to producing a lot of extraneous data, and such a production is undesirable (e.g. a source that would send the extraneous data over the network).
If
limitRequest == falsethis operator doesn't propagate the backpressure requested amount. Rather, it makes an unbounded request and cancels once N elements have been emitted. If n is zero, the source is subscribed to but immediately cancelled, then the operator completes.In this mode, the source could produce a lot of extraneous elements despite cancellation. If that behavior is undesirable and you do not own the request from downstream (e.g. prefetching operators), consider using
limitRequest = trueinstead. -
take
-
take
-
takeLast
Emit the last N values thisFluxemitted before its completion. -
takeUntil
Relay values from thisFluxuntil the givenPredicatematches. This includes the matching data (unliketakeWhile(java.util.function.Predicate<? super T>)). The predicate is tested before the element is emitted, so if the element is modified by the consumer, this won't affect the predicate. In case of an error during the predicate test, the current element is emitted before the error. -
takeUntilOther
-
takeWhile
Relay values from thisFluxwhile a predicate returns TRUE for the values (checked before each value is delivered). This only includes the matching data (unliketakeUntil(java.util.function.Predicate<? super T>)). -
tap
Tap into Reactive Streams signals emitted or received by thisFluxand notify a stateful per-SubscriberSignalListener.Any exception thrown by the
SignalListenermethods causes the subscription to be cancelled and the subscriber to be terminated with anonError signalof that exception. Note thatSignalListener.doFinally(SignalType),SignalListener.doAfterComplete()andSignalListener.doAfterError(Throwable)instead justdropthe exception.This simplified variant assumes the state is purely initialized within the
Supplier, as it is called for each incomingSubscriberwithout additional context.When the context-propagation library is available at runtime and the downstream
ContextViewis not empty, this operator implicitly uses the library to restore thread locals around all invocations ofSignalListenermethods. Typically, this would be done in conjunction with the use ofcontextCapture()operator down the chain.- Parameters:
simpleListenerGenerator- theSupplierto create a newSignalListeneron each subscription- Returns:
- a new
Fluxwith side effects defined by generatedSignalListener - See Also:
-
tap
Tap into Reactive Streams signals emitted or received by thisFluxand notify a stateful per-SubscriberSignalListener.Any exception thrown by the
SignalListenermethods causes the subscription to be cancelled and the subscriber to be terminated with anonError signalof that exception. Note thatSignalListener.doFinally(SignalType),SignalListener.doAfterComplete()andSignalListener.doAfterError(Throwable)instead justdropthe exception.This simplified variant allows the
SignalListenerto be constructed for each subscription with access to the incomingSubscriber'sContextView.When the context-propagation library is available at runtime and the
ContextViewis not empty, this operator implicitly uses the library to restore thread locals around all invocations ofSignalListenermethods. Typically, this would be done in conjunction with the use ofcontextCapture()operator down the chain.- Parameters:
listenerGenerator- theFunctionto create a newSignalListeneron each subscription- Returns:
- a new
Fluxwith side effects defined by generatedSignalListener - See Also:
-
tap
Tap into Reactive Streams signals emitted or received by thisFluxand notify a stateful per-SubscriberSignalListenercreated by the providedSignalListenerFactory.The factory will initialize a
state objectfor eachFluxorMonoinstance it is used with, and that state will be cached and exposed for each incomingSubscriberin order to generate the associatedlistener.Any exception thrown by the
SignalListenermethods causes the subscription to be cancelled and the subscriber to be terminated with anonError signalof that exception. Note thatSignalListener.doFinally(SignalType),SignalListener.doAfterComplete()andSignalListener.doAfterError(Throwable)instead justdropthe exception.When the context-propagation library is available at runtime and the downstream
ContextViewis not empty, this operator implicitly uses the library to restore thread locals around all invocations ofSignalListenermethods. Typically, this would be done in conjunction with the use ofcontextCapture()operator down the chain.- Parameters:
listenerFactory- theSignalListenerFactoryto create a newSignalListeneron each subscription- Returns:
- a new
Fluxwith side effects defined by generatedSignalListener - See Also:
-
then
Return aMono<Void>that completes when thisFluxcompletes. This will actively ignore the sequence and only replay completion or error signals.Discard Support: This operator discards elements from the source.
-
then
Let thisFluxcomplete then play signals from a providedMono.In other words ignore element from this
Fluxand transform its completion signal into the emission and completion signal of a providedMono<V>. Error signal is replayed in the resultingMono<V>.Discard Support: This operator discards elements from the source.
-
thenEmpty
Return aMono<Void>that waits for thisFluxto complete then for a suppliedPublisher<Void>to also complete. The second completion signal is replayed, or any error signal that occurs instead.Discard Support: This operator discards elements from the source.
-
thenMany
-
timed
TimesSubscriber.onNext(Object)events, encapsulated into aTimedobject that lets downstream consumer look at various time information gathered with nanosecond resolution using the default clock (Schedulers.parallel()):Timed.elapsed(): the time in nanoseconds since last event, as aDuration. For the first onNext, "last event" is the subscription. Otherwise it is the previous onNext. This is functionally equivalent toelapsed(), with a more expressive and precise representation than aTuple2with a long.Timed.timestamp(): the timestamp of this onNext, as anInstant(with nanoseconds part). This is functionally equivalent totimestamp(), with a more expressive and precise representation than aTuple2with a long.Timed.elapsedSinceSubscription(): the time in nanoseconds since subscription, as aDuration.
The
Timedobject instances are safe to store and use later, as they are created as an immutable wrapper around the<T>value and immediately passed downstream.- Returns:
- a timed
Flux - See Also:
-
timed
TimesSubscriber.onNext(Object)events, encapsulated into aTimedobject that lets downstream consumer look at various time information gathered with nanosecond resolution using the providedScheduleras a clock:Timed.elapsed(): the time in nanoseconds since last event, as aDuration. For the first onNext, "last event" is the subscription. Otherwise it is the previous onNext. This is functionally equivalent toelapsed(), with a more expressive and precise representation than aTuple2with a long.Timed.timestamp(): the timestamp of this onNext, as anInstant(with nanoseconds part). This is functionally equivalent totimestamp(), with a more expressive and precise representation than aTuple2with a long.Timed.elapsedSinceSubscription(): the time in nanoseconds since subscription, as aDuration.
The
Timedobject instances are safe to store and use later, as they are created as an immutable wrapper around the<T>value and immediately passed downstream.- Returns:
- a timed
Flux - See Also:
-
timeout
Propagate aTimeoutExceptionas soon as no item is emitted within the givenDurationfrom the previous emission (or the subscription for the first item). -
timeout
Switch to a fallbackFluxas soon as no item is emitted within the givenDurationfrom the previous emission (or the subscription for the first item).If the given
Publisheris null, signal aTimeoutExceptioninstead. -
timeout
Propagate aTimeoutExceptionas soon as no item is emitted within the givenDurationfrom the previous emission (or the subscription for the first item), as measured by the specifiedScheduler. -
timeout
public final Flux<T> timeout(Duration timeout, @Nullable Publisher<? extends T> fallback, Scheduler timer) Switch to a fallbackFluxas soon as no item is emitted within the givenDurationfrom the previous emission (or the subscription for the first item), as measured on the specifiedScheduler.If the given
Publisheris null, signal aTimeoutExceptioninstead. -
timeout
Signal aTimeoutExceptionin case the first item from thisFluxhas not been emitted before the givenPublisheremits. -
timeout
public final <U,V> Flux<T> timeout(Publisher<U> firstTimeout, Function<? super T, ? extends Publisher<V>> nextTimeoutFactory) Signal aTimeoutExceptionin case the first item from thisFluxhas not been emitted before thefirstTimeoutPublisheremits, and whenever each subsequent elements is not emitted before aPublishergenerated from the latest element signals.Discard Support: This operator discards an element if it comes right after the timeout.
- Type Parameters:
U- the type of the elements of the first timeout PublisherV- the type of the elements of the subsequent timeout Publishers- Parameters:
firstTimeout- the timeoutPublisherthat must not emit before the first signal from thisFluxnextTimeoutFactory- the timeoutPublisherfactory for each next item- Returns:
- a
Fluxthat can time out if each element does not come before a signal from a per-item companionPublisher
-
timeout
public final <U,V> Flux<T> timeout(Publisher<U> firstTimeout, Function<? super T, ? extends Publisher<V>> nextTimeoutFactory, Publisher<? extends T> fallback) Switch to a fallbackPublisherin case the first item from thisFluxhas not been emitted before thefirstTimeoutPublisheremits, and whenever each subsequent elements is not emitted before aPublishergenerated from the latest element signals.- Type Parameters:
U- the type of the elements of the first timeout PublisherV- the type of the elements of the subsequent timeout Publishers- Parameters:
firstTimeout- the timeoutPublisherthat must not emit before the first signal from thisFluxnextTimeoutFactory- the timeoutPublisherfactory for each next itemfallback- the fallbackPublisherto subscribe when a timeout occurs- Returns:
- a
Fluxthat can time out if each element does not come before a signal from a per-item companionPublisher
-
timestamp
Emit aTuple2pair of T1 the current clock time in millis (as aLongmeasured by theparallelScheduler) and T2 the emitted data (as aT), for each item from thisFlux.- Returns:
- a timestamped
Flux - See Also:
-
timestamp
Emit aTuple2pair of T1 the current clock time in millis (as aLongmeasured by the providedScheduler) and T2 the emitted data (as aT), for each item from thisFlux.The provider
Schedulerwill be asked toprovide timewith a granularity ofTimeUnit.MILLISECONDS. In order for this operator to work as advertised, the provided Scheduler should thus return results that can be interpreted as unix timestamps. -
toIterable
Transform thisFluxinto a lazyIterableblocking onIterator.next()calls.Note that iterating from within threads marked as "non-blocking only" is illegal and will cause an
IllegalStateExceptionto be thrown, but obtaining theIterableitself within these threads is ok.- Returns:
- a blocking
Iterable
-
toIterable
Transform thisFluxinto a lazyIterableblocking onIterator.next()calls.Note that iterating from within threads marked as "non-blocking only" is illegal and will cause an
IllegalStateExceptionto be thrown, but obtaining theIterableitself within these threads is ok. -
toIterable
Transform thisFluxinto a lazyIterableblocking onIterator.next()calls.Note that iterating from within threads marked as "non-blocking only" is illegal and will cause an
IllegalStateExceptionto be thrown, but obtaining theIterableitself within these threads is ok. -
toStream
Transform thisFluxinto a lazyStreamblocking for each sourceonNextcall.Note that iterating from within threads marked as "non-blocking only" is illegal and will cause an
IllegalStateExceptionto be thrown, but obtaining theStreamitself or applying lazy intermediate operation on the stream within these threads is ok.- Returns:
- a
Streamof unknown size with onClose attached toSubscription.cancel()
-
toStream
Transform thisFluxinto a lazyStreamblocking for each sourceonNextcall.Note that iterating from within threads marked as "non-blocking only" is illegal and will cause an
IllegalStateExceptionto be thrown, but obtaining theStreamitself or applying lazy intermediate operation on the stream within these threads is ok.- Parameters:
batchSize- the bounded capacity to prefetch from thisFluxorInteger.MAX_VALUEfor unbounded demand- Returns:
- a
Streamof unknown size with onClose attached toSubscription.cancel()
-
transform
Transform thisFluxin order to generate a targetFlux. UnliketransformDeferred(Function), the provided function is executed as part of assembly.Function<Flux, Flux> applySchedulers = flux -> flux.subscribeOn(Schedulers.boundedElastic()) .publishOn(Schedulers.parallel()); flux.transform(applySchedulers).map(v -> v * v).subscribe(); -
transformDeferred
public final <V> Flux<V> transformDeferred(Function<? super Flux<T>, ? extends Publisher<V>> transformer) Defer the transformation of thisFluxin order to generate a targetFluxtype. A transformation will occur for eachSubscriber. For instance:flux.transformDeferred(original -> original.log());
-
transformDeferredContextual
public final <V> Flux<V> transformDeferredContextual(BiFunction<? super Flux<T>, ? super ContextView, ? extends Publisher<V>> transformer) Defer the given transformation to thisFluxin order to generate a targetFluxtype. A transformation will occur for eachSubscriber. In addition, the transformingBiFunctionexposes theContextViewof eachSubscriber. For instance:Flux<T> fluxLogged = flux.transformDeferredContextual((original, ctx) -> original.log("for RequestID" + ctx.get("RequestID")) //...later subscribe. Each subscriber has its Context with a RequestID entry fluxLogged.contextWrite(Context.of("RequestID", "requestA").subscribe(); fluxLogged.contextWrite(Context.of("RequestID", "requestB").subscribe();- Type Parameters:
V- the item type in the returnedPublisher- Parameters:
transformer- theBiFunctionto lazily map thisFluxinto a targetFluxinstance upon subscription, with access toContextView- Returns:
- a new
Flux - See Also:
-
window
Split thisFluxsequence into multipleFluxwindows containingmaxSizeelements (or less for the final window) and starting from the first item. EachFluxwindow will onComplete aftermaxSizeitems have been routed.Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
-
window
Split thisFluxsequence into multipleFluxwindows of sizemaxSize, that each open everyskipelements in the source.When maxSize < skip : dropping windows
When maxSize > skip : overlapping windows
When maxSize == skip : exact windows
Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: The overlapping variant DOES NOT discard elements, as they might be part of another still valid window. The exact window and dropping window variants bot discard elements they internally queued for backpressure upon cancellation or error triggered by a data signal. The dropping window variant also discards elements in between windows.
-
window
Split thisFluxsequence into continuous, non-overlapping windows where the window boundary is signalled by anotherPublisherNote that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors and those emitted by the
boundarydelivered to the windowFluxare wrapped inExceptions.SourceException.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
-
window
Split thisFluxsequence into continuous, non-overlapping windows that open for awindowingTimespanDuration(as measured on theparallelScheduler).Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
-
window
Split thisFluxsequence into multipleFluxwindows that open for a givenwindowingTimespanDuration, after which it closes with onComplete. Each window is opened at a regulartimeShiftinterval, starting from the first item. Both durations are measured on theparallelScheduler.When windowingTimespan < openWindowEvery : dropping windows
When windowingTimespan > openWindowEvery : overlapping windows
When windowingTimespan == openWindowEvery : exact windows
Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: The overlapping variant DOES NOT discard elements, as they might be part of another still valid window. The exact window and dropping window variants bot discard elements they internally queued for backpressure upon cancellation or error triggered by a data signal. The dropping window variant also discards elements in between windows.
-
window
Split thisFluxsequence into continuous, non-overlapping windows that open for awindowingTimespanDuration(as measured on the providedScheduler).Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
-
window
public final Flux<Flux<T>> window(Duration windowingTimespan, Duration openWindowEvery, Scheduler timer) Split thisFluxsequence into multipleFluxwindows that open for a givenwindowingTimespanDuration, after which it closes with onComplete. Each window is opened at a regulartimeShiftinterval, starting from the first item. Both durations are measured on the providedScheduler.When windowingTimespan < openWindowEvery : dropping windows
When windowingTimespan > openWindowEvery : overlapping windows
When openWindowEvery == openWindowEvery : exact windows
Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: The overlapping variant DOES NOT discard elements, as they might be part of another still valid window. The exact window and dropping window variants bot discard elements they internally queued for backpressure upon cancellation or error triggered by a data signal. The dropping window variant also discards elements in between windows.
-
windowTimeout
Split thisFluxsequence into multipleFluxwindows containingmaxSizeelements (or less for the final window) and starting from the first item. EachFluxwindow will onComplete once it containsmaxSizeelements OR it has been open for the givenDuration(as measured on theparallelScheduler).Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
-
windowTimeout
Split thisFluxsequence into multipleFluxwindows containingmaxSizeelements (or less for the final window) and starting from the first item. EachFluxwindow will onComplete once it containsmaxSizeelements OR it has been open for the givenDuration(as measured on theparallelScheduler).Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
- Parameters:
maxSize- the maximum number of items to emit in the window before closing itmaxTime- the maximumDurationsince the window was opened before closing itfairBackpressure- define whether operator request unbounded demand or prefetch by maxSize- Returns:
- a
FluxofFluxwindows based on element count and duration
-
windowTimeout
Split thisFluxsequence into multipleFluxwindows containingmaxSizeelements (or less for the final window) and starting from the first item. EachFluxwindow will onComplete once it containsmaxSizeelements OR it has been open for the givenDuration(as measured on the providedScheduler).Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
-
windowTimeout
public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime, Scheduler timer, boolean fairBackpressure) Split thisFluxsequence into multipleFluxwindows containingmaxSizeelements (or less for the final window) and starting from the first item. EachFluxwindow will onComplete once it containsmaxSizeelements OR it has been open for the givenDuration(as measured on the providedScheduler).Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal.
- Parameters:
maxSize- the maximum number of items to emit in the window before closing itmaxTime- the maximumDurationsince the window was opened before closing ittimer- a time-capableSchedulerinstance to run onfairBackpressure- define whether operator request unbounded demand or prefetch by maxSize- Returns:
- a
FluxofFluxwindows based on element count and duration
-
windowUntil
Split thisFluxsequence into multipleFluxwindows delimited by the given predicate. A new window is opened each time the predicate returns true, at which point the previous window will receive the triggering element then onComplete.Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window errors). This variant shouldn't expose empty windows, as the separators are emitted into the windows they close.
Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.
-
windowUntil
Split thisFluxsequence into multipleFluxwindows delimited by the given predicate. A new window is opened each time the predicate returns true.Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window completes or errors).
If
cutBeforeis true, the old window will onComplete and the triggering element will be emitted in the new window, which becomes immediately available. This variant can emit an empty window if the sequence starts with a separator.Otherwise, the triggering element will be emitted in the old window before it does onComplete, similar to
windowUntil(Predicate). This variant shouldn't expose empty windows, as the separators are emitted into the windows they close.Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.
-
windowUntil
public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore, int prefetch) Split thisFluxsequence into multipleFluxwindows delimited by the given predicate and using a prefetch. A new window is opened each time the predicate returns true.Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window completes or errors).
If
cutBeforeis true, the old window will onComplete and the triggering element will be emitted in the new window. This variant can emit an empty window if the sequence starts with a separator.Otherwise, the triggering element will be emitted in the old window before it does onComplete, similar to
windowUntil(Predicate). This variant shouldn't expose empty windows, as the separators are emitted into the windows they close.Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.
- Parameters:
boundaryTrigger- a predicate that triggers the next window when it becomes true.cutBefore- set to true to include the triggering element in the new window rather than the old.prefetch- the request size to use for thisFlux.- Returns:
- a
FluxofFluxwindows, bounded depending on the predicate.
-
windowUntilChanged
Collect subsequent repetitions of an element (that is, if they arrive right after one another) into multipleFluxwindows.Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.
-
windowUntilChanged
Collect subsequent repetitions of an element (that is, if they arrive right after one another), as compared by a key extracted through the user providedFunction, into multipleFluxwindows.Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.
-
windowUntilChanged
public final <V> Flux<Flux<T>> windowUntilChanged(Function<? super T, ? extends V> keySelector, BiPredicate<? super V, ? super V> keyComparator) Collect subsequent repetitions of an element (that is, if they arrive right after one another), as compared by a key extracted through the user providedFunctionand compared using a suppliedBiPredicate, into multipleFluxwindows.Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal. Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.
-
windowWhile
Split thisFluxsequence into multipleFluxwindows that stay open while a given predicate matches the source elements. Once the predicate returns false, the window closes with an onComplete and the triggering element is discarded.Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window completes or errors). Empty windows can happen when a sequence starts with a separator or contains multiple separators, but a sequence that finishes with a separator won't cause a remainder empty window to be emitted.
Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal, as well as the triggering element(s) (that doesn't match the predicate). Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.
-
windowWhile
Split thisFluxsequence into multipleFluxwindows that stay open while a given predicate matches the source elements. Once the predicate returns false, the window closes with an onComplete and the triggering element is discarded.Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window completes or errors). Empty windows can happen when a sequence starts with a separator or contains multiple separators, but a sequence that finishes with a separator won't cause a remainder empty window to be emitted.
Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: This operator discards elements it internally queued for backpressure upon cancellation or error triggered by a data signal, as well as the triggering element(s) (that doesn't match the predicate). Upon cancellation of the current window, it also discards the remaining elements that were bound for it until the main sequence completes or creation of a new window is triggered.
-
windowWhen
public final <U,V> Flux<Flux<T>> windowWhen(Publisher<U> bucketOpening, Function<? super U, ? extends Publisher<V>> closeSelector) Split thisFluxsequence into potentially overlapping windows controlled by items of a startPublisherand endPublisherderived from the start values.When Open signal is strictly not overlapping Close signal : dropping windows
When Open signal is strictly more frequent than Close signal : overlapping windows
When Open signal is exactly coordinated with Close signal : exact windows
Note that windows are a live view of part of the underlying source publisher, and as such their lifecycle is tied to that source. As a result, it is not possible to subscribe to a window more than once: they are unicast. This is most noticeable when trying to
retry()orrepeat()a window, as these operators are based on re-subscription.To distinguish errors emitted by the processing of individual windows, source sequence errors delivered to the window
Fluxare wrapped inExceptions.SourceException.Discard Support: This operator DOES NOT discard elements.
- Type Parameters:
U- the type of the sequence opening windowsV- the type of the sequence closing windows opened by the bucketOpening Publisher's elements- Parameters:
bucketOpening- aPublisherthat opens a new window when it emits any itemcloseSelector- aFunctiongiven an opening signal and returning aPublisherthat will close the window when emitting- Returns:
- a
FluxofFluxwindows opened by signals from a firstPublisherand lasting until a selected secondPublisheremits
-
withLatestFrom
public final <U,R> Flux<R> withLatestFrom(Publisher<? extends U> other, BiFunction<? super T, ? super U, ? extends R> resultSelector) Combine the most recently emitted values from both thisFluxand anotherPublisherthrough aBiFunctionand emits the result.The operator will drop values from this
Fluxuntil the otherPublisherproduces any value.If the other
Publishercompletes without any value, the sequence is completed.- Type Parameters:
U- the otherPublishersequence typeR- the result type- Parameters:
other- thePublisherto combine withresultSelector- the bi-function called with each pair of source and other elements that should return a single value to be emitted- Returns:
- a combined
Fluxgated by anotherPublisher
-
zipWith
Zip thisFluxwith anotherPublishersource, that is to say wait for both to emit one element and combine these elements once into aTuple2. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. -
zipWith
public final <T2,V> Flux<V> zipWith(Publisher<? extends T2> source2, BiFunction<? super T, ? super T2, ? extends V> combinator) Zip thisFluxwith anotherPublishersource, that is to say wait for both to emit one element and combine these elements using acombinatorBiFunctionThe operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.- Type Parameters:
T2- type of the value from source2V- The produced output after transformation by the combinator- Parameters:
source2- The second sourcePublisherto zip with thisFlux.combinator- The aggregate function that will receive a unique value from each source and return the value to signal downstream- Returns:
- a zipped
Flux
-
zipWith
public final <T2,V> Flux<V> zipWith(Publisher<? extends T2> source2, int prefetch, BiFunction<? super T, ? super T2, ? extends V> combinator) Zip thisFluxwith anotherPublishersource, that is to say wait for both to emit one element and combine these elements using acombinatorBiFunctionThe operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.- Type Parameters:
T2- type of the value from source2V- The produced output after transformation by the combinator- Parameters:
source2- The second sourcePublisherto zip with thisFlux.prefetch- the request size to use for thisFluxand the otherPublishercombinator- The aggregate function that will receive a unique value from each source and return the value to signal downstream- Returns:
- a zipped
Flux
-
zipWith
Zip thisFluxwith anotherPublishersource, that is to say wait for both to emit one element and combine these elements once into aTuple2. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. -
zipWithIterable
-
zipWithIterable
public final <T2,V> Flux<V> zipWithIterable(Iterable<? extends T2> iterable, BiFunction<? super T, ? super T2, ? extends V> zipper) Zip elements from thisFluxwith the content of anIterable, that is to say combine one element from each, pairwise, using the given zipperBiFunction.- Type Parameters:
T2- the value type of the other iterable sequenceV- the result type- Parameters:
iterable- theIterableto zip withzipper- theBiFunctionpair combinator- Returns:
- a zipped
Flux
-
onAssembly
To be used by custom operators: invokes assemblyHookspointcut given aFlux, potentially returning a newFlux. This is for example useful to activate cross-cutting concerns at assembly time, eg. a generalizedcheckpoint().- Type Parameters:
T- the value type- Parameters:
source- the source to apply assembly hooks onto- Returns:
- the source, potentially wrapped with assembly time cross-cutting behavior
-
onAssembly
To be used by custom operators: invokes assemblyHookspointcut given aConnectableFlux, potentially returning a newConnectableFlux. This is for example useful to activate cross-cutting concerns at assembly time, eg. a generalizedcheckpoint().- Type Parameters:
T- the value type- Parameters:
source- the source to apply assembly hooks onto- Returns:
- the source, potentially wrapped with assembly time cross-cutting behavior
-
toString
-
firstWithSignal(Iterable).