Class Mono<T>

java.lang.Object
reactor.core.publisher.Mono<T>
Type Parameters:
T - the type of the single value of this class
All Implemented Interfaces:
Publisher<T>, CorePublisher<T>
Direct Known Subclasses:
MonoOperator, MonoProcessor

public abstract class Mono<T> extends Object implements CorePublisher<T>
A Reactive Streams Publisher with basic rx operators that emits at most one item via the onNext signal then terminates with an onComplete signal (successful Mono, with or without value), or only emits a single onError signal (failed Mono).

Most Mono implementations are expected to immediately call Subscriber.onComplete() after having called Subscriber#onNext(T). Mono.never() is an outlier: it doesn't emit any signal, which is not technically forbidden although not terribly useful outside of tests. On the other hand, a combination of onNext and onError is explicitly forbidden.

The recommended way to learn about the Mono API and discover new operators is through the reference documentation, rather than through this javadoc (as opposed to learning more about individual operators). See the "which operator do I need?" appendix.

The rx operators will offer aliases for input Mono type to preserve the "at most one" property of the resulting Mono. For instance flatMap returns a Mono, while there is a flatMapMany alias with possibly more than 1 emission.

Mono<Void> should be used for Publisher that just completes without any value.

It is intended to be used in implementations and return types, input parameters should keep using raw Publisher as much as possible.

Note that using state in the java.util.function / lambdas used within Mono operators should be avoided, as these may be shared between several Subscribers.

Author:
Sebastien Deleuze, Stephane Maldini, David Karnok, Simon Baslé, Injae Kim
See Also:
  • Constructor Details

    • Mono

      public Mono()
  • Method Details

    • create

      public static <T> Mono<T> create(Consumer<MonoSink<T>> callback)
      Creates a deferred emitter that can be used with callback-based APIs to signal at most one value, a complete or an error signal.

      Bridging legacy API involves mostly boilerplate code due to the lack of standard types and methods. There are two kinds of API surfaces: 1) addListener/removeListener and 2) callback-handler.

      1) addListener/removeListener pairs
      To work with such API one has to instantiate the listener, call the sink from the listener then register it with the source:

      
       Mono.<String>create(sink -> {
           HttpListener listener = event -> {
               if (event.getResponseCode() >= 400) {
                   sink.error(new RuntimeException("Failed"));
               } else {
                   String body = event.getBody();
                   if (body.isEmpty()) {
                       sink.success();
                   } else {
                       sink.success(body.toLowerCase());
                   }
               }
           };
      
           client.addListener(listener);
      
           sink.onDispose(() -> client.removeListener(listener));
       });
       
      Note that this works only with single-value emitting listeners. Otherwise, all subsequent signals are dropped. You may have to add client.removeListener(this); to the listener's body.

      2) callback handler
      This requires a similar instantiation pattern such as above, but usually the successful completion and error are separated into different methods. In addition, the legacy API may or may not support some cancellation mechanism.

      
       Mono.<String>create(sink -> {
           Callback<String> callback = new Callback<String>() {
               @Override
               public void onResult(String data) {
                   sink.success(data.toLowerCase());
               }
      
               @Override
               public void onError(Exception e) {
                   sink.error(e);
               }
           }
      
           // without cancellation support:
      
           client.call("query", callback);
      
           // with cancellation support:
      
           AutoCloseable cancel = client.call("query", callback);
           sink.onDispose(() -> {
               try {
                   cancel.close();
               } catch (Exception ex) {
                   Exceptions.onErrorDropped(ex);
               }
           });
       });
       
      Type Parameters:
      T - The type of the value emitted
      Parameters:
      callback - Consume the MonoSink provided per-subscriber by Reactor to generate signals.
      Returns:
      a Mono
    • defer

      public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier)
      Create a Mono provider that will supply a target Mono to subscribe to for each Subscriber downstream.

      Type Parameters:
      T - the element type of the returned Mono instance
      Parameters:
      supplier - a Mono factory
      Returns:
      a deferred Mono
      See Also:
    • deferContextual

      public static <T> Mono<T> deferContextual(Function<ContextView,? extends Mono<? extends T>> contextualMonoFactory)
      Create a Mono provider that will supply a target Mono to subscribe to for each Subscriber downstream. This operator behaves the same way as defer(Supplier), but accepts a Function that will receive the current ContextView as an argument.

      Type Parameters:
      T - the element type of the returned Mono instance
      Parameters:
      contextualMonoFactory - a Mono factory
      Returns:
      a deferred Mono deriving actual Mono from context values for each subscription
    • delay

      public static Mono<Long> delay(Duration duration)
      Create a Mono which delays an onNext signal by a given duration on a default Scheduler and completes. If the demand cannot be produced in time, an onError will be signalled instead. The delay is introduced through the parallel default Scheduler.

      Parameters:
      duration - the duration of the delay
      Returns:
      a new Mono
    • delay

      public static Mono<Long> delay(Duration duration, Scheduler timer)
      Create a Mono which delays an onNext signal by a given duration on a provided Scheduler and completes. If the demand cannot be produced in time, an onError will be signalled instead.

      Parameters:
      duration - the Duration of the delay
      timer - a time-capable Scheduler instance to run on
      Returns:
      a new Mono
    • empty

      public static <T> Mono<T> empty()
      Create a Mono that completes without emitting any item.

      Type Parameters:
      T - the reified Subscriber type
      Returns:
      a completed Mono
    • error

      public static <T> Mono<T> error(Throwable error)
      Create a Mono that terminates with the specified error immediately after being subscribed to.

      Type Parameters:
      T - the reified Subscriber type
      Parameters:
      error - the onError signal
      Returns:
      a failing Mono
    • error

      public static <T> Mono<T> error(Supplier<? extends Throwable> errorSupplier)
      Create a Mono that terminates with an error immediately after being subscribed to. The Throwable is generated by a Supplier, invoked each time there is a subscription and allowing for lazy instantiation.

      Type Parameters:
      T - the reified Subscriber type
      Parameters:
      errorSupplier - the error signal Supplier to invoke for each Subscriber
      Returns:
      a failing Mono
    • first

      @SafeVarargs @Deprecated public static <T> Mono<T> first(Mono<? extends T>... monos)
      Deprecated.
      use firstWithSignal(Mono[]). To be removed in reactor 3.5.
      Pick the first Mono to emit any signal (value, empty completion or error) and replay that signal, effectively behaving like the fastest of these competing sources.

      Type Parameters:
      T - The type of the function result.
      Parameters:
      monos - The deferred monos to use.
      Returns:
      a new Mono behaving like the fastest of its sources.
    • first

      @Deprecated public static <T> Mono<T> first(Iterable<? extends Mono<? extends T>> monos)
      Deprecated.
      use firstWithSignal(Iterable). To be removed in reactor 3.5.
      Pick the first Mono to emit any signal (value, empty completion or error) and replay that signal, effectively behaving like the fastest of these competing sources.

      Type Parameters:
      T - The type of the function result.
      Parameters:
      monos - The deferred monos to use.
      Returns:
      a new Mono behaving like the fastest of its sources.
    • firstWithSignal

      @SafeVarargs public static <T> Mono<T> firstWithSignal(Mono<? extends T>... monos)
      Pick the first Mono to emit any signal (value, empty completion or error) and replay that signal, effectively behaving like the fastest of these competing sources.

      Type Parameters:
      T - The type of the function result.
      Parameters:
      monos - The deferred monos to use.
      Returns:
      a new Mono behaving like the fastest of its sources.
    • firstWithSignal

      public static <T> Mono<T> firstWithSignal(Iterable<? extends Mono<? extends T>> monos)
      Pick the first Mono to emit any signal (value, empty completion or error) and replay that signal, effectively behaving like the fastest of these competing sources.

      Type Parameters:
      T - The type of the function result.
      Parameters:
      monos - The deferred monos to use.
      Returns:
      a new Mono behaving like the fastest of its sources.
    • firstWithValue

      public static <T> Mono<T> firstWithValue(Iterable<? extends Mono<? extends T>> monos)
      Pick the first Mono source to emit any value and replay that signal, effectively behaving like the source that first emits an onNext.

      Valued sources always "win" over an empty source (one that only emits onComplete) or a failing source (one that only emits onError).

      When no source can provide a value, this operator fails with a NoSuchElementException (provided there are at least two sources). This exception has a composite as its cause that can be used to inspect what went wrong with each source (so the composite has as many elements as there are sources).

      Exceptions from failing sources are directly reflected in the composite at the index of the failing source. For empty sources, a NoSuchElementException is added at their respective index. One can use Exceptions.unwrapMultiple(topLevel.getCause()) to easily inspect these errors as a List.

      Note that like in firstWithSignal(Iterable), an infinite source can be problematic if no other source emits onNext.

      Type Parameters:
      T - The type of the element in the sources and the resulting mono
      Parameters:
      monos - An Iterable of the competing source monos
      Returns:
      a new Mono behaving like the fastest of its sources
    • firstWithValue

      @SafeVarargs public static <T> Mono<T> firstWithValue(Mono<? extends T> first, Mono<? extends T>... others)
      Pick the first Mono source to emit any value and replay that signal, effectively behaving like the source that first emits an onNext.

      Valued sources always "win" over an empty source (one that only emits onComplete) or a failing source (one that only emits onError).

      When no source can provide a value, this operator fails with a NoSuchElementException (provided there are at least two sources). This exception has a composite as its cause that can be used to inspect what went wrong with each source (so the composite has as many elements as there are sources).

      Exceptions from failing sources are directly reflected in the composite at the index of the failing source. For empty sources, a NoSuchElementException is added at their respective index. One can use Exceptions.unwrapMultiple(topLevel.getCause()) to easily inspect these errors as a List.

      Note that like in firstWithSignal(Mono[]), an infinite source can be problematic if no other source emits onNext. In case the first source is already an array-based firstWithValue(Mono, Mono[]) instance, nesting is avoided: a single new array-based instance is created with all the sources from first plus all the others sources at the same level.

      Type Parameters:
      T - The type of the element in the sources and the resulting mono
      Parameters:
      first - the first competing source Mono
      others - the other competing sources Mono
      Returns:
      a new Mono behaving like the fastest of its sources
    • from

      public static <T> Mono<T> from(Publisher<? extends T> source)
      Expose the specified Publisher with the Mono API, and ensure it will emit 0 or 1 item. The source emitter will be cancelled on the first `onNext`.

      Hooks.onEachOperator(String, Function) and similar assembly hooks are applied unless the source is already a Mono (including Mono that was decorated as a Flux, see Flux.from(Publisher)).

      Type Parameters:
      T - the source type
      Parameters:
      source - the Publisher source
      Returns:
      the next item emitted as a Mono
    • fromCallable

      public static <T> Mono<T> fromCallable(Callable<? extends @Nullable T> callable)
      Create a Mono producing its value using the provided Callable. If the Callable resolves to null, the resulting Mono completes empty.

      Type Parameters:
      T - type of the expected value
      Parameters:
      callable - Callable that will produce the value
      Returns:
      A Mono.
    • fromCompletionStage

      public static <T> Mono<T> fromCompletionStage(CompletionStage<? extends T> completionStage)
      Create a Mono, producing its value using the provided CompletionStage.

      If the completionStage is also a Future, cancelling the Mono will cancel the future. Use fromFuture(CompletableFuture, boolean) with suppressCancellation set to true if you need to suppress cancellation propagation.

      Type Parameters:
      T - type of the expected value
      Parameters:
      completionStage - CompletionStage that will produce a value (or a null to complete immediately)
      Returns:
      A Mono.
    • fromCompletionStage

      public static <T> Mono<T> fromCompletionStage(Supplier<? extends CompletionStage<? extends T>> stageSupplier)
      Create a Mono that wraps a lazily-supplied CompletionStage on subscription, emitting the value produced by the CompletionStage.

      If the completionStage is also a Future, cancelling the Mono will cancel the future. Use fromFuture(CompletableFuture, boolean) with suppressCancellation set to true if you need to suppress cancellation propagation.

      Type Parameters:
      T - type of the expected value
      Parameters:
      stageSupplier - The Supplier of a CompletionStage that will produce a value (or a null to complete immediately). This allows lazy triggering of CompletionStage-based APIs.
      Returns:
      A Mono.
    • fromDirect

      public static <I> Mono<I> fromDirect(Publisher<? extends I> source)
      Convert a Publisher to a Mono without any cardinality check (ie this method doesn't cancel the source past the first element). Conversion transparently returns Mono sources without wrapping and otherwise supports Fuseable sources. Note this is an advanced interoperability operator that implies you know the Publisher you are converting follows the Mono semantics and only ever emits one element.

      Hooks.onEachOperator(String, Function) and similar assembly hooks are applied unless the source is already a Mono.

      Type Parameters:
      I - type of the value emitted by the publisher
      Parameters:
      source - the Mono-compatible Publisher to wrap
      Returns:
      a wrapped Mono
    • fromFuture

      public static <T> Mono<T> fromFuture(CompletableFuture<? extends T> future)
      Create a Mono, producing its value using the provided CompletableFuture and cancelling the future if the Mono gets cancelled.

      Use fromFuture(CompletableFuture, boolean) with suppressCancellation set to true if you need to suppress cancellation propagation.

      Type Parameters:
      T - type of the expected value
      Parameters:
      future - CompletableFuture that will produce a value (or a null to complete immediately)
      Returns:
      A Mono.
      See Also:
    • fromFuture

      public static <T> Mono<T> fromFuture(CompletableFuture<? extends T> future, boolean suppressCancel)
      Create a Mono, producing its value using the provided CompletableFuture and optionally cancelling the future if the Mono gets cancelled (if suppressCancel == false).

      Type Parameters:
      T - type of the expected value
      Parameters:
      future - CompletableFuture that will produce a value (or a null to complete immediately)
      suppressCancel - true to prevent cancellation of the future when the Mono is cancelled, false otherwise (the default)
      Returns:
      A Mono.
    • fromFuture

      public static <T> Mono<T> fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier)
      Create a Mono that wraps a lazily-supplied CompletableFuture on subscription, emitting the value produced by the future and cancelling the future if the Mono gets cancelled.

      Type Parameters:
      T - type of the expected value
      Parameters:
      futureSupplier - The Supplier of a CompletableFuture that will produce a value (or a null to complete immediately). This allows lazy triggering of future-based APIs.
      Returns:
      A Mono.
      See Also:
    • fromFuture

      public static <T> Mono<T> fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier, boolean suppressCancel)
      Create a Mono that wraps a lazily-supplied CompletableFuture on subscription, emitting the value produced by the future and optionally cancelling the future if the Mono gets cancelled (if suppressCancel == false).

      Type Parameters:
      T - type of the expected value
      Parameters:
      futureSupplier - The Supplier of a CompletableFuture that will produce a value (or a null to complete immediately). This allows lazy triggering of future-based APIs.
      suppressCancel - true to prevent cancellation of the future when the Mono is cancelled, false otherwise (the default)
      Returns:
      A Mono.
      See Also:
    • fromRunnable

      public static <T> Mono<T> fromRunnable(Runnable runnable)
      Create a Mono that completes empty once the provided Runnable has been executed.

      Type Parameters:
      T - The generic type of the upstream, which is preserved by this operator
      Parameters:
      runnable - Runnable that will be executed before emitting the completion signal
      Returns:
      A Mono.
    • fromSupplier

      public static <T> Mono<T> fromSupplier(Supplier<? extends @Nullable T> supplier)
      Create a Mono, producing its value using the provided Supplier. If the Supplier resolves to null, the resulting Mono completes empty.

      Type Parameters:
      T - type of the expected value
      Parameters:
      supplier - Supplier that will produce the value
      Returns:
      A Mono.
    • ignoreElements

      public static <T> Mono<T> ignoreElements(Publisher<T> source)
      Create a new Mono that ignores elements from the source (dropping them), but completes when the source completes.

      Discard Support: This operator discards the element from the source.

      Type Parameters:
      T - the source type of the ignored data
      Parameters:
      source - the Publisher to ignore
      Returns:
      a new completable Mono.
    • just

      public static <T> Mono<T> just(T data)
      Create a new Mono that emits the specified item, which is captured at instantiation time.

      Type Parameters:
      T - the type of the produced item
      Parameters:
      data - the only item to onNext
      Returns:
      a Mono.
    • justOrEmpty

      public static <T> Mono<T> justOrEmpty(@Nullable Optional<? extends T> data)
      Create a new Mono that emits the specified item if Optional.isPresent() otherwise only emits onComplete.

      Type Parameters:
      T - the type of the produced item
      Parameters:
      data - the Optional item to onNext or onComplete if not present
      Returns:
      a Mono.
    • justOrEmpty

      public static <T> Mono<T> justOrEmpty(@Nullable T data)
      Create a new Mono that emits the specified item if non null otherwise only emits onComplete.

      Type Parameters:
      T - the type of the produced item
      Parameters:
      data - the item to onNext or onComplete if null
      Returns:
      a Mono.
    • never

      public static <T> Mono<T> never()
      Return a Mono that will never signal any data, error or completion signal, essentially running indefinitely.

      Type Parameters:
      T - the Subscriber type target
      Returns:
      a never completing Mono
    • sequenceEqual

      public static <T> Mono<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2)
      Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the same by comparing the items emitted by each Publisher pairwise.

      Type Parameters:
      T - the type of items emitted by each Publisher
      Parameters:
      source1 - the first Publisher to compare
      source2 - the second Publisher to compare
      Returns:
      a Mono that emits a Boolean value that indicates whether the two sequences are the same
    • sequenceEqual

      public static <T> Mono<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2, BiPredicate<? super T,? super T> isEqual)
      Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the same by comparing the items emitted by each Publisher pairwise based on the results of a specified equality function.

      Type Parameters:
      T - the type of items emitted by each Publisher
      Parameters:
      source1 - the first Publisher to compare
      source2 - the second Publisher to compare
      isEqual - a function used to compare items emitted by each Publisher
      Returns:
      a Mono that emits a Boolean value that indicates whether the two Publisher two sequences are the same according to the specified function
    • sequenceEqual

      public static <T> Mono<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2, BiPredicate<? super T,? super T> isEqual, int prefetch)
      Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the same by comparing the items emitted by each Publisher pairwise based on the results of a specified equality function.

      Type Parameters:
      T - the type of items emitted by each Publisher
      Parameters:
      source1 - the first Publisher to compare
      source2 - the second Publisher to compare
      isEqual - a function used to compare items emitted by each Publisher
      prefetch - the number of items to prefetch from the first and second source Publisher
      Returns:
      a Mono that emits a Boolean value that indicates whether the two Publisher two sequences are the same according to the specified function
    • using

      public static <T, D> Mono<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Mono<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager)
      Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a Mono derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.

      • For eager cleanup, unlike in Flux, in the case of a valued Mono the cleanup happens just before passing the value to downstream. In all cases, exceptions raised by the eager cleanup Consumer may override the terminal event, discarding the element if the derived Mono was valued.
      • Non-eager cleanup will drop any exception.

      Type Parameters:
      T - emitted type
      D - resource type
      Parameters:
      resourceSupplier - a Callable that is called on subscribe to create the resource
      sourceSupplier - a Mono factory to create the Mono depending on the created resource
      resourceCleanup - invoked on completion to clean-up the resource
      eager - set to true to clean before any signal (including onNext) is passed downstream
      Returns:
      new Mono
    • using

      public static <T, D> Mono<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Mono<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)
      Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a Mono derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.

      Unlike in Flux, in the case of a valued Mono the cleanup happens just before passing the value to downstream. In all cases, exceptions raised by the cleanup Consumer may override the terminal event, discarding the element if the derived Mono was valued.

      Type Parameters:
      T - emitted type
      D - resource type
      Parameters:
      resourceSupplier - a Callable that is called on subscribe to create the resource
      sourceSupplier - a Mono factory to create the Mono depending on the created resource
      resourceCleanup - invoked on completion to clean-up the resource
      Returns:
      new Mono
    • using

      public static <T, D extends AutoCloseable> Mono<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Mono<? extends T>> sourceSupplier)
      Uses an AutoCloseable resource, generated by a supplier for each individual Subscriber, while streaming the value from a Mono derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.

      Unlike in Flux, in the case of a valued Mono the cleanup happens just before passing the value to downstream. In all cases, exceptions raised by the cleanup Consumer may override the terminal event, discarding the element if the derived Mono was valued.

      Type Parameters:
      T - emitted type
      D - resource type
      Parameters:
      resourceSupplier - a Callable that is called on subscribe to create the resource
      sourceSupplier - a Mono factory to create the Mono depending on the created resource
      Returns:
      new Mono
    • using

      public static <T, D extends AutoCloseable> Mono<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Mono<? extends T>> sourceSupplier, boolean eager)
      Uses an AutoCloseable resource, generated by a supplier for each individual Subscriber, while streaming the value from a Mono derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.

      • For eager cleanup, Unlike in Flux, in the case of a valued Mono the cleanup happens just before passing the value to downstream. In all cases, exceptions raised by the cleanup Consumer may override the terminal event, discarding the element if the derived Mono was valued.
      • Non-eager cleanup will drop any exception.

      Type Parameters:
      T - emitted type
      D - resource type
      Parameters:
      resourceSupplier - a Callable that is called on subscribe to create the resource
      sourceSupplier - a Mono factory to create the Mono depending on the created resource
      eager - set to true to clean before any signal (including onNext) is passed downstream
      Returns:
      new Mono
    • usingWhen

      public static <T, D> Mono<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D,? extends Mono<? extends T>> resourceClosure, Function<? super D,? extends Publisher<?>> asyncCleanup)
      Uses a resource, generated by a Publisher for each individual Subscriber, to derive a Mono. Note that all steps of the operator chain that would need the resource to be in an open stable state need to be described inside the resourceClosure Function.

      Unlike in the Flux counterpart, ALL signals are deferred until the Mono terminates and the relevant Function generates and invokes a "cleanup" Publisher. This is because a failure in the cleanup Publisher must result in a lone onError signal in the downstream Mono (any potential value in the derived Mono is discarded). Here are the various scenarios that can play out:

      • empty Mono, asyncCleanup ends with onComplete(): downstream receives onComplete()
      • empty Mono, asyncCleanup ends with onError(t): downstream receives onError(t)
      • valued Mono, asyncCleanup ends with onComplete(): downstream receives onNext(value),onComplete()
      • valued Mono, asyncCleanup ends with onError(t): downstream receives onError(t), value is discarded
      • error(e) Mono, asyncCleanup ends with onComplete(): downstream receives onError(e)
      • error(e) Mono, asyncCleanup ends with onError(t): downstream receives onError(t), t suppressing e

      Note that if the resource supplying Publisher emits more than one resource, the subsequent resources are dropped (Operators.onNextDropped(Object, Context)). If the publisher errors AFTER having emitted one resource, the error is also silently dropped (Operators.onErrorDropped(Throwable, Context)). An empty completion or error without at least one onNext signal (no resource supplied) triggers a short-circuit of the main sequence with the same terminal signal (no cleanup is invoked).

      Discard Support: This operator discards any source element if the asyncCleanup handler fails.

      Type Parameters:
      T - the type of elements emitted by the resource closure, and thus the main sequence
      D - the type of the resource object
      Parameters:
      resourceSupplier - a Publisher that "generates" the resource, subscribed for each subscription to the main sequence
      resourceClosure - a factory to derive a Mono from the supplied resource
      asyncCleanup - an asynchronous resource cleanup invoked when the resource closure terminates (with onComplete, onError or cancel)
      Returns:
      a new Mono built around a "transactional" resource, with deferred emission until the asynchronous cleanup sequence completes
    • usingWhen

      public static <T, D> Mono<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D,? extends Mono<? extends T>> resourceClosure, Function<? super D,? extends Publisher<?>> asyncComplete, BiFunction<? super D,? super Throwable,? extends Publisher<?>> asyncError, Function<? super D,? extends Publisher<?>> asyncCancel)
      Uses a resource, generated by a Publisher for each individual Subscriber, to derive a Mono.Note that all steps of the operator chain that would need the resource to be in an open stable state need to be described inside the resourceClosure Function.

      Unlike in the Flux counterpart, ALL signals are deferred until the Mono terminates and the relevant Function generates and invokes a "cleanup" Publisher. This is because a failure in the cleanup Publisher must result in a lone onError signal in the downstream Mono (any potential value in the derived Mono is discarded). Here are the various scenarios that can play out:

      • empty Mono, asyncComplete ends with onComplete(): downstream receives onComplete()
      • empty Mono, asyncComplete ends with onError(t): downstream receives onError(t)
      • valued Mono, asyncComplete ends with onComplete(): downstream receives onNext(value),onComplete()
      • valued Mono, asyncComplete ends with onError(t): downstream receives onError(t), value is discarded
      • error(e) Mono, errorComplete ends with onComplete(): downstream receives onError(e)
      • error(e) Mono, errorComplete ends with onError(t): downstream receives onError(t), t suppressing e

      Individual cleanups can also be associated with mono cancellation and error terminations:

      Note that if the resource supplying Publisher emits more than one resource, the subsequent resources are dropped (Operators.onNextDropped(Object, Context)). If the publisher errors AFTER having emitted one resource, the error is also silently dropped (Operators.onErrorDropped(Throwable, Context)). An empty completion or error without at least one onNext signal (no resource supplied) triggers a short-circuit of the main sequence with the same terminal signal (no cleanup is invoked).

      Discard Support: This operator discards the element if the asyncComplete handler fails.

      Type Parameters:
      T - the type of elements emitted by the resource closure, and thus the main sequence
      D - the type of the resource object
      Parameters:
      resourceSupplier - a Publisher that "generates" the resource, subscribed for each subscription to the main sequence
      resourceClosure - a factory to derive a Mono from the supplied resource
      asyncComplete - an asynchronous resource cleanup invoked if the resource closure terminates with onComplete
      asyncError - an asynchronous resource cleanup invoked if the resource closure terminates with onError. The terminating error is provided to the BiFunction
      asyncCancel - an asynchronous resource cleanup invoked if the resource closure is cancelled. When null, the asyncComplete path is used instead.
      Returns:
      a new Mono built around a "transactional" resource, with several termination path triggering asynchronous cleanup sequences
    • when

      public static Mono<Void> when(Publisher<?>... sources)
      Aggregate given publishers into a new Mono that will be fulfilled when all of the given sources have completed. An error will cause pending results to be cancelled and immediate error emission to the returned Mono.

      Parameters:
      sources - The sources to use.
      Returns:
      a Mono.
    • when

      public static Mono<Void> when(Iterable<? extends Publisher<?>> sources)
      Aggregate given publishers into a new Mono that will be fulfilled when all of the given Publishers have completed. An error will cause pending results to be cancelled and immediate error emission to the returned Mono.

      Parameters:
      sources - The sources to use.
      Returns:
      a Mono.
    • whenDelayError

      public static Mono<Void> whenDelayError(Iterable<? extends Publisher<?>> sources)
      Aggregate given publishers into a new Mono that will be fulfilled when all of the given sources have completed. Errors from the sources are delayed. If several Publishers error, the exceptions are combined (as suppressed exceptions on a root exception).

      Parameters:
      sources - The sources to use.
      Returns:
      a Mono.
    • whenDelayError

      public static Mono<Void> whenDelayError(Publisher<?>... sources)
      Merge given publishers into a new Mono that will be fulfilled when all of the given sources have completed. Errors from the sources are delayed. If several Publishers error, the exceptions are combined (as suppressed exceptions on a root exception).

      Parameters:
      sources - The sources to use.
      Returns:
      a Mono.
    • zip

      public static <T1, T2> Mono<Tuple2<T1,T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2)
      Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple2. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.

      Type Parameters:
      T1 - type of the value from p1
      T2 - type of the value from p2
      Parameters:
      p1 - The first upstream Publisher to subscribe to.
      p2 - The second upstream Publisher to subscribe to.
      Returns:
      a Mono.
    • zip

      public static <T1, T2, O> Mono<O> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1,? super T2,? extends O> combinator)
      Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values as defined by the combinator function. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.

      Type Parameters:
      T1 - type of the value from p1
      T2 - type of the value from p2
      O - output value
      Parameters:
      p1 - The first upstream Publisher to subscribe to.
      p2 - The second upstream Publisher to subscribe to.
      combinator - a BiFunction combinator function when both sources complete
      Returns:
      a Mono.
    • zip

      public static <T1, T2, T3> Mono<Tuple3<T1,T2,T3>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3)
      Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple3. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.

      Type Parameters:
      T1 - type of the value from p1
      T2 - type of the value from p2
      T3 - type of the value from p3
      Parameters:
      p1 - The first upstream Publisher to subscribe to.
      p2 - The second upstream Publisher to subscribe to.
      p3 - The third upstream Publisher to subscribe to.
      Returns:
      a Mono.
    • zip

      public static <T1, T2, T3, T4> Mono<Tuple4<T1,T2,T3,T4>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4)
      Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple4. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.

      Type Parameters:
      T1 - type of the value from p1
      T2 - type of the value from p2
      T3 - type of the value from p3
      T4 - type of the value from p4
      Parameters:
      p1 - The first upstream Publisher to subscribe to.
      p2 - The second upstream Publisher to subscribe to.
      p3 - The third upstream Publisher to subscribe to.
      p4 - The fourth upstream Publisher to subscribe to.
      Returns:
      a Mono.
    • zip

      public static <T1, T2, T3, T4, T5> Mono<Tuple5<T1,T2,T3,T4,T5>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5)
      Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple5. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.

      Type Parameters:
      T1 - type of the value from p1
      T2 - type of the value from p2
      T3 - type of the value from p3
      T4 - type of the value from p4
      T5 - type of the value from p5
      Parameters:
      p1 - The first upstream Publisher to subscribe to.
      p2 - The second upstream Publisher to subscribe to.
      p3 - The third upstream Publisher to subscribe to.
      p4 - The fourth upstream Publisher to subscribe to.
      p5 - The fifth upstream Publisher to subscribe to.
      Returns:
      a Mono.
    • zip

      public static <T1, T2, T3, T4, T5, T6> Mono<Tuple6<T1,T2,T3,T4,T5,T6>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6)
      Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple6. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.

      Type Parameters:
      T1 - type of the value from p1
      T2 - type of the value from p2
      T3 - type of the value from p3
      T4 - type of the value from p4
      T5 - type of the value from p5
      T6 - type of the value from p6
      Parameters:
      p1 - The first upstream Publisher to subscribe to.
      p2 - The second upstream Publisher to subscribe to.
      p3 - The third upstream Publisher to subscribe to.
      p4 - The fourth upstream Publisher to subscribe to.
      p5 - The fifth upstream Publisher to subscribe to.
      p6 - The sixth upstream Publisher to subscribe to.
      Returns:
      a Mono.
    • zip

      public static <T1, T2, T3, T4, T5, T6, T7> Mono<Tuple7<T1,T2,T3,T4,T5,T6,T7>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6, Mono<? extends T7> p7)
      Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple7. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.

      Type Parameters:
      T1 - type of the value from p1
      T2 - type of the value from p2
      T3 - type of the value from p3
      T4 - type of the value from p4
      T5 - type of the value from p5
      T6 - type of the value from p6
      T7 - type of the value from p7
      Parameters:
      p1 - The first upstream Publisher to subscribe to.
      p2 - The second upstream Publisher to subscribe to.
      p3 - The third upstream Publisher to subscribe to.
      p4 - The fourth upstream Publisher to subscribe to.
      p5 - The fifth upstream Publisher to subscribe to.
      p6 - The sixth upstream Publisher to subscribe to.
      p7 - The seventh upstream Publisher to subscribe to.
      Returns:
      a Mono.
    • zip

      public static <T1, T2, T3, T4, T5, T6, T7, T8> Mono<Tuple8<T1,T2,T3,T4,T5,T6,T7,T8>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6, Mono<? extends T7> p7, Mono<? extends T8> p8)
      Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple8. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.

      Type Parameters:
      T1 - type of the value from p1
      T2 - type of the value from p2
      T3 - type of the value from p3
      T4 - type of the value from p4
      T5 - type of the value from p5
      T6 - type of the value from p6
      T7 - type of the value from p7
      T8 - type of the value from p8
      Parameters:
      p1 - The first upstream Publisher to subscribe to.
      p2 - The second upstream Publisher to subscribe to.
      p3 - The third upstream Publisher to subscribe to.
      p4 - The fourth upstream Publisher to subscribe to.
      p5 - The fifth upstream Publisher to subscribe to.
      p6 - The sixth upstream Publisher to subscribe to.
      p7 - The seventh upstream Publisher to subscribe to.
      p8 - The eight upstream Publisher to subscribe to.
      Returns:
      a Mono.
    • zip

      public static <R> Mono<R> zip(Iterable<? extends Mono<?>> monos, Function<? super Object[],? extends R> combinator)
      Aggregate given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values according to the provided combinator function. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.

      Type Parameters:
      R - the combined result
      Parameters:
      monos - The monos to use.
      combinator - the function to transform the combined array into an arbitrary object.
      Returns:
      a Mono.
    • zip

      public static <R> Mono<R> zip(Function<? super Object[],? extends R> combinator, Mono<?>... monos)
      Aggregate given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values according to the provided combinator function. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.

      Type Parameters:
      R - the combined result
      Parameters:
      monos - The monos to use.
      combinator - the function to transform the combined array into an arbitrary object.
      Returns:
      a Mono.
    • zipDelayError

      public static <T1, T2> Mono<Tuple2<T1,T2>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2)
      Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple2 and delaying errors. If a Mono source completes without value, the other source is run to completion then the resulting Mono completes empty. If both Monos error, the two exceptions are combined (as suppressed exceptions on a root exception).

      Type Parameters:
      T1 - type of the value from p1
      T2 - type of the value from p2
      Parameters:
      p1 - The first upstream Publisher to subscribe to.
      p2 - The second upstream Publisher to subscribe to.
      Returns:
      a Mono.
    • zipDelayError

      public static <T1, T2, T3> Mono<Tuple3<T1,T2,T3>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3)
      Merge given monos into a new Mono that will be fulfilled when all of the given Mono Monos have produced an item, aggregating their values into a Tuple3 and delaying errors. If a Mono source completes without value, all other sources are run to completion then the resulting Mono completes empty. If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).

      Type Parameters:
      T1 - type of the value from p1
      T2 - type of the value from p2
      T3 - type of the value from p3
      Parameters:
      p1 - The first upstream Publisher to subscribe to.
      p2 - The second upstream Publisher to subscribe to.
      p3 - The third upstream Publisher to subscribe to.
      Returns:
      a Mono.
    • zipDelayError

      public static <T1, T2, T3, T4> Mono<Tuple4<T1,T2,T3,T4>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4)
      Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple4 and delaying errors. If a Mono source completes without value, all other sources are run to completion then the resulting Mono completes empty. If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).

      Type Parameters:
      T1 - type of the value from p1
      T2 - type of the value from p2
      T3 - type of the value from p3
      T4 - type of the value from p4
      Parameters:
      p1 - The first upstream Publisher to subscribe to.
      p2 - The second upstream Publisher to subscribe to.
      p3 - The third upstream Publisher to subscribe to.
      p4 - The fourth upstream Publisher to subscribe to.
      Returns:
      a Mono.
    • zipDelayError

      public static <T1, T2, T3, T4, T5> Mono<Tuple5<T1,T2,T3,T4,T5>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5)
      Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple5 and delaying errors. If a Mono source completes without value, all other sources are run to completion then the resulting Mono completes empty. If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).

      Type Parameters:
      T1 - type of the value from p1
      T2 - type of the value from p2
      T3 - type of the value from p3
      T4 - type of the value from p4
      T5 - type of the value from p5
      Parameters:
      p1 - The first upstream Publisher to subscribe to.
      p2 - The second upstream Publisher to subscribe to.
      p3 - The third upstream Publisher to subscribe to.
      p4 - The fourth upstream Publisher to subscribe to.
      p5 - The fifth upstream Publisher to subscribe to.
      Returns:
      a Mono.
    • zipDelayError

      public static <T1, T2, T3, T4, T5, T6> Mono<Tuple6<T1,T2,T3,T4,T5,T6>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6)
      Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple6 and delaying errors. If a Mono source completes without value, all other sources are run to completion then the resulting Mono completes empty. If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).

      Type Parameters:
      T1 - type of the value from p1
      T2 - type of the value from p2
      T3 - type of the value from p3
      T4 - type of the value from p4
      T5 - type of the value from p5
      T6 - type of the value from p6
      Parameters:
      p1 - The first upstream Publisher to subscribe to.
      p2 - The second upstream Publisher to subscribe to.
      p3 - The third upstream Publisher to subscribe to.
      p4 - The fourth upstream Publisher to subscribe to.
      p5 - The fifth upstream Publisher to subscribe to.
      p6 - The sixth upstream Publisher to subscribe to.
      Returns:
      a Mono.
    • zipDelayError

      public static <T1, T2, T3, T4, T5, T6, T7> Mono<Tuple7<T1,T2,T3,T4,T5,T6,T7>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6, Mono<? extends T7> p7)
      Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple7 and delaying errors. If a Mono source completes without value, all other sources are run to completion then the resulting Mono completes empty. If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).

      Type Parameters:
      T1 - type of the value from p1
      T2 - type of the value from p2
      T3 - type of the value from p3
      T4 - type of the value from p4
      T5 - type of the value from p5
      T6 - type of the value from p6
      T7 - type of the value from p7
      Parameters:
      p1 - The first upstream Publisher to subscribe to.
      p2 - The second upstream Publisher to subscribe to.
      p3 - The third upstream Publisher to subscribe to.
      p4 - The fourth upstream Publisher to subscribe to.
      p5 - The fifth upstream Publisher to subscribe to.
      p6 - The sixth upstream Publisher to subscribe to.
      p7 - The seventh upstream Publisher to subscribe to.
      Returns:
      a Mono.
    • zipDelayError

      public static <T1, T2, T3, T4, T5, T6, T7, T8> Mono<Tuple8<T1,T2,T3,T4,T5,T6,T7,T8>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6, Mono<? extends T7> p7, Mono<? extends T8> p8)
      Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple8 and delaying errors. If a Mono source completes without value, all other sources are run to completion then the resulting Mono completes empty. If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).

      Type Parameters:
      T1 - type of the value from p1
      T2 - type of the value from p2
      T3 - type of the value from p3
      T4 - type of the value from p4
      T5 - type of the value from p5
      T6 - type of the value from p6
      T7 - type of the value from p7
      T8 - type of the value from p8
      Parameters:
      p1 - The first upstream Publisher to subscribe to.
      p2 - The second upstream Publisher to subscribe to.
      p3 - The third upstream Publisher to subscribe to.
      p4 - The fourth upstream Publisher to subscribe to.
      p5 - The fifth upstream Publisher to subscribe to.
      p6 - The sixth upstream Publisher to subscribe to.
      p7 - The seventh upstream Publisher to subscribe to.
      p8 - The eight upstream Publisher to subscribe to.
      Returns:
      a Mono.
    • zipDelayError

      public static <R> Mono<R> zipDelayError(Iterable<? extends Mono<?>> monos, Function<? super Object[],? extends R> combinator)
      Aggregate given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item. Errors from the sources are delayed. If a Mono source completes without value, all other sources are run to completion then the resulting Mono completes empty. If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).

      Type Parameters:
      R - the combined result
      Parameters:
      monos - The monos to use.
      combinator - the function to transform the combined array into an arbitrary object.
      Returns:
      a Mono.
    • zipDelayError

      public static <R> Mono<R> zipDelayError(Function<? super Object[],? extends R> combinator, Mono<?>... monos)
      Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values according to the provided combinator function and delaying errors. If a Mono source completes without value, all other sources are run to completion then the resulting Mono completes empty. If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).

      Type Parameters:
      R - the combined result
      Parameters:
      monos - The monos to use.
      combinator - the function to transform the combined array into an arbitrary object.
      Returns:
      a combined Mono.
    • as

      public final <P> P as(Function<? super Mono<T>,P> transformer)
      Transform this Mono into a target type.
       mono.as(Flux::from).subscribe() 
       
      Type Parameters:
      P - the returned instance type
      Parameters:
      transformer - the Function to immediately map this Mono into a target type
      Returns:
      the Mono transformed to an instance of P
      See Also:
    • and

      public final Mono<Void> and(Publisher<?> other)
      Join the termination signals from this mono and another source into the returned void mono

      Parameters:
      other - the Publisher to wait for complete
      Returns:
      a new combined Mono
      See Also:
    • block

      public @Nullable T block()
      Subscribe to this Mono and block indefinitely until a next signal is received. Returns that value, or null if the Mono completes empty. In case the Mono errors, the original exception is thrown (wrapped in a RuntimeException if it was a checked exception).

      Note that each block() will trigger a new subscription: in other words, the result might miss signal from hot publishers.

      Returns:
      T the result
    • block

      public @Nullable T block(Duration timeout)
      Subscribe to this Mono and block until a next signal is received or a timeout expires. Returns that value, or null if the Mono completes empty. In case the Mono errors, the original exception is thrown (wrapped in a RuntimeException if it was a checked exception). If the provided timeout expires, a RuntimeException is thrown with a TimeoutException as the cause.

      Note that each block() will trigger a new subscription: in other words, the result might miss signal from hot publishers.

      Parameters:
      timeout - maximum time period to wait for before raising a RuntimeException with a TimeoutException as the cause
      Returns:
      T the result
    • blockOptional

      public Optional<T> blockOptional()
      Subscribe to this Mono and block indefinitely until a next signal is received or the Mono completes empty. Returns an Optional, which can be used to replace the empty case with an Exception via Optional.orElseThrow(Supplier). In case the Mono itself errors, the original exception is thrown (wrapped in a RuntimeException if it was a checked exception).

      Note that each blockOptional() will trigger a new subscription: in other words, the result might miss signal from hot publishers.

      Returns:
      T the result
    • blockOptional

      public Optional<T> blockOptional(Duration timeout)
      Subscribe to this Mono and block until a next signal is received, the Mono completes empty or a timeout expires. Returns an Optional for the first two cases, which can be used to replace the empty case with an Exception via Optional.orElseThrow(Supplier). In case the Mono itself errors, the original exception is thrown (wrapped in a RuntimeException if it was a checked exception). If the provided timeout expires, a RuntimeException is thrown with a TimeoutException as the cause.

      Note that each block() will trigger a new subscription: in other words, the result might miss signal from hot publishers.

      Parameters:
      timeout - maximum time period to wait for before raising a RuntimeException with a TimeoutException as the cause
      Returns:
      T the result
    • cast

      public final <E> Mono<E> cast(Class<E> clazz)
      Cast the current Mono produced type into a target produced type.

      Type Parameters:
      E - the Mono output type
      Parameters:
      clazz - the target type to cast to
      Returns:
      a casted Mono
    • cache

      public final Mono<T> cache()
      Turn this Mono into a hot source and cache last emitted signals for further Subscriber. Completion and Error will also be replayed.

      Once the first subscription is made to this Mono, the source is subscribed to and the signal will be cached, indefinitely. This process cannot be cancelled.

      In the face of multiple concurrent subscriptions, this operator ensures that only one subscription is made to the source.

      Returns:
      a replaying Mono
    • cache

      public final Mono<T> cache(Duration ttl)
      Turn this Mono into a hot source and cache last emitted signals for further Subscriber, with an expiry timeout.

      Completion and Error will also be replayed until ttl triggers in which case the next Subscriber will start over a new subscription.

      Cache loading (ie. subscription to the source) is triggered atomically by the first subscription to an uninitialized or expired cache, which guarantees that a single cache load happens at a time (and other subscriptions will get notified of the newly cached value when it arrives).

      Returns:
      a replaying Mono
    • cache

      public final Mono<T> cache(Duration ttl, Scheduler timer)
      Turn this Mono into a hot source and cache last emitted signals for further Subscriber, with an expiry timeout.

      Completion and Error will also be replayed until ttl triggers in which case the next Subscriber will start over a new subscription.

      Cache loading (ie. subscription to the source) is triggered atomically by the first subscription to an uninitialized or expired cache, which guarantees that a single cache load happens at a time (and other subscriptions will get notified of the newly cached value when it arrives).

      Parameters:
      ttl - Time-to-live for each cached item and post termination.
      timer - the Scheduler on which to measure the duration.
      Returns:
      a replaying Mono
    • cache

      public final Mono<T> cache(Function<? super T,Duration> ttlForValue, Function<Throwable,Duration> ttlForError, Supplier<Duration> ttlForEmpty)
      Turn this Mono into a hot source and cache last emitted signal for further Subscriber, with an expiry timeout (TTL) that depends on said signal. A TTL of Long.MAX_VALUE milliseconds is interpreted as indefinite caching of the signal (no cache cleanup is scheduled, so the signal is retained as long as this Mono is not garbage collected).

      Empty completion and Error will also be replayed according to their respective TTL, so transient errors can be "retried" by letting the Function return Duration.ZERO. Such a transient exception would then be propagated to the first subscriber but the following subscribers would trigger a new source subscription.

      Exceptions in the TTL generators themselves are processed like the Duration.ZERO case, except the original signal is suppressed (in case of onError) or dropped (in case of onNext).

      Note that subscribers that come in perfectly simultaneously could receive the same cached signal even if the TTL is set to zero.

      Cache loading (ie. subscription to the source) is triggered atomically by the first subscription to an uninitialized or expired cache, which guarantees that a single cache load happens at a time (and other subscriptions will get notified of the newly cached value when it arrives).

      Parameters:
      ttlForValue - the TTL-generating Function invoked when source is valued
      ttlForError - the TTL-generating Function invoked when source is erroring
      ttlForEmpty - the TTL-generating Supplier invoked when source is empty
      Returns:
      a replaying Mono
    • cache

      public final Mono<T> cache(Function<? super T,Duration> ttlForValue, Function<Throwable,Duration> ttlForError, Supplier<Duration> ttlForEmpty, Scheduler timer)
      Turn this Mono into a hot source and cache last emitted signal for further Subscriber, with an expiry timeout (TTL) that depends on said signal. A TTL of Long.MAX_VALUE milliseconds is interpreted as indefinite caching of the signal (no cache cleanup is scheduled, so the signal is retained as long as this Mono is not garbage collected).

      Empty completion and Error will also be replayed according to their respective TTL, so transient errors can be "retried" by letting the Function return Duration.ZERO. Such a transient exception would then be propagated to the first subscriber but the following subscribers would trigger a new source subscription.

      Exceptions in the TTL generators themselves are processed like the Duration.ZERO case, except the original signal is suppressed (in case of onError) or dropped (in case of onNext).

      Note that subscribers that come in perfectly simultaneously could receive the same cached signal even if the TTL is set to zero.

      Cache loading (ie. subscription to the source) is triggered atomically by the first subscription to an uninitialized or expired cache, which guarantees that a single cache load happens at a time (and other subscriptions will get notified of the newly cached value when it arrives).

      Parameters:
      ttlForValue - the TTL-generating Function invoked when source is valued
      ttlForError - the TTL-generating Function invoked when source is erroring
      ttlForEmpty - the TTL-generating Supplier invoked when source is empty
      timer - the Scheduler on which to measure the duration.
      Returns:
      a replaying Mono
    • cacheInvalidateIf

      public final Mono<T> cacheInvalidateIf(Predicate<? super T> invalidationPredicate)
      Cache onNext signal received from the source and replay it to other subscribers, while allowing invalidation by verifying the cached value against the given Predicate each time a late subscription occurs. Note that the Predicate is only evaluated if the cache is currently populated, ie. it is not applied upon receiving the source onNext signal. For late subscribers, if the predicate returns true the cache is invalidated and a new subscription is made to the source in an effort to refresh the cache with a more up-to-date value to be passed to the new subscriber.

      The predicate is not strictly evaluated once per downstream subscriber. Rather, subscriptions happening in concurrent batches will trigger a single evaluation of the predicate. Similarly, a batch of subscriptions happening before the cache is populated (ie. before this operator receives an onNext signal after an invalidation) will always receive the incoming value without going through the Predicate. The predicate is only triggered by subscribers that come in AFTER the cache is populated. Therefore, it is possible that pre-population subscribers receive an "invalid" value, especially if the object can switch from a valid to an invalid state in a short amount of time (eg. between creation, cache population and propagation to the downstream subscriber(s)).

      If the cached value needs to be discarded in case of invalidation, the recommended way is to do so in the predicate directly. Note that some downstream subscribers might still be using or storing the value, for example if they haven't requested anything yet.

      As this form of caching is explicitly value-oriented, empty source completion signals and error signals are NOT cached. It is always possible to use materialize() to cache these (further using filter(Predicate) if one wants to only consider empty sources or error sources).

      Predicate is applied differently depending on whether the cache is populated or not:

      • IF EMPTY
        • first incoming subscriber creates a new COORDINATOR and adds itself
      • IF COORDINATOR
        1. each incoming subscriber is added to the current "batch" (COORDINATOR)
        2. once the value is received, the predicate is applied ONCE
          1. mismatch: all the batch is terminated with an error -> we're back to init state, next subscriber will trigger a new coordinator and a new subscription
          2. ok: all the batch is completed with the value -> cache is now POPULATED
      • IF POPULATED
        1. each incoming subscriber causes the predicate to apply
        2. if ok: complete that subscriber with the value
        3. if mismatch, swap the current POPULATED with a new COORDINATOR and add the subscriber to that coordinator
        4. imagining a race between sub1 and sub2:
          1. OK NOK will naturally lead to sub1 completing and sub2 being put on wait inside a new COORDINATOR
          2. NOK NOK will race swap of POPULATED with COORDINATOR1 and COORDINATOR2 respectively
            1. if sub1 swaps, sub2 will dismiss the COORDINATOR2 it failed to swap and loop back, see COORDINATOR1 and add itself
            2. if sub2 swaps, the reverse happens
            3. if value is populated in the time it takes for sub2 to loop back, sub2 sees a value and triggers the predicate again (hopefully passing)

      Cancellation is only possible for downstream subscribers when they've been added to a COORDINATOR. Subscribers that are received when POPULATED will either be completed right away or (if the predicate fails) end up being added to a COORDINATOR.

      When cancelling a COORDINATOR-issued subscription:

      1. removes itself from batch
      2. if 0 subscribers remaining
        1. swap COORDINATOR with EMPTY
        2. COORDINATOR cancels its source

      The fact that COORDINATOR cancels its source when no more subscribers remain is important, because it prevents issues with a never() source or a source that never produces a value passing the predicate (assuming timeouts on the subscriber).

      Parameters:
      invalidationPredicate - the Predicate used for cache invalidation. Returning true means the value is invalid and should be removed from the cache.
      Returns:
      a new cached Mono which can be invalidated
    • cacheInvalidateWhen

      public final Mono<T> cacheInvalidateWhen(Function<? super T,Mono<Void>> invalidationTriggerGenerator)
      Cache onNext signal received from the source and replay it to other subscribers, while allowing invalidation via a Mono<Void> companion trigger generated from the currently cached value.

      As this form of caching is explicitly value-oriented, empty source completion signals and error signals are NOT cached. It is always possible to use materialize() to cache these (further using filter(Predicate) if one wants to only consider empty sources or error sources). The exception is still propagated to the subscribers that have accumulated between the time the source has been subscribed to and the time the onError/onComplete terminal signal is received. An empty source is turned into a NoSuchElementException onError.

      Completion of the trigger will invalidate the cached element, so the next subscriber that comes in will trigger a new subscription to the source, re-populating the cache and re-creating a new trigger out of that value.

      • If the trigger completes with an error, all registered subscribers are terminated with the same error.
      • If all the subscribers are cancelled before the cache is populated (ie. an attempt to cache a never()), the source subscription is cancelled.
      • Cancelling a downstream subscriber once the cache has been populated is not necessarily relevant, as the value will be immediately replayed on subscription, which usually means within onSubscribe (so earlier than any cancellation can happen). That said the operator will make best efforts to detect such cancellations and avoid propagating the value to these subscribers.

      If the cached value needs to be discarded in case of invalidation, use the cacheInvalidateWhen(Function, Consumer) version. Note that some downstream subscribers might still be using or storing the value, for example if they haven't requested anything yet.

      Trigger is generated only after a subscribers in the COORDINATOR have received the value, and only once. The only way to get out of the POPULATED state is to use the trigger, so there cannot be multiple trigger subscriptions, nor concurrent triggering.

      Cancellation is only possible for downstream subscribers when they've been added to a COORDINATOR. Subscribers that are received when POPULATED will either be completed right away or (if the predicate fails) end up being added to a COORDINATOR.

      When cancelling a COORDINATOR-issued subscription:

      1. removes itself from batch
      2. if 0 subscribers remaining
        1. swap COORDINATOR with EMPTY
        2. COORDINATOR cancels its source

      The fact that COORDINATOR cancels its source when no more subscribers remain is important, because it prevents issues with a never() source or a source that never produces a value passing the predicate (assuming timeouts on the subscriber).

      Parameters:
      invalidationTriggerGenerator - the Function that generates new Mono<Void> triggers used for invalidation
      Returns:
      a new cached Mono which can be invalidated
    • cacheInvalidateWhen

      public final Mono<T> cacheInvalidateWhen(Function<? super T,Mono<Void>> invalidationTriggerGenerator, Consumer<? super T> onInvalidate)
      Cache onNext signal received from the source and replay it to other subscribers, while allowing invalidation via a Mono<Void> companion trigger generated from the currently cached value.

      As this form of caching is explicitly value-oriented, empty source completion signals and error signals are NOT cached. It is always possible to use materialize() to cache these (further using filter(Predicate) if one wants to only consider empty sources or error sources). The exception is still propagated to the subscribers that have accumulated between the time the source has been subscribed to and the time the onError/onComplete terminal signal is received. An empty source is turned into a NoSuchElementException onError.

      Completion of the trigger will invalidate the cached element, so the next subscriber that comes in will trigger a new subscription to the source, re-populating the cache and re-creating a new trigger out of that value.

      • If the trigger completes with an error, all registered subscribers are terminated with the same error.
      • If all the subscribers are cancelled before the cache is populated (ie. an attempt to cache a never()), the source subscription is cancelled.
      • Cancelling a downstream subscriber once the cache has been populated is not necessarily relevant, as the value will be immediately replayed on subscription, which usually means within onSubscribe (so earlier than any cancellation can happen). That said the operator will make best efforts to detect such cancellations and avoid propagating the value to these subscribers.

      Once a cached value is invalidated, it is passed to the provided Consumer (which MUST complete normally). Note that some downstream subscribers might still be using or storing the value, for example if they haven't requested anything yet.

      Trigger is generated only after a subscribers in the COORDINATOR have received the value, and only once. The only way to get out of the POPULATED state is to use the trigger, so there cannot be multiple trigger subscriptions, nor concurrent triggering.

      Cancellation is only possible for downstream subscribers when they've been added to a COORDINATOR. Subscribers that are received when POPULATED will either be completed right away or (if the predicate fails) end up being added to a COORDINATOR.

      When cancelling a COORDINATOR-issued subscription:

      1. removes itself from batch
      2. if 0 subscribers remaining
        1. swap COORDINATOR with EMPTY
        2. COORDINATOR cancels its source

      The fact that COORDINATOR cancels its source when no more subscribers remain is important, because it prevents issues with a never() source or a source that never produces a value passing the predicate (assuming timeouts on the subscriber).

      Parameters:
      invalidationTriggerGenerator - the Function that generates new Mono<Void> triggers used for invalidation
      onInvalidate - the Consumer that will be applied to cached value upon invalidation
      Returns:
      a new cached Mono which can be invalidated
    • cancelOn

      public final Mono<T> cancelOn(Scheduler scheduler)
      Prepare this Mono so that subscribers will cancel from it on a specified Scheduler.

      Parameters:
      scheduler - the Scheduler to signal cancel on
      Returns:
      a scheduled cancel Mono
    • checkpoint

      public final Mono<T> checkpoint()
      Activate traceback (full assembly tracing) for this particular Mono, in case of an error upstream of the checkpoint. Tracing incurs the cost of an exception stack trace creation.

      It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.

      The traceback is attached to the error as a suppressed exception. As such, if the error is a composite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected via Exceptions.isTraceback(Throwable).

      Returns:
      the assembly tracing Mono
    • checkpoint

      public final Mono<T> checkpoint(String description)
      Activate traceback (assembly marker) for this particular Mono by giving it a description that will be reflected in the assembly traceback in case of an error upstream of the checkpoint. Note that unlike checkpoint(), this doesn't create a filled stack trace, avoiding the main cost of the operator. However, as a trade-off the description must be unique enough for the user to find out where this Mono was assembled. If you only want a generic description, and still rely on the stack trace to find the assembly site, use the checkpoint(String, boolean) variant.

      It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.

      The traceback is attached to the error as a suppressed exception. As such, if the error is a composite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected via Exceptions.isTraceback(Throwable).

      Parameters:
      description - a unique enough description to include in the light assembly traceback.
      Returns:
      the assembly marked Mono
    • checkpoint

      public final Mono<T> checkpoint(@Nullable String description, boolean forceStackTrace)
      Activate traceback (full assembly tracing or the lighter assembly marking depending on the forceStackTrace option).

      By setting the forceStackTrace parameter to true, activate assembly tracing for this particular Mono and give it a description that will be reflected in the assembly traceback in case of an error upstream of the checkpoint. Note that unlike checkpoint(String), this will incur the cost of an exception stack trace creation. The description could for example be a meaningful name for the assembled mono or a wider correlation ID, since the stack trace will always provide enough information to locate where this Flux was assembled.

      By setting forceStackTrace to false, behaves like checkpoint(String) and is subject to the same caveat in choosing the description.

      It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly marker.

      The traceback is attached to the error as a suppressed exception. As such, if the error is a composite one, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected via Exceptions.isTraceback(Throwable).

      Parameters:
      description - a description (must be unique enough if forceStackTrace is set to false).
      forceStackTrace - false to make a light checkpoint without a stacktrace, true to use a stack trace.
      Returns:
      the assembly marked Mono.
    • concatWith

      public final Flux<T> concatWith(Publisher<? extends T> other)
      Concatenate emissions of this Mono with the provided Publisher (no interleave).

      Parameters:
      other - the Publisher sequence to concat after this Flux
      Returns:
      a concatenated Flux
    • contextCapture

      public final Mono<T> contextCapture()
      If context-propagation library is on the classpath, this is a convenience shortcut to capture thread local values during the subscription phase and put them in the Context that is visible upstream of this operator.

      As a result this operator should generally be used as close as possible to the end of the chain / subscription point.

      If the ContextView visible upstream is not empty, a small subset of operators will automatically restore the context snapshot (handle, tap). If context-propagation is not available at runtime, this operator simply returns the current Mono instance.

      Returns:
      a new Flux where context-propagation API has been used to capture entries and inject them into the Context
      See Also:
    • contextWrite

      public final Mono<T> contextWrite(ContextView contextToAppend)
      Enrich the Context visible from downstream for the benefit of upstream operators, by making all values from the provided ContextView visible on top of pairs from downstream.

      A Context (and its ContextView) is tied to a given subscription and is read by querying the downstream Subscriber. Subscriber that don't enrich the context instead access their own downstream's context. As a result, this operator conceptually enriches a Context coming from under it in the chain (downstream, by default an empty one) and makes the new enriched Context visible to operators above it in the chain.

      Parameters:
      contextToAppend - the ContextView to merge with the downstream Context, resulting in a new more complete Context that will be visible from upstream.
      Returns:
      a contextualized Mono
      See Also:
    • contextWrite

      public final Mono<T> contextWrite(Function<Context,Context> contextModifier)
      Enrich the Context visible from downstream for the benefit of upstream operators, by applying a Function to the downstream Context.

      The Function takes a Context for convenience, allowing to easily call write APIs to return a new Context.

      A Context (and its ContextView) is tied to a given subscription and is read by querying the downstream Subscriber. Subscriber that don't enrich the context instead access their own downstream's context. As a result, this operator conceptually enriches a Context coming from under it in the chain (downstream, by default an empty one) and makes the new enriched Context visible to operators above it in the chain.

      Parameters:
      contextModifier - the Function to apply to the downstream Context, resulting in a new more complete Context that will be visible from upstream.
      Returns:
      a contextualized Mono
      See Also:
    • defaultIfEmpty

      public final Mono<T> defaultIfEmpty(T defaultV)
      Provide a default single value if this mono is completed without any data

      Parameters:
      defaultV - the alternate value if this sequence is empty
      Returns:
      a new Mono
      See Also:
    • delayElement

      public final Mono<T> delayElement(Duration delay)
      Delay this Mono element (Subscriber.onNext(T) signal) by a given duration. Empty Monos or error signals are not delayed.

      Note that the scheduler on which the Mono chain continues execution will be the parallel scheduler if the mono is valued, or the current scheduler if the mono completes empty or errors.

      Parameters:
      delay - duration by which to delay the Subscriber.onNext(T) signal
      Returns:
      a delayed Mono
    • delayElement

      public final Mono<T> delayElement(Duration delay, Scheduler timer)
      Delay this Mono element (Subscriber.onNext(T) signal) by a given Duration, on a particular Scheduler. Empty monos or error signals are not delayed.

      Note that the scheduler on which the mono chain continues execution will be the scheduler provided if the mono is valued, or the current scheduler if the mono completes empty or errors.

      Parameters:
      delay - Duration by which to delay the Subscriber.onNext(T) signal
      timer - a time-capable Scheduler instance to delay the value signal on
      Returns:
      a delayed Mono
    • delayUntil

      public final Mono<T> delayUntil(Function<? super T,? extends Publisher<?>> triggerProvider)
      Subscribe to this Mono and another Publisher that is generated from this Mono's element and which will be used as a trigger for relaying said element.

      That is to say, the resulting Mono delays until this Mono's element is emitted, generates a trigger Publisher and then delays again until the trigger Publisher terminates.

      Note that contiguous calls to all delayUntil are fused together. The triggers are generated and subscribed to in sequence, once the previous trigger completes. Error is propagated immediately downstream. In both cases, an error in the source is immediately propagated.

      Parameters:
      triggerProvider - a Function that maps this Mono's value into a Publisher whose termination will trigger relaying the value.
      Returns:
      this Mono, but delayed until the derived publisher terminates.
    • delaySubscription

      public final Mono<T> delaySubscription(Duration delay)
      Delay the subscription to this Mono source until the given period elapses.

      Parameters:
      delay - duration before subscribing this Mono
      Returns:
      a delayed Mono
    • delaySubscription

      public final Mono<T> delaySubscription(Duration delay, Scheduler timer)
      Delay the subscription to this Mono source until the given Duration elapses.

      Parameters:
      delay - Duration before subscribing this Mono
      timer - a time-capable Scheduler instance to run on
      Returns:
      a delayed Mono
    • delaySubscription

      public final <U> Mono<T> delaySubscription(Publisher<U> subscriptionDelay)
      Delay the subscription to this Mono until another Publisher signals a value or completes.

      Type Parameters:
      U - the other source type
      Parameters:
      subscriptionDelay - a Publisher to signal by next or complete this subscribe(Subscriber)
      Returns:
      a delayed Mono
    • dematerialize

      public final <X> Mono<X> dematerialize()
      An operator working only if this Mono emits onNext, onError or onComplete Signal instances, transforming these materialized signals into real signals on the Subscriber. The error Signal will trigger onError and complete Signal will trigger onComplete.

      Type Parameters:
      X - the dematerialized type
      Returns:
      a dematerialized Mono
      See Also:
    • doAfterTerminate

      public final Mono<T> doAfterTerminate(Runnable afterTerminate)
      Add behavior (side-effect) triggered after the Mono terminates, either by completing downstream successfully or with an error.

      The relevant signal is propagated downstream, then the Runnable is executed.

      Parameters:
      afterTerminate - the callback to call after Subscriber.onComplete() or Subscriber.onError(java.lang.Throwable)
      Returns:
      an observed Mono
    • doFirst

      public final Mono<T> doFirst(Runnable onFirst)
      Add behavior (side-effect) triggered before the Mono is subscribed to, which should be the first event after assembly time.

      Note that when several doFirst(Runnable) operators are used anywhere in a chain of operators, their order of execution is reversed compared to the declaration order (as subscribe signal flows backward, from the ultimate subscriber to the source publisher):

      
       Mono.just(1v)
           .doFirst(() -> System.out.println("three"))
           .doFirst(() -> System.out.println("two"))
           .doFirst(() -> System.out.println("one"));
       //would print one two three
       
       

      In case the Runnable throws an exception, said exception will be directly propagated to the subscribing Subscriber along with a no-op Subscription, similarly to what error(Throwable) does. Otherwise, after the handler has executed, the Subscriber is directly subscribed to the original source Mono (this).

      This side-effect method provides stronger first guarantees compared to doOnSubscribe(Consumer), which is triggered once the Subscription has been set up and passed to the Subscriber.

      Parameters:
      onFirst - the callback to execute before the Mono is subscribed to
      Returns:
      an observed Mono
    • doFinally

      public final Mono<T> doFinally(Consumer<SignalType> onFinally)
      Add behavior triggering after the Mono terminates for any reason, including cancellation. The terminating event (SignalType.ON_COMPLETE, SignalType.ON_ERROR and SignalType.CANCEL) is passed to the consumer, which is executed after the signal has been passed downstream.

      Note that the fact that the signal is propagated downstream before the callback is executed means that several doFinally in a row will be executed in reverse order. If you want to assert the execution of the callback please keep in mind that the Mono will complete before it is executed, so its effect might not be visible immediately after eg. a block().

      Parameters:
      onFinally - the callback to execute after a terminal signal (complete, error or cancel)
      Returns:
      an observed Mono
    • doOnCancel

      public final Mono<T> doOnCancel(Runnable onCancel)
      Add behavior triggered when the Mono is cancelled.

      The handler is executed first, then the cancel signal is propagated upstream to the source.

      Parameters:
      onCancel - the callback to call on Subscription.cancel()
      Returns:
      a new Mono
    • doOnDiscard

      public final <R> Mono<T> doOnDiscard(Class<R> type, Consumer<? super R> discardHook)
      Potentially modify the behavior of the whole chain of operators upstream of this one to conditionally clean up elements that get discarded by these operators.

      The discardHook MUST be idempotent and safe to use on any instance of the desired type. Calls to this method are additive, and the order of invocation of the discardHook is the same as the order of declaration (calling .filter(...).doOnDiscard(first).doOnDiscard(second) will let the filter invoke first then second handlers).

      Two main categories of discarding operators exist:

      • filtering operators, dropping some source elements as part of their designed behavior
      • operators that prefetch a few elements and keep them around pending a request, but get cancelled/in error
      WARNING: Not all operators support this instruction. The ones that do are identified in the javadoc by the presence of a Discard Support section.
      Parameters:
      type - the Class of elements in the upstream chain of operators that this cleanup hook should take into account.
      discardHook - a Consumer of elements in the upstream chain of operators that performs the cleanup.
      Returns:
      a Mono that cleans up matching elements that get discarded upstream of it.
    • doOnNext

      public final Mono<T> doOnNext(Consumer<? super T> onNext)
      Add behavior triggered when the Mono emits a data successfully.

      The Consumer is executed first, then the onNext signal is propagated downstream.

      Parameters:
      onNext - the callback to call on Subscriber.onNext(T)
      Returns:
      a new Mono
    • doOnSuccess

      public final Mono<T> doOnSuccess(Consumer<? super @Nullable T> onSuccess)
      Add behavior triggered as soon as the Mono can be considered to have completed successfully. The value passed to the Consumer reflects the type of completion:
      • null : completed without data. handler is executed right before onComplete is propagated downstream
      • T: completed with data. handler is executed right before onNext is propagated downstream

      The Consumer is executed before propagating either onNext or onComplete downstream.

      Parameters:
      onSuccess - the callback to call on, argument is null if the Mono completes without data Subscriber.onNext(T) or Subscriber.onComplete() without preceding Subscriber.onNext(T)
      Returns:
      a new Mono
    • doOnEach

      public final Mono<T> doOnEach(Consumer<? super Signal<T>> signalConsumer)
      Add behavior triggered when the Mono emits an item, fails with an error or completes successfully. All these events are represented as a Signal that is passed to the side-effect callback. Note that this is an advanced operator, typically used for monitoring of a Mono. These Signal have a Context associated to them.

      The Consumer is executed first, then the relevant signal is propagated downstream.

      Parameters:
      signalConsumer - the mandatory callback to call on Subscriber.onNext(Object), Subscriber.onError(Throwable) and Subscriber.onComplete()
      Returns:
      an observed Mono
      See Also:
    • doOnError

      public final Mono<T> doOnError(Consumer<? super Throwable> onError)
      Add behavior triggered when the Mono completes with an error.

      The Consumer is executed first, then the onError signal is propagated downstream.

      Parameters:
      onError - the error callback to call on Subscriber.onError(Throwable)
      Returns:
      a new Mono
    • doOnError

      public final <E extends Throwable> Mono<T> doOnError(Class<E> exceptionType, Consumer<? super E> onError)
      Add behavior triggered when the Mono completes with an error matching the given exception type.

      The Consumer is executed first, then the onError signal is propagated downstream.

      Type Parameters:
      E - type of the error to handle
      Parameters:
      exceptionType - the type of exceptions to handle
      onError - the error handler for relevant errors
      Returns:
      an observed Mono
    • doOnError

      public final Mono<T> doOnError(Predicate<? super Throwable> predicate, Consumer<? super Throwable> onError)
      Add behavior triggered when the Mono completes with an error matching the given predicate.

      The Consumer is executed first, then the onError signal is propagated downstream.

      Parameters:
      predicate - the matcher for exceptions to handle
      onError - the error handler for relevant error
      Returns:
      an observed Mono
    • doOnRequest

      public final Mono<T> doOnRequest(LongConsumer consumer)
      Add behavior triggering a LongConsumer when the Mono receives any request.

      Note that non fatal error raised in the callback will not be propagated and will simply trigger Operators.onOperatorError(Throwable, Context).

      The LongConsumer is executed first, then the request signal is propagated upstream to the parent.

      Parameters:
      consumer - the consumer to invoke on each request
      Returns:
      an observed Mono
    • doOnSubscribe

      public final Mono<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe)
      Add behavior (side-effect) triggered when the Mono is being subscribed, that is to say when a Subscription has been produced by the Publisher and is being passed to the Subscriber.onSubscribe(Subscription).

      This method is not intended for capturing the subscription and calling its methods, but for side effects like monitoring. For instance, the correct way to cancel a subscription is to call Disposable.dispose() on the Disposable returned by subscribe().

      The Consumer is executed first, then the Subscription is propagated downstream to the next subscriber in the chain that is being established.

      Parameters:
      onSubscribe - the callback to call on Subscriber.onSubscribe(Subscription)
      Returns:
      a new Mono
      See Also:
    • doOnTerminate

      public final Mono<T> doOnTerminate(Runnable onTerminate)
      Add behavior triggered when the Mono terminates, either by completing with a value, completing empty or failing with an error. Unlike in Flux.doOnTerminate(Runnable), the simple fact that a Mono emits onNext implies completion, so the handler is invoked BEFORE the element is propagated (same as with doOnSuccess(Consumer)).

      The Runnable is executed first, then the onNext/onComplete/onError signal is propagated downstream.

      Parameters:
      onTerminate - the callback to call Subscriber.onNext(T), Subscriber.onComplete() without preceding Subscriber.onNext(T) or Subscriber.onError(java.lang.Throwable)
      Returns:
      a new Mono
    • elapsed

      public final Mono<Tuple2<Long,T>> elapsed()
      Map this Mono into Tuple2<Long, T> of timemillis and source data. The timemillis corresponds to the elapsed time between the subscribe and the first next signal, as measured by the parallel scheduler.

      Returns:
      a new Mono that emits a tuple of time elapsed in milliseconds and matching data
      See Also:
    • elapsed

      public final Mono<Tuple2<Long,T>> elapsed(Scheduler scheduler)
      Map this Mono sequence into Tuple2<Long, T> of timemillis and source data. The timemillis corresponds to the elapsed time between the subscribe and the first next signal, as measured by the provided Scheduler.

      Parameters:
      scheduler - a Scheduler instance to read time from
      Returns:
      a new Mono that emits a tuple of time elapsed in milliseconds and matching data
      See Also:
    • expandDeep

      public final Flux<T> expandDeep(Function<? super T,? extends Publisher<? extends T>> expander, int capacityHint)
      Recursively expand elements into a graph and emit all the resulting element, in a depth-first traversal order.

      That is: emit the value from this Mono, expand it and emit the first value at this first level of recursion, and so on... When no more recursion is possible, backtrack to the previous level and re-apply the strategy.

      For example, given the hierarchical structure

        A
         - AA
           - aa1
         - AB
           - ab1
         - a1
       
      Expands Mono.just(A) into
        A
        AA
        aa1
        AB
        ab1
        a1
       
      Parameters:
      expander - the Function applied at each level of recursion to expand values into a Publisher, producing a graph.
      capacityHint - a capacity hint to prepare the inner queues to accommodate n elements per level of recursion.
      Returns:
      this Mono expanded depth-first to a Flux
    • expandDeep

      public final Flux<T> expandDeep(Function<? super T,? extends Publisher<? extends T>> expander)
      Recursively expand elements into a graph and emit all the resulting element, in a depth-first traversal order.

      That is: emit the value from this Mono, expand it and emit the first value at this first level of recursion, and so on... When no more recursion is possible, backtrack to the previous level and re-apply the strategy.

      For example, given the hierarchical structure

        A
         - AA
           - aa1
         - AB
           - ab1
         - a1
       
      Expands Mono.just(A) into
        A
        AA
        aa1
        AB
        ab1
        a1
       
      Parameters:
      expander - the Function applied at each level of recursion to expand values into a Publisher, producing a graph.
      Returns:
      this Mono expanded depth-first to a Flux
    • expand

      public final Flux<T> expand(Function<? super T,? extends Publisher<? extends T>> expander, int capacityHint)
      Recursively expand elements into a graph and emit all the resulting element using a breadth-first traversal strategy.

      That is: emit the value from this Mono first, then expand it at a first level of recursion and emit all of the resulting values, then expand all of these at a second level and so on...

      For example, given the hierarchical structure

        A
         - AA
           - aa1
         - AB
           - ab1
         - a1
       
      Expands Mono.just(A) into
        A
        AA
        AB
        a1
        aa1
        ab1
       
      Parameters:
      expander - the Function applied at each level of recursion to expand values into a Publisher, producing a graph.
      capacityHint - a capacity hint to prepare the inner queues to accommodate n elements per level of recursion.
      Returns:
      this Mono expanded breadth-first to a Flux
    • expand

      public final Flux<T> expand(Function<? super T,? extends Publisher<? extends T>> expander)
      Recursively expand elements into a graph and emit all the resulting element using a breadth-first traversal strategy.

      That is: emit the value from this Mono first, then expand it at a first level of recursion and emit all of the resulting values, then expand all of these at a second level and so on...

      For example, given the hierarchical structure

        A
         - AA
           - aa1
         - AB
           - ab1
         - a1
       
      Expands Mono.just(A) into
        A
        AA
        AB
        a1
        aa1
        ab1
       
      Parameters:
      expander - the Function applied at each level of recursion to expand values into a Publisher, producing a graph.
      Returns:
      this Mono expanded breadth-first to a Flux
    • filter

      public final Mono<T> filter(Predicate<? super T> tester)
      If this Mono is valued, test the result and replay it if predicate returns true. Otherwise complete without value.

      Discard Support: This operator discards the element if it does not match the filter. It also discards upon cancellation or error triggered by a data signal.

      Parameters:
      tester - the predicate to evaluate
      Returns:
      a filtered Mono
    • filterWhen

      public final Mono<T> filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate)
      If this Mono is valued, test the value asynchronously using a generated Publisher<Boolean> test. The value from the Mono is replayed if the first item emitted by the test is true. It is dropped if the test is either empty or its first emitted value is false.

      Note that only the first value of the test publisher is considered, and unless it is a Mono, test will be cancelled after receiving that first value.

      Discard Support: This operator discards the element if it does not match the filter. It also discards upon cancellation or error triggered by a data signal.

      Parameters:
      asyncPredicate - the function generating a Publisher of Boolean to filter the Mono with
      Returns:
      a filtered Mono
    • flatMap

      public final <R> Mono<R> flatMap(Function<? super T,? extends Mono<? extends R>> transformer)
      Transform the item emitted by this Mono asynchronously, returning the value emitted by another Mono (possibly changing the value type).

      Type Parameters:
      R - the result type bound
      Parameters:
      transformer - the function to dynamically bind a new Mono
      Returns:
      a new Mono with an asynchronously mapped value.
    • flatMapMany

      public final <R> Flux<R> flatMapMany(Function<? super T,? extends Publisher<? extends R>> mapper)
      Transform the item emitted by this Mono into a Publisher, then forward its emissions into the returned Flux.

      Type Parameters:
      R - the merged sequence type
      Parameters:
      mapper - the Function to produce a sequence of R from the eventual passed Subscriber.onNext(T)
      Returns:
      a new Flux as the sequence is not guaranteed to be single at most
    • flatMapMany

      public final <R> Flux<R> flatMapMany(Function<? super T,? extends Publisher<? extends R>> mapperOnNext, Function<? super Throwable,? extends Publisher<? extends R>> mapperOnError, Supplier<? extends Publisher<? extends R>> mapperOnComplete)
      Transform the signals emitted by this Mono into signal-specific Publishers, then forward the applicable Publisher's emissions into the returned Flux.

      Type Parameters:
      R - the type of the produced inner sequence
      Parameters:
      mapperOnNext - the Function to call on next data and returning a sequence to merge
      mapperOnError - the Function to call on error signal and returning a sequence to merge
      mapperOnComplete - the Function to call on complete signal and returning a sequence to merge
      Returns:
      a new Flux as the sequence is not guaranteed to be single at most
      See Also:
    • flatMapIterable

      public final <R> Flux<R> flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
      Transform the item emitted by this Mono into Iterable, then forward its elements into the returned Flux. The Iterable.iterator() method will be called at least once and at most twice.

      This operator inspects each Iterable's Spliterator to assess if the iteration can be guaranteed to be finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)). Since the default Spliterator wraps the Iterator we can have two Iterable.iterator() calls per iterable. This second invocation is skipped on a Collection however, a type which is assumed to be always finite.

      Discard Support: Upon cancellation, this operator discards T elements it prefetched and, in some cases, attempts to discard remainder of the currently processed Iterable (if it can safely ensure the iterator is finite). Note that this means each Iterable's Iterable.iterator() method could be invoked twice.

      Type Parameters:
      R - the merged output sequence type
      Parameters:
      mapper - the Function to transform input item into a sequence Iterable
      Returns:
      a merged Flux
    • flux

      public final Flux<T> flux()
      Convert this Mono to a Flux
      Returns:
      a Flux variant of this Mono
    • hasElement

      public final Mono<Boolean> hasElement()
      Emit a single boolean true if this Mono has an element.

      Returns:
      a new Mono with true if a value is emitted and false otherwise
    • handle

      public final <R> Mono<R> handle(BiConsumer<? super T,SynchronousSink<R>> handler)
      Handle the items emitted by this Mono by calling a biconsumer with the output sink for each onNext. At most one SynchronousSink.next(Object) call must be performed and/or 0 or 1 SynchronousSink.error(Throwable) or SynchronousSink.complete().

      When the context-propagation library is available at runtime and the downstream ContextView is not empty, this operator implicitly uses the library to restore thread locals around the handler BiConsumer. Typically, this would be done in conjunction with the use of contextCapture() operator down the chain.

      Type Parameters:
      R - the transformed type
      Parameters:
      handler - the handling BiConsumer
      Returns:
      a transformed Mono
    • hide

      public final Mono<T> hide()
      Hides the identity of this Mono instance.

      The main purpose of this operator is to prevent certain identity-based optimizations from happening, mostly for diagnostic purposes.

      Returns:
      a new Mono preventing Publisher / Subscription based Reactor optimizations
    • ignoreElement

      public final Mono<T> ignoreElement()
      Ignores onNext signal (dropping it) and only propagates termination events.

      Discard Support: This operator discards the source element.

      Returns:
      a new empty Mono representing the completion of this Mono.
    • log

      public final Mono<T> log()
      Observe all Reactive Streams signals and trace them using Logger support. Default will use Level.INFO and java.util.logging. If SLF4J is available, it will be used instead.

      The default log category will be "reactor.Mono", followed by a suffix generated from the source operator, e.g. "reactor.Mono.Map".

      Returns:
      a new Mono that logs signals
      See Also:
    • log

      public final Mono<T> log(@Nullable String category)
      Observe all Reactive Streams signals and use Logger support to handle trace implementation. Default will use Level.INFO and java.util.logging. If SLF4J is available, it will be used instead.

      Parameters:
      category - to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will complete, e.g. "reactor.Flux.Map".
      Returns:
      a new Mono
    • log

      public final Mono<T> log(@Nullable String category, Level level, SignalType... options)
      Observe Reactive Streams signals matching the passed flags options and use Logger support to handle trace implementation. Default will use the passed Level and java.util.logging. If SLF4J is available, it will be used instead. Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
           mono.log("category", SignalType.ON_NEXT, SignalType.ON_ERROR)
       

      Parameters:
      category - to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will complete, e.g. "reactor.Flux.Map".
      level - the Level to enforce for this tracing Mono (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account)
      options - a vararg SignalType option to filter log messages
      Returns:
      a new Mono
    • log

      public final Mono<T> log(@Nullable String category, Level level, boolean showOperatorLine, SignalType... options)
      Observe Reactive Streams signals matching the passed filter options and use Logger support to handle trace implementation. Default will use the passed Level and java.util.logging. If SLF4J is available, it will be used instead. Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
           mono.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
       

      Parameters:
      category - to be mapped into logger configuration (e.g. org.springframework .reactor). If category ends with "." like "reactor.", a generated operator suffix will complete, e.g. "reactor.Mono.Map".
      level - the Level to enforce for this tracing Mono (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account)
      showOperatorLine - capture the current stack to display operator class/line number.
      options - a vararg SignalType option to filter log messages
      Returns:
      a new unaltered Mono
    • log

      public final Mono<T> log(Logger logger)
      Observe Reactive Streams signals matching the passed filter options and trace them using a specific user-provided Logger, at Level.INFO level.

      Parameters:
      logger - the Logger to use, instead of resolving one through a category.
      Returns:
      a new Mono that logs signals
    • log

      public final Mono<T> log(Logger logger, Level level, boolean showOperatorLine, SignalType... options)
      Observe Reactive Streams signals matching the passed filter options and trace them using a specific user-provided Logger, at the given Level.

      Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:

           flux.log(myCustomLogger, Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
       

      Parameters:
      logger - the Logger to use, instead of resolving one through a category.
      level - the Level to enforce for this tracing Flux (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account)
      showOperatorLine - capture the current stack to display operator class/line number.
      options - a vararg SignalType option to filter log messages
      Returns:
      a new Mono that logs signals
    • map

      public final <R> Mono<R> map(Function<? super T,? extends R> mapper)
      Transform the item emitted by this Mono by applying a synchronous function to it.

      Type Parameters:
      R - the transformed type
      Parameters:
      mapper - the synchronous transforming Function
      Returns:
      a new Mono
    • mapNotNull

      public final <R> Mono<R> mapNotNull(Function<? super T,? extends @Nullable R> mapper)
      Transform the item emitted by this Mono by applying a synchronous function to it, which is allowed to produce a null value. In that case, the resulting Mono completes immediately. This operator effectively behaves like map(Function) followed by filter(Predicate) although null is not a supported value, so it can't be filtered out.

      Type Parameters:
      R - the transformed type
      Parameters:
      mapper - the synchronous transforming Function
      Returns:
      a new Mono
    • materialize

      public final Mono<Signal<T>> materialize()
      Transform incoming onNext, onError and onComplete signals into Signal instances, materializing these signals. Since the error is materialized as a Signal, the propagation will be stopped and onComplete will be emitted. Complete signal will first emit a Signal.complete() and then effectively complete the flux. All these Signal have a Context associated to them.

      Returns:
      a Mono of materialized Signal
      See Also:
    • mergeWith

      public final Flux<T> mergeWith(Publisher<? extends T> other)
      Merge emissions of this Mono with the provided Publisher. The element from the Mono may be interleaved with the elements of the Publisher.

      Parameters:
      other - the Publisher to merge with
      Returns:
      a new Flux as the sequence is not guaranteed to be at most 1
    • metrics

      @Deprecated public final Mono<T> metrics()
      Deprecated.
      Prefer using the tap(SignalListenerFactory) with the SignalListenerFactory provided by the new reactor-core-micrometer module. To be removed in 3.6.0 at the earliest.
      Activate metrics for this sequence, provided there is an instrumentation facade on the classpath (otherwise this method is a pure no-op).

      Metrics are gathered on Subscriber events, and it is recommended to also name (and optionally tag) the sequence.

      The name serves as a prefix in the reported metrics names. In case no name has been provided, the default name "reactor" will be applied.

      The MeterRegistry used by reactor can be configured via Metrics.MicrometerConfiguration.useRegistry(MeterRegistry) prior to using this operator, the default being Metrics.globalRegistry.

      Returns:
      an instrumented Mono
      See Also:
    • name

      public final Mono<T> name(String name)
      Give a name to this sequence, which can be retrieved using Scannable.name() as long as this is the first reachable Scannable.parents().

      The name is typically visible at assembly time by the tap(SignalListenerFactory) operator, which could for example be configured with a metrics listener using the name as a prefix for meters' id.

      Parameters:
      name - a name for the sequence
      Returns:
      the same sequence, but bearing a name
      See Also:
    • or

      public final Mono<T> or(Mono<? extends T> other)
      Emit the first available signal from this mono or the other mono.

      Parameters:
      other - the racing other Mono to compete with for the signal
      Returns:
      a new Mono
      See Also:
    • ofType

      public final <U> Mono<U> ofType(Class<U> clazz)
      Evaluate the emitted value against the given Class type. If the value matches the type, it is passed into the new Mono. Otherwise the value is ignored.

      Parameters:
      clazz - the Class type to test values against
      Returns:
      a new Mono filtered on the requested type
    • onErrorComplete

      public final Mono<T> onErrorComplete()
      Simply complete the sequence by replacing an onError signal with an onComplete signal. All other signals are propagated as-is.

      Returns:
      a new Mono falling back on completion when an onError occurs
      See Also:
    • onErrorComplete

      public final Mono<T> onErrorComplete(Class<? extends Throwable> type)
      Simply complete the sequence by replacing an onError signal with an onComplete signal if the error matches the given Class. All other signals, including non-matching onError, are propagated as-is.

      Returns:
      a new Mono falling back on completion when a matching error occurs
      See Also:
    • onErrorComplete

      public final Mono<T> onErrorComplete(Predicate<? super Throwable> predicate)
      Simply complete the sequence by replacing an onError signal with an onComplete signal if the error matches the given Predicate. All other signals, including non-matching onError, are propagated as-is.

      Returns:
      a new Mono falling back on completion when a matching error occurs
      See Also:
    • onErrorContinue

      public final Mono<T> onErrorContinue(BiConsumer<Throwable,Object> errorConsumer)
      Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements. The recovered error and associated value are notified via the provided BiConsumer. Alternatively, throwing from that biconsumer will propagate the thrown exception downstream in place of the original error, which is added as a suppressed exception to the new one.

      This operator is offered on Mono mainly as a way to propagate the configuration to upstream Flux. The mode doesn't really make sense on a Mono, since we're sure there will be no further value to continue with. onErrorResume(Function) is a more classical fit.

      Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)

      In most cases, you should instead handle the error inside the specific function which may cause it. Specifically, on each inner publisher you can use doOnError to log the error, and onErrorResume(e -> Mono.empty()) to drop erroneous elements:

       .flatMap(id -> repository.retrieveById(id)
                                .doOnError(System.err::println)
                                .onErrorResume(e -> Mono.empty()))
       

      This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.

      Parameters:
      errorConsumer - a BiConsumer fed with errors matching the Class and the value that triggered the error.
      Returns:
      a Mono that attempts to continue processing on errors.
    • onErrorContinue

      public final <E extends Throwable> Mono<T> onErrorContinue(Class<E> type, BiConsumer<Throwable,Object> errorConsumer)
      Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements. Only errors matching the specified type are recovered from. The recovered error and associated value are notified via the provided BiConsumer. Alternatively, throwing from that biconsumer will propagate the thrown exception downstream in place of the original error, which is added as a suppressed exception to the new one.

      This operator is offered on Mono mainly as a way to propagate the configuration to upstream Flux. The mode doesn't really make sense on a Mono, since we're sure there will be no further value to continue with. onErrorResume(Function) is a more classical fit.

      Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)

      In most cases, you should instead handle the error inside the specific function which may cause it. Specifically, on each inner publisher you can use doOnError to log the error, and onErrorResume(e -> Mono.empty()) to drop erroneous elements:

       .flatMap(id -> repository.retrieveById(id)
                                .doOnError(MyException.class, System.err::println)
                                .onErrorResume(MyException.class, e -> Mono.empty()))
       

      This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.

      Parameters:
      type - the Class of Exception that are resumed from.
      errorConsumer - a BiConsumer fed with errors matching the Class and the value that triggered the error.
      Returns:
      a Mono that attempts to continue processing on some errors.
    • onErrorContinue

      public final <E extends Throwable> Mono<T> onErrorContinue(Predicate<E> errorPredicate, BiConsumer<Throwable,Object> errorConsumer)
      Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements. Only errors matching the Predicate are recovered from (note that this predicate can be applied several times and thus must be idempotent). The recovered error and associated value are notified via the provided BiConsumer. Alternatively, throwing from that biconsumer will propagate the thrown exception downstream in place of the original error, which is added as a suppressed exception to the new one.

      This operator is offered on Mono mainly as a way to propagate the configuration to upstream Flux. The mode doesn't really make sense on a Mono, since we're sure there will be no further value to continue with. onErrorResume(Function) is a more classical fit.

      Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)

      In most cases, you should instead handle the error inside the specific function which may cause it. Specifically, on each inner publisher you can use doOnError to log the error, and onErrorResume(e -> Mono.empty()) to drop erroneous elements:

       .flatMap(id -> repository.retrieveById(id)
                                .doOnError(errorPredicate, System.err::println)
                                .onErrorResume(errorPredicate, e -> Mono.empty()))
       

      This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.

      Parameters:
      errorPredicate - a Predicate used to filter which errors should be resumed from. This MUST be idempotent, as it can be used several times.
      errorConsumer - a BiConsumer fed with errors matching the predicate and the value that triggered the error.
      Returns:
      a Mono that attempts to continue processing on some errors.
    • onErrorStop

      public final Mono<T> onErrorStop()
      If an onErrorContinue(BiConsumer) variant has been used downstream, reverts to the default 'STOP' mode where errors are terminal events upstream. It can be used for easier scoping of the on next failure strategy or to override the inherited strategy in a sub-stream (for example in a flatMap). It has no effect if onErrorContinue(BiConsumer) has not been used downstream.
      Returns:
      a Mono that terminates on errors, even if onErrorContinue(BiConsumer) was used downstream
    • onErrorMap

      public final Mono<T> onErrorMap(Predicate<? super Throwable> predicate, Function<? super Throwable,? extends Throwable> mapper)
      Transform an error emitted by this Mono by synchronously applying a function to it if the error matches the given predicate. Otherwise let the error pass through.

      Parameters:
      predicate - the error predicate
      mapper - the error transforming Function
      Returns:
      a Mono that transforms some source errors to other errors
    • onErrorMap

      public final Mono<T> onErrorMap(Function<? super Throwable,? extends Throwable> mapper)
      Transform any error emitted by this Mono by synchronously applying a function to it.

      Parameters:
      mapper - the error transforming Function
      Returns:
      a Mono that transforms source errors to other errors
    • onErrorMap

      public final <E extends Throwable> Mono<T> onErrorMap(Class<E> type, Function<? super E,? extends Throwable> mapper)
      Transform an error emitted by this Mono by synchronously applying a function to it if the error matches the given type. Otherwise let the error pass through.

      Type Parameters:
      E - the error type
      Parameters:
      type - the class of the exception type to react to
      mapper - the error transforming Function
      Returns:
      a Mono that transforms some source errors to other errors
    • onErrorResume

      public final Mono<T> onErrorResume(Function<? super Throwable,? extends Mono<? extends T>> fallback)
      Subscribe to a fallback publisher when any error occurs, using a function to choose the fallback depending on the error.

      Parameters:
      fallback - the function to choose the fallback to an alternative Mono
      Returns:
      a Mono falling back upon source onError
      See Also:
    • onErrorResume

      public final <E extends Throwable> Mono<T> onErrorResume(Class<E> type, Function<? super E,? extends Mono<? extends T>> fallback)
      Subscribe to a fallback publisher when an error matching the given type occurs, using a function to choose the fallback depending on the error.

      Type Parameters:
      E - the error type
      Parameters:
      type - the error type to match
      fallback - the function to choose the fallback to an alternative Mono
      Returns:
      a Mono falling back upon source onError
      See Also:
    • onErrorResume

      public final Mono<T> onErrorResume(Predicate<? super Throwable> predicate, Function<? super Throwable,? extends Mono<? extends T>> fallback)
      Subscribe to a fallback publisher when an error matching a given predicate occurs.

      Parameters:
      predicate - the error predicate to match
      fallback - the function to choose the fallback to an alternative Mono
      Returns:
      a Mono falling back upon source onError
      See Also:
    • onErrorReturn

      public final Mono<T> onErrorReturn(T fallbackValue)
      Simply emit a captured fallback value when any error is observed on this Mono.

      Parameters:
      fallbackValue - the value to emit if an error occurs
      Returns:
      a new falling back Mono
      See Also:
    • onErrorReturn

      public final <E extends Throwable> Mono<T> onErrorReturn(Class<E> type, T fallbackValue)
      Simply emit a captured fallback value when an error of the specified type is observed on this Mono.

      Type Parameters:
      E - the error type
      Parameters:
      type - the error type to match
      fallbackValue - the value to emit if an error occurs that matches the type
      Returns:
      a new falling back Mono
      See Also:
    • onErrorReturn

      public final Mono<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)
      Simply emit a captured fallback value when an error matching the given predicate is observed on this Mono.

      Parameters:
      predicate - the error predicate to match
      fallbackValue - the value to emit if an error occurs that matches the predicate
      Returns:
      a new Mono
      See Also:
    • onTerminateDetach

      public final Mono<T> onTerminateDetach()
      Detaches both the child Subscriber and the Subscription on termination or cancellation.

      This should help with odd retention scenarios when running with non-reactor Subscriber.

      Returns:
      a detachable Mono
    • publish

      public final <R> Mono<R> publish(Function<? super Mono<T>,? extends Mono<? extends R>> transform)
      Share a Mono for the duration of a function that may transform it and consume it as many times as necessary without causing multiple subscriptions to the upstream.
      Type Parameters:
      R - the output value type
      Parameters:
      transform - the transformation function
      Returns:
      a new Mono
    • publishOn

      public final Mono<T> publishOn(Scheduler scheduler)
      Run onNext, onComplete and onError on a supplied Scheduler Worker.

      This operator influences the threading context where the rest of the operators in the chain below it will execute, up to a new occurrence of publishOn.

      Typically used for fast publisher, slow consumer(s) scenarios.

       mono.publishOn(Schedulers.single()).subscribe() 
       
      Parameters:
      scheduler - a Scheduler providing the Scheduler.Worker where to publish
      Returns:
      an asynchronously producing Mono
    • repeat

      public final Flux<T> repeat()
      Repeatedly and indefinitely subscribe to the source upon completion of the previous subscription.

      Returns:
      an indefinitely repeated Flux on onComplete
    • repeat

      public final Flux<T> repeat(BooleanSupplier predicate)
      Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.

      Parameters:
      predicate - the boolean to evaluate on onComplete.
      Returns:
      a Flux that repeats on onComplete while the predicate matches
    • repeat

      public final Flux<T> repeat(long numRepeat)
      Repeatedly subscribe to the source numRepeat times. This results in numRepeat + 1 total subscriptions to the original source. As a consequence, using 0 plays the original sequence once.

      Parameters:
      numRepeat - the number of times to re-subscribe on onComplete (positive, or 0 for original sequence only)
      Returns:
      a Flux that repeats on onComplete, up to the specified number of repetitions
    • repeat

      public final Flux<T> repeat(long numRepeat, BooleanSupplier predicate)
      Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription. A specified maximum of repeat will limit the number of re-subscribe.

      Parameters:
      numRepeat - the number of times to re-subscribe on complete (positive, or 0 for original sequence only)
      predicate - the boolean to evaluate on onComplete
      Returns:
      a Flux that repeats on onComplete while the predicate matches, up to the specified number of repetitions
    • repeatWhen

      public final Flux<T> repeatWhen(Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
      Repeatedly subscribe to this Mono when a companion sequence emits elements in response to the flux completion signal. Any terminal signal from the companion sequence will terminate the resulting Flux with the same signal immediately.

      If the companion sequence signals when this Mono is active, the repeat attempt is suppressed.

      Note that if the companion Publisher created by the repeatFactory emits Context as trigger objects, the content of these Context will be added to the operator's own Context.

      Parameters:
      repeatFactory - the Function that returns the associated Publisher companion, given a Flux that signals each onComplete as a Long representing the number of source elements emitted in the latest attempt (0 or 1).
      Returns:
      a Flux that repeats on onComplete when the companion Publisher produces an onNext signal
    • repeatWhenEmpty

      public final Mono<T> repeatWhenEmpty(Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
      Repeatedly subscribe to this Mono as long as the current subscription to this Mono completes empty and the companion Publisher produces an onNext signal.

      Any terminal signal will terminate the resulting Mono with the same signal immediately.

      Parameters:
      repeatFactory - the Function that returns the associated Publisher companion, given a Flux that signals each onComplete as a 0-based incrementing Long.
      Returns:
      a Mono that resubscribes to this Mono if the previous subscription was empty, as long as the companion Publisher produces an onNext signal
    • repeatWhenEmpty

      public final Mono<T> repeatWhenEmpty(int maxRepeat, Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
      Repeatedly subscribe to this Mono as long as the current subscription to this Mono completes empty and the companion Publisher produces an onNext signal.

      Any terminal signal will terminate the resulting Mono with the same signal immediately.

      Emits an IllegalStateException if maxRepeat is exceeded (provided it is different from Integer.MAX_VALUE).

      Parameters:
      maxRepeat - the maximum number of repeats (infinite if Integer.MAX_VALUE)
      repeatFactory - the Function that returns the associated Publisher companion, given a Flux that signals each onComplete as a 0-based incrementing Long.
      Returns:
      a Mono that resubscribes to this Mono if the previous subscription was empty, as long as the companion Publisher produces an onNext signal and the maximum number of repeats isn't exceeded.
    • retry

      public final Mono<T> retry()
      Re-subscribes to this Mono sequence if it signals any error, indefinitely.

      Returns:
      a Mono that retries on onError
    • retry

      public final Mono<T> retry(long numRetries)
      Re-subscribes to this Mono sequence if it signals any error, for a fixed number of times.

      Note that passing Long.MAX_VALUE is treated as infinite retry.

      Parameters:
      numRetries - the number of times to tolerate an error
      Returns:
      a Mono that retries on onError up to the specified number of retry attempts.
    • retryWhen

      public final Mono<T> retryWhen(Retry retrySpec)
      Retries this Mono in response to signals emitted by a companion Publisher. The companion is generated by the provided Retry instance, see Retry.max(long), Retry.maxInARow(long) and Retry.backoff(long, Duration) for readily available strategy builders.

      The operator generates a base for the companion, a Flux of Retry.RetrySignal which each give metadata about each retryable failure whenever this Mono signals an error. The final companion should be derived from that base companion and emit data in response to incoming onNext (although it can emit less elements, or delay the emissions).

      Terminal signals in the companion terminate the sequence with the same signal, so emitting an Subscriber.onError(Throwable) will fail the resulting Mono with that same error.

      Note that the Retry.RetrySignal state can be transient and change between each source onError or onNext. If processed with a delay, this could lead to the represented state being out of sync with the state at which the retry was evaluated. Map it to Retry.RetrySignal.copy() right away to mediate this.

      Note that if the companion Publisher created by the whenFactory emits Context as trigger objects, these Context will be merged with the previous Context:

       
       Retry customStrategy = Retry.from(companion -> companion.handle((retrySignal, sink) -> {
       	    Context ctx = sink.currentContext();
       	    int rl = ctx.getOrDefault("retriesLeft", 0);
       	    if (rl > 0) {
      		    sink.next(Context.of(
      		        "retriesLeft", rl - 1,
      		        "lastError", retrySignal.failure()
      		    ));
       	    } else {
       	        sink.error(Exceptions.retryExhausted("retries exhausted", retrySignal.failure()));
       	    }
       }));
       Mono<T> retried = originalMono.retryWhen(customStrategy);
       
      Parameters:
      retrySpec - the Retry strategy that will generate the companion Publisher, given a Flux that signals each onError as a Retry.RetrySignal.
      Returns:
      a Mono that retries on onError when a companion Publisher produces an onNext signal
      See Also:
    • share

      public final Mono<T> share()
      Prepare a Mono which shares this Mono result similar to Flux.shareNext(). This will effectively turn this Mono into a hot task when the first Subscriber subscribes using subscribe() API. Further Subscriber will share the same Subscription and therefore the same result. When all subscribers have cancelled it will cancel the source Mono.

      Returns:
      a new Mono
    • single

      public final Mono<T> single()
      Expect exactly one item from this Mono source or signal NoSuchElementException for an empty source.

      Note Mono doesn't need Flux.single(Object), since it is equivalent to defaultIfEmpty(Object) in a Mono.

      Returns:
      a Mono with the single item or an error signal
    • singleOptional

      public final Mono<Optional<T>> singleOptional()
      Wrap the item produced by this Mono source into an Optional or emit an empty Optional for an empty source.

      Returns:
      a Mono with an Optional containing the item, an empty optional or an error signal
    • subscribe

      public final Disposable subscribe()
      Subscribe to this Mono and request unbounded demand.

      This version doesn't specify any consumption behavior for the events from the chain, especially no error handling, so other variants should usually be preferred.

      Returns:
      a new Disposable that can be used to cancel the underlying Subscription
    • subscribe

      public final Disposable subscribe(Consumer<? super T> consumer)
      Subscribe a Consumer to this Mono that will consume all the sequence. It will request an unbounded demand (Long.MAX_VALUE).

      For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer).

      Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.

      Parameters:
      consumer - the consumer to invoke on each value (onNext signal)
      Returns:
      a new Disposable that can be used to cancel the underlying Subscription
    • subscribe

      public final Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer)
      Subscribe to this Mono with a Consumer that will consume all the elements in the sequence, as well as a Consumer that will handle errors. The subscription will request an unbounded demand (Long.MAX_VALUE).

      For a passive version that observe and forward incoming data see doOnSuccess(Consumer) and doOnError(java.util.function.Consumer).

      Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.

      Parameters:
      consumer - the consumer to invoke on each next signal
      errorConsumer - the consumer to invoke on error signal
      Returns:
      a new Disposable that can be used to cancel the underlying Subscription
    • subscribe

      public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer)
      Subscribe Consumer to this Mono that will respectively consume all the elements in the sequence, handle errors and react to completion. The subscription will request unbounded demand (Long.MAX_VALUE).

      For a passive version that observe and forward incoming data see doOnSuccess(Consumer) and doOnError(java.util.function.Consumer).

      Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.

      Parameters:
      consumer - the consumer to invoke on each value
      errorConsumer - the consumer to invoke on error signal
      completeConsumer - the consumer to invoke on complete signal
      Returns:
      a new Disposable that can be used to cancel the underlying Subscription
    • subscribe

      public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer)
      Subscribe Consumer to this Mono that will respectively consume all the elements in the sequence, handle errors, react to completion, and request upon subscription. It will let the provided subscriptionConsumer request the adequate amount of data, or request unbounded demand Long.MAX_VALUE if no such consumer is provided.

      For a passive version that observe and forward incoming data see doOnSuccess(Consumer) and doOnError(java.util.function.Consumer).

      Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.

      Parameters:
      consumer - the consumer to invoke on each value
      errorConsumer - the consumer to invoke on error signal
      completeConsumer - the consumer to invoke on complete signal
      subscriptionConsumer - the consumer to invoke on subscribe signal, to be used for the initial request, or null for max request
      Returns:
      a new Disposable that can be used to cancel the underlying Subscription
    • subscribe

      public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Context initialContext)
      Subscribe Consumer to this Mono that will respectively consume all the elements in the sequence, handle errors and react to completion. Additionally, a Context is tied to the subscription. At subscription, an unbounded request is implicitly made.

      For a passive version that observe and forward incoming data see doOnSuccess(Consumer) and doOnError(java.util.function.Consumer).

      Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.

      Parameters:
      consumer - the consumer to invoke on each value
      errorConsumer - the consumer to invoke on error signal
      completeConsumer - the consumer to invoke on complete signal
      initialContext - the Context for the subscription
      Returns:
      a new Disposable that can be used to cancel the underlying Subscription
    • subscribe

      public final void subscribe(Subscriber<? super T> actual)
      Specified by:
      subscribe in interface Publisher<T>
    • subscribe

      public abstract void subscribe(CoreSubscriber<? super T> actual)
      An internal Publisher.subscribe(Subscriber) that will bypass Hooks.onLastOperator(Function) pointcut.

      In addition to behave as expected by Publisher.subscribe(Subscriber) in a controlled manner, it supports direct subscribe-time Context passing.

      Specified by:
      subscribe in interface CorePublisher<T>
      Parameters:
      actual - the Subscriber interested into the published sequence
      See Also:
    • subscribeOn

      public final Mono<T> subscribeOn(Scheduler scheduler)
      Run subscribe, onSubscribe and request on a specified Scheduler's Scheduler.Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of a publishOn.

       mono.subscribeOn(Schedulers.parallel()).subscribe() 
       
      Parameters:
      scheduler - a Scheduler providing the Scheduler.Worker where to subscribe
      Returns:
      a Mono requesting asynchronously
      See Also:
    • subscribeWith

      public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber)
      Subscribe the given Subscriber to this Mono and return said Subscriber, allowing subclasses with a richer API to be used fluently.
      Type Parameters:
      E - the reified type of the Subscriber for chaining
      Parameters:
      subscriber - the Subscriber to subscribe with
      Returns:
      the passed Subscriber after subscribing it to this Mono
    • switchIfEmpty

      public final Mono<T> switchIfEmpty(Mono<? extends T> alternate)
      Fallback to an alternative Mono if this mono is completed without data

      Parameters:
      alternate - the alternate mono if this mono is empty
      Returns:
      a Mono falling back upon source completing without elements
      See Also:
    • tag

      public final Mono<T> tag(String key, String value)
      Tag this mono with a key/value pair. These can be retrieved as a Set of all tags throughout the publisher chain by using Scannable.tags() (as traversed by Scannable.parents()).

      The name is typically visible at assembly time by the tap(SignalListenerFactory) operator, which could for example be configured with a metrics listener applying the tag(s) to its meters.

      Parameters:
      key - a tag key
      value - a tag value
      Returns:
      the same sequence, but bearing tags
      See Also:
    • take

      public final Mono<T> take(Duration duration)
      Give this Mono a chance to resolve within a specified time frame but complete if it doesn't. This works a bit like timeout(Duration) except that the resulting Mono completes rather than errors when the timer expires.

      The timeframe is evaluated using the parallel Scheduler.

      Parameters:
      duration - the maximum duration to wait for the source Mono to resolve.
      Returns:
      a new Mono that will propagate the signals from the source unless no signal is received for duration, in which case it completes.
    • take

      public final Mono<T> take(Duration duration, Scheduler timer)
      Give this Mono a chance to resolve within a specified time frame but complete if it doesn't. This works a bit like timeout(Duration) except that the resulting Mono completes rather than errors when the timer expires.

      The timeframe is evaluated using the provided Scheduler.

      Parameters:
      duration - the maximum duration to wait for the source Mono to resolve.
      timer - the Scheduler on which to measure the duration.
      Returns:
      a new Mono that will propagate the signals from the source unless no signal is received for duration, in which case it completes.
    • takeUntilOther

      public final Mono<T> takeUntilOther(Publisher<?> other)
      Give this Mono a chance to resolve before a companion Publisher emits. If the companion emits before any signal from the source, the resulting Mono will complete. Otherwise, it will relay signals from the source.

      Parameters:
      other - a companion Publisher that shortcircuits the source with an onComplete signal if it emits before the source emits.
      Returns:
      a new Mono that will propagate the signals from the source unless a signal is first received from the companion Publisher, in which case it completes.
    • tap

      public final Mono<T> tap(Supplier<SignalListener<T>> simpleListenerGenerator)
      Tap into Reactive Streams signals emitted or received by this Mono and notify a stateful per-Subscriber SignalListener.

      Any exception thrown by the SignalListener methods causes the subscription to be cancelled and the subscriber to be terminated with an onError signal of that exception. Note that SignalListener.doFinally(SignalType), SignalListener.doAfterComplete() and SignalListener.doAfterError(Throwable) instead just drop the exception.

      This simplified variant assumes the state is purely initialized within the Supplier, as it is called for each incoming Subscriber without additional context.

      When the context-propagation library is available at runtime and the downstream ContextView is not empty, this operator implicitly uses the library to restore thread locals around all invocations of SignalListener methods. Typically, this would be done in conjunction with the use of contextCapture() operator down the chain.

      Parameters:
      simpleListenerGenerator - the Supplier to create a new SignalListener on each subscription
      Returns:
      a new Mono with side effects defined by generated SignalListener
      See Also:
    • tap

      public final Mono<T> tap(Function<ContextView,SignalListener<T>> listenerGenerator)
      Tap into Reactive Streams signals emitted or received by this Mono and notify a stateful per-Subscriber SignalListener.

      Any exception thrown by the SignalListener methods causes the subscription to be cancelled and the subscriber to be terminated with an onError signal of that exception. Note that SignalListener.doFinally(SignalType), SignalListener.doAfterComplete() and SignalListener.doAfterError(Throwable) instead just drop the exception.

      This simplified variant allows the SignalListener to be constructed for each subscription with access to the incoming Subscriber's ContextView.

      When the context-propagation library is available at runtime and the ContextView is not empty, this operator implicitly uses the library to restore thread locals around all invocations of SignalListener methods. Typically, this would be done in conjunction with the use of contextCapture() operator down the chain.

      Parameters:
      listenerGenerator - the Function to create a new SignalListener on each subscription
      Returns:
      a new Mono with side effects defined by generated SignalListener
      See Also:
    • tap

      public final Mono<T> tap(SignalListenerFactory<T,?> listenerFactory)
      Tap into Reactive Streams signals emitted or received by this Mono and notify a stateful per-Subscriber SignalListener created by the provided SignalListenerFactory.

      The factory will initialize a state object for each Flux or Mono instance it is used with, and that state will be cached and exposed for each incoming Subscriber in order to generate the associated listener.

      Any exception thrown by the SignalListener methods causes the subscription to be cancelled and the subscriber to be terminated with an onError signal of that exception. Note that SignalListener.doFinally(SignalType), SignalListener.doAfterComplete() and SignalListener.doAfterError(Throwable) instead just drop the exception.

      When the context-propagation library is available at runtime and the downstream ContextView is not empty, this operator implicitly uses the library to restore thread locals around all invocations of SignalListener methods. Typically, this would be done in conjunction with the use of contextCapture() operator down the chain.

      Parameters:
      listenerFactory - the SignalListenerFactory to create a new SignalListener on each subscription
      Returns:
      a new Mono with side effects defined by generated SignalListener
      See Also:
    • then

      public final Mono<Void> then()
      Return a Mono<Void> which only replays complete and error signals from this Mono.

      Discard Support: This operator discards the element from the source.

      Returns:
      a Mono ignoring its payload (actively dropping)
    • then

      public final <V> Mono<V> then(Mono<V> other)
      Let this Mono complete then play another Mono.

      In other words ignore element from this Mono and transform its completion signal into the emission and completion signal of a provided Mono<V>. Error signal is replayed in the resulting Mono<V>.

      Discard Support: This operator discards the element from the source.

      Type Parameters:
      V - the element type of the supplied Mono
      Parameters:
      other - a Mono to emit from after termination
      Returns:
      a new Mono that emits from the supplied Mono
    • thenReturn

      public final <V> Mono<V> thenReturn(V value)
      Let this Mono complete successfully, then emit the provided value. On an error in the original Mono, the error signal is propagated instead.

      Discard Support: This operator discards the element from the source.

      Type Parameters:
      V - the element type of the supplied value
      Parameters:
      value - a value to emit after successful termination
      Returns:
      a new Mono that emits the supplied value
    • thenEmpty

      public final Mono<Void> thenEmpty(Publisher<Void> other)
      Return a Mono<Void> that waits for this Mono to complete then for a supplied Publisher<Void> to also complete. The second completion signal is replayed, or any error signal that occurs instead.

      Discard Support: This operator discards the element from the source.

      Parameters:
      other - a Publisher to wait for after this Mono's termination
      Returns:
      a new Mono completing when both publishers have completed in sequence
    • thenMany

      public final <V> Flux<V> thenMany(Publisher<V> other)
      Let this Mono complete successfully then play another Publisher. On an error in the original Mono, the error signal is propagated instead.

      In other words ignore the element from this mono and transform the completion signal into a Flux<V> that will emit elements from the provided Publisher.

      Discard Support: This operator discards the element from the source.

      Type Parameters:
      V - the element type of the supplied Publisher
      Parameters:
      other - a Publisher to emit from after termination
      Returns:
      a new Flux that emits from the supplied Publisher after this Mono completes.
    • timed

      public final Mono<Timed<T>> timed()
      Times this Mono Subscriber.onNext(Object) event, encapsulated into a Timed object that lets downstream consumer look at various time information gathered with nanosecond resolution using the default clock (Schedulers.parallel()):

      The Timed object instances are safe to store and use later, as they are created as an immutable wrapper around the <T> value and immediately passed downstream.

      Returns:
      a timed Mono
      See Also:
    • timed

      public final Mono<Timed<T>> timed(Scheduler clock)
      Times this Mono Subscriber.onNext(Object) event, encapsulated into a Timed object that lets downstream consumer look at various time information gathered with nanosecond resolution using the provided Scheduler as a clock:

      The Timed object instances are safe to store and use later, as they are created as an immutable wrapper around the <T> value and immediately passed downstream.

      Returns:
      a timed Mono
      See Also:
    • timeout

      public final Mono<T> timeout(Duration timeout)
      Propagate a TimeoutException in case no item arrives within the given Duration.

      Parameters:
      timeout - the timeout before the onNext signal from this Mono
      Returns:
      a Mono that can time out
    • timeout

      public final Mono<T> timeout(Duration timeout, Mono<? extends T> fallback)
      Switch to a fallback Mono in case no item arrives within the given Duration.

      If the fallback Mono is null, signal a TimeoutException instead.

      Parameters:
      timeout - the timeout before the onNext signal from this Mono
      fallback - the fallback Mono to subscribe to when a timeout occurs
      Returns:
      a Mono that will fallback to a different Mono in case of timeout
    • timeout

      public final Mono<T> timeout(Duration timeout, Scheduler timer)
      Signal a TimeoutException error in case an item doesn't arrive before the given period, as measured on the provided Scheduler.

      Parameters:
      timeout - the timeout before the onNext signal from this Mono
      timer - a time-capable Scheduler instance to run the delay on
      Returns:
      an expirable Mono
    • timeout

      public final Mono<T> timeout(Duration timeout, @Nullable Mono<? extends T> fallback, Scheduler timer)
      Switch to a fallback Mono in case an item doesn't arrive before the given period, as measured on the provided Scheduler.

      If the given Mono is null, signal a TimeoutException.

      Parameters:
      timeout - the timeout before the onNext signal from this Mono
      fallback - the fallback Mono to subscribe when a timeout occurs
      timer - a time-capable Scheduler instance to run on
      Returns:
      an expirable Mono with a fallback Mono
    • timeout

      public final <U> Mono<T> timeout(Publisher<U> firstTimeout)
      Signal a TimeoutException in case the item from this Mono has not been emitted before the given Publisher emits.

      Type Parameters:
      U - the element type of the timeout Publisher
      Parameters:
      firstTimeout - the timeout Publisher that must not emit before the first signal from this Mono
      Returns:
      an expirable Mono if the item does not come before a Publisher signals
    • timeout

      public final <U> Mono<T> timeout(Publisher<U> firstTimeout, Mono<? extends T> fallback)
      Switch to a fallback Publisher in case the item from this Mono has not been emitted before the given Publisher emits.

      Type Parameters:
      U - the element type of the timeout Publisher
      Parameters:
      firstTimeout - the timeout Publisher that must not emit before the first signal from this Mono
      fallback - the fallback Publisher to subscribe when a timeout occurs
      Returns:
      an expirable Mono with a fallback Mono if the item doesn't come before a Publisher signals
    • timestamp

      public final Mono<Tuple2<Long,T>> timestamp()
      If this Mono is valued, emit a Tuple2 pair of T1 the current clock time in millis (as a Long measured by the parallel Scheduler) and T2 the emitted data (as a T).

      Returns:
      a timestamped Mono
      See Also:
    • timestamp

      public final Mono<Tuple2<Long,T>> timestamp(Scheduler scheduler)
      If this Mono is valued, emit a Tuple2 pair of T1 the current clock time in millis (as a Long measured by the provided Scheduler) and T2 the emitted data (as a T).

      The provider Scheduler will be asked to provide time with a granularity of TimeUnit.MILLISECONDS. In order for this operator to work as advertised, the provided Scheduler should thus return results that can be interpreted as unix timestamps.

      Parameters:
      scheduler - a Scheduler instance to read time from
      Returns:
      a timestamped Mono
      See Also:
    • toFuture

      public final CompletableFuture<@Nullable T> toFuture()
      Transform this Mono into a CompletableFuture completing on onNext or onComplete and failing on onError.

      Returns:
      a CompletableFuture
    • transform

      public final <V> Mono<V> transform(Function<? super Mono<T>,? extends Publisher<V>> transformer)
      Transform this Mono in order to generate a target Mono. Unlike transformDeferred(Function), the provided function is executed as part of assembly.
       Function<Mono, Mono> applySchedulers = mono -> mono.subscribeOn(Schedulers.io())
                                                          .publishOn(Schedulers.parallel());
       mono.transform(applySchedulers).map(v -> v * v).subscribe();
       

      Type Parameters:
      V - the item type in the returned Mono
      Parameters:
      transformer - the Function to immediately map this Mono into a target Mono instance.
      Returns:
      a new Mono
      See Also:
    • transformDeferred

      public final <V> Mono<V> transformDeferred(Function<? super Mono<T>,? extends Publisher<V>> transformer)
      Defer the given transformation to this Mono in order to generate a target Mono type. A transformation will occur for each Subscriber. For instance:
       mono.transformDeferred(original -> original.log());
       

      Type Parameters:
      V - the item type in the returned Publisher
      Parameters:
      transformer - the Function to lazily map this Mono into a target Mono instance upon subscription.
      Returns:
      a new Mono
      See Also:
    • transformDeferredContextual

      public final <V> Mono<V> transformDeferredContextual(BiFunction<? super Mono<T>,? super ContextView,? extends Publisher<V>> transformer)
      Defer the given transformation to this Mono in order to generate a target Mono type. A transformation will occur for each Subscriber. In addition, the transforming BiFunction exposes the ContextView of each Subscriber. For instance:
       Mono<T> monoLogged = mono.transformDeferredContextual((original, ctx) -> original.log("for RequestID" + ctx.get("RequestID"))
       //...later subscribe. Each subscriber has its Context with a RequestID entry
       monoLogged.contextWrite(Context.of("RequestID", "requestA").subscribe();
       monoLogged.contextWrite(Context.of("RequestID", "requestB").subscribe();
       

      Type Parameters:
      V - the item type in the returned Publisher
      Parameters:
      transformer - the BiFunction to lazily map this Mono into a target Mono instance upon subscription, with access to ContextView
      Returns:
      a new Mono
      See Also:
    • zipWhen

      public final <T2> Mono<Tuple2<T,T2>> zipWhen(Function<T,Mono<? extends T2>> rightGenerator)
      Wait for the result from this mono, use it to create a second mono via the provided rightGenerator function and combine both results into a Tuple2.

      Type Parameters:
      T2 - the element type of the other Mono instance
      Parameters:
      rightGenerator - the Function to generate a Mono to combine with
      Returns:
      a new combined Mono
    • zipWhen

      public final <T2, O> Mono<O> zipWhen(Function<T,Mono<? extends T2>> rightGenerator, BiFunction<T,T2,O> combinator)
      Wait for the result from this mono, use it to create a second mono via the provided rightGenerator function and combine both results into an arbitrary O object, as defined by the provided combinator function.

      Type Parameters:
      T2 - the element type of the other Mono instance
      O - the element type of the combination
      Parameters:
      rightGenerator - the Function to generate a Mono to combine with
      combinator - a BiFunction combinator function when both sources complete
      Returns:
      a new combined Mono
    • zipWith

      public final <T2> Mono<Tuple2<T,T2>> zipWith(Mono<? extends T2> other)
      Combine the result from this mono and another into a Tuple2.

      An error or empty completion of any source will cause the other source to be cancelled and the resulting Mono to immediately error or complete, respectively.

      Type Parameters:
      T2 - the element type of the other Mono instance
      Parameters:
      other - the Mono to combine with
      Returns:
      a new combined Mono
    • zipWith

      public final <T2, O> Mono<O> zipWith(Mono<? extends T2> other, BiFunction<? super T,? super T2,? extends O> combinator)
      Combine the result from this mono and another into an arbitrary O object, as defined by the provided combinator function.

      An error or empty completion of any source will cause the other source to be cancelled and the resulting Mono to immediately error or complete, respectively.

      Type Parameters:
      T2 - the element type of the other Mono instance
      O - the element type of the combination
      Parameters:
      other - the Mono to combine with
      combinator - a BiFunction combinator function when both sources complete
      Returns:
      a new combined Mono
    • onAssembly

      protected static <T> Mono<T> onAssembly(Mono<T> source)
      To be used by custom operators: invokes assembly Hooks pointcut given a Mono, potentially returning a new Mono. This is for example useful to activate cross-cutting concerns at assembly time, eg. a generalized checkpoint().
      Type Parameters:
      T - the value type
      Parameters:
      source - the source to apply assembly hooks onto
      Returns:
      the source, potentially wrapped with assembly time cross-cutting behavior
    • toString

      public String toString()
      Overrides:
      toString in class Object