Class Mono<T>
- Type Parameters:
T- the type of the single value of this class
- All Implemented Interfaces:
Publisher<T>,CorePublisher<T>
- Direct Known Subclasses:
MonoOperator,MonoProcessor
Publisher with basic rx operators that emits at most one item via the
onNext signal then terminates with an onComplete signal (successful Mono,
with or without value), or only emits a single onError signal (failed Mono).
Most Mono implementations are expected to immediately call Subscriber.onComplete()
after having called Subscriber#onNext(T). Mono.never() is an outlier: it doesn't
emit any signal, which is not technically forbidden although not terribly useful outside
of tests. On the other hand, a combination of onNext and onError is explicitly forbidden.
The recommended way to learn about the Mono API and discover new operators is
through the reference documentation, rather than through this javadoc (as opposed to
learning more about individual operators). See the
"which operator do I need?" appendix.
The rx operators will offer aliases for input Mono type to preserve the "at most one"
property of the resulting Mono. For instance flatMap returns a
Mono, while there is a flatMapMany alias with possibly more than
1 emission.
Mono<Void> should be used for Publisher that just completes without any value.
It is intended to be used in implementations and return types, input parameters should keep
using raw Publisher as much as possible.
Note that using state in the java.util.function / lambdas used within Mono operators
should be avoided, as these may be shared between several Subscribers.
- Author:
- Sebastien Deleuze, Stephane Maldini, David Karnok, Simon Baslé, Injae Kim
- See Also:
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionJoin the termination signals from this mono and another source into the returned void monofinal <P> PTransform thisMonointo a target type.block()Subscribe to thisMonoand block indefinitely until a next signal is received.Subscribe to thisMonoand block until a next signal is received or a timeout expires.Subscribe to thisMonoand block indefinitely until a next signal is received or the Mono completes empty.blockOptional(Duration timeout) Subscribe to thisMonoand block until a next signal is received, the Mono completes empty or a timeout expires.cache()Turn thisMonointo a hot source and cache last emitted signals for furtherSubscriber.Turn thisMonointo a hot source and cache last emitted signals for furtherSubscriber, with an expiry timeout.Turn thisMonointo a hot source and cache last emitted signals for furtherSubscriber, with an expiry timeout.cache(Function<? super T, Duration> ttlForValue, Function<Throwable, Duration> ttlForError, Supplier<Duration> ttlForEmpty) Turn thisMonointo a hot source and cache last emitted signal for furtherSubscriber, with an expiry timeout (TTL) that depends on said signal.cache(Function<? super T, Duration> ttlForValue, Function<Throwable, Duration> ttlForError, Supplier<Duration> ttlForEmpty, Scheduler timer) Turn thisMonointo a hot source and cache last emitted signal for furtherSubscriber, with an expiry timeout (TTL) that depends on said signal.cacheInvalidateIf(Predicate<? super T> invalidationPredicate) cacheInvalidateWhen(Function<? super T, Mono<Void>> invalidationTriggerGenerator) CacheonNextsignal received from the source and replay it to other subscribers, while allowing invalidation via aMono<Void>companion trigger generated from the currently cached value.cacheInvalidateWhen(Function<? super T, Mono<Void>> invalidationTriggerGenerator, Consumer<? super T> onInvalidate) CacheonNextsignal received from the source and replay it to other subscribers, while allowing invalidation via aMono<Void>companion trigger generated from the currently cached value.final <E> Mono<E>Cast the currentMonoproduced type into a target produced type.Activate traceback (full assembly tracing) for this particularMono, in case of an error upstream of the checkpoint.checkpoint(@Nullable String description, boolean forceStackTrace) Activate traceback (full assembly tracing or the lighter assembly marking depending on theforceStackTraceoption).checkpoint(String description) Activate traceback (assembly marker) for this particularMonoby giving it a description that will be reflected in the assembly traceback in case of an error upstream of the checkpoint.concatWith(Publisher<? extends T> other) If context-propagation library is on the classpath, this is a convenience shortcut to capture thread local values during the subscription phase and put them in theContextthat is visible upstream of this operator.contextWrite(Function<Context, Context> contextModifier) contextWrite(ContextView contextToAppend) Enrich theContextvisible from downstream for the benefit of upstream operators, by making all values from the providedContextViewvisible on top of pairs from downstream.static <T> Mono<T>Creates a deferred emitter that can be used with callback-based APIs to signal at most one value, a complete or an error signal.defaultIfEmpty(T defaultV) Provide a default single value if this mono is completed without any datastatic <T> Mono<T>Create aMonoprovider that willsupplya targetMonoto subscribe to for eachSubscriberdownstream.static <T> Mono<T>deferContextual(Function<ContextView, ? extends Mono<? extends T>> contextualMonoFactory) Create aMonoprovider that willsupplya targetMonoto subscribe to for eachSubscriberdownstream.Create a Mono which delays an onNext signal by a givendurationon a default Scheduler and completes.delayElement(Duration delay) Delay thisMonoelement (Subscriber.onNext(T)signal) by a given duration.delayElement(Duration delay, Scheduler timer) Delay thisMonoelement (Subscriber.onNext(T)signal) by a givenDuration, on a particularScheduler.delaySubscription(Duration delay) Delay thesubscriptionto thisMonosource until the given period elapses.delaySubscription(Duration delay, Scheduler timer) delaySubscription(Publisher<U> subscriptionDelay) delayUntil(Function<? super T, ? extends Publisher<?>> triggerProvider) final <X> Mono<X>An operator working only if thisMonoemits onNext, onError or onCompleteSignalinstances, transforming thesematerializedsignals into real signals on theSubscriber.doAfterTerminate(Runnable afterTerminate) Add behavior (side-effect) triggered after theMonoterminates, either by completing downstream successfully or with an error.doFinally(Consumer<SignalType> onFinally) Add behavior triggering after theMonoterminates for any reason, including cancellation.Add behavior (side-effect) triggered before theMonois subscribed to, which should be the first event after assembly time.doOnCancel(Runnable onCancel) Add behavior triggered when theMonois cancelled.doOnDiscard(Class<R> type, Consumer<? super R> discardHook) Potentially modify the behavior of the whole chain of operators upstream of this one to conditionally clean up elements that get discarded by these operators.Add behavior triggered when theMonoemits an item, fails with an error or completes successfully.Add behavior triggered when theMonocompletes with an error matching the given exception type.Add behavior triggered when theMonocompletes with an error.Add behavior triggered when theMonocompletes with an error matching the given predicate.Add behavior triggered when theMonoemits a data successfully.doOnRequest(LongConsumer consumer) Add behavior triggering aLongConsumerwhen theMonoreceives any request.doOnSubscribe(Consumer<? super Subscription> onSubscribe) Add behavior (side-effect) triggered when theMonois being subscribed, that is to say when aSubscriptionhas been produced by thePublisherand is being passed to theSubscriber.onSubscribe(Subscription).doOnSuccess(Consumer<? super @Nullable T> onSuccess) Add behavior triggered as soon as theMonocan be considered to have completed successfully.doOnTerminate(Runnable onTerminate) Add behavior triggered when theMonoterminates, either by completing with a value, completing empty or failing with an error.elapsed()Map thisMonointoTuple2<Long, T>of timemillis and source data.Map thisMonosequence intoTuple2<Long, T>of timemillis and source data.static <T> Mono<T>empty()Create aMonothat completes without emitting any item.static <T> Mono<T>Create aMonothat terminates with the specified error immediately after being subscribed to.static <T> Mono<T>Create aMonothat terminates with an error immediately after being subscribed to.Recursively expand elements into a graph and emit all the resulting element using a breadth-first traversal strategy.Recursively expand elements into a graph and emit all the resulting element using a breadth-first traversal strategy.expandDeep(Function<? super T, ? extends Publisher<? extends T>> expander) Recursively expand elements into a graph and emit all the resulting element, in a depth-first traversal order.expandDeep(Function<? super T, ? extends Publisher<? extends T>> expander, int capacityHint) Recursively expand elements into a graph and emit all the resulting element, in a depth-first traversal order.If thisMonois valued, test the result and replay it if predicate returns true.filterWhen(Function<? super T, ? extends Publisher<Boolean>> asyncPredicate) If thisMonois valued, test the value asynchronously using a generatedPublisher<Boolean>test.static <T> Mono<T>Deprecated.static <T> Mono<T>Deprecated.static <T> Mono<T>firstWithSignal(Iterable<? extends Mono<? extends T>> monos) Pick the firstMonoto emit any signal (value, empty completion or error) and replay that signal, effectively behaving like the fastest of these competing sources.static <T> Mono<T>firstWithSignal(Mono<? extends T>... monos) Pick the firstMonoto emit any signal (value, empty completion or error) and replay that signal, effectively behaving like the fastest of these competing sources.static <T> Mono<T>firstWithValue(Iterable<? extends Mono<? extends T>> monos) static <T> Mono<T>firstWithValue(Mono<? extends T> first, Mono<? extends T>... others) final <R> Mono<R>final <R> Flux<R>flatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper) final <R> Flux<R>flatMapMany(Function<? super T, ? extends Publisher<? extends R>> mapper) final <R> Flux<R>flatMapMany(Function<? super T, ? extends Publisher<? extends R>> mapperOnNext, Function<? super Throwable, ? extends Publisher<? extends R>> mapperOnError, Supplier<? extends Publisher<? extends R>> mapperOnComplete) flux()static <T> Mono<T>static <T> Mono<T>fromCallable(Callable<? extends @Nullable T> callable) static <T> Mono<T>fromCompletionStage(CompletionStage<? extends T> completionStage) Create aMono, producing its value using the providedCompletionStage.static <T> Mono<T>fromCompletionStage(Supplier<? extends CompletionStage<? extends T>> stageSupplier) Create aMonothat wraps a lazily-suppliedCompletionStageon subscription, emitting the value produced by theCompletionStage.static <I> Mono<I>fromDirect(Publisher<? extends I> source) static <T> Mono<T>fromFuture(CompletableFuture<? extends T> future) Create aMono, producing its value using the providedCompletableFutureand cancelling the future if the Mono gets cancelled.static <T> Mono<T>fromFuture(CompletableFuture<? extends T> future, boolean suppressCancel) Create aMono, producing its value using the providedCompletableFutureand optionally cancelling the future if the Mono gets cancelled (ifsuppressCancel == false).static <T> Mono<T>fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier) Create aMonothat wraps a lazily-suppliedCompletableFutureon subscription, emitting the value produced by the future and cancelling the future if the Mono gets cancelled.static <T> Mono<T>fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier, boolean suppressCancel) Create aMonothat wraps a lazily-suppliedCompletableFutureon subscription, emitting the value produced by the future and optionally cancelling the future if the Mono gets cancelled (ifsuppressCancel == false).static <T> Mono<T>fromRunnable(Runnable runnable) static <T> Mono<T>fromSupplier(Supplier<? extends @Nullable T> supplier) final <R> Mono<R>handle(BiConsumer<? super T, SynchronousSink<R>> handler) Handle the items emitted by thisMonoby calling a biconsumer with the output sink for each onNext.Emit a single boolean true if thisMonohas an element.hide()Hides the identity of thisMonoinstance.Ignores onNext signal (dropping it) and only propagates termination events.static <T> Mono<T>ignoreElements(Publisher<T> source) Create a newMonothat ignores elements from the source (dropping them), but completes when the source completes.static <T> Mono<T>just(T data) Create a newMonothat emits the specified item, which is captured at instantiation time.static <T> Mono<T>justOrEmpty(@Nullable Optional<? extends T> data) Create a newMonothat emits the specified item ifOptional.isPresent()otherwise only emits onComplete.static <T> Mono<T>justOrEmpty(@Nullable T data) Create a newMonothat emits the specified item if non null otherwise only emits onComplete.log()Observe all Reactive Streams signals and trace them usingLoggersupport.Observe all Reactive Streams signals and useLoggersupport to handle trace implementation.log(@Nullable String category, Level level, boolean showOperatorLine, SignalType... options) Observe Reactive Streams signals matching the passed filteroptionsand useLoggersupport to handle trace implementation.log(@Nullable String category, Level level, SignalType... options) Observe Reactive Streams signals matching the passed flagsoptionsand useLoggersupport to handle trace implementation.Observe Reactive Streams signals matching the passed filteroptionsand trace them using a specific user-providedLogger, atLevel.INFOlevel.log(Logger logger, Level level, boolean showOperatorLine, SignalType... options) final <R> Mono<R>Transform the item emitted by thisMonoby applying a synchronous function to it.final <R> Mono<R>mapNotNull(Function<? super T, ? extends @Nullable R> mapper) Transform the item emitted by thisMonoby applying a synchronous function to it, which is allowed to produce anullvalue.Transform incoming onNext, onError and onComplete signals intoSignalinstances, materializing these signals.metrics()Deprecated.Prefer using thetap(SignalListenerFactory)with theSignalListenerFactoryprovided by the new reactor-core-micrometer module.Give a name to this sequence, which can be retrieved usingScannable.name()as long as this is the first reachableScannable.parents().static <T> Mono<T>never()Return aMonothat will never signal any data, error or completion signal, essentially running indefinitely.final <U> Mono<U>Evaluate the emitted value against the givenClasstype.protected static <T> Mono<T>onAssembly(Mono<T> source) Simply complete the sequence by replacing anonError signalwith anonComplete signal.onErrorComplete(Class<? extends Throwable> type) Simply complete the sequence by replacing anonError signalwith anonComplete signalif the error matches the givenClass.onErrorComplete(Predicate<? super Throwable> predicate) Simply complete the sequence by replacing anonError signalwith anonComplete signalif the error matches the givenPredicate.onErrorContinue(Class<E> type, BiConsumer<Throwable, Object> errorConsumer) Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements.onErrorContinue(BiConsumer<Throwable, Object> errorConsumer) Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements.onErrorContinue(Predicate<E> errorPredicate, BiConsumer<Throwable, Object> errorConsumer) Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements.onErrorMap(Class<E> type, Function<? super E, ? extends Throwable> mapper) Transform an error emitted by thisMonoby synchronously applying a function to it if the error matches the given type.onErrorMap(Function<? super Throwable, ? extends Throwable> mapper) Transform any error emitted by thisMonoby synchronously applying a function to it.onErrorMap(Predicate<? super Throwable> predicate, Function<? super Throwable, ? extends Throwable> mapper) Transform an error emitted by thisMonoby synchronously applying a function to it if the error matches the given predicate.onErrorResume(Class<E> type, Function<? super E, ? extends Mono<? extends T>> fallback) Subscribe to a fallback publisher when an error matching the given type occurs, using a function to choose the fallback depending on the error.onErrorResume(Function<? super Throwable, ? extends Mono<? extends T>> fallback) Subscribe to a fallback publisher when any error occurs, using a function to choose the fallback depending on the error.onErrorResume(Predicate<? super Throwable> predicate, Function<? super Throwable, ? extends Mono<? extends T>> fallback) Subscribe to a fallback publisher when an error matching a given predicate occurs.onErrorReturn(Class<E> type, T fallbackValue) Simply emit a captured fallback value when an error of the specified type is observed on thisMono.onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue) Simply emit a captured fallback value when an error matching the given predicate is observed on thisMono.onErrorReturn(T fallbackValue) Simply emit a captured fallback value when any error is observed on thisMono.If anonErrorContinue(BiConsumer)variant has been used downstream, reverts to the default 'STOP' mode where errors are terminal events upstream.Detaches both the childSubscriberand theSubscriptionon termination or cancellation.Emit the first available signal from this mono or the other mono.final <R> Mono<R>Share aMonofor the duration of a function that may transform it and consume it as many times as necessary without causing multiple subscriptions to the upstream.repeat()Repeatedly and indefinitely subscribe to the source upon completion of the previous subscription.repeat(long numRepeat) Repeatedly subscribe to the source numRepeat times.repeat(long numRepeat, BooleanSupplier predicate) Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.repeat(BooleanSupplier predicate) Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.repeatWhen(Function<Flux<Long>, ? extends Publisher<?>> repeatFactory) Repeatedly subscribe to thisMonowhen a companion sequence emits elements in response to the flux completion signal.repeatWhenEmpty(int maxRepeat, Function<Flux<Long>, ? extends Publisher<?>> repeatFactory) repeatWhenEmpty(Function<Flux<Long>, ? extends Publisher<?>> repeatFactory) retry()Re-subscribes to thisMonosequence if it signals any error, indefinitely.retry(long numRetries) Re-subscribes to thisMonosequence if it signals any error, for a fixed number of times.sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2) Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the same by comparing the items emitted by each Publisher pairwise.sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2, BiPredicate<? super T, ? super T> isEqual) Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the same by comparing the items emitted by each Publisher pairwise based on the results of a specified equality function.sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2, BiPredicate<? super T, ? super T> isEqual, int prefetch) Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the same by comparing the items emitted by each Publisher pairwise based on the results of a specified equality function.share()single()Expect exactly one item from thisMonosource or signalNoSuchElementExceptionfor an empty source.Wrap the item produced by thisMonosource into an Optional or emit an empty Optional for an empty source.final DisposableSubscribe to thisMonoand request unbounded demand.final Disposablesubscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer) final Disposablesubscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer) final Disposablesubscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Context initialContext) final Disposablefinal Disposablefinal voidsubscribe(Subscriber<? super T> actual) abstract voidsubscribe(CoreSubscriber<? super T> actual) An internalPublisher.subscribe(Subscriber)that will bypassHooks.onLastOperator(Function)pointcut.subscribeOn(Scheduler scheduler) Run subscribe, onSubscribe and request on a specifiedScheduler'sScheduler.Worker.final <E extends Subscriber<? super T>>
EsubscribeWith(E subscriber) Subscribe the givenSubscriberto thisMonoand return saidSubscriber, allowing subclasses with a richer API to be used fluently.switchIfEmpty(Mono<? extends T> alternate) Fallback to an alternativeMonoif this mono is completed without dataTag this mono with a key/value pair.Give this Mono a chance to resolve within a specified time frame but complete if it doesn't.Give this Mono a chance to resolve within a specified time frame but complete if it doesn't.takeUntilOther(Publisher<?> other) Give this Mono a chance to resolve before a companionPublisheremits.tap(Function<ContextView, SignalListener<T>> listenerGenerator) Tap into Reactive Streams signals emitted or received by thisMonoand notify a stateful per-SubscriberSignalListener.tap(Supplier<SignalListener<T>> simpleListenerGenerator) Tap into Reactive Streams signals emitted or received by thisMonoand notify a stateful per-SubscriberSignalListener.tap(SignalListenerFactory<T, ?> listenerFactory) Tap into Reactive Streams signals emitted or received by thisMonoand notify a stateful per-SubscriberSignalListenercreated by the providedSignalListenerFactory.then()Return aMono<Void>which only replays complete and error signals from thisMono.final <V> Mono<V>Let thisMonocomplete then play another Mono.Return aMono<Void>that waits for thisMonoto complete then for a suppliedPublisher<Void>to also complete.final <V> Flux<V>final <V> Mono<V>thenReturn(V value) Let thisMonocomplete successfully, then emit the provided value.timed()Times thisMonoSubscriber.onNext(Object)event, encapsulated into aTimedobject that lets downstream consumer look at various time information gathered with nanosecond resolution using the default clock (Schedulers.parallel()):Timed.elapsed(): the time in nanoseconds since subscription, as aDuration.Times thisMonoSubscriber.onNext(Object)event, encapsulated into aTimedobject that lets downstream consumer look at various time information gathered with nanosecond resolution using the providedScheduleras a clock:Timed.elapsed(): the time in nanoseconds since subscription, as aDuration.Propagate aTimeoutExceptionin case no item arrives within the givenDuration.Signal aTimeoutExceptionerror in case an item doesn't arrive before the given period, as measured on the providedScheduler.Signal aTimeoutExceptionin case the item from thisMonohas not been emitted before the givenPublisheremits.final CompletableFuture<@Nullable T>toFuture()Transform thisMonointo aCompletableFuturecompleting on onNext or onComplete and failing on onError.toString()final <V> Mono<V>final <V> Mono<V>transformDeferred(Function<? super Mono<T>, ? extends Publisher<V>> transformer) final <V> Mono<V>transformDeferredContextual(BiFunction<? super Mono<T>, ? super ContextView, ? extends Publisher<V>> transformer) static <T,D extends AutoCloseable>
Mono<T>using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Mono<? extends T>> sourceSupplier) Uses anAutoCloseableresource, generated by a supplier for each individual Subscriber, while streaming the value from a Mono derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.static <T,D extends AutoCloseable>
Mono<T>using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Mono<? extends T>> sourceSupplier, boolean eager) Uses anAutoCloseableresource, generated by a supplier for each individual Subscriber, while streaming the value from a Mono derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.static <T,D> Mono<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Mono<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup) Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a Mono derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.static <T,D> Mono<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Mono<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager) Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a Mono derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.static <T,D> Mono<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D, ? extends Mono<? extends T>> resourceClosure, Function<? super D, ? extends Publisher<?>> asyncCleanup) static <T,D> Mono<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D, ? extends Mono<? extends T>> resourceClosure, Function<? super D, ? extends Publisher<?>> asyncComplete, BiFunction<? super D, ? super Throwable, ? extends Publisher<?>> asyncError, Function<? super D, ? extends Publisher<?>> asyncCancel) Uses a resource, generated by aPublisherfor each individualSubscriber, to derive aMono.Note that all steps of the operator chain that would need the resource to be in an open stable state need to be described inside theresourceClosureFunction.Aggregate given publishers into a new Mono that will be fulfilled when all of the given Publishers have completed.Aggregate given publishers into a new Mono that will be fulfilled when all of the given sources have completed.whenDelayError(Iterable<? extends Publisher<?>> sources) Aggregate given publishers into a new Mono that will be fulfilled when all of the given sources have completed.whenDelayError(Publisher<?>... sources) Merge given publishers into a new Mono that will be fulfilled when all of the given sources have completed.static <R> Mono<R>Aggregate given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values according to the provided combinator function.static <R> Mono<R>Aggregate given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values according to the provided combinator function.Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple2.static <T1,T2, O> Mono<O> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values as defined by the combinator function.Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple3.Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple4.zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple5.zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple6.zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6, Mono<? extends T7> p7) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple7.zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6, Mono<? extends T7> p7, Mono<? extends T8> p8) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple8.static <R> Mono<R>zipDelayError(Iterable<? extends Mono<?>> monos, Function<? super Object[], ? extends R> combinator) Aggregate given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item.static <R> Mono<R>zipDelayError(Function<? super Object[], ? extends R> combinator, Mono<?>... monos) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values according to the provided combinator function and delaying errors.zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple2and delaying errors.zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3) Merge given monos into a new Mono that will be fulfilled when all of the given Mono Monos have produced an item, aggregating their values into aTuple3and delaying errors.zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple4and delaying errors.zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple5and delaying errors.zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple6and delaying errors.zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6, Mono<? extends T7> p7) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple7and delaying errors.zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6, Mono<? extends T7> p7, Mono<? extends T8> p8) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple8and delaying errors.Wait for the result from this mono, use it to create a second mono via the providedrightGeneratorfunction and combine both results into aTuple2.final <T2,O> Mono<O> Wait for the result from this mono, use it to create a second mono via the providedrightGeneratorfunction and combine both results into an arbitraryOobject, as defined by the providedcombinatorfunction.Combine the result from this mono and another into aTuple2.final <T2,O> Mono<O> zipWith(Mono<? extends T2> other, BiFunction<? super T, ? super T2, ? extends O> combinator) Combine the result from this mono and another into an arbitraryOobject, as defined by the providedcombinatorfunction.
-
Constructor Details
-
Mono
public Mono()
-
-
Method Details
-
create
Creates a deferred emitter that can be used with callback-based APIs to signal at most one value, a complete or an error signal.Bridging legacy API involves mostly boilerplate code due to the lack of standard types and methods. There are two kinds of API surfaces: 1) addListener/removeListener and 2) callback-handler.
1) addListener/removeListener pairs
To work with such API one has to instantiate the listener, call the sink from the listener then register it with the source:
Note that this works only with single-value emitting listeners. Otherwise, all subsequent signals are dropped. You may have to addMono.<String>create(sink -> { HttpListener listener = event -> { if (event.getResponseCode() >= 400) { sink.error(new RuntimeException("Failed")); } else { String body = event.getBody(); if (body.isEmpty()) { sink.success(); } else { sink.success(body.toLowerCase()); } } }; client.addListener(listener); sink.onDispose(() -> client.removeListener(listener)); });client.removeListener(this);to the listener's body.2) callback handler
This requires a similar instantiation pattern such as above, but usually the successful completion and error are separated into different methods. In addition, the legacy API may or may not support some cancellation mechanism.Mono.<String>create(sink -> { Callback<String> callback = new Callback<String>() { @Override public void onResult(String data) { sink.success(data.toLowerCase()); } @Override public void onError(Exception e) { sink.error(e); } } // without cancellation support: client.call("query", callback); // with cancellation support: AutoCloseable cancel = client.call("query", callback); sink.onDispose(() -> { try { cancel.close(); } catch (Exception ex) { Exceptions.onErrorDropped(ex); } }); }); -
defer
Create aMonoprovider that willsupplya targetMonoto subscribe to for eachSubscriberdownstream. -
deferContextual
public static <T> Mono<T> deferContextual(Function<ContextView, ? extends Mono<? extends T>> contextualMonoFactory) Create aMonoprovider that willsupplya targetMonoto subscribe to for eachSubscriberdownstream. This operator behaves the same way asdefer(Supplier), but accepts aFunctionthat will receive the currentContextViewas an argument. -
delay
Create a Mono which delays an onNext signal by a givendurationon a default Scheduler and completes. If the demand cannot be produced in time, an onError will be signalled instead. The delay is introduced through theparalleldefault Scheduler.- Parameters:
duration- the duration of the delay- Returns:
- a new
Mono
-
delay
-
empty
Create aMonothat completes without emitting any item.- Type Parameters:
T- the reifiedSubscribertype- Returns:
- a completed
Mono
-
error
Create aMonothat terminates with the specified error immediately after being subscribed to.- Type Parameters:
T- the reifiedSubscribertype- Parameters:
error- the onError signal- Returns:
- a failing
Mono
-
error
Create aMonothat terminates with an error immediately after being subscribed to. TheThrowableis generated by aSupplier, invoked each time there is a subscription and allowing for lazy instantiation.- Type Parameters:
T- the reifiedSubscribertype- Parameters:
errorSupplier- the error signalSupplierto invoke for eachSubscriber- Returns:
- a failing
Mono
-
first
Deprecated.usefirstWithSignal(Mono[]). To be removed in reactor 3.5.Pick the firstMonoto emit any signal (value, empty completion or error) and replay that signal, effectively behaving like the fastest of these competing sources.- Type Parameters:
T- The type of the function result.- Parameters:
monos- The deferred monos to use.- Returns:
- a new
Monobehaving like the fastest of its sources.
-
first
Deprecated.usefirstWithSignal(Iterable). To be removed in reactor 3.5.Pick the firstMonoto emit any signal (value, empty completion or error) and replay that signal, effectively behaving like the fastest of these competing sources.- Type Parameters:
T- The type of the function result.- Parameters:
monos- The deferred monos to use.- Returns:
- a new
Monobehaving like the fastest of its sources.
-
firstWithSignal
Pick the firstMonoto emit any signal (value, empty completion or error) and replay that signal, effectively behaving like the fastest of these competing sources.- Type Parameters:
T- The type of the function result.- Parameters:
monos- The deferred monos to use.- Returns:
- a new
Monobehaving like the fastest of its sources.
-
firstWithSignal
Pick the firstMonoto emit any signal (value, empty completion or error) and replay that signal, effectively behaving like the fastest of these competing sources.- Type Parameters:
T- The type of the function result.- Parameters:
monos- The deferred monos to use.- Returns:
- a new
Monobehaving like the fastest of its sources.
-
firstWithValue
Pick the firstMonosource to emit any value and replay that signal, effectively behaving like the source that first emits anonNext.Valued sources always "win" over an empty source (one that only emits onComplete) or a failing source (one that only emits onError).
When no source can provide a value, this operator fails with a
NoSuchElementException(provided there are at least two sources). This exception has acompositeas itscausethat can be used to inspect what went wrong with each source (so the composite has as many elements as there are sources).Exceptions from failing sources are directly reflected in the composite at the index of the failing source. For empty sources, a
NoSuchElementExceptionis added at their respective index. One can useExceptions.unwrapMultiple(topLevel.getCause())to easily inspect these errors as aList.Note that like in
firstWithSignal(Iterable), an infinite source can be problematic if no other source emits onNext. -
firstWithValue
@SafeVarargs public static <T> Mono<T> firstWithValue(Mono<? extends T> first, Mono<? extends T>... others) Pick the firstMonosource to emit any value and replay that signal, effectively behaving like the source that first emits anonNext.Valued sources always "win" over an empty source (one that only emits onComplete) or a failing source (one that only emits onError).
When no source can provide a value, this operator fails with a
NoSuchElementException(provided there are at least two sources). This exception has acompositeas itscausethat can be used to inspect what went wrong with each source (so the composite has as many elements as there are sources).Exceptions from failing sources are directly reflected in the composite at the index of the failing source. For empty sources, a
NoSuchElementExceptionis added at their respective index. One can useExceptions.unwrapMultiple(topLevel.getCause())to easily inspect these errors as aList.Note that like in
firstWithSignal(Mono[]), an infinite source can be problematic if no other source emits onNext. In case thefirstsource is already an array-basedfirstWithValue(Mono, Mono[])instance, nesting is avoided: a single new array-based instance is created with all the sources fromfirstplus all theotherssources at the same level. -
from
Expose the specifiedPublisherwith theMonoAPI, and ensure it will emit 0 or 1 item. The source emitter will be cancelled on the first `onNext`.Hooks.onEachOperator(String, Function)and similar assembly hooks are applied unless the source is already aMono(includingMonothat was decorated as aFlux, seeFlux.from(Publisher)). -
fromCallable
-
fromCompletionStage
Create aMono, producing its value using the providedCompletionStage.If the completionStage is also a
Future, cancelling the Mono will cancel the future. UsefromFuture(CompletableFuture, boolean)withsuppressCancellationset totrueif you need to suppress cancellation propagation.- Type Parameters:
T- type of the expected value- Parameters:
completionStage-CompletionStagethat will produce a value (or a null to complete immediately)- Returns:
- A
Mono.
-
fromCompletionStage
public static <T> Mono<T> fromCompletionStage(Supplier<? extends CompletionStage<? extends T>> stageSupplier) Create aMonothat wraps a lazily-suppliedCompletionStageon subscription, emitting the value produced by theCompletionStage.If the completionStage is also a
Future, cancelling the Mono will cancel the future. UsefromFuture(CompletableFuture, boolean)withsuppressCancellationset totrueif you need to suppress cancellation propagation.- Type Parameters:
T- type of the expected value- Parameters:
stageSupplier- TheSupplierof aCompletionStagethat will produce a value (or a null to complete immediately). This allows lazy triggering of CompletionStage-based APIs.- Returns:
- A
Mono.
-
fromDirect
Convert aPublisherto aMonowithout any cardinality check (ie this method doesn't cancel the source past the first element). Conversion transparently returnsMonosources without wrapping and otherwise supportsFuseablesources. Note this is an advanced interoperability operator that implies you know thePublisheryou are converting follows theMonosemantics and only ever emits one element.Hooks.onEachOperator(String, Function)and similar assembly hooks are applied unless the source is already aMono. -
fromFuture
Create aMono, producing its value using the providedCompletableFutureand cancelling the future if the Mono gets cancelled.Use
fromFuture(CompletableFuture, boolean)withsuppressCancellationset totrueif you need to suppress cancellation propagation.- Type Parameters:
T- type of the expected value- Parameters:
future-CompletableFuturethat will produce a value (or a null to complete immediately)- Returns:
- A
Mono. - See Also:
-
fromFuture
Create aMono, producing its value using the providedCompletableFutureand optionally cancelling the future if the Mono gets cancelled (ifsuppressCancel == false).- Type Parameters:
T- type of the expected value- Parameters:
future-CompletableFuturethat will produce a value (or a null to complete immediately)suppressCancel-trueto prevent cancellation of the future when the Mono is cancelled,falseotherwise (the default)- Returns:
- A
Mono.
-
fromFuture
public static <T> Mono<T> fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier) Create aMonothat wraps a lazily-suppliedCompletableFutureon subscription, emitting the value produced by the future and cancelling the future if the Mono gets cancelled.- Type Parameters:
T- type of the expected value- Parameters:
futureSupplier- TheSupplierof aCompletableFuturethat will produce a value (or a null to complete immediately). This allows lazy triggering of future-based APIs.- Returns:
- A
Mono. - See Also:
-
fromFuture
public static <T> Mono<T> fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier, boolean suppressCancel) Create aMonothat wraps a lazily-suppliedCompletableFutureon subscription, emitting the value produced by the future and optionally cancelling the future if the Mono gets cancelled (ifsuppressCancel == false).- Type Parameters:
T- type of the expected value- Parameters:
futureSupplier- TheSupplierof aCompletableFuturethat will produce a value (or a null to complete immediately). This allows lazy triggering of future-based APIs.suppressCancel-trueto prevent cancellation of the future when the Mono is cancelled,falseotherwise (the default)- Returns:
- A
Mono. - See Also:
-
fromRunnable
-
fromSupplier
-
ignoreElements
Create a newMonothat ignores elements from the source (dropping them), but completes when the source completes.Discard Support: This operator discards the element from the source.
-
just
Create a newMonothat emits the specified item, which is captured at instantiation time.- Type Parameters:
T- the type of the produced item- Parameters:
data- the only item to onNext- Returns:
- a
Mono.
-
justOrEmpty
Create a newMonothat emits the specified item ifOptional.isPresent()otherwise only emits onComplete. -
justOrEmpty
Create a newMonothat emits the specified item if non null otherwise only emits onComplete.- Type Parameters:
T- the type of the produced item- Parameters:
data- the item to onNext or onComplete if null- Returns:
- a
Mono.
-
never
Return aMonothat will never signal any data, error or completion signal, essentially running indefinitely.- Type Parameters:
T- theSubscribertype target- Returns:
- a never completing
Mono
-
sequenceEqual
public static <T> Mono<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2) Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the same by comparing the items emitted by each Publisher pairwise.- Type Parameters:
T- the type of items emitted by each Publisher- Parameters:
source1- the first Publisher to comparesource2- the second Publisher to compare- Returns:
- a Mono that emits a Boolean value that indicates whether the two sequences are the same
-
sequenceEqual
public static <T> Mono<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2, BiPredicate<? super T, ? super T> isEqual) Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the same by comparing the items emitted by each Publisher pairwise based on the results of a specified equality function.- Type Parameters:
T- the type of items emitted by each Publisher- Parameters:
source1- the first Publisher to comparesource2- the second Publisher to compareisEqual- a function used to compare items emitted by each Publisher- Returns:
- a Mono that emits a Boolean value that indicates whether the two Publisher two sequences are the same according to the specified function
-
sequenceEqual
public static <T> Mono<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2, BiPredicate<? super T, ? super T> isEqual, int prefetch) Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the same by comparing the items emitted by each Publisher pairwise based on the results of a specified equality function.- Type Parameters:
T- the type of items emitted by each Publisher- Parameters:
source1- the first Publisher to comparesource2- the second Publisher to compareisEqual- a function used to compare items emitted by each Publisherprefetch- the number of items to prefetch from the first and second source Publisher- Returns:
- a Mono that emits a Boolean value that indicates whether the two Publisher two sequences are the same according to the specified function
-
using
public static <T,D> Mono<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Mono<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager) Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a Mono derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.- For eager cleanup, unlike in
Flux, in the case of a valuedMonothe cleanup happens just before passing the value to downstream. In all cases, exceptions raised by the eager cleanupConsumermay override the terminal event, discarding the element if the derivedMonowas valued. - Non-eager cleanup will drop any exception.
- Type Parameters:
T- emitted typeD- resource type- Parameters:
resourceSupplier- aCallablethat is called on subscribe to create the resourcesourceSupplier- aMonofactory to create the Mono depending on the created resourceresourceCleanup- invoked on completion to clean-up the resourceeager- set to true to clean before any signal (including onNext) is passed downstream- Returns:
- new
Mono
- For eager cleanup, unlike in
-
using
public static <T,D> Mono<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Mono<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup) Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a Mono derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.Unlike in
Flux, in the case of a valuedMonothe cleanup happens just before passing the value to downstream. In all cases, exceptions raised by the cleanupConsumermay override the terminal event, discarding the element if the derivedMonowas valued.- Type Parameters:
T- emitted typeD- resource type- Parameters:
resourceSupplier- aCallablethat is called on subscribe to create the resourcesourceSupplier- aMonofactory to create the Mono depending on the created resourceresourceCleanup- invoked on completion to clean-up the resource- Returns:
- new
Mono
-
using
public static <T,D extends AutoCloseable> Mono<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Mono<? extends T>> sourceSupplier) Uses anAutoCloseableresource, generated by a supplier for each individual Subscriber, while streaming the value from a Mono derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.Unlike in
Flux, in the case of a valuedMonothe cleanup happens just before passing the value to downstream. In all cases, exceptions raised by the cleanupConsumermay override the terminal event, discarding the element if the derivedMonowas valued. -
using
public static <T,D extends AutoCloseable> Mono<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Mono<? extends T>> sourceSupplier, boolean eager) Uses anAutoCloseableresource, generated by a supplier for each individual Subscriber, while streaming the value from a Mono derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.- For eager cleanup, Unlike in
Flux, in the case of a valuedMonothe cleanup happens just before passing the value to downstream. In all cases, exceptions raised by the cleanupConsumermay override the terminal event, discarding the element if the derivedMonowas valued. - Non-eager cleanup will drop any exception.
- Type Parameters:
T- emitted typeD- resource type- Parameters:
resourceSupplier- aCallablethat is called on subscribe to create the resourcesourceSupplier- aMonofactory to create the Mono depending on the created resourceeager- set to true to clean before any signal (including onNext) is passed downstream- Returns:
- new
Mono
- For eager cleanup, Unlike in
-
usingWhen
public static <T,D> Mono<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D, ? extends Mono<? extends T>> resourceClosure, Function<? super D, ? extends Publisher<?>> asyncCleanup) Uses a resource, generated by aPublisherfor each individualSubscriber, to derive aMono. Note that all steps of the operator chain that would need the resource to be in an open stable state need to be described inside theresourceClosureFunction.Unlike in
the Flux counterpart, ALL signals are deferred until theMonoterminates and the relevantFunctiongenerates and invokes a "cleanup"Publisher. This is because a failure in the cleanup Publisher must result in a loneonErrorsignal in the downstreamMono(any potential value in the derivedMonois discarded). Here are the various scenarios that can play out:- empty Mono, asyncCleanup ends with
onComplete(): downstream receivesonComplete() - empty Mono, asyncCleanup ends with
onError(t): downstream receivesonError(t) - valued Mono, asyncCleanup ends with
onComplete(): downstream receivesonNext(value),onComplete() - valued Mono, asyncCleanup ends with
onError(t): downstream receivesonError(t),valueis discarded - error(e) Mono, asyncCleanup ends with
onComplete(): downstream receivesonError(e) - error(e) Mono, asyncCleanup ends with
onError(t): downstream receivesonError(t), t suppressing e
Note that if the resource supplying
Publisheremits more than one resource, the subsequent resources are dropped (Operators.onNextDropped(Object, Context)). If the publisher errors AFTER having emitted one resource, the error is also silently dropped (Operators.onErrorDropped(Throwable, Context)). An empty completion or error without at least one onNext signal (no resource supplied) triggers a short-circuit of the main sequence with the same terminal signal (no cleanup is invoked).Discard Support: This operator discards any source element if the
asyncCleanuphandler fails.- Type Parameters:
T- the type of elements emitted by the resource closure, and thus the main sequenceD- the type of the resource object- Parameters:
resourceSupplier- aPublisherthat "generates" the resource, subscribed for each subscription to the main sequenceresourceClosure- a factory to derive aMonofrom the supplied resourceasyncCleanup- an asynchronous resource cleanup invoked when the resource closure terminates (with onComplete, onError or cancel)- Returns:
- a new
Monobuilt around a "transactional" resource, with deferred emission until the asynchronous cleanup sequence completes
- empty Mono, asyncCleanup ends with
-
usingWhen
public static <T,D> Mono<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D, ? extends Mono<? extends T>> resourceClosure, Function<? super D, ? extends Publisher<?>> asyncComplete, BiFunction<? super D, ? super Throwable, ? extends Publisher<?>> asyncError, Function<? super D, ? extends Publisher<?>> asyncCancel) Uses a resource, generated by aPublisherfor each individualSubscriber, to derive aMono.Note that all steps of the operator chain that would need the resource to be in an open stable state need to be described inside theresourceClosureFunction.Unlike in
the Flux counterpart, ALL signals are deferred until theMonoterminates and the relevantFunctiongenerates and invokes a "cleanup"Publisher. This is because a failure in the cleanup Publisher must result in a loneonErrorsignal in the downstreamMono(any potential value in the derivedMonois discarded). Here are the various scenarios that can play out:- empty Mono, asyncComplete ends with
onComplete(): downstream receivesonComplete() - empty Mono, asyncComplete ends with
onError(t): downstream receivesonError(t) - valued Mono, asyncComplete ends with
onComplete(): downstream receivesonNext(value),onComplete() - valued Mono, asyncComplete ends with
onError(t): downstream receivesonError(t),valueis discarded - error(e) Mono, errorComplete ends with
onComplete(): downstream receivesonError(e) - error(e) Mono, errorComplete ends with
onError(t): downstream receivesonError(t), t suppressing e
Individual cleanups can also be associated with mono cancellation and error terminations:
Note that if the resource supplying
Publisheremits more than one resource, the subsequent resources are dropped (Operators.onNextDropped(Object, Context)). If the publisher errors AFTER having emitted one resource, the error is also silently dropped (Operators.onErrorDropped(Throwable, Context)). An empty completion or error without at least one onNext signal (no resource supplied) triggers a short-circuit of the main sequence with the same terminal signal (no cleanup is invoked).Discard Support: This operator discards the element if the
asyncCompletehandler fails.- Type Parameters:
T- the type of elements emitted by the resource closure, and thus the main sequenceD- the type of the resource object- Parameters:
resourceSupplier- aPublisherthat "generates" the resource, subscribed for each subscription to the main sequenceresourceClosure- a factory to derive aMonofrom the supplied resourceasyncComplete- an asynchronous resource cleanup invoked if the resource closure terminates with onCompleteasyncError- an asynchronous resource cleanup invoked if the resource closure terminates with onError. The terminating error is provided to theBiFunctionasyncCancel- an asynchronous resource cleanup invoked if the resource closure is cancelled. Whennull, theasyncCompletepath is used instead.- Returns:
- a new
Monobuilt around a "transactional" resource, with several termination path triggering asynchronous cleanup sequences
- empty Mono, asyncComplete ends with
-
when
Aggregate given publishers into a new Mono that will be fulfilled when all of the given sources have completed. An error will cause pending results to be cancelled and immediate error emission to the returnedMono.- Parameters:
sources- The sources to use.- Returns:
- a
Mono.
-
when
Aggregate given publishers into a new Mono that will be fulfilled when all of the given Publishers have completed. An error will cause pending results to be cancelled and immediate error emission to the returnedMono.- Parameters:
sources- The sources to use.- Returns:
- a
Mono.
-
whenDelayError
Aggregate given publishers into a new Mono that will be fulfilled when all of the given sources have completed. Errors from the sources are delayed. If several Publishers error, the exceptions are combined (as suppressed exceptions on a root exception).- Parameters:
sources- The sources to use.- Returns:
- a
Mono.
-
whenDelayError
Merge given publishers into a new Mono that will be fulfilled when all of the given sources have completed. Errors from the sources are delayed. If several Publishers error, the exceptions are combined (as suppressed exceptions on a root exception).- Parameters:
sources- The sources to use.- Returns:
- a
Mono.
-
zip
Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple2. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively. -
zip
public static <T1,T2, Mono<O> zipO> (Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values as defined by the combinator function. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.- Type Parameters:
T1- type of the value from p1T2- type of the value from p2O- output value- Parameters:
p1- The first upstreamPublisherto subscribe to.p2- The second upstreamPublisherto subscribe to.combinator- aBiFunctioncombinator function when both sources complete- Returns:
- a
Mono.
-
zip
public static <T1,T2, Mono<Tuple3<T1,T3> T2, zipT3>> (Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple3. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively. -
zip
public static <T1,T2, Mono<Tuple4<T1,T3, T4> T2, zipT3, T4>> (Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple4. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.- Type Parameters:
T1- type of the value from p1T2- type of the value from p2T3- type of the value from p3T4- type of the value from p4- Parameters:
p1- The first upstreamPublisherto subscribe to.p2- The second upstreamPublisherto subscribe to.p3- The third upstreamPublisherto subscribe to.p4- The fourth upstreamPublisherto subscribe to.- Returns:
- a
Mono.
-
zip
public static <T1,T2, Mono<Tuple5<T1,T3, T4, T5> T2, zipT3, T4, T5>> (Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple5. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.- Type Parameters:
T1- type of the value from p1T2- type of the value from p2T3- type of the value from p3T4- type of the value from p4T5- type of the value from p5- Parameters:
p1- The first upstreamPublisherto subscribe to.p2- The second upstreamPublisherto subscribe to.p3- The third upstreamPublisherto subscribe to.p4- The fourth upstreamPublisherto subscribe to.p5- The fifth upstreamPublisherto subscribe to.- Returns:
- a
Mono.
-
zip
public static <T1,T2, Mono<Tuple6<T1,T3, T4, T5, T6> T2, zipT3, T4, T5, T6>> (Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple6. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.- Type Parameters:
T1- type of the value from p1T2- type of the value from p2T3- type of the value from p3T4- type of the value from p4T5- type of the value from p5T6- type of the value from p6- Parameters:
p1- The first upstreamPublisherto subscribe to.p2- The second upstreamPublisherto subscribe to.p3- The third upstreamPublisherto subscribe to.p4- The fourth upstreamPublisherto subscribe to.p5- The fifth upstreamPublisherto subscribe to.p6- The sixth upstreamPublisherto subscribe to.- Returns:
- a
Mono.
-
zip
public static <T1,T2, Mono<Tuple7<T1,T3, T4, T5, T6, T7> T2, zipT3, T4, T5, T6, T7>> (Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6, Mono<? extends T7> p7) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple7. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.- Type Parameters:
T1- type of the value from p1T2- type of the value from p2T3- type of the value from p3T4- type of the value from p4T5- type of the value from p5T6- type of the value from p6T7- type of the value from p7- Parameters:
p1- The first upstreamPublisherto subscribe to.p2- The second upstreamPublisherto subscribe to.p3- The third upstreamPublisherto subscribe to.p4- The fourth upstreamPublisherto subscribe to.p5- The fifth upstreamPublisherto subscribe to.p6- The sixth upstreamPublisherto subscribe to.p7- The seventh upstreamPublisherto subscribe to.- Returns:
- a
Mono.
-
zip
public static <T1,T2, Mono<Tuple8<T1,T3, T4, T5, T6, T7, T8> T2, zipT3, T4, T5, T6, T7, T8>> (Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6, Mono<? extends T7> p7, Mono<? extends T8> p8) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple8. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.- Type Parameters:
T1- type of the value from p1T2- type of the value from p2T3- type of the value from p3T4- type of the value from p4T5- type of the value from p5T6- type of the value from p6T7- type of the value from p7T8- type of the value from p8- Parameters:
p1- The first upstreamPublisherto subscribe to.p2- The second upstreamPublisherto subscribe to.p3- The third upstreamPublisherto subscribe to.p4- The fourth upstreamPublisherto subscribe to.p5- The fifth upstreamPublisherto subscribe to.p6- The sixth upstreamPublisherto subscribe to.p7- The seventh upstreamPublisherto subscribe to.p8- The eight upstreamPublisherto subscribe to.- Returns:
- a
Mono.
-
zip
public static <R> Mono<R> zip(Iterable<? extends Mono<?>> monos, Function<? super Object[], ? extends R> combinator) Aggregate given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values according to the provided combinator function. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.- Type Parameters:
R- the combined result- Parameters:
monos- The monos to use.combinator- the function to transform the combined array into an arbitrary object.- Returns:
- a
Mono.
-
zip
Aggregate given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values according to the provided combinator function. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.- Type Parameters:
R- the combined result- Parameters:
monos- The monos to use.combinator- the function to transform the combined array into an arbitrary object.- Returns:
- a
Mono.
-
zipDelayError
public static <T1,T2> Mono<Tuple2<T1,T2>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple2and delaying errors. If a Mono source completes without value, the other source is run to completion then the resultingMonocompletes empty. If both Monos error, the two exceptions are combined (as suppressed exceptions on a root exception). -
zipDelayError
public static <T1,T2, Mono<Tuple3<T1,T3> T2, zipDelayErrorT3>> (Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3) Merge given monos into a new Mono that will be fulfilled when all of the given Mono Monos have produced an item, aggregating their values into aTuple3and delaying errors. If a Mono source completes without value, all other sources are run to completion then the resultingMonocompletes empty. If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception). -
zipDelayError
public static <T1,T2, Mono<Tuple4<T1,T3, T4> T2, zipDelayErrorT3, T4>> (Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple4and delaying errors. If a Mono source completes without value, all other sources are run to completion then the resultingMonocompletes empty. If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).- Type Parameters:
T1- type of the value from p1T2- type of the value from p2T3- type of the value from p3T4- type of the value from p4- Parameters:
p1- The first upstreamPublisherto subscribe to.p2- The second upstreamPublisherto subscribe to.p3- The third upstreamPublisherto subscribe to.p4- The fourth upstreamPublisherto subscribe to.- Returns:
- a
Mono.
-
zipDelayError
public static <T1,T2, Mono<Tuple5<T1,T3, T4, T5> T2, zipDelayErrorT3, T4, T5>> (Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple5and delaying errors. If a Mono source completes without value, all other sources are run to completion then the resultingMonocompletes empty. If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).- Type Parameters:
T1- type of the value from p1T2- type of the value from p2T3- type of the value from p3T4- type of the value from p4T5- type of the value from p5- Parameters:
p1- The first upstreamPublisherto subscribe to.p2- The second upstreamPublisherto subscribe to.p3- The third upstreamPublisherto subscribe to.p4- The fourth upstreamPublisherto subscribe to.p5- The fifth upstreamPublisherto subscribe to.- Returns:
- a
Mono.
-
zipDelayError
public static <T1,T2, Mono<Tuple6<T1,T3, T4, T5, T6> T2, zipDelayErrorT3, T4, T5, T6>> (Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple6and delaying errors. If a Mono source completes without value, all other sources are run to completion then the resultingMonocompletes empty. If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).- Type Parameters:
T1- type of the value from p1T2- type of the value from p2T3- type of the value from p3T4- type of the value from p4T5- type of the value from p5T6- type of the value from p6- Parameters:
p1- The first upstreamPublisherto subscribe to.p2- The second upstreamPublisherto subscribe to.p3- The third upstreamPublisherto subscribe to.p4- The fourth upstreamPublisherto subscribe to.p5- The fifth upstreamPublisherto subscribe to.p6- The sixth upstreamPublisherto subscribe to.- Returns:
- a
Mono.
-
zipDelayError
public static <T1,T2, Mono<Tuple7<T1,T3, T4, T5, T6, T7> T2, zipDelayErrorT3, T4, T5, T6, T7>> (Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6, Mono<? extends T7> p7) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple7and delaying errors. If a Mono source completes without value, all other sources are run to completion then the resultingMonocompletes empty. If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).- Type Parameters:
T1- type of the value from p1T2- type of the value from p2T3- type of the value from p3T4- type of the value from p4T5- type of the value from p5T6- type of the value from p6T7- type of the value from p7- Parameters:
p1- The first upstreamPublisherto subscribe to.p2- The second upstreamPublisherto subscribe to.p3- The third upstreamPublisherto subscribe to.p4- The fourth upstreamPublisherto subscribe to.p5- The fifth upstreamPublisherto subscribe to.p6- The sixth upstreamPublisherto subscribe to.p7- The seventh upstreamPublisherto subscribe to.- Returns:
- a
Mono.
-
zipDelayError
public static <T1,T2, Mono<Tuple8<T1,T3, T4, T5, T6, T7, T8> T2, zipDelayErrorT3, T4, T5, T6, T7, T8>> (Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6, Mono<? extends T7> p7, Mono<? extends T8> p8) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into aTuple8and delaying errors. If a Mono source completes without value, all other sources are run to completion then the resultingMonocompletes empty. If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).- Type Parameters:
T1- type of the value from p1T2- type of the value from p2T3- type of the value from p3T4- type of the value from p4T5- type of the value from p5T6- type of the value from p6T7- type of the value from p7T8- type of the value from p8- Parameters:
p1- The first upstreamPublisherto subscribe to.p2- The second upstreamPublisherto subscribe to.p3- The third upstreamPublisherto subscribe to.p4- The fourth upstreamPublisherto subscribe to.p5- The fifth upstreamPublisherto subscribe to.p6- The sixth upstreamPublisherto subscribe to.p7- The seventh upstreamPublisherto subscribe to.p8- The eight upstreamPublisherto subscribe to.- Returns:
- a
Mono.
-
zipDelayError
public static <R> Mono<R> zipDelayError(Iterable<? extends Mono<?>> monos, Function<? super Object[], ? extends R> combinator) Aggregate given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item. Errors from the sources are delayed. If a Mono source completes without value, all other sources are run to completion then the resultingMonocompletes empty. If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).- Type Parameters:
R- the combined result- Parameters:
monos- The monos to use.combinator- the function to transform the combined array into an arbitrary object.- Returns:
- a
Mono.
-
zipDelayError
public static <R> Mono<R> zipDelayError(Function<? super Object[], ? extends R> combinator, Mono<?>... monos) Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values according to the provided combinator function and delaying errors. If a Mono source completes without value, all other sources are run to completion then the resultingMonocompletes empty. If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).- Type Parameters:
R- the combined result- Parameters:
monos- The monos to use.combinator- the function to transform the combined array into an arbitrary object.- Returns:
- a combined
Mono.
-
as
Transform thisMonointo a target type.mono.as(Flux::from).subscribe() -
and
Join the termination signals from this mono and another source into the returned void mono- Parameters:
other- thePublisherto wait for complete- Returns:
- a new combined Mono
- See Also:
-
block
Subscribe to thisMonoand block indefinitely until a next signal is received. Returns that value, or null if the Mono completes empty. In case the Mono errors, the original exception is thrown (wrapped in aRuntimeExceptionif it was a checked exception).Note that each block() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
- Returns:
- T the result
-
block
Subscribe to thisMonoand block until a next signal is received or a timeout expires. Returns that value, or null if the Mono completes empty. In case the Mono errors, the original exception is thrown (wrapped in aRuntimeExceptionif it was a checked exception). If the provided timeout expires, aRuntimeExceptionis thrown with aTimeoutExceptionas the cause.Note that each block() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
- Parameters:
timeout- maximum time period to wait for before raising aRuntimeExceptionwith aTimeoutExceptionas the cause- Returns:
- T the result
-
blockOptional
Subscribe to thisMonoand block indefinitely until a next signal is received or the Mono completes empty. Returns anOptional, which can be used to replace the empty case with an Exception viaOptional.orElseThrow(Supplier). In case the Mono itself errors, the original exception is thrown (wrapped in aRuntimeExceptionif it was a checked exception).Note that each blockOptional() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
- Returns:
- T the result
-
blockOptional
Subscribe to thisMonoand block until a next signal is received, the Mono completes empty or a timeout expires. Returns anOptionalfor the first two cases, which can be used to replace the empty case with an Exception viaOptional.orElseThrow(Supplier). In case the Mono itself errors, the original exception is thrown (wrapped in aRuntimeExceptionif it was a checked exception). If the provided timeout expires, aRuntimeExceptionis thrown with aTimeoutExceptionas the cause.Note that each block() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
- Parameters:
timeout- maximum time period to wait for before raising aRuntimeExceptionwith aTimeoutExceptionas the cause- Returns:
- T the result
-
cast
Cast the currentMonoproduced type into a target produced type. -
cache
Turn thisMonointo a hot source and cache last emitted signals for furtherSubscriber. Completion and Error will also be replayed.Once the first subscription is made to this
Mono, the source is subscribed to and the signal will be cached, indefinitely. This process cannot be cancelled.In the face of multiple concurrent subscriptions, this operator ensures that only one subscription is made to the source.
- Returns:
- a replaying
Mono
-
cache
Turn thisMonointo a hot source and cache last emitted signals for furtherSubscriber, with an expiry timeout.Completion and Error will also be replayed until
ttltriggers in which case the nextSubscriberwill start over a new subscription.Cache loading (ie. subscription to the source) is triggered atomically by the first subscription to an uninitialized or expired cache, which guarantees that a single cache load happens at a time (and other subscriptions will get notified of the newly cached value when it arrives).
- Returns:
- a replaying
Mono
-
cache
Turn thisMonointo a hot source and cache last emitted signals for furtherSubscriber, with an expiry timeout.Completion and Error will also be replayed until
ttltriggers in which case the nextSubscriberwill start over a new subscription.Cache loading (ie. subscription to the source) is triggered atomically by the first subscription to an uninitialized or expired cache, which guarantees that a single cache load happens at a time (and other subscriptions will get notified of the newly cached value when it arrives).
-
cache
public final Mono<T> cache(Function<? super T, Duration> ttlForValue, Function<Throwable, Duration> ttlForError, Supplier<Duration> ttlForEmpty) Turn thisMonointo a hot source and cache last emitted signal for furtherSubscriber, with an expiry timeout (TTL) that depends on said signal. A TTL ofLong.MAX_VALUEmilliseconds is interpreted as indefinite caching of the signal (no cache cleanup is scheduled, so the signal is retained as long as thisMonois not garbage collected).Empty completion and Error will also be replayed according to their respective TTL, so transient errors can be "retried" by letting the
FunctionreturnDuration.ZERO. Such a transient exception would then be propagated to the first subscriber but the following subscribers would trigger a new source subscription.Exceptions in the TTL generators themselves are processed like the
Duration.ZEROcase, except the original signal issuppressed(in case of onError) ordropped(in case of onNext).Note that subscribers that come in perfectly simultaneously could receive the same cached signal even if the TTL is set to zero.
Cache loading (ie. subscription to the source) is triggered atomically by the first subscription to an uninitialized or expired cache, which guarantees that a single cache load happens at a time (and other subscriptions will get notified of the newly cached value when it arrives).
-
cache
public final Mono<T> cache(Function<? super T, Duration> ttlForValue, Function<Throwable, Duration> ttlForError, Supplier<Duration> ttlForEmpty, Scheduler timer) Turn thisMonointo a hot source and cache last emitted signal for furtherSubscriber, with an expiry timeout (TTL) that depends on said signal. A TTL ofLong.MAX_VALUEmilliseconds is interpreted as indefinite caching of the signal (no cache cleanup is scheduled, so the signal is retained as long as thisMonois not garbage collected).Empty completion and Error will also be replayed according to their respective TTL, so transient errors can be "retried" by letting the
FunctionreturnDuration.ZERO. Such a transient exception would then be propagated to the first subscriber but the following subscribers would trigger a new source subscription.Exceptions in the TTL generators themselves are processed like the
Duration.ZEROcase, except the original signal issuppressed(in case of onError) ordropped(in case of onNext).Note that subscribers that come in perfectly simultaneously could receive the same cached signal even if the TTL is set to zero.
Cache loading (ie. subscription to the source) is triggered atomically by the first subscription to an uninitialized or expired cache, which guarantees that a single cache load happens at a time (and other subscriptions will get notified of the newly cached value when it arrives).
- Parameters:
ttlForValue- the TTL-generatingFunctioninvoked when source is valuedttlForError- the TTL-generatingFunctioninvoked when source is erroringttlForEmpty- the TTL-generatingSupplierinvoked when source is emptytimer- theScheduleron which to measure the duration.- Returns:
- a replaying
Mono
-
cacheInvalidateIf
CacheonNextsignal received from the source and replay it to other subscribers, while allowing invalidation by verifying the cached value against the givenPredicateeach time a late subscription occurs. Note that thePredicateis only evaluated if the cache is currently populated, ie. it is not applied upon receiving the sourceonNextsignal. For late subscribers, if the predicate returnstruethe cache is invalidated and a new subscription is made to the source in an effort to refresh the cache with a more up-to-date value to be passed to the new subscriber.The predicate is not strictly evaluated once per downstream subscriber. Rather, subscriptions happening in concurrent batches will trigger a single evaluation of the predicate. Similarly, a batch of subscriptions happening before the cache is populated (ie. before this operator receives an onNext signal after an invalidation) will always receive the incoming value without going through the
Predicate. The predicate is only triggered by subscribers that come in AFTER the cache is populated. Therefore, it is possible that pre-population subscribers receive an "invalid" value, especially if the object can switch from a valid to an invalid state in a short amount of time (eg. between creation, cache population and propagation to the downstream subscriber(s)).If the cached value needs to be discarded in case of invalidation, the recommended way is to do so in the predicate directly. Note that some downstream subscribers might still be using or storing the value, for example if they haven't requested anything yet.
As this form of caching is explicitly value-oriented, empty source completion signals and error signals are NOT cached. It is always possible to use
materialize()to cache these (further usingfilter(Predicate)if one wants to only consider empty sources or error sources).Predicate is applied differently depending on whether the cache is populated or not:
- IF EMPTY
- first incoming subscriber creates a new COORDINATOR and adds itself
- IF COORDINATOR
- each incoming subscriber is added to the current "batch" (COORDINATOR)
- once the value is received, the predicate is applied ONCE
- mismatch: all the batch is terminated with an error -> we're back to init state, next subscriber will trigger a new coordinator and a new subscription
- ok: all the batch is completed with the value -> cache is now POPULATED
- IF POPULATED
- each incoming subscriber causes the predicate to apply
- if ok: complete that subscriber with the value
- if mismatch, swap the current POPULATED with a new COORDINATOR and add the subscriber to that coordinator
- imagining a race between sub1 and sub2:
- OK NOK will naturally lead to sub1 completing and sub2 being put on wait inside a new COORDINATOR
- NOK NOK will race swap of POPULATED with COORDINATOR1 and COORDINATOR2 respectively
- if sub1 swaps, sub2 will dismiss the COORDINATOR2 it failed to swap and loop back, see COORDINATOR1 and add itself
- if sub2 swaps, the reverse happens
- if value is populated in the time it takes for sub2 to loop back, sub2 sees a value and triggers the predicate again (hopefully passing)
Cancellation is only possible for downstream subscribers when they've been added to a COORDINATOR. Subscribers that are received when POPULATED will either be completed right away or (if the predicate fails) end up being added to a COORDINATOR.
When cancelling a COORDINATOR-issued subscription:
- removes itself from batch
- if 0 subscribers remaining
- swap COORDINATOR with EMPTY
- COORDINATOR cancels its source
The fact that COORDINATOR cancels its source when no more subscribers remain is important, because it prevents issues with a never() source or a source that never produces a value passing the predicate (assuming timeouts on the subscriber).
- IF EMPTY
-
cacheInvalidateWhen
public final Mono<T> cacheInvalidateWhen(Function<? super T, Mono<Void>> invalidationTriggerGenerator) CacheonNextsignal received from the source and replay it to other subscribers, while allowing invalidation via aMono<Void>companion trigger generated from the currently cached value.As this form of caching is explicitly value-oriented, empty source completion signals and error signals are NOT cached. It is always possible to use
materialize()to cache these (further usingfilter(Predicate)if one wants to only consider empty sources or error sources). The exception is still propagated to the subscribers that have accumulated between the time the source has been subscribed to and the time the onError/onComplete terminal signal is received. An empty source is turned into aNoSuchElementExceptiononError.Completion of the trigger will invalidate the cached element, so the next subscriber that comes in will trigger a new subscription to the source, re-populating the cache and re-creating a new trigger out of that value.
- If the trigger completes with an error, all registered subscribers are terminated with the same error.
-
If all the subscribers are cancelled before the cache is populated (ie. an attempt to
cache a
never()), the source subscription is cancelled. - Cancelling a downstream subscriber once the cache has been populated is not necessarily relevant, as the value will be immediately replayed on subscription, which usually means within onSubscribe (so earlier than any cancellation can happen). That said the operator will make best efforts to detect such cancellations and avoid propagating the value to these subscribers.
If the cached value needs to be discarded in case of invalidation, use the
cacheInvalidateWhen(Function, Consumer)version. Note that some downstream subscribers might still be using or storing the value, for example if they haven't requested anything yet.Trigger is generated only after a subscribers in the COORDINATOR have received the value, and only once. The only way to get out of the POPULATED state is to use the trigger, so there cannot be multiple trigger subscriptions, nor concurrent triggering.
Cancellation is only possible for downstream subscribers when they've been added to a COORDINATOR. Subscribers that are received when POPULATED will either be completed right away or (if the predicate fails) end up being added to a COORDINATOR.
When cancelling a COORDINATOR-issued subscription:
- removes itself from batch
- if 0 subscribers remaining
- swap COORDINATOR with EMPTY
- COORDINATOR cancels its source
The fact that COORDINATOR cancels its source when no more subscribers remain is important, because it prevents issues with a never() source or a source that never produces a value passing the predicate (assuming timeouts on the subscriber).
- Parameters:
invalidationTriggerGenerator- theFunctionthat generates newMono<Void>triggers used for invalidation- Returns:
- a new cached
Monowhich can be invalidated
-
cacheInvalidateWhen
public final Mono<T> cacheInvalidateWhen(Function<? super T, Mono<Void>> invalidationTriggerGenerator, Consumer<? super T> onInvalidate) CacheonNextsignal received from the source and replay it to other subscribers, while allowing invalidation via aMono<Void>companion trigger generated from the currently cached value.As this form of caching is explicitly value-oriented, empty source completion signals and error signals are NOT cached. It is always possible to use
materialize()to cache these (further usingfilter(Predicate)if one wants to only consider empty sources or error sources). The exception is still propagated to the subscribers that have accumulated between the time the source has been subscribed to and the time the onError/onComplete terminal signal is received. An empty source is turned into aNoSuchElementExceptiononError.Completion of the trigger will invalidate the cached element, so the next subscriber that comes in will trigger a new subscription to the source, re-populating the cache and re-creating a new trigger out of that value.
- If the trigger completes with an error, all registered subscribers are terminated with the same error.
-
If all the subscribers are cancelled before the cache is populated (ie. an attempt to
cache a
never()), the source subscription is cancelled. - Cancelling a downstream subscriber once the cache has been populated is not necessarily relevant, as the value will be immediately replayed on subscription, which usually means within onSubscribe (so earlier than any cancellation can happen). That said the operator will make best efforts to detect such cancellations and avoid propagating the value to these subscribers.
Once a cached value is invalidated, it is passed to the provided
Consumer(which MUST complete normally). Note that some downstream subscribers might still be using or storing the value, for example if they haven't requested anything yet.Trigger is generated only after a subscribers in the COORDINATOR have received the value, and only once. The only way to get out of the POPULATED state is to use the trigger, so there cannot be multiple trigger subscriptions, nor concurrent triggering.
Cancellation is only possible for downstream subscribers when they've been added to a COORDINATOR. Subscribers that are received when POPULATED will either be completed right away or (if the predicate fails) end up being added to a COORDINATOR.
When cancelling a COORDINATOR-issued subscription:
- removes itself from batch
- if 0 subscribers remaining
- swap COORDINATOR with EMPTY
- COORDINATOR cancels its source
The fact that COORDINATOR cancels its source when no more subscribers remain is important, because it prevents issues with a never() source or a source that never produces a value passing the predicate (assuming timeouts on the subscriber).
- Parameters:
invalidationTriggerGenerator- theFunctionthat generates newMono<Void>triggers used for invalidationonInvalidate- theConsumerthat will be applied to cached value upon invalidation- Returns:
- a new cached
Monowhich can be invalidated
-
cancelOn
-
checkpoint
Activate traceback (full assembly tracing) for this particularMono, in case of an error upstream of the checkpoint. Tracing incurs the cost of an exception stack trace creation.It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
The traceback is attached to the error as a
suppressed exception. As such, if the error is acomposite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected viaExceptions.isTraceback(Throwable).- Returns:
- the assembly tracing
Mono
-
checkpoint
Activate traceback (assembly marker) for this particularMonoby giving it a description that will be reflected in the assembly traceback in case of an error upstream of the checkpoint. Note that unlikecheckpoint(), this doesn't create a filled stack trace, avoiding the main cost of the operator. However, as a trade-off the description must be unique enough for the user to find out where this Mono was assembled. If you only want a generic description, and still rely on the stack trace to find the assembly site, use thecheckpoint(String, boolean)variant.It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
The traceback is attached to the error as a
suppressed exception. As such, if the error is acomposite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected viaExceptions.isTraceback(Throwable).- Parameters:
description- a unique enough description to include in the light assembly traceback.- Returns:
- the assembly marked
Mono
-
checkpoint
Activate traceback (full assembly tracing or the lighter assembly marking depending on theforceStackTraceoption).By setting the
forceStackTraceparameter to true, activate assembly tracing for this particularMonoand give it a description that will be reflected in the assembly traceback in case of an error upstream of the checkpoint. Note that unlikecheckpoint(String), this will incur the cost of an exception stack trace creation. The description could for example be a meaningful name for the assembled mono or a wider correlation ID, since the stack trace will always provide enough information to locate where this Flux was assembled.By setting
forceStackTraceto false, behaves likecheckpoint(String)and is subject to the same caveat in choosing the description.It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly marker.
The traceback is attached to the error as a
suppressed exception. As such, if the error is acomposite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected viaExceptions.isTraceback(Throwable).- Parameters:
description- a description (must be unique enough if forceStackTrace is set to false).forceStackTrace- false to make a light checkpoint without a stacktrace, true to use a stack trace.- Returns:
- the assembly marked
Mono.
-
concatWith
-
contextCapture
If context-propagation library is on the classpath, this is a convenience shortcut to capture thread local values during the subscription phase and put them in theContextthat is visible upstream of this operator.As a result this operator should generally be used as close as possible to the end of the chain / subscription point.
If the
ContextViewvisible upstream is not empty, a small subset of operators will automatically restore the context snapshot (handle,tap). If context-propagation is not available at runtime, this operator simply returns the currentMonoinstance. -
contextWrite
Enrich theContextvisible from downstream for the benefit of upstream operators, by making all values from the providedContextViewvisible on top of pairs from downstream.A
Context(and itsContextView) is tied to a given subscription and is read by querying the downstreamSubscriber.Subscriberthat don't enrich the context instead access their own downstream's context. As a result, this operator conceptually enriches aContextcoming from under it in the chain (downstream, by default an empty one) and makes the new enrichedContextvisible to operators above it in the chain.- Parameters:
contextToAppend- theContextViewto merge with the downstreamContext, resulting in a new more completeContextthat will be visible from upstream.- Returns:
- a contextualized
Mono - See Also:
-
contextWrite
Enrich theContextvisible from downstream for the benefit of upstream operators, by applying aFunctionto the downstreamContext.The
Functiontakes aContextfor convenience, allowing to easily callwrite APIsto return a newContext.A
Context(and itsContextView) is tied to a given subscription and is read by querying the downstreamSubscriber.Subscriberthat don't enrich the context instead access their own downstream's context. As a result, this operator conceptually enriches aContextcoming from under it in the chain (downstream, by default an empty one) and makes the new enrichedContextvisible to operators above it in the chain. -
defaultIfEmpty
Provide a default single value if this mono is completed without any data- Parameters:
defaultV- the alternate value if this sequence is empty- Returns:
- a new
Mono - See Also:
-
delayElement
Delay thisMonoelement (Subscriber.onNext(T)signal) by a given duration. Empty Monos or error signals are not delayed.Note that the scheduler on which the Mono chain continues execution will be the
parallelscheduler if the mono is valued, or the current scheduler if the mono completes empty or errors.- Parameters:
delay- duration by which to delay theSubscriber.onNext(T)signal- Returns:
- a delayed
Mono
-
delayElement
Delay thisMonoelement (Subscriber.onNext(T)signal) by a givenDuration, on a particularScheduler. Empty monos or error signals are not delayed.Note that the scheduler on which the mono chain continues execution will be the scheduler provided if the mono is valued, or the current scheduler if the mono completes empty or errors.
- Parameters:
delay-Durationby which to delay theSubscriber.onNext(T)signaltimer- a time-capableSchedulerinstance to delay the value signal on- Returns:
- a delayed
Mono
-
delayUntil
Subscribe to thisMonoand anotherPublisherthat is generated from this Mono's element and which will be used as a trigger for relaying said element.That is to say, the resulting
Monodelays until this Mono's element is emitted, generates a trigger Publisher and then delays again until the trigger Publisher terminates.Note that contiguous calls to all delayUntil are fused together. The triggers are generated and subscribed to in sequence, once the previous trigger completes. Error is propagated immediately downstream. In both cases, an error in the source is immediately propagated.
-
delaySubscription
-
delaySubscription
-
delaySubscription
- Type Parameters:
U- the other source type- Parameters:
subscriptionDelay- aPublisherto signal by next or complete thissubscribe(Subscriber)- Returns:
- a delayed
Mono
-
dematerialize
An operator working only if thisMonoemits onNext, onError or onCompleteSignalinstances, transforming thesematerializedsignals into real signals on theSubscriber. The errorSignalwill trigger onError and completeSignalwill trigger onComplete.- Type Parameters:
X- the dematerialized type- Returns:
- a dematerialized
Mono - See Also:
-
doAfterTerminate
Add behavior (side-effect) triggered after theMonoterminates, either by completing downstream successfully or with an error.The relevant signal is propagated downstream, then the
Runnableis executed.- Parameters:
afterTerminate- the callback to call afterSubscriber.onComplete()orSubscriber.onError(java.lang.Throwable)- Returns:
- an observed
Mono
-
doFirst
Add behavior (side-effect) triggered before theMonois subscribed to, which should be the first event after assembly time.Note that when several
doFirst(Runnable)operators are used anywhere in a chain of operators, their order of execution is reversed compared to the declaration order (as subscribe signal flows backward, from the ultimate subscriber to the source publisher):Mono.just(1v) .doFirst(() -> System.out.println("three")) .doFirst(() -> System.out.println("two")) .doFirst(() -> System.out.println("one")); //would print one two threeIn case the
Runnablethrows an exception, said exception will be directly propagated to the subscribingSubscriberalong with a no-opSubscription, similarly to whaterror(Throwable)does. Otherwise, after the handler has executed, theSubscriberis directly subscribed to the original sourceMono(this).This side-effect method provides stronger first guarantees compared to
doOnSubscribe(Consumer), which is triggered once theSubscriptionhas been set up and passed to theSubscriber. -
doFinally
Add behavior triggering after theMonoterminates for any reason, including cancellation. The terminating event (SignalType.ON_COMPLETE,SignalType.ON_ERRORandSignalType.CANCEL) is passed to the consumer, which is executed after the signal has been passed downstream.Note that the fact that the signal is propagated downstream before the callback is executed means that several doFinally in a row will be executed in reverse order. If you want to assert the execution of the callback please keep in mind that the Mono will complete before it is executed, so its effect might not be visible immediately after eg. a
block().- Parameters:
onFinally- the callback to execute after a terminal signal (complete, error or cancel)- Returns:
- an observed
Mono
-
doOnCancel
Add behavior triggered when theMonois cancelled.The handler is executed first, then the cancel signal is propagated upstream to the source.
- Parameters:
onCancel- the callback to call onSubscription.cancel()- Returns:
- a new
Mono
-
doOnDiscard
Potentially modify the behavior of the whole chain of operators upstream of this one to conditionally clean up elements that get discarded by these operators.The
discardHookMUST be idempotent and safe to use on any instance of the desired type. Calls to this method are additive, and the order of invocation of thediscardHookis the same as the order of declaration (calling.filter(...).doOnDiscard(first).doOnDiscard(second)will let the filter invokefirstthensecondhandlers).Two main categories of discarding operators exist:
- filtering operators, dropping some source elements as part of their designed behavior
- operators that prefetch a few elements and keep them around pending a request, but get cancelled/in error
- Parameters:
type- theClassof elements in the upstream chain of operators that this cleanup hook should take into account.discardHook- aConsumerof elements in the upstream chain of operators that performs the cleanup.- Returns:
- a
Monothat cleans up matching elements that get discarded upstream of it.
-
doOnNext
Add behavior triggered when theMonoemits a data successfully.The
Consumeris executed first, then the onNext signal is propagated downstream.- Parameters:
onNext- the callback to call onSubscriber.onNext(T)- Returns:
- a new
Mono
-
doOnSuccess
Add behavior triggered as soon as theMonocan be considered to have completed successfully. The value passed to theConsumerreflects the type of completion:- null : completed without data. handler is executed right before onComplete is propagated downstream
- T: completed with data. handler is executed right before onNext is propagated downstream
The
Consumeris executed before propagating either onNext or onComplete downstream.- Parameters:
onSuccess- the callback to call on, argument is null if theMonocompletes without dataSubscriber.onNext(T)orSubscriber.onComplete()without precedingSubscriber.onNext(T)- Returns:
- a new
Mono
-
doOnEach
Add behavior triggered when theMonoemits an item, fails with an error or completes successfully. All these events are represented as aSignalthat is passed to the side-effect callback. Note that this is an advanced operator, typically used for monitoring of a Mono. TheseSignalhave aContextassociated to them.The
Consumeris executed first, then the relevant signal is propagated downstream.- Parameters:
signalConsumer- the mandatory callback to call onSubscriber.onNext(Object),Subscriber.onError(Throwable)andSubscriber.onComplete()- Returns:
- an observed
Mono - See Also:
-
doOnError
Add behavior triggered when theMonocompletes with an error.The
Consumeris executed first, then the onError signal is propagated downstream.- Parameters:
onError- the error callback to call onSubscriber.onError(Throwable)- Returns:
- a new
Mono
-
doOnError
public final <E extends Throwable> Mono<T> doOnError(Class<E> exceptionType, Consumer<? super E> onError) Add behavior triggered when theMonocompletes with an error matching the given exception type.The
Consumeris executed first, then the onError signal is propagated downstream.- Type Parameters:
E- type of the error to handle- Parameters:
exceptionType- the type of exceptions to handleonError- the error handler for relevant errors- Returns:
- an observed
Mono
-
doOnError
public final Mono<T> doOnError(Predicate<? super Throwable> predicate, Consumer<? super Throwable> onError) Add behavior triggered when theMonocompletes with an error matching the given predicate.The
Consumeris executed first, then the onError signal is propagated downstream.- Parameters:
predicate- the matcher for exceptions to handleonError- the error handler for relevant error- Returns:
- an observed
Mono
-
doOnRequest
Add behavior triggering aLongConsumerwhen theMonoreceives any request.Note that non fatal error raised in the callback will not be propagated and will simply trigger
Operators.onOperatorError(Throwable, Context).The
LongConsumeris executed first, then the request signal is propagated upstream to the parent.- Parameters:
consumer- the consumer to invoke on each request- Returns:
- an observed
Mono
-
doOnSubscribe
Add behavior (side-effect) triggered when theMonois being subscribed, that is to say when aSubscriptionhas been produced by thePublisherand is being passed to theSubscriber.onSubscribe(Subscription).This method is not intended for capturing the subscription and calling its methods, but for side effects like monitoring. For instance, the correct way to cancel a subscription is to call
Disposable.dispose()on the Disposable returned bysubscribe().The
Consumeris executed first, then theSubscriptionis propagated downstream to the next subscriber in the chain that is being established.- Parameters:
onSubscribe- the callback to call onSubscriber.onSubscribe(Subscription)- Returns:
- a new
Mono - See Also:
-
doOnTerminate
Add behavior triggered when theMonoterminates, either by completing with a value, completing empty or failing with an error. Unlike inFlux.doOnTerminate(Runnable), the simple fact that aMonoemitsonNextimplies completion, so the handler is invoked BEFORE the element is propagated (same as withdoOnSuccess(Consumer)).The
Runnableis executed first, then the onNext/onComplete/onError signal is propagated downstream.- Parameters:
onTerminate- the callback to callSubscriber.onNext(T),Subscriber.onComplete()without precedingSubscriber.onNext(T)orSubscriber.onError(java.lang.Throwable)- Returns:
- a new
Mono
-
elapsed
Map thisMonointoTuple2<Long, T>of timemillis and source data. The timemillis corresponds to the elapsed time between the subscribe and the first next signal, as measured by theparallelscheduler. -
elapsed
Map thisMonosequence intoTuple2<Long, T>of timemillis and source data. The timemillis corresponds to the elapsed time between the subscribe and the first next signal, as measured by the providedScheduler. -
expandDeep
public final Flux<T> expandDeep(Function<? super T, ? extends Publisher<? extends T>> expander, int capacityHint) Recursively expand elements into a graph and emit all the resulting element, in a depth-first traversal order.That is: emit the value from this
Mono, expand it and emit the first value at this first level of recursion, and so on... When no more recursion is possible, backtrack to the previous level and re-apply the strategy.For example, given the hierarchical structure
A - AA - aa1 - AB - ab1 - a1ExpandsMono.just(A)intoA AA aa1 AB ab1 a1
-
expandDeep
Recursively expand elements into a graph and emit all the resulting element, in a depth-first traversal order.That is: emit the value from this
Mono, expand it and emit the first value at this first level of recursion, and so on... When no more recursion is possible, backtrack to the previous level and re-apply the strategy.For example, given the hierarchical structure
A - AA - aa1 - AB - ab1 - a1ExpandsMono.just(A)intoA AA aa1 AB ab1 a1
-
expand
public final Flux<T> expand(Function<? super T, ? extends Publisher<? extends T>> expander, int capacityHint) Recursively expand elements into a graph and emit all the resulting element using a breadth-first traversal strategy.That is: emit the value from this
Monofirst, then expand it at a first level of recursion and emit all of the resulting values, then expand all of these at a second level and so on...For example, given the hierarchical structure
A - AA - aa1 - AB - ab1 - a1ExpandsMono.just(A)intoA AA AB a1 aa1 ab1
-
expand
Recursively expand elements into a graph and emit all the resulting element using a breadth-first traversal strategy.That is: emit the value from this
Monofirst, then expand it at a first level of recursion and emit all of the resulting values, then expand all of these at a second level and so on...For example, given the hierarchical structure
A - AA - aa1 - AB - ab1 - a1ExpandsMono.just(A)intoA AA AB a1 aa1 ab1
-
filter
If thisMonois valued, test the result and replay it if predicate returns true. Otherwise complete without value.Discard Support: This operator discards the element if it does not match the filter. It also discards upon cancellation or error triggered by a data signal.
- Parameters:
tester- the predicate to evaluate- Returns:
- a filtered
Mono
-
filterWhen
If thisMonois valued, test the value asynchronously using a generatedPublisher<Boolean>test. The value from the Mono is replayed if the first item emitted by the test is true. It is dropped if the test is either empty or its first emitted value is false.Note that only the first value of the test publisher is considered, and unless it is a
Mono, test will be cancelled after receiving that first value.Discard Support: This operator discards the element if it does not match the filter. It also discards upon cancellation or error triggered by a data signal.
-
flatMap
-
flatMapMany
Transform the item emitted by thisMonointo a Publisher, then forward its emissions into the returnedFlux.- Type Parameters:
R- the merged sequence type- Parameters:
mapper- theFunctionto produce a sequence of R from the eventual passedSubscriber.onNext(T)- Returns:
- a new
Fluxas the sequence is not guaranteed to be single at most
-
flatMapMany
public final <R> Flux<R> flatMapMany(Function<? super T, ? extends Publisher<? extends R>> mapperOnNext, Function<? super Throwable, ? extends Publisher<? extends R>> mapperOnError, Supplier<? extends Publisher<? extends R>> mapperOnComplete) Transform the signals emitted by thisMonointo signal-specific Publishers, then forward the applicable Publisher's emissions into the returnedFlux.- Type Parameters:
R- the type of the produced inner sequence- Parameters:
mapperOnNext- theFunctionto call on next data and returning a sequence to mergemapperOnError- theFunctionto call on error signal and returning a sequence to mergemapperOnComplete- theFunctionto call on complete signal and returning a sequence to merge- Returns:
- a new
Fluxas the sequence is not guaranteed to be single at most - See Also:
-
flatMapIterable
public final <R> Flux<R> flatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper) Transform the item emitted by thisMonointoIterable, then forward its elements into the returnedFlux. TheIterable.iterator()method will be called at least once and at most twice.This operator inspects each
Iterable'sSpliteratorto assess if the iteration can be guaranteed to be finite (seeOperators.onDiscardMultiple(Iterator, boolean, Context)). Since the default Spliterator wraps the Iterator we can have twoIterable.iterator()calls per iterable. This second invocation is skipped on aCollectionhowever, a type which is assumed to be always finite.Discard Support: Upon cancellation, this operator discards
Telements it prefetched and, in some cases, attempts to discard remainder of the currently processedIterable(if it can safely ensure the iterator is finite). Note that this means eachIterable'sIterable.iterator()method could be invoked twice. -
flux
-
hasElement
Emit a single boolean true if thisMonohas an element.- Returns:
- a new
Monowithtrueif a value is emitted andfalseotherwise
-
handle
Handle the items emitted by thisMonoby calling a biconsumer with the output sink for each onNext. At most oneSynchronousSink.next(Object)call must be performed and/or 0 or 1SynchronousSink.error(Throwable)orSynchronousSink.complete().When the context-propagation library is available at runtime and the downstream
ContextViewis not empty, this operator implicitly uses the library to restore thread locals around the handlerBiConsumer. Typically, this would be done in conjunction with the use ofcontextCapture()operator down the chain.- Type Parameters:
R- the transformed type- Parameters:
handler- the handlingBiConsumer- Returns:
- a transformed
Mono
-
hide
Hides the identity of thisMonoinstance.The main purpose of this operator is to prevent certain identity-based optimizations from happening, mostly for diagnostic purposes.
- Returns:
- a new
MonopreventingPublisher/Subscriptionbased Reactor optimizations
-
ignoreElement
Ignores onNext signal (dropping it) and only propagates termination events.Discard Support: This operator discards the source element.
-
log
Observe all Reactive Streams signals and trace them usingLoggersupport. Default will useLevel.INFOandjava.util.logging. If SLF4J is available, it will be used instead.The default log category will be "reactor.Mono", followed by a suffix generated from the source operator, e.g. "reactor.Mono.Map".
- Returns:
- a new
Monothat logs signals - See Also:
-
log
Observe all Reactive Streams signals and useLoggersupport to handle trace implementation. Default will useLevel.INFOand java.util.logging. If SLF4J is available, it will be used instead.- Parameters:
category- to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will complete, e.g. "reactor.Flux.Map".- Returns:
- a new
Mono
-
log
Observe Reactive Streams signals matching the passed flagsoptionsand useLoggersupport to handle trace implementation. Default will use the passedLeveland java.util.logging. If SLF4J is available, it will be used instead. Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:mono.log("category", SignalType.ON_NEXT, SignalType.ON_ERROR)- Parameters:
category- to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will complete, e.g. "reactor.Flux.Map".level- theLevelto enforce for this tracing Mono (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account)options- a varargSignalTypeoption to filter log messages- Returns:
- a new
Mono
-
log
public final Mono<T> log(@Nullable String category, Level level, boolean showOperatorLine, SignalType... options) Observe Reactive Streams signals matching the passed filteroptionsand useLoggersupport to handle trace implementation. Default will use the passedLeveland java.util.logging. If SLF4J is available, it will be used instead. Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:mono.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)- Parameters:
category- to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will complete, e.g. "reactor.Mono.Map".level- theLevelto enforce for this tracing Mono (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account)showOperatorLine- capture the current stack to display operator class/line number.options- a varargSignalTypeoption to filter log messages- Returns:
- a new unaltered
Mono
-
log
Observe Reactive Streams signals matching the passed filteroptionsand trace them using a specific user-providedLogger, atLevel.INFOlevel. -
log
public final Mono<T> log(Logger logger, Level level, boolean showOperatorLine, SignalType... options) Observe Reactive Streams signals matching the passed filteroptionsand trace them using a specific user-providedLogger, at the givenLevel.Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
flux.log(myCustomLogger, Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)- Parameters:
logger- theLoggerto use, instead of resolving one through a category.level- theLevelto enforce for this tracing Flux (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account)showOperatorLine- capture the current stack to display operator class/line number.options- a varargSignalTypeoption to filter log messages- Returns:
- a new
Monothat logs signals
-
map
Transform the item emitted by thisMonoby applying a synchronous function to it. -
mapNotNull
Transform the item emitted by thisMonoby applying a synchronous function to it, which is allowed to produce anullvalue. In that case, the resulting Mono completes immediately. This operator effectively behaves likemap(Function)followed byfilter(Predicate)althoughnullis not a supported value, so it can't be filtered out. -
materialize
Transform incoming onNext, onError and onComplete signals intoSignalinstances, materializing these signals. Since the error is materialized as aSignal, the propagation will be stopped and onComplete will be emitted. Complete signal will first emit aSignal.complete()and then effectively complete the flux. All theseSignalhave aContextassociated to them. -
mergeWith
-
metrics
Deprecated.Prefer using thetap(SignalListenerFactory)with theSignalListenerFactoryprovided by the new reactor-core-micrometer module. To be removed in 3.6.0 at the earliest.Activate metrics for this sequence, provided there is an instrumentation facade on the classpath (otherwise this method is a pure no-op).Metrics are gathered on
Subscriberevents, and it is recommended to alsoname(and optionallytag) the sequence.The name serves as a prefix in the reported metrics names. In case no name has been provided, the default name "reactor" will be applied.
The
MeterRegistryused by reactor can be configured viaMetrics.MicrometerConfiguration.useRegistry(MeterRegistry)prior to using this operator, the default beingMetrics.globalRegistry.- Returns:
- an instrumented
Mono - See Also:
-
name
Give a name to this sequence, which can be retrieved usingScannable.name()as long as this is the first reachableScannable.parents().The name is typically visible at assembly time by the
tap(SignalListenerFactory)operator, which could for example be configured with a metrics listener using the name as a prefix for meters' id.- Parameters:
name- a name for the sequence- Returns:
- the same sequence, but bearing a name
- See Also:
-
or
Emit the first available signal from this mono or the other mono. -
ofType
-
onErrorComplete
Simply complete the sequence by replacing anonError signalwith anonComplete signal. All other signals are propagated as-is.- Returns:
- a new
Monofalling back on completion when an onError occurs - See Also:
-
onErrorComplete
Simply complete the sequence by replacing anonError signalwith anonComplete signalif the error matches the givenClass. All other signals, including non-matching onError, are propagated as-is.- Returns:
- a new
Monofalling back on completion when a matching error occurs - See Also:
-
onErrorComplete
Simply complete the sequence by replacing anonError signalwith anonComplete signalif the error matches the givenPredicate. All other signals, including non-matching onError, are propagated as-is.- Returns:
- a new
Monofalling back on completion when a matching error occurs - See Also:
-
onErrorContinue
Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements. The recovered error and associated value are notified via the providedBiConsumer. Alternatively, throwing from that biconsumer will propagate the thrown exception downstream in place of the original error, which is added as a suppressed exception to the new one.This operator is offered on
Monomainly as a way to propagate the configuration to upstreamFlux. The mode doesn't really make sense on aMono, since we're sure there will be no further value to continue with.onErrorResume(Function)is a more classical fit.Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)
In most cases, you should instead handle the error inside the specific function which may cause it. Specifically, on each inner publisher you can use
doOnErrorto log the error, andonErrorResume(e -> Mono.empty())to drop erroneous elements:.flatMap(id -> repository.retrieveById(id) .doOnError(System.err::println) .onErrorResume(e -> Mono.empty()))This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.
- Parameters:
errorConsumer- aBiConsumerfed with errors matching theClassand the value that triggered the error.- Returns:
- a
Monothat attempts to continue processing on errors.
-
onErrorContinue
public final <E extends Throwable> Mono<T> onErrorContinue(Class<E> type, BiConsumer<Throwable, Object> errorConsumer) Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements. Only errors matching the specifiedtypeare recovered from. The recovered error and associated value are notified via the providedBiConsumer. Alternatively, throwing from that biconsumer will propagate the thrown exception downstream in place of the original error, which is added as a suppressed exception to the new one.This operator is offered on
Monomainly as a way to propagate the configuration to upstreamFlux. The mode doesn't really make sense on aMono, since we're sure there will be no further value to continue with.onErrorResume(Function)is a more classical fit.Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)
In most cases, you should instead handle the error inside the specific function which may cause it. Specifically, on each inner publisher you can use
doOnErrorto log the error, andonErrorResume(e -> Mono.empty())to drop erroneous elements:.flatMap(id -> repository.retrieveById(id) .doOnError(MyException.class, System.err::println) .onErrorResume(MyException.class, e -> Mono.empty()))This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.
- Parameters:
type- theClassofExceptionthat are resumed from.errorConsumer- aBiConsumerfed with errors matching theClassand the value that triggered the error.- Returns:
- a
Monothat attempts to continue processing on some errors.
-
onErrorContinue
public final <E extends Throwable> Mono<T> onErrorContinue(Predicate<E> errorPredicate, BiConsumer<Throwable, Object> errorConsumer) Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements. Only errors matching thePredicateare recovered from (note that this predicate can be applied several times and thus must be idempotent). The recovered error and associated value are notified via the providedBiConsumer. Alternatively, throwing from that biconsumer will propagate the thrown exception downstream in place of the original error, which is added as a suppressed exception to the new one.This operator is offered on
Monomainly as a way to propagate the configuration to upstreamFlux. The mode doesn't really make sense on aMono, since we're sure there will be no further value to continue with.onErrorResume(Function)is a more classical fit.Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)
In most cases, you should instead handle the error inside the specific function which may cause it. Specifically, on each inner publisher you can use
doOnErrorto log the error, andonErrorResume(e -> Mono.empty())to drop erroneous elements:.flatMap(id -> repository.retrieveById(id) .doOnError(errorPredicate, System.err::println) .onErrorResume(errorPredicate, e -> Mono.empty()))This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.
- Parameters:
errorPredicate- aPredicateused to filter which errors should be resumed from. This MUST be idempotent, as it can be used several times.errorConsumer- aBiConsumerfed with errors matching the predicate and the value that triggered the error.- Returns:
- a
Monothat attempts to continue processing on some errors.
-
onErrorStop
If anonErrorContinue(BiConsumer)variant has been used downstream, reverts to the default 'STOP' mode where errors are terminal events upstream. It can be used for easier scoping of the on next failure strategy or to override the inherited strategy in a sub-stream (for example in a flatMap). It has no effect ifonErrorContinue(BiConsumer)has not been used downstream.- Returns:
- a
Monothat terminates on errors, even ifonErrorContinue(BiConsumer)was used downstream
-
onErrorMap
public final Mono<T> onErrorMap(Predicate<? super Throwable> predicate, Function<? super Throwable, ? extends Throwable> mapper) Transform an error emitted by thisMonoby synchronously applying a function to it if the error matches the given predicate. Otherwise let the error pass through. -
onErrorMap
Transform any error emitted by thisMonoby synchronously applying a function to it. -
onErrorMap
public final <E extends Throwable> Mono<T> onErrorMap(Class<E> type, Function<? super E, ? extends Throwable> mapper) Transform an error emitted by thisMonoby synchronously applying a function to it if the error matches the given type. Otherwise let the error pass through. -
onErrorResume
public final Mono<T> onErrorResume(Function<? super Throwable, ? extends Mono<? extends T>> fallback) Subscribe to a fallback publisher when any error occurs, using a function to choose the fallback depending on the error. -
onErrorResume
public final <E extends Throwable> Mono<T> onErrorResume(Class<E> type, Function<? super E, ? extends Mono<? extends T>> fallback) Subscribe to a fallback publisher when an error matching the given type occurs, using a function to choose the fallback depending on the error. -
onErrorResume
public final Mono<T> onErrorResume(Predicate<? super Throwable> predicate, Function<? super Throwable, ? extends Mono<? extends T>> fallback) Subscribe to a fallback publisher when an error matching a given predicate occurs. -
onErrorReturn
Simply emit a captured fallback value when any error is observed on thisMono.- Parameters:
fallbackValue- the value to emit if an error occurs- Returns:
- a new falling back
Mono - See Also:
-
onErrorReturn
Simply emit a captured fallback value when an error of the specified type is observed on thisMono.- Type Parameters:
E- the error type- Parameters:
type- the error type to matchfallbackValue- the value to emit if an error occurs that matches the type- Returns:
- a new falling back
Mono - See Also:
-
onErrorReturn
Simply emit a captured fallback value when an error matching the given predicate is observed on thisMono.- Parameters:
predicate- the error predicate to matchfallbackValue- the value to emit if an error occurs that matches the predicate- Returns:
- a new
Mono - See Also:
-
onTerminateDetach
Detaches both the childSubscriberand theSubscriptionon termination or cancellation.This should help with odd retention scenarios when running with non-reactor
Subscriber.- Returns:
- a detachable
Mono
-
publish
Share aMonofor the duration of a function that may transform it and consume it as many times as necessary without causing multiple subscriptions to the upstream.- Type Parameters:
R- the output value type- Parameters:
transform- the transformation function- Returns:
- a new
Mono
-
publishOn
Run onNext, onComplete and onError on a suppliedSchedulerWorker.This operator influences the threading context where the rest of the operators in the chain below it will execute, up to a new occurrence of
publishOn.Typically used for fast publisher, slow consumer(s) scenarios.
mono.publishOn(Schedulers.single()).subscribe()- Parameters:
scheduler- aSchedulerproviding theScheduler.Workerwhere to publish- Returns:
- an asynchronously producing
Mono
-
repeat
Repeatedly and indefinitely subscribe to the source upon completion of the previous subscription.- Returns:
- an indefinitely repeated
Fluxon onComplete
-
repeat
Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.- Parameters:
predicate- the boolean to evaluate on onComplete.- Returns:
- a
Fluxthat repeats on onComplete while the predicate matches
-
repeat
Repeatedly subscribe to the source numRepeat times. This results innumRepeat + 1total subscriptions to the original source. As a consequence, using 0 plays the original sequence once.- Parameters:
numRepeat- the number of times to re-subscribe on onComplete (positive, or 0 for original sequence only)- Returns:
- a
Fluxthat repeats on onComplete, up to the specified number of repetitions
-
repeat
Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription. A specified maximum of repeat will limit the number of re-subscribe.- Parameters:
numRepeat- the number of times to re-subscribe on complete (positive, or 0 for original sequence only)predicate- the boolean to evaluate on onComplete- Returns:
- a
Fluxthat repeats on onComplete while the predicate matches, up to the specified number of repetitions
-
repeatWhen
Repeatedly subscribe to thisMonowhen a companion sequence emits elements in response to the flux completion signal. Any terminal signal from the companion sequence will terminate the resultingFluxwith the same signal immediately.If the companion sequence signals when this
Monois active, the repeat attempt is suppressed.Note that if the companion
Publishercreated by therepeatFactoryemitsContextas trigger objects, the content of these Context will be added to the operator's ownContext.- Parameters:
repeatFactory- theFunctionthat returns the associatedPublishercompanion, given aFluxthat signals each onComplete as aLongrepresenting the number of source elements emitted in the latest attempt (0 or 1).- Returns:
- a
Fluxthat repeats on onComplete when the companionPublisherproduces an onNext signal
-
repeatWhenEmpty
Repeatedly subscribe to thisMonoas long as the current subscription to thisMonocompletes empty and the companionPublisherproduces an onNext signal.Any terminal signal will terminate the resulting
Monowith the same signal immediately.- Parameters:
repeatFactory- theFunctionthat returns the associatedPublishercompanion, given aFluxthat signals each onComplete as a 0-based incrementingLong.- Returns:
- a
Monothat resubscribes to thisMonoif the previous subscription was empty, as long as the companionPublisherproduces an onNext signal
-
repeatWhenEmpty
public final Mono<T> repeatWhenEmpty(int maxRepeat, Function<Flux<Long>, ? extends Publisher<?>> repeatFactory) Repeatedly subscribe to thisMonoas long as the current subscription to thisMonocompletes empty and the companionPublisherproduces an onNext signal.Any terminal signal will terminate the resulting
Monowith the same signal immediately.Emits an
IllegalStateExceptionifmaxRepeatis exceeded (provided it is different fromInteger.MAX_VALUE).- Parameters:
maxRepeat- the maximum number of repeats (infinite ifInteger.MAX_VALUE)repeatFactory- theFunctionthat returns the associatedPublishercompanion, given aFluxthat signals each onComplete as a 0-based incrementingLong.- Returns:
- a
Monothat resubscribes to thisMonoif the previous subscription was empty, as long as the companionPublisherproduces an onNext signal and the maximum number of repeats isn't exceeded.
-
retry
Re-subscribes to thisMonosequence if it signals any error, indefinitely.- Returns:
- a
Monothat retries on onError
-
retry
Re-subscribes to thisMonosequence if it signals any error, for a fixed number of times.Note that passing Long.MAX_VALUE is treated as infinite retry.
- Parameters:
numRetries- the number of times to tolerate an error- Returns:
- a
Monothat retries on onError up to the specified number of retry attempts.
-
retryWhen
Retries thisMonoin response to signals emitted by a companionPublisher. The companion is generated by the providedRetryinstance, seeRetry.max(long),Retry.maxInARow(long)andRetry.backoff(long, Duration)for readily available strategy builders.The operator generates a base for the companion, a
FluxofRetry.RetrySignalwhich each give metadata about each retryable failure whenever thisMonosignals an error. The final companion should be derived from that base companion and emit data in response to incoming onNext (although it can emit less elements, or delay the emissions).Terminal signals in the companion terminate the sequence with the same signal, so emitting an
Subscriber.onError(Throwable)will fail the resultingMonowith that same error.Note that the
Retry.RetrySignalstate can be transient and change between each sourceonErrororonNext. If processed with a delay, this could lead to the represented state being out of sync with the state at which the retry was evaluated. Map it toRetry.RetrySignal.copy()right away to mediate this.Note that if the companion
Publishercreated by thewhenFactoryemitsContextas trigger objects, theseContextwill be merged with the previous Context:Retry customStrategy = Retry.from(companion -> companion.handle((retrySignal, sink) -> { Context ctx = sink.currentContext(); int rl = ctx.getOrDefault("retriesLeft", 0); if (rl > 0) { sink.next(Context.of( "retriesLeft", rl - 1, "lastError", retrySignal.failure() )); } else { sink.error(Exceptions.retryExhausted("retries exhausted", retrySignal.failure())); } })); Mono<T> retried = originalMono.retryWhen(customStrategy); -
single
Expect exactly one item from thisMonosource or signalNoSuchElementExceptionfor an empty source.Note Mono doesn't need
Flux.single(Object), since it is equivalent todefaultIfEmpty(Object)in aMono.- Returns:
- a
Monowith the single item or an error signal
-
singleOptional
Wrap the item produced by thisMonosource into an Optional or emit an empty Optional for an empty source.- Returns:
- a
Monowith an Optional containing the item, an empty optional or an error signal
-
subscribe
Subscribe to thisMonoand request unbounded demand.This version doesn't specify any consumption behavior for the events from the chain, especially no error handling, so other variants should usually be preferred.
- Returns:
- a new
Disposablethat can be used to cancel the underlyingSubscription
-
subscribe
Subscribe aConsumerto thisMonothat will consume all the sequence. It will request an unbounded demand (Long.MAX_VALUE).For a passive version that observe and forward incoming data see
doOnNext(java.util.function.Consumer).Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
- Parameters:
consumer- the consumer to invoke on each value (onNext signal)- Returns:
- a new
Disposablethat can be used to cancel the underlyingSubscription
-
subscribe
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer) Subscribe to thisMonowith aConsumerthat will consume all the elements in the sequence, as well as aConsumerthat will handle errors. The subscription will request an unbounded demand (Long.MAX_VALUE).For a passive version that observe and forward incoming data see
doOnSuccess(Consumer)anddoOnError(java.util.function.Consumer).Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
- Parameters:
consumer- the consumer to invoke on each next signalerrorConsumer- the consumer to invoke on error signal- Returns:
- a new
Disposablethat can be used to cancel the underlyingSubscription
-
subscribe
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer) SubscribeConsumerto thisMonothat will respectively consume all the elements in the sequence, handle errors and react to completion. The subscription will request unbounded demand (Long.MAX_VALUE).For a passive version that observe and forward incoming data see
doOnSuccess(Consumer)anddoOnError(java.util.function.Consumer).Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
- Parameters:
consumer- the consumer to invoke on each valueerrorConsumer- the consumer to invoke on error signalcompleteConsumer- the consumer to invoke on complete signal- Returns:
- a new
Disposablethat can be used to cancel the underlyingSubscription
-
subscribe
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer) SubscribeConsumerto thisMonothat will respectively consume all the elements in the sequence, handle errors, react to completion, and request upon subscription. It will let the providedsubscriptionConsumerrequest the adequate amount of data, or request unbounded demandLong.MAX_VALUEif no such consumer is provided.For a passive version that observe and forward incoming data see
doOnSuccess(Consumer)anddoOnError(java.util.function.Consumer).Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
- Parameters:
consumer- the consumer to invoke on each valueerrorConsumer- the consumer to invoke on error signalcompleteConsumer- the consumer to invoke on complete signalsubscriptionConsumer- the consumer to invoke on subscribe signal, to be used for the initialrequest, or null for max request- Returns:
- a new
Disposablethat can be used to cancel the underlyingSubscription
-
subscribe
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Context initialContext) SubscribeConsumerto thisMonothat will respectively consume all the elements in the sequence, handle errors and react to completion. Additionally, aContextis tied to the subscription. At subscription, an unbounded request is implicitly made.For a passive version that observe and forward incoming data see
doOnSuccess(Consumer)anddoOnError(java.util.function.Consumer).Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
- Parameters:
consumer- the consumer to invoke on each valueerrorConsumer- the consumer to invoke on error signalcompleteConsumer- the consumer to invoke on complete signalinitialContext- theContextfor the subscription- Returns:
- a new
Disposablethat can be used to cancel the underlyingSubscription
-
subscribe
-
subscribe
An internalPublisher.subscribe(Subscriber)that will bypassHooks.onLastOperator(Function)pointcut.In addition to behave as expected by
Publisher.subscribe(Subscriber)in a controlled manner, it supports direct subscribe-timeContextpassing.- Specified by:
subscribein interfaceCorePublisher<T>- Parameters:
actual- theSubscriberinterested into the published sequence- See Also:
-
subscribeOn
Run subscribe, onSubscribe and request on a specifiedScheduler'sScheduler.Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of apublishOn.mono.subscribeOn(Schedulers.parallel()).subscribe()- Parameters:
scheduler- aSchedulerproviding theScheduler.Workerwhere to subscribe- Returns:
- a
Monorequesting asynchronously - See Also:
-
subscribeWith
Subscribe the givenSubscriberto thisMonoand return saidSubscriber, allowing subclasses with a richer API to be used fluently.- Type Parameters:
E- the reified type of theSubscriberfor chaining- Parameters:
subscriber- theSubscriberto subscribe with- Returns:
- the passed
Subscriberafter subscribing it to thisMono
-
switchIfEmpty
Fallback to an alternativeMonoif this mono is completed without data- Parameters:
alternate- the alternate mono if this mono is empty- Returns:
- a
Monofalling back upon source completing without elements - See Also:
-
tag
Tag this mono with a key/value pair. These can be retrieved as aSetof all tags throughout the publisher chain by usingScannable.tags()(as traversed byScannable.parents()).The name is typically visible at assembly time by the
tap(SignalListenerFactory)operator, which could for example be configured with a metrics listener applying the tag(s) to its meters.- Parameters:
key- a tag keyvalue- a tag value- Returns:
- the same sequence, but bearing tags
- See Also:
-
take
Give this Mono a chance to resolve within a specified time frame but complete if it doesn't. This works a bit liketimeout(Duration)except that the resultingMonocompletes rather than errors when the timer expires.The timeframe is evaluated using the
parallel Scheduler.- Parameters:
duration- the maximum duration to wait for the source Mono to resolve.- Returns:
- a new
Monothat will propagate the signals from the source unless no signal is received forduration, in which case it completes.
-
take
Give this Mono a chance to resolve within a specified time frame but complete if it doesn't. This works a bit liketimeout(Duration)except that the resultingMonocompletes rather than errors when the timer expires.The timeframe is evaluated using the provided
Scheduler. -
takeUntilOther
Give this Mono a chance to resolve before a companionPublisheremits. If the companion emits before any signal from the source, the resulting Mono will complete. Otherwise, it will relay signals from the source. -
tap
Tap into Reactive Streams signals emitted or received by thisMonoand notify a stateful per-SubscriberSignalListener.Any exception thrown by the
SignalListenermethods causes the subscription to be cancelled and the subscriber to be terminated with anonError signalof that exception. Note thatSignalListener.doFinally(SignalType),SignalListener.doAfterComplete()andSignalListener.doAfterError(Throwable)instead justdropthe exception.This simplified variant assumes the state is purely initialized within the
Supplier, as it is called for each incomingSubscriberwithout additional context.When the context-propagation library is available at runtime and the downstream
ContextViewis not empty, this operator implicitly uses the library to restore thread locals around all invocations ofSignalListenermethods. Typically, this would be done in conjunction with the use ofcontextCapture()operator down the chain.- Parameters:
simpleListenerGenerator- theSupplierto create a newSignalListeneron each subscription- Returns:
- a new
Monowith side effects defined by generatedSignalListener - See Also:
-
tap
Tap into Reactive Streams signals emitted or received by thisMonoand notify a stateful per-SubscriberSignalListener.Any exception thrown by the
SignalListenermethods causes the subscription to be cancelled and the subscriber to be terminated with anonError signalof that exception. Note thatSignalListener.doFinally(SignalType),SignalListener.doAfterComplete()andSignalListener.doAfterError(Throwable)instead justdropthe exception.This simplified variant allows the
SignalListenerto be constructed for each subscription with access to the incomingSubscriber'sContextView.When the context-propagation library is available at runtime and the
ContextViewis not empty, this operator implicitly uses the library to restore thread locals around all invocations ofSignalListenermethods. Typically, this would be done in conjunction with the use ofcontextCapture()operator down the chain.- Parameters:
listenerGenerator- theFunctionto create a newSignalListeneron each subscription- Returns:
- a new
Monowith side effects defined by generatedSignalListener - See Also:
-
tap
Tap into Reactive Streams signals emitted or received by thisMonoand notify a stateful per-SubscriberSignalListenercreated by the providedSignalListenerFactory.The factory will initialize a
state objectfor eachFluxorMonoinstance it is used with, and that state will be cached and exposed for each incomingSubscriberin order to generate the associatedlistener.Any exception thrown by the
SignalListenermethods causes the subscription to be cancelled and the subscriber to be terminated with anonError signalof that exception. Note thatSignalListener.doFinally(SignalType),SignalListener.doAfterComplete()andSignalListener.doAfterError(Throwable)instead justdropthe exception.When the context-propagation library is available at runtime and the downstream
ContextViewis not empty, this operator implicitly uses the library to restore thread locals around all invocations ofSignalListenermethods. Typically, this would be done in conjunction with the use ofcontextCapture()operator down the chain.- Parameters:
listenerFactory- theSignalListenerFactoryto create a newSignalListeneron each subscription- Returns:
- a new
Monowith side effects defined by generatedSignalListener - See Also:
-
then
Return aMono<Void>which only replays complete and error signals from thisMono.Discard Support: This operator discards the element from the source.
- Returns:
- a
Monoignoring its payload (actively dropping)
-
then
Let thisMonocomplete then play another Mono.In other words ignore element from this
Monoand transform its completion signal into the emission and completion signal of a providedMono<V>. Error signal is replayed in the resultingMono<V>.Discard Support: This operator discards the element from the source.
-
thenReturn
Let thisMonocomplete successfully, then emit the provided value. On an error in the originalMono, the error signal is propagated instead.Discard Support: This operator discards the element from the source.
- Type Parameters:
V- the element type of the supplied value- Parameters:
value- a value to emit after successful termination- Returns:
- a new
Monothat emits the supplied value
-
thenEmpty
Return aMono<Void>that waits for thisMonoto complete then for a suppliedPublisher<Void>to also complete. The second completion signal is replayed, or any error signal that occurs instead.Discard Support: This operator discards the element from the source.
-
thenMany
Let thisMonocomplete successfully then play anotherPublisher. On an error in the originalMono, the error signal is propagated instead.In other words ignore the element from this mono and transform the completion signal into a
Flux<V>that will emit elements from the providedPublisher.Discard Support: This operator discards the element from the source.
-
timed
Times thisMonoSubscriber.onNext(Object)event, encapsulated into aTimedobject that lets downstream consumer look at various time information gathered with nanosecond resolution using the default clock (Schedulers.parallel()):Timed.elapsed(): the time in nanoseconds since subscription, as aDuration. This is functionally equivalent toelapsed(), with a more expressive and precise representation than aTuple2with a long.Timed.timestamp(): the timestamp of this onNext, as anInstant(with nanoseconds part). This is functionally equivalent totimestamp(), with a more expressive and precise representation than aTuple2with a long.Timed.elapsedSinceSubscription(): forMonothis is the same asTimed.elapsed().
The
Timedobject instances are safe to store and use later, as they are created as an immutable wrapper around the<T>value and immediately passed downstream.- Returns:
- a timed
Mono - See Also:
-
timed
Times thisMonoSubscriber.onNext(Object)event, encapsulated into aTimedobject that lets downstream consumer look at various time information gathered with nanosecond resolution using the providedScheduleras a clock:Timed.elapsed(): the time in nanoseconds since subscription, as aDuration. This is functionally equivalent toelapsed(), with a more expressive and precise representation than aTuple2with a long.Timed.timestamp(): the timestamp of this onNext, as anInstant(with nanoseconds part). This is functionally equivalent totimestamp(), with a more expressive and precise representation than aTuple2with a long.Timed.elapsedSinceSubscription(): forMonothis is the same asTimed.elapsed().
The
Timedobject instances are safe to store and use later, as they are created as an immutable wrapper around the<T>value and immediately passed downstream.- Returns:
- a timed
Mono - See Also:
-
timeout
-
timeout
Switch to a fallbackMonoin case no item arrives within the givenDuration.If the fallback
Monois null, signal aTimeoutExceptioninstead. -
timeout
Signal aTimeoutExceptionerror in case an item doesn't arrive before the given period, as measured on the providedScheduler. -
timeout
public final Mono<T> timeout(Duration timeout, @Nullable Mono<? extends T> fallback, Scheduler timer) Switch to a fallbackMonoin case an item doesn't arrive before the given period, as measured on the providedScheduler.If the given
Monois null, signal aTimeoutException. -
timeout
Signal aTimeoutExceptionin case the item from thisMonohas not been emitted before the givenPublisheremits. -
timeout
Switch to a fallbackPublisherin case the item from thisMonohas not been emitted before the givenPublisheremits.- Type Parameters:
U- the element type of the timeout Publisher- Parameters:
firstTimeout- the timeoutPublisherthat must not emit before the first signal from thisMonofallback- the fallbackPublisherto subscribe when a timeout occurs- Returns:
- an expirable
Monowith a fallbackMonoif the item doesn't come before aPublishersignals
-
timestamp
-
timestamp
If thisMonois valued, emit aTuple2pair of T1 the current clock time in millis (as aLongmeasured by the providedScheduler) and T2 the emitted data (as aT).The provider
Schedulerwill be asked toprovide timewith a granularity ofTimeUnit.MILLISECONDS. In order for this operator to work as advertised, the provided Scheduler should thus return results that can be interpreted as unix timestamps. -
toFuture
Transform thisMonointo aCompletableFuturecompleting on onNext or onComplete and failing on onError.- Returns:
- a
CompletableFuture
-
transform
Transform thisMonoin order to generate a targetMono. UnliketransformDeferred(Function), the provided function is executed as part of assembly.Function<Mono, Mono> applySchedulers = mono -> mono.subscribeOn(Schedulers.io()) .publishOn(Schedulers.parallel()); mono.transform(applySchedulers).map(v -> v * v).subscribe(); -
transformDeferred
public final <V> Mono<V> transformDeferred(Function<? super Mono<T>, ? extends Publisher<V>> transformer) Defer the given transformation to thisMonoin order to generate a targetMonotype. A transformation will occur for eachSubscriber. For instance:mono.transformDeferred(original -> original.log());
-
transformDeferredContextual
public final <V> Mono<V> transformDeferredContextual(BiFunction<? super Mono<T>, ? super ContextView, ? extends Publisher<V>> transformer) Defer the given transformation to thisMonoin order to generate a targetMonotype. A transformation will occur for eachSubscriber. In addition, the transformingBiFunctionexposes theContextViewof eachSubscriber. For instance:Mono<T> monoLogged = mono.transformDeferredContextual((original, ctx) -> original.log("for RequestID" + ctx.get("RequestID")) //...later subscribe. Each subscriber has its Context with a RequestID entry monoLogged.contextWrite(Context.of("RequestID", "requestA").subscribe(); monoLogged.contextWrite(Context.of("RequestID", "requestB").subscribe();- Type Parameters:
V- the item type in the returnedPublisher- Parameters:
transformer- theBiFunctionto lazily map thisMonointo a targetMonoinstance upon subscription, with access toContextView- Returns:
- a new
Mono - See Also:
-
zipWhen
Wait for the result from this mono, use it to create a second mono via the providedrightGeneratorfunction and combine both results into aTuple2.- Type Parameters:
T2- the element type of the other Mono instance- Parameters:
rightGenerator- theFunctionto generate aMonoto combine with- Returns:
- a new combined Mono
-
zipWhen
public final <T2,O> Mono<O> zipWhen(Function<T, Mono<? extends T2>> rightGenerator, BiFunction<T, T2, O> combinator) Wait for the result from this mono, use it to create a second mono via the providedrightGeneratorfunction and combine both results into an arbitraryOobject, as defined by the providedcombinatorfunction.- Type Parameters:
T2- the element type of the other Mono instanceO- the element type of the combination- Parameters:
rightGenerator- theFunctionto generate aMonoto combine withcombinator- aBiFunctioncombinator function when both sources complete- Returns:
- a new combined Mono
-
zipWith
Combine the result from this mono and another into aTuple2.An error or empty completion of any source will cause the other source to be cancelled and the resulting Mono to immediately error or complete, respectively.
- Type Parameters:
T2- the element type of the other Mono instance- Parameters:
other- theMonoto combine with- Returns:
- a new combined Mono
-
zipWith
public final <T2,O> Mono<O> zipWith(Mono<? extends T2> other, BiFunction<? super T, ? super T2, ? extends O> combinator) Combine the result from this mono and another into an arbitraryOobject, as defined by the providedcombinatorfunction.An error or empty completion of any source will cause the other source to be cancelled and the resulting Mono to immediately error or complete, respectively.
- Type Parameters:
T2- the element type of the other Mono instanceO- the element type of the combination- Parameters:
other- theMonoto combine withcombinator- aBiFunctioncombinator function when both sources complete- Returns:
- a new combined Mono
-
onAssembly
To be used by custom operators: invokes assemblyHookspointcut given aMono, potentially returning a newMono. This is for example useful to activate cross-cutting concerns at assembly time, eg. a generalizedcheckpoint().- Type Parameters:
T- the value type- Parameters:
source- the source to apply assembly hooks onto- Returns:
- the source, potentially wrapped with assembly time cross-cutting behavior
-
toString
-
firstWithSignal(Iterable).