T
- the type of the single value of this classpublic abstract class Mono<T> extends Object implements Publisher<T>
Publisher
with basic rx operators that completes successfully by emitting an element, or
with an error.
The rx operators will offer aliases for input Mono
type to preserve the "at most one"
property of the resulting Mono
. For instance flatMap
returns a
Mono
, while there is a flatMapMany
alias with possibly more than 1 emission.
Mono<Void>
should be used for Publisher
that just completes without any value.
It is intended to be used in implementations and return types, input parameters should keep using raw Publisher
as much as possible.
Note that using state in the java.util.function
/ lambdas used within Mono operators
should be avoided, as these may be shared between several Subscribers
.
Flux
Constructor and Description |
---|
Mono() |
Modifier and Type | Method and Description |
---|---|
Mono<Void> |
and(Publisher<?> other)
Join the termination signals from this mono and another source into the returned
void mono
|
<P> P |
as(Function<? super Mono<T>,P> transformer)
Transform this
Mono into a target type. |
T |
block()
Subscribe to this
Mono and block indefinitely until a next signal is
received. |
T |
block(Duration timeout)
Subscribe to this
Mono and block until a next signal is
received or a timeout expires. |
Optional<T> |
blockOptional()
Subscribe to this
Mono and block indefinitely until a next signal is
received or the Mono completes empty. |
Optional<T> |
blockOptional(Duration timeout)
Subscribe to this
Mono and block until a next signal is
received, the Mono completes empty or a timeout expires. |
Mono<T> |
cache()
Turn this
Mono into a hot source and cache last emitted signals for further Subscriber . |
Mono<T> |
cache(Duration ttl)
Turn this
Mono into a hot source and cache last emitted signals for further
Subscriber , with an expiry timeout. |
Mono<T> |
cancelOn(Scheduler scheduler)
|
<E> Mono<E> |
cast(Class<E> clazz)
Cast the current
Mono produced type into a target produced type. |
Mono<T> |
checkpoint()
Activate assembly tracing for this particular
Mono , in case of an error
upstream of the checkpoint. |
Mono<T> |
checkpoint(String description)
Activate assembly marker for this particular
Mono by giving it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. |
Mono<T> |
checkpoint(String description,
boolean forceStackTrace)
Activate assembly tracing or the lighter assembly marking depending on the
forceStackTrace option. |
<V> Mono<V> |
compose(Function<? super Mono<T>,? extends Publisher<V>> transformer)
|
Flux<T> |
concatWith(Publisher<? extends T> other)
|
static <T> Mono<T> |
create(Consumer<MonoSink<T>> callback)
Creates a deferred emitter that can be used with callback-based
APIs to signal at most one value, a complete or an error signal.
|
Mono<T> |
defaultIfEmpty(T defaultV)
Provide a default single value if this mono is completed without any data
|
static <T> Mono<T> |
defer(Supplier<? extends Mono<? extends T>> supplier)
Create a
Mono provider that will supply a target Mono to subscribe to for
each Subscriber downstream. |
static Mono<Long> |
delay(Duration duration)
Create a Mono which delays an onNext signal by a given
duration
on a default Scheduler and completes. |
static Mono<Long> |
delay(Duration duration,
Scheduler timer)
|
Mono<T> |
delayElement(Duration delay)
Delay this
Mono element (Subscriber.onNext(T) signal) by a given
duration. |
Mono<T> |
delayElement(Duration delay,
Scheduler timer)
Delay this
Mono element (Subscriber.onNext(T) signal) by a given
Duration , on a particular Scheduler . |
Mono<T> |
delaySubscription(Duration delay)
Delay the
subscription to this Mono source until the given
period elapses. |
Mono<T> |
delaySubscription(Duration delay,
Scheduler timer)
|
<U> Mono<T> |
delaySubscription(Publisher<U> subscriptionDelay)
|
Mono<T> |
delayUntil(Function<? super T,? extends Publisher<?>> triggerProvider)
|
<X> Mono<X> |
dematerialize()
An operator working only if this
Mono emits onNext, onError or onComplete Signal
instances, transforming these materialized signals into
real signals on the Subscriber . |
Mono<T> |
doAfterSuccessOrError(BiConsumer<? super T,Throwable> afterSuccessOrError)
Add behavior triggered after the
Mono terminates, either by completing downstream successfully or with an error. |
Mono<T> |
doAfterTerminate(Runnable afterTerminate)
Add behavior (side-effect) triggered after the
Mono terminates, either by
completing downstream successfully or with an error. |
Mono<T> |
doFinally(Consumer<SignalType> onFinally)
Add behavior triggering after the
Mono terminates for any reason,
including cancellation. |
Mono<T> |
doOnCancel(Runnable onCancel)
Add behavior triggered when the
Mono is cancelled. |
Mono<T> |
doOnEach(Consumer<? super Signal<T>> signalConsumer)
Add behavior triggered when the
Mono emits an item, fails with an error
or completes successfully. |
<E extends Throwable> |
doOnError(Class<E> exceptionType,
Consumer<? super E> onError)
Add behavior triggered when the
Mono completes with an error matching the given exception type. |
Mono<T> |
doOnError(Consumer<? super Throwable> onError)
Add behavior triggered when the
Mono completes with an error. |
Mono<T> |
doOnError(Predicate<? super Throwable> predicate,
Consumer<? super Throwable> onError)
Add behavior triggered when the
Mono completes with an error matching the given predicate. |
Mono<T> |
doOnNext(Consumer<? super T> onNext)
Add behavior triggered when the
Mono emits a data successfully. |
Mono<T> |
doOnRequest(LongConsumer consumer)
Add behavior triggering a
LongConsumer when the Mono receives any request. |
Mono<T> |
doOnSubscribe(Consumer<? super Subscription> onSubscribe)
Add behavior triggered when the
Mono is subscribed. |
Mono<T> |
doOnSuccess(Consumer<? super T> onSuccess)
Add behavior triggered when the
Mono completes successfully. |
Mono<T> |
doOnSuccessOrError(BiConsumer<? super T,Throwable> onSuccessOrError)
Add behavior triggered when the
Mono terminates, either by completing successfully or with an error. |
Mono<T> |
doOnTerminate(Runnable onTerminate)
Add behavior triggered when the
Mono terminates, either by completing successfully or with an error. |
Mono<Tuple2<Long,T>> |
elapsed()
Map this
Mono into Tuple2<Long, T>
of timemillis and source data. |
Mono<Tuple2<Long,T>> |
elapsed(Scheduler scheduler)
Map this
Mono sequence into Tuple2<Long, T>
of timemillis and source data. |
static <T> Mono<T> |
empty()
Create a
Mono that completes without emitting any item. |
static <T> Mono<T> |
error(Throwable error)
Create a
Mono that terminates with the specified error immediately after
being subscribed to. |
Flux<T> |
expand(Function<? super T,? extends Publisher<? extends T>> expander)
Recursively expand elements into a graph and emit all the resulting element using
a breadth-first traversal strategy.
|
Flux<T> |
expand(Function<? super T,? extends Publisher<? extends T>> expander,
int capacityHint)
Recursively expand elements into a graph and emit all the resulting element using
a breadth-first traversal strategy.
|
Flux<T> |
expandDeep(Function<? super T,? extends Publisher<? extends T>> expander)
Recursively expand elements into a graph and emit all the resulting element,
in a depth-first traversal order.
|
Flux<T> |
expandDeep(Function<? super T,? extends Publisher<? extends T>> expander,
int capacityHint)
Recursively expand elements into a graph and emit all the resulting element,
in a depth-first traversal order.
|
Mono<T> |
filter(Predicate<? super T> tester)
If this
Mono is valued, test the result and replay it if predicate returns true. |
Mono<T> |
filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate)
If this
Mono is valued, test the value asynchronously using a generated
Publisher<Boolean> test. |
static <T> Mono<T> |
first(Iterable<? extends Mono<? extends T>> monos)
Pick the first available result coming from any of the given monos and populate a new Mono.
|
static <T> Mono<T> |
first(Mono<? extends T>... monos)
Pick the first
Mono to emit any signal (value, empty completion or error)
and replay that signal, effectively behaving like the fastest of these competing
sources. |
<R> Mono<R> |
flatMap(Function<? super T,? extends Mono<? extends R>> transformer)
|
<R> Flux<R> |
flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
|
<R> Flux<R> |
flatMapMany(Function<? super T,? extends Publisher<? extends R>> mapper)
|
<R> Flux<R> |
flatMapMany(Function<? super T,? extends Publisher<? extends R>> mapperOnNext,
Function<? super Throwable,? extends Publisher<? extends R>> mapperOnError,
Supplier<? extends Publisher<? extends R>> mapperOnComplete)
|
Flux<T> |
flux()
|
static <T> Mono<T> |
from(Publisher<? extends T> source)
|
static <T> Mono<T> |
fromCallable(Callable<? extends T> supplier)
|
static <T> Mono<T> |
fromCompletionStage(CompletionStage<? extends T> completionStage)
Create a
Mono , producing its value using the provided CompletionStage . |
static <I> Mono<I> |
fromDirect(Publisher<? extends I> source)
|
static <T> Mono<T> |
fromFuture(CompletableFuture<? extends T> future)
Create a
Mono , producing its value using the provided CompletableFuture . |
static <T> Mono<T> |
fromRunnable(Runnable runnable)
|
static <T> Mono<T> |
fromSupplier(Supplier<? extends T> supplier)
|
<R> Mono<R> |
handle(BiConsumer<? super T,SynchronousSink<R>> handler)
Handle the items emitted by this
Mono by calling a biconsumer with the
output sink for each onNext. |
Mono<Boolean> |
hasElement()
Emit a single boolean true if this
Mono has an element. |
Mono<T> |
hide()
Hides the identity of this
Mono instance. |
Mono<T> |
ignoreElement()
Ignores onNext signal (dropping it) and only propagates termination events.
|
static <T> Mono<T> |
ignoreElements(Publisher<T> source)
Create a new
Mono that ignores elements from the source (dropping them),
but completes when the source completes. |
static <T> Mono<T> |
just(T data)
Create a new
Mono that emits the specified item, which is captured at
instantiation time. |
static <T> Mono<T> |
justOrEmpty(Optional<? extends T> data)
Create a new
Mono that emits the specified item if Optional.isPresent() otherwise only emits
onComplete. |
static <T> Mono<T> |
justOrEmpty(T data)
Create a new
Mono that emits the specified item if non null otherwise only emits
onComplete. |
Mono<T> |
log()
Observe all Reactive Streams signals and trace them using
Logger support. |
Mono<T> |
log(String category)
Observe all Reactive Streams signals and use
Logger support to handle trace implementation. |
Mono<T> |
log(String category,
Level level,
boolean showOperatorLine,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and
use Logger support to
handle trace
implementation. |
Mono<T> |
log(String category,
Level level,
SignalType... options)
Observe Reactive Streams signals matching the passed flags
options and use
Logger support to handle trace implementation. |
<R> Mono<R> |
map(Function<? super T,? extends R> mapper)
Transform the item emitted by this
Mono by applying a synchronous function to it. |
Mono<Signal<T>> |
materialize()
Transform incoming onNext, onError and onComplete signals into
Signal instances,
materializing these signals. |
Flux<T> |
mergeWith(Publisher<? extends T> other)
|
Mono<T> |
name(String name)
Give a name to this sequence, which can be retrieved using
Scannable.name()
as long as this is the first reachable Scannable.parents() . |
static <T> Mono<T> |
never()
Return a
Mono that will never signal any data, error or completion signal,
essentially running indefinitely. |
<U> Mono<U> |
ofType(Class<U> clazz)
Evaluate the emitted value against the given
Class type. |
protected static <T> Mono<T> |
onAssembly(Mono<T> source)
|
<E extends Throwable> |
onErrorMap(Class<E> type,
Function<? super E,? extends Throwable> mapper)
Transform an error emitted by this
Mono by synchronously applying a function
to it if the error matches the given type. |
Mono<T> |
onErrorMap(Function<? super Throwable,? extends Throwable> mapper)
Transform any error emitted by this
Mono by synchronously applying a function to it. |
Mono<T> |
onErrorMap(Predicate<? super Throwable> predicate,
Function<? super Throwable,? extends Throwable> mapper)
Transform an error emitted by this
Mono by synchronously applying a function
to it if the error matches the given predicate. |
<E extends Throwable> |
onErrorResume(Class<E> type,
Function<? super E,? extends Mono<? extends T>> fallback)
Subscribe to a fallback publisher when an error matching the given type
occurs, using a function to choose the fallback depending on the error.
|
Mono<T> |
onErrorResume(Function<? super Throwable,? extends Mono<? extends T>> fallback)
Subscribe to a fallback publisher when any error occurs, using a function to
choose the fallback depending on the error.
|
Mono<T> |
onErrorResume(Predicate<? super Throwable> predicate,
Function<? super Throwable,? extends Mono<? extends T>> fallback)
Subscribe to a fallback publisher when an error matching a given predicate
occurs.
|
<E extends Throwable> |
onErrorReturn(Class<E> type,
T fallbackValue)
Simply emit a captured fallback value when an error of the specified type is
observed on this
Mono . |
Mono<T> |
onErrorReturn(Predicate<? super Throwable> predicate,
T fallbackValue)
Simply emit a captured fallback value when an error matching the given predicate is
observed on this
Mono . |
Mono<T> |
onErrorReturn(T fallback)
Simply emit a captured fallback value when any error is observed on this
Mono . |
protected static <T> Mono<T> |
onLastAssembly(Mono<T> source)
|
Mono<T> |
onTerminateDetach()
Detaches both the child
Subscriber and the Subscription on
termination or cancellation. |
Mono<T> |
or(Mono<? extends T> other)
Emit the first available result from this mono or the other mono.
|
<R> Mono<R> |
publish(Function<? super Mono<T>,? extends Mono<? extends R>> transform)
Share a
Mono for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream. |
Mono<T> |
publishOn(Scheduler scheduler)
|
Flux<T> |
repeat()
Repeatedly and indefinitely subscribe to the source upon completion of the
previous subscription.
|
Flux<T> |
repeat(BooleanSupplier predicate)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.
|
Flux<T> |
repeat(long numRepeat)
Repeatedly subscribe to the source numRepeat times.
|
Flux<T> |
repeat(long numRepeat,
BooleanSupplier predicate)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous
subscription.
|
Flux<T> |
repeatWhen(Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
Repeatedly subscribe to this
Mono when a companion sequence emits elements in
response to the flux completion signal. |
Mono<T> |
repeatWhenEmpty(Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
|
Mono<T> |
repeatWhenEmpty(int maxRepeat,
Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
|
Mono<T> |
retry()
Re-subscribes to this
Mono sequence if it signals any error, indefinitely. |
Mono<T> |
retry(long numRetries)
Re-subscribes to this
Mono sequence if it signals any error, for a fixed
number of times. |
Mono<T> |
retry(long numRetries,
Predicate<? super Throwable> retryMatcher)
|
Mono<T> |
retry(Predicate<? super Throwable> retryMatcher)
|
Mono<T> |
retryWhen(Function<Flux<Throwable>,? extends Publisher<?>> whenFactory)
|
static <T> Mono<Boolean> |
sequenceEqual(Publisher<? extends T> source1,
Publisher<? extends T> source2)
Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the
same by comparing the items emitted by each Publisher pairwise.
|
static <T> Mono<Boolean> |
sequenceEqual(Publisher<? extends T> source1,
Publisher<? extends T> source2,
BiPredicate<? super T,? super T> isEqual)
Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the
same by comparing the items emitted by each Publisher pairwise based on the results of a specified
equality function.
|
static <T> Mono<Boolean> |
sequenceEqual(Publisher<? extends T> source1,
Publisher<? extends T> source2,
BiPredicate<? super T,? super T> isEqual,
int prefetch)
Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the
same by comparing the items emitted by each Publisher pairwise based on the results of a specified
equality function.
|
Mono<T> |
single()
Expect exactly one item from this
Mono source or signal
NoSuchElementException for an empty source. |
Disposable |
subscribe()
Subscribe to this
Mono and request unbounded demand. |
Disposable |
subscribe(Consumer<? super T> consumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer)
|
Disposable |
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer)
|
abstract void |
subscribe(CoreSubscriber<? super T> actual)
An internal
Publisher.subscribe(Subscriber) that will bypass
Hooks.onLastOperator(Function) pointcut. |
void |
subscribe(Subscriber<? super T> actual) |
Mono<T> |
subscribeOn(Scheduler scheduler)
Run subscribe, onSubscribe and request on a specified
Scheduler 's Scheduler.Worker . |
static Mono<Context> |
subscriberContext()
|
Mono<T> |
subscriberContext(Context mergeContext)
|
Mono<T> |
subscriberContext(Function<Context,Context> doOnContext)
|
<E extends Subscriber<? super T>> |
subscribeWith(E subscriber)
|
Mono<T> |
switchIfEmpty(Mono<? extends T> alternate)
Fallback to an alternative
Mono if this mono is completed without data |
Mono<T> |
tag(String key,
String value)
Tag this mono with a key/value pair.
|
Mono<T> |
take(Duration duration)
Give this Mono a chance to resolve within a specified time frame but complete if it
doesn't.
|
Mono<T> |
take(Duration duration,
Scheduler timer)
Give this Mono a chance to resolve within a specified time frame but complete if it
doesn't.
|
Mono<T> |
takeUntilOther(Publisher<?> other)
Give this Mono a chance to resolve before a companion
Publisher emits. |
Mono<Void> |
then()
Return a
Mono<Void> which only replays complete and error signals
from this Mono . |
<V> Mono<V> |
then(Mono<V> other)
Let this
Mono complete then play another Mono. |
Mono<Void> |
thenEmpty(Publisher<Void> other)
Return a
Mono<Void> that waits for this Mono to complete then
for a supplied Publisher<Void> to also complete. |
<V> Flux<V> |
thenMany(Publisher<V> other)
|
Mono<T> |
timeout(Duration timeout)
Propagate a
TimeoutException in case no item arrives within the given
Duration . |
Mono<T> |
timeout(Duration timeout,
Mono<? extends T> fallback)
|
Mono<T> |
timeout(Duration timeout,
Mono<? extends T> fallback,
Scheduler timer)
|
Mono<T> |
timeout(Duration timeout,
Scheduler timer)
Signal a
TimeoutException error in case an item doesn't arrive before the given period,
as measured on the provided Scheduler . |
<U> Mono<T> |
timeout(Publisher<U> firstTimeout)
Signal a
TimeoutException in case the item from this Mono has
not been emitted before the given Publisher emits. |
<U> Mono<T> |
timeout(Publisher<U> firstTimeout,
Mono<? extends T> fallback)
|
Mono<Tuple2<Long,T>> |
timestamp()
|
Mono<Tuple2<Long,T>> |
timestamp(Scheduler scheduler)
|
CompletableFuture<T> |
toFuture()
Transform this
Mono into a CompletableFuture completing on onNext or onComplete and failing on
onError. |
MonoProcessor<T> |
toProcessor()
Wrap this
Mono into a MonoProcessor (turning it hot and allowing to block,
cancel, as well as many other operations). |
String |
toString() |
<V> Mono<V> |
transform(Function<? super Mono<T>,? extends Publisher<V>> transformer)
|
static <T,D> Mono<T> |
using(Callable<? extends D> resourceSupplier,
Function<? super D,? extends Mono<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a
Mono derived from the same resource and makes sure the resource is released if the
sequence terminates or the Subscriber cancels.
|
static <T,D> Mono<T> |
using(Callable<? extends D> resourceSupplier,
Function<? super D,? extends Mono<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup,
boolean eager)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a
Mono derived from the same resource and makes sure the resource is released if the
sequence terminates or the Subscriber cancels.
|
static Mono<Void> |
when(Iterable<? extends Publisher<?>> sources)
Aggregate given publishers into a new Mono that will be
fulfilled when all of the given Publishers have been fulfilled.
|
static Mono<Void> |
when(Publisher<?>... sources)
Aggregate given publishers into a new Mono that will be fulfilled
when all of the given sources have been fulfilled.
|
static Mono<Void> |
whenDelayError(Iterable<? extends Publisher<?>> sources)
Aggregate given publishers into a new Mono that will be
fulfilled when all of the given sources have been fulfilled.
|
static Mono<Void> |
whenDelayError(Publisher<?>... sources)
Merge given publishers into a new Mono that will be fulfilled when
all of the given sources have been fulfilled.
|
static <R> Mono<R> |
zip(Function<? super Object[],? extends R> combinator,
Mono<?>... monos)
Aggregate given monos into a new Mono that will be fulfilled when all of the given Monos have been fulfilled, aggregating their values according to the provided combinator function.
|
static <R> Mono<R> |
zip(Iterable<? extends Mono<?>> monos,
Function<? super Object[],? extends R> combinator)
Aggregate given monos into a new Mono that will be fulfilled when all of the given Monos have been fulfilled, aggregating their values according to the provided combinator function.
|
static <T1,T2> Mono<Tuple2<T1,T2>> |
zip(Mono<? extends T1> p1,
Mono<? extends T2> p2)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have been fulfilled, aggregating their values into a
Tuple2 . |
static <T1,T2,O> Mono<O> |
zip(Mono<? extends T1> p1,
Mono<? extends T2> p2,
BiFunction<? super T1,? super T2,? extends O> combinator)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have been fulfilled, aggregating their values as defined by the combinator function.
|
static <T1,T2,T3> Mono<Tuple3<T1,T2,T3>> |
zip(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have been fulfilled, aggregating their values into a
Tuple3 . |
static <T1,T2,T3,T4> |
zip(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have been fulfilled, aggregating their values into a
Tuple4 . |
static <T1,T2,T3,T4,T5> |
zip(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have been fulfilled, aggregating their values into a
Tuple5 . |
static <T1,T2,T3,T4,T5,T6> |
zip(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5,
Mono<? extends T6> p6)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have been fulfilled, aggregating their values into a
Tuple6 . |
static <R> Mono<R> |
zipDelayError(Function<? super Object[],? extends R> combinator,
Mono<?>... monos)
Merge given monos into a new Mono that will be fulfilled when all of the
given Monos have been fulfilled, aggregating their values according to
the provided combinator function and delaying errors.
|
static <R> Mono<R> |
zipDelayError(Iterable<? extends Mono<?>> monos,
Function<? super Object[],? extends R> combinator)
Aggregate given monos into a new Mono that will be fulfilled when all of the given Monos have been fulfilled.
|
static <T1,T2> Mono<Tuple2<T1,T2>> |
zipDelayError(Mono<? extends T1> p1,
Mono<? extends T2> p2)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have been fulfilled, aggregating their values into a
Tuple2 and delaying errors. |
static <T1,T2,T3> Mono<Tuple3<T1,T2,T3>> |
zipDelayError(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3)
Merge given monos into a new Mono that will be fulfilled when all of the given Mono Monos
have been fulfilled, aggregating their values into a
Tuple3 and delaying errors. |
static <T1,T2,T3,T4> |
zipDelayError(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have been fulfilled, aggregating their values into a
Tuple4 and delaying errors. |
static <T1,T2,T3,T4,T5> |
zipDelayError(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have been fulfilled, aggregating their values into a
Tuple5 and delaying errors. |
static <T1,T2,T3,T4,T5,T6> |
zipDelayError(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5,
Mono<? extends T6> p6)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have been fulfilled, aggregating their values into a
Tuple6 and delaying errors. |
<T2> Mono<Tuple2<T,T2>> |
zipWhen(Function<T,Mono<? extends T2>> rightGenerator)
Wait for the result from this mono, use it to create a second mono via the
provided
rightGenerator function and combine both results into a Tuple2 . |
<T2,O> Mono<O> |
zipWhen(Function<T,Mono<? extends T2>> rightGenerator,
BiFunction<T,T2,O> combinator)
Wait for the result from this mono, use it to create a second mono via the
provided
rightGenerator function and combine both results into an arbitrary
O object, as defined by the provided combinator function. |
<T2> Mono<Tuple2<T,T2>> |
zipWith(Mono<? extends T2> other)
Combine the result from this mono and another into a
Tuple2 . |
<T2,O> Mono<O> |
zipWith(Mono<? extends T2> other,
BiFunction<? super T,? super T2,? extends O> combinator)
Combine the result from this mono and another into an arbitrary
O object,
as defined by the provided combinator function. |
public static <T> Mono<T> create(Consumer<MonoSink<T>> callback)
Bridging legacy API involves mostly boilerplate code due to the lack of standard types and methods. There are two kinds of API surfaces: 1) addListener/removeListener and 2) callback-handler.
1) addListener/removeListener pairs
To work with such API one has to instantiate the listener,
call the sink from the listener then register it with the source:
Mono.<String>create(sink -> {
HttpListener listener = event -> {
if (event.getResponseCode() >= 400) {
sink.error(new RuntimeExeption("Failed"));
} else {
String body = event.getBody();
if (body.isEmpty()) {
sink.success();
} else {
sink.success(body.toLowerCase());
}
}
};
client.addListener(listener);
sink.onDispose(() -> client.removeListener(listener));
});
Note that this works only with single-value emitting listeners. Otherwise,
all subsequent signals are dropped. You may have to add client.removeListener(this);
to the listener's body.
2) callback handler
This requires a similar instantiation pattern such as above, but usually the
successful completion and error are separated into different methods.
In addition, the legacy API may or may not support some cancellation mechanism.
Mono.<String>create(sink -> {
Callback<String> callback = new Callback<String>() {
@Override
public void onResult(String data) {
sink.success(data.toLowerCase());
}
@Override
public void onError(Exception e) {
sink.error(e);
}
}
// without cancellation support:
client.call("query", callback);
// with cancellation support:
AutoCloseable cancel = client.call("query", callback);
sink.onDispose(() -> {
try {
cancel.close();
} catch (Exception ex) {
Exceptions.onErrorDropped(ex);
}
});
});
public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier)
Mono
provider that will supply
a target Mono
to subscribe to for
each Subscriber
downstream.
public static Mono<Long> delay(Duration duration)
duration
on a default Scheduler and completes.
If the demand cannot be produced in time, an onError will be signalled instead.
The delay is introduced through the parallel
default Scheduler.
duration
- the duration of the delayMono
public static <T> Mono<T> empty()
Mono
that completes without emitting any item.
T
- the reified Subscriber
typeMono
public static <T> Mono<T> error(Throwable error)
Mono
that terminates with the specified error immediately after
being subscribed to.
T
- the reified Subscriber
typeerror
- the onError signalMono
@SafeVarargs public static <T> Mono<T> first(Mono<? extends T>... monos)
Mono
to emit any signal (value, empty completion or error)
and replay that signal, effectively behaving like the fastest of these competing
sources.
T
- The type of the function result.monos
- The deferred monos to use.Mono
behaving like the fastest of its sources.public static <T> Mono<T> first(Iterable<? extends Mono<? extends T>> monos)
T
- The type of the function result.monos
- The monos to use.Mono
.public static <T> Mono<T> fromCompletionStage(CompletionStage<? extends T> completionStage)
T
- type of the expected valuecompletionStage
- CompletionStage
that will produce a value (or a null to
complete immediately)Mono
.public static <I> Mono<I> fromDirect(Publisher<? extends I> source)
Publisher
to a Mono
without any cardinality check
(ie this method doesn't check if the source is already a Mono, nor cancels the
source past the first element). Conversion supports Fuseable
sources.
Note this is an advanced interoperability operator that implies you know the
Publisher
you are converting follows the Mono
semantics and only
ever emits one element.public static <T> Mono<T> fromFuture(CompletableFuture<? extends T> future)
T
- type of the expected valuefuture
- CompletableFuture
that will produce a value (or a null to
complete immediately)Mono
.fromCompletionStage for a generalization
public static <T> Mono<T> ignoreElements(Publisher<T> source)
Mono
that ignores elements from the source (dropping them),
but completes when the source completes.
public static <T> Mono<T> just(T data)
Mono
that emits the specified item, which is captured at
instantiation time.
T
- the type of the produced itemdata
- the only item to onNextMono
.public static <T> Mono<T> justOrEmpty(@Nullable Optional<? extends T> data)
Mono
that emits the specified item if Optional.isPresent()
otherwise only emits
onComplete.
public static <T> Mono<T> justOrEmpty(@Nullable T data)
Mono
that emits the specified item if non null otherwise only emits
onComplete.
T
- the type of the produced itemdata
- the item to onNext or onComplete if nullMono
.public static <T> Mono<T> never()
Mono
that will never signal any data, error or completion signal,
essentially running indefinitely.
T
- the Subscriber
type targetMono
public static <T> Mono<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2)
T
- the type of items emitted by each Publishersource1
- the first Publisher to comparesource2
- the second Publisher to comparepublic static <T> Mono<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2, BiPredicate<? super T,? super T> isEqual)
T
- the type of items emitted by each Publishersource1
- the first Publisher to comparesource2
- the second Publisher to compareisEqual
- a function used to compare items emitted by each Publisherpublic static <T> Mono<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2, BiPredicate<? super T,? super T> isEqual, int prefetch)
T
- the type of items emitted by each Publishersource1
- the first Publisher to comparesource2
- the second Publisher to compareisEqual
- a function used to compare items emitted by each Publisherprefetch
- the number of items to prefetch from the first and second source Publisherpublic static Mono<Context> subscriberContext()
Mono
emitting the Context
available on subscribe.
If no Context is available, the mono will simply emit the
empty Context
.
Mono
emitting current contextsubscribe(CoreSubscriber)
public static <T,D> Mono<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Mono<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager)
T
- emitted typeD
- resource typeresourceSupplier
- a Callable
that is called on subscribe to create the resourcesourceSupplier
- a Mono
factory to create the Mono depending on the created resourceresourceCleanup
- invoked on completion to clean-up the resourceeager
- push to true to clean before terminating downstream subscribersMono
public static <T,D> Mono<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Mono<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)
Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal event.
T
- emitted typeD
- resource typeresourceSupplier
- a Callable
that is called on subscribe to create the resourcesourceSupplier
- a Mono
factory to create the Mono depending on the created resourceresourceCleanup
- invoked on completion to clean-up the resourceMono
public static Mono<Void> when(Publisher<?>... sources)
Mono
.
sources
- The sources to use.Mono
.public static Mono<Void> when(Iterable<? extends Publisher<?>> sources)
Mono
.
sources
- The sources to use.Mono
.public static Mono<Void> whenDelayError(Iterable<? extends Publisher<?>> sources)
sources
- The sources to use.Mono
.public static Mono<Void> whenDelayError(Publisher<?>... sources)
sources
- The sources to use.Mono
.public static <T1,T2,O> Mono<O> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1,? super T2,? extends O> combinator)
Mono
.
T1
- type of the value from source1T2
- type of the value from source2O
- output valuep1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.combinator
- a BiFunction
combinator function when both sources
completeMono
.public static <T1,T2,T3> Mono<Tuple3<T1,T2,T3>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3)
Tuple3
.
An error will cause pending results to be cancelled and immediate error emission to the
returned Mono
.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4> Mono<Tuple4<T1,T2,T3,T4>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4)
Tuple4
.
An error will cause pending results to be cancelled and immediate error emission to the
returned Mono
.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4,T5> Mono<Tuple5<T1,T2,T3,T4,T5>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5)
Tuple5
.
An error will cause pending results to be cancelled and immediate error emission to the
returned Mono
.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.p5
- The fifth upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4,T5,T6> Mono<Tuple6<T1,T2,T3,T4,T5,T6>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6)
Tuple6
.
An error will cause pending results to be cancelled and immediate error emission to the
returned Mono
.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.p5
- The fifth upstream Publisher
to subscribe to.p6
- The sixth upstream Publisher
to subscribe to.Mono
.public static <R> Mono<R> zip(Iterable<? extends Mono<?>> monos, Function<? super Object[],? extends R> combinator)
R
- the combined resultmonos
- The monos to use.combinator
- the function to transform the combined array into an arbitrary
object.Mono
.public static <R> Mono<R> zip(Function<? super Object[],? extends R> combinator, Mono<?>... monos)
Mono
.
R
- the combined resultmonos
- The monos to use.combinator
- the function to transform the combined array into an arbitrary
object.Mono
.public static <T1,T2> Mono<Tuple2<T1,T2>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2)
Tuple2
and delaying errors.
If both Monos error, the two exceptions are combined (as suppressed exceptions on a root exception).
public static <T1,T2,T3> Mono<Tuple3<T1,T2,T3>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3)
Tuple3
and delaying errors.
If several Monos error, the two exceptions are combined (as suppressed exceptions on a root exception).
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4> Mono<Tuple4<T1,T2,T3,T4>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4)
Tuple4
and delaying errors.
If several Monos error, the exceptions are combined (as suppressed exceptions on a root exception).
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4,T5> Mono<Tuple5<T1,T2,T3,T4,T5>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5)
Tuple5
and delaying errors.
If several Monos error, the exceptions are combined (as suppressed exceptions on a root exception).
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.p5
- The fifth upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4,T5,T6> Mono<Tuple6<T1,T2,T3,T4,T5,T6>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6)
Tuple6
and delaying errors.
If several Monos error, the exceptions are combined (as suppressed exceptions on a root exception).
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.p5
- The fifth upstream Publisher
to subscribe to.p6
- The sixth upstream Publisher
to subscribe to.Mono
.public static <R> Mono<R> zipDelayError(Iterable<? extends Mono<?>> monos, Function<? super Object[],? extends R> combinator)
R
- the combined resultmonos
- The monos to use.combinator
- the function to transform the combined array into an arbitrary
object.Mono
.public static <R> Mono<R> zipDelayError(Function<? super Object[],? extends R> combinator, Mono<?>... monos)
R
- the combined resultmonos
- The monos to use.combinator
- the function to transform the combined array into an arbitrary
object.Mono
.public final <P> P as(Function<? super Mono<T>,P> transformer)
Mono
into a target type.
mono.as(Flux::from).subscribe()
P
- the returned instance typetransformer
- the Function
to immediately map this Mono
into a target typeMono
transformed to an instance of Pfor a bounded conversion to {@link Publisher}
public final Mono<Void> and(Publisher<?> other)
other
- the Publisher
to wait for
completewhen(org.reactivestreams.Publisher<?>...)
@Nullable public T block()
Mono
and block indefinitely until a next signal is
received. Returns that value, or null if the Mono completes empty. In case the Mono
errors, the original exception is thrown (wrapped in a RuntimeException
if
it was a checked exception).
Note that each block() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
@Nullable public T block(Duration timeout)
Mono
and block until a next signal is
received or a timeout expires. Returns that value, or null if the Mono completes
empty. In case the Mono errors, the original exception is thrown (wrapped in a
RuntimeException
if it was a checked exception).
If the provided timeout expires,a RuntimeException
is thrown.
Note that each block() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
timeout
- maximum time period to wait for before raising a RuntimeException
public Optional<T> blockOptional()
Mono
and block indefinitely until a next signal is
received or the Mono completes empty. Returns an Optional
, which can be used
to replace the empty case with an Exception via Optional.orElseThrow(Supplier)
.
In case the Mono itself errors, the original exception is thrown (wrapped in a
RuntimeException
if it was a checked exception).
Note that each blockOptional() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
public Optional<T> blockOptional(Duration timeout)
Mono
and block until a next signal is
received, the Mono completes empty or a timeout expires. Returns an Optional
for the first two cases, which can be used to replace the empty case with an
Exception via Optional.orElseThrow(Supplier)
.
In case the Mono itself errors, the original exception is thrown (wrapped in a
RuntimeException
if it was a checked exception).
If the provided timeout expires, a RuntimeException
is thrown.
Note that each block() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
timeout
- maximum time period to wait for before raising a RuntimeException
public final <E> Mono<E> cast(Class<E> clazz)
Mono
produced type into a target produced type.
public final Mono<T> cache()
Mono
into a hot source and cache last emitted signals for further Subscriber
.
Completion and Error will also be replayed.
Mono
public final Mono<T> cache(Duration ttl)
Mono
into a hot source and cache last emitted signals for further
Subscriber
, with an expiry timeout.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
Mono
public final Mono<T> checkpoint()
Mono
, in case of an error
upstream of the checkpoint. Tracing incurs the cost of an exception stack trace
creation.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
Mono
public final Mono<T> checkpoint(String description)
Mono
by giving it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. Note that unlike checkpoint()
, this doesn't create a
filled stack trace, avoiding the main cost of the operator.
However, as a trade-off the description must be unique enough for the user to find
out where this Mono was assembled. If you only want a generic description, and
still rely on the stack trace to find the assembly site, use the
checkpoint(String, boolean)
variant.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
description
- a unique enough description to include in the light assembly traceback.Mono
public final Mono<T> checkpoint(@Nullable String description, boolean forceStackTrace)
forceStackTrace
option.
By setting the forceStackTrace
parameter to true, activate assembly
tracing for this particular Mono
and give it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. Note that unlike checkpoint(String)
, this will incur
the cost of an exception stack trace creation. The description could for
example be a meaningful name for the assembled mono or a wider correlation ID,
since the stack trace will always provide enough information to locate where this
Flux was assembled.
By setting forceStackTrace
to false, behaves like
checkpoint(String)
and is subject to the same caveat in choosing the
description.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly marker.
description
- a description (must be unique enough if forceStackTrace is push
to false).forceStackTrace
- false to make a light checkpoint without a stacktrace, true
to use a stack trace.Mono
.public final <V> Mono<V> compose(Function<? super Mono<T>,? extends Publisher<V>> transformer)
Mono
in order to generate a
target Mono
type. A transformation will occur for each
Subscriber
. For instance:
flux.compose(Mono::from).subscribe()
V
- the item type in the returned Publisher
transformer
- the Function
to lazily map this Mono
into a target Mono
instance upon subscription.Mono
as() for a loose conversion to an arbitrary type
,
transform(Function)
public final Mono<T> defaultIfEmpty(T defaultV)
defaultV
- the alternate value if this sequence is emptyMono
Flux.defaultIfEmpty(Object)
public final Mono<T> delayElement(Duration delay)
Mono
element (Subscriber.onNext(T)
signal) by a given
duration. Empty Monos or error signals are not delayed.
Note that the scheduler on which the Mono chain continues execution will be the
parallel
scheduler if the mono is valued, or the
current scheduler if the mono completes empty or errors.
delay
- duration by which to delay the Subscriber.onNext(T)
signalMono
public final Mono<T> delayElement(Duration delay, Scheduler timer)
Mono
element (Subscriber.onNext(T)
signal) by a given
Duration
, on a particular Scheduler
. Empty monos or error signals are not delayed.
Note that the scheduler on which the mono chain continues execution will be the scheduler provided if the mono is valued, or the current scheduler if the mono completes empty or errors.
delay
- Duration
by which to delay the Subscriber.onNext(T)
signaltimer
- a time-capable Scheduler
instance to delay the value signal onMono
public final Mono<T> delayUntil(Function<? super T,? extends Publisher<?>> triggerProvider)
Mono
and another Publisher
that is generated from
this Mono's element and which will be used as a trigger for relaying said element.
That is to say, the resulting Mono
delays until this Mono's element is
emitted, generates a trigger Publisher and then delays again until the trigger
Publisher terminates.
Note that contiguous calls to all delayUntil are fused together. The triggers are generated and subscribed to in sequence, once the previous trigger completes. Error is propagated immediately downstream. In both cases, an error in the source is immediately propagated.
public final <U> Mono<T> delaySubscription(Publisher<U> subscriptionDelay)
U
- the other source typesubscriptionDelay
- a
Publisher
to signal by next or complete this subscribe(Subscriber)
Mono
public final <X> Mono<X> dematerialize()
Mono
emits onNext, onError or onComplete Signal
instances, transforming these materialized
signals into
real signals on the Subscriber
.
The error Signal
will trigger onError and complete Signal
will trigger
onComplete.
X
- the dematerialized typeMono
materialize()
public final Mono<T> doAfterSuccessOrError(BiConsumer<? super T,Throwable> afterSuccessOrError)
Mono
terminates, either by completing downstream successfully or with an error.
The arguments will be null depending on success, success with data and error:
afterSuccessOrError
- the callback to call after Subscriber.onNext(T)
, Subscriber.onComplete()
without preceding Subscriber.onNext(T)
or Subscriber.onError(java.lang.Throwable)
Mono
public final Mono<T> doAfterTerminate(Runnable afterTerminate)
Mono
terminates, either by
completing downstream successfully or with an error.
afterTerminate
- the callback to call after Subscriber.onComplete()
or Subscriber.onError(java.lang.Throwable)
Flux
public final Mono<T> doFinally(Consumer<SignalType> onFinally)
Mono
terminates for any reason,
including cancellation. The terminating event (SignalType.ON_COMPLETE
,
SignalType.ON_ERROR
and SignalType.CANCEL
) is passed to the consumer,
which is executed after the signal has been passed downstream.
Note that the fact that the signal is propagated downstream before the callback is
executed means that several doFinally in a row will be executed in
reverse order. If you want to assert the execution of the callback
please keep in mind that the Mono will complete before it is executed, so its
effect might not be visible immediately after eg. a block()
.
onFinally
- the callback to execute after a terminal signal (complete, error
or cancel)Mono
public final Mono<T> doOnCancel(Runnable onCancel)
Mono
is cancelled.
onCancel
- the callback to call on Subscription.cancel()
Mono
public final Mono<T> doOnNext(Consumer<? super T> onNext)
Mono
emits a data successfully.
onNext
- the callback to call on Subscriber.onNext(T)
Mono
public final Mono<T> doOnSuccess(Consumer<? super T> onSuccess)
Mono
completes successfully.
onSuccess
- the callback to call on, argument is null if the Mono
completes without data
Subscriber.onNext(T)
or Subscriber.onComplete()
without preceding Subscriber.onNext(T)
Mono
public final Mono<T> doOnEach(Consumer<? super Signal<T>> signalConsumer)
Mono
emits an item, fails with an error
or completes successfully. All these events are represented as a Signal
that is passed to the side-effect callback. Note that this is an advanced operator,
typically used for monitoring of a Mono.signalConsumer
- the mandatory callback to call on
Subscriber.onNext(Object)
, Subscriber.onError(Throwable)
and
Subscriber.onComplete()
Mono
doOnNext(Consumer)
,
doOnError(Consumer)
,
materialize()
,
Signal
public final Mono<T> doOnError(Consumer<? super Throwable> onError)
Mono
completes with an error.
onError
- the error callback to call on Subscriber.onError(Throwable)
Mono
public final <E extends Throwable> Mono<T> doOnError(Class<E> exceptionType, Consumer<? super E> onError)
Mono
completes with an error matching the given exception type.
E
- type of the error to handleexceptionType
- the type of exceptions to handleonError
- the error handler for relevant errorsMono
public final Mono<T> doOnError(Predicate<? super Throwable> predicate, Consumer<? super Throwable> onError)
Mono
completes with an error matching the given predicate.
predicate
- the matcher for exceptions to handleonError
- the error handler for relevant errorMono
public final Mono<T> doOnRequest(LongConsumer consumer)
LongConsumer
when the Mono
receives any request.
Note that non fatal error raised in the callback will not be propagated and
will simply trigger Operators.onOperatorError(Throwable, Context)
.
consumer
- the consumer to invoke on each requestMono
public final Mono<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe)
Mono
is subscribed.
onSubscribe
- the callback to call on Subscriber.onSubscribe(Subscription)
Mono
public final Mono<T> doOnSuccessOrError(BiConsumer<? super T,Throwable> onSuccessOrError)
Mono
terminates, either by completing successfully or with an error.
onSuccessOrError
- the callback to call Subscriber.onNext(T)
, Subscriber.onComplete()
without preceding Subscriber.onNext(T)
or Subscriber.onError(java.lang.Throwable)
Mono
public final Mono<T> doOnTerminate(Runnable onTerminate)
Mono
terminates, either by completing successfully or with an error.
onTerminate
- the callback to call Subscriber.onNext(T)
, Subscriber.onComplete()
without preceding Subscriber.onNext(T)
or Subscriber.onError(java.lang.Throwable)
Mono
public final Mono<Tuple2<Long,T>> elapsed()
Mono
into Tuple2<Long, T>
of timemillis and source data. The timemillis corresponds to the elapsed time between
the subscribe and the first next signal, as measured by the parallel
scheduler.
Mono
that emits a tuple of time elapsed in milliseconds and matching datapublic final Mono<Tuple2<Long,T>> elapsed(Scheduler scheduler)
Mono
sequence into Tuple2<Long, T>
of timemillis and source data. The timemillis corresponds to the elapsed time between the subscribe and the first
next signal, as measured by the provided Scheduler
.
public final Flux<T> expandDeep(Function<? super T,? extends Publisher<? extends T>> expander, int capacityHint)
That is: emit the value from this Mono
, expand it and emit the first value
at this first level of recursion, and so on... When no more recursion is possible,
backtrack to the previous level and re-apply the strategy.
For example, given the hierarchical structure
A - AA - aa1 - AB - ab1 - a1Expands
Mono.just(A)
into
A AA aa1 AB ab1 a1
public final Flux<T> expandDeep(Function<? super T,? extends Publisher<? extends T>> expander)
That is: emit the value from this Mono
, expand it and emit the first value
at this first level of recursion, and so on... When no more recursion is possible,
backtrack to the previous level and re-apply the strategy.
For example, given the hierarchical structure
A - AA - aa1 - AB - ab1 - a1Expands
Mono.just(A)
into
A AA aa1 AB ab1 a1
public final Flux<T> expand(Function<? super T,? extends Publisher<? extends T>> expander, int capacityHint)
That is: emit the value from this Mono
first, then it each at a first level of
recursion and emit all of the resulting values, then expand all of these at a
second level and so on...
For example, given the hierarchical structure
A - AA - aa1 - AB - ab1 - a1Expands
Mono.just(A)
into
A AA AB a1 aa1 ab1
public final Flux<T> expand(Function<? super T,? extends Publisher<? extends T>> expander)
That is: emit the value from this Mono
first, then it each at a first level of
recursion and emit all of the resulting values, then expand all of these at a
second level and so on...
For example, given the hierarchical structure
A - AA - aa1 - AB - ab1 - a1Expands
Mono.just(A)
into
A AA AB a1 aa1 ab1
public final Mono<T> filter(Predicate<? super T> tester)
Mono
is valued, test the result and replay it if predicate returns true.
Otherwise complete without value.
tester
- the predicate to evaluateMono
public final Mono<T> filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate)
Mono
is valued, test the value asynchronously using a generated
Publisher<Boolean>
test. The value from the Mono is replayed if the
first item emitted by the test is true. It is dropped if the test is
either empty or its first emitted value is false.
Note that only the first value of the test publisher is considered, and unless it
is a Mono
, test will be cancelled after receiving that first value.
public final <R> Mono<R> flatMap(Function<? super T,? extends Mono<? extends R>> transformer)
public final <R> Flux<R> flatMapMany(Function<? super T,? extends Publisher<? extends R>> mapper)
Mono
into a Publisher, then forward
its emissions into the returned Flux
.
R
- the merged sequence typemapper
- the
Function
to produce a sequence of R from the the eventual passed Subscriber.onNext(T)
Flux
as the sequence is not guaranteed to be single at mostpublic final <R> Flux<R> flatMapMany(Function<? super T,? extends Publisher<? extends R>> mapperOnNext, Function<? super Throwable,? extends Publisher<? extends R>> mapperOnError, Supplier<? extends Publisher<? extends R>> mapperOnComplete)
Mono
into signal-specific Publishers,
then forward the applicable Publisher's emissions into the returned Flux
.
R
- the type of the produced inner sequencemapperOnNext
- the Function
to call on next data and returning a sequence to mergemapperOnError
- the Function
to call on error signal and returning a sequence to mergemapperOnComplete
- the Function
to call on complete signal and returning a sequence to mergeFlux
as the sequence is not guaranteed to be single at mostFlux.flatMap(Function, Function, Supplier)
public final <R> Flux<R> flatMapIterable(Function<? super T,? extends Iterable<? extends R>> mapper)
public final Mono<Boolean> hasElement()
Mono
has an element.
Mono
with true
if a value is emitted and false
otherwisepublic final <R> Mono<R> handle(BiConsumer<? super T,SynchronousSink<R>> handler)
Mono
by calling a biconsumer with the
output sink for each onNext. At most one SynchronousSink.next(Object)
call must be performed and/or 0 or 1 SynchronousSink.error(Throwable)
or
SynchronousSink.complete()
.R
- the transformed typehandler
- the handling BiConsumer
Mono
public final Mono<T> hide()
Mono
instance.
The main purpose of this operator is to prevent certain identity-based optimizations from happening, mostly for diagnostic purposes.
Mono
preventing Publisher
/ Subscription
based Reactor optimizationspublic final Mono<T> ignoreElement()
public final Mono<T> log()
Logger
support.
Default will use Level.INFO
and java.util.logging
.
If SLF4J is available, it will be used instead.
The default log category will be "reactor.Mono", followed by a suffix generated from the source operator, e.g. "reactor.Mono.Map".
Mono
that logs signalsFlux.log()
public final Mono<T> log(@Nullable String category)
Logger
support to handle trace implementation. Default will
use Level.INFO
and java.util.logging. If SLF4J is available, it will be used instead.
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will complete, e.g. "reactor.Flux.Map".Mono
public final Mono<T> log(@Nullable String category, Level level, SignalType... options)
options
and use
Logger
support to handle trace implementation. Default will use the passed
Level
and java.util.logging. If SLF4J is available, it will be used instead.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
mono.log("category", SignalType.ON_NEXT, SignalType.ON_ERROR)
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will complete, e.g. "reactor.Flux.Map".level
- the Level
to enforce for this tracing Mono (only FINEST, FINE,
INFO, WARNING and SEVERE are taken into account)options
- a vararg SignalType
option to filter log messagesMono
public final Mono<T> log(@Nullable String category, Level level, boolean showOperatorLine, SignalType... options)
options
and
use Logger
support to
handle trace
implementation. Default will
use the passed Level
and java.util.logging. If SLF4J is available, it will be used instead.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
mono.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will complete, e.g. "reactor.Mono.Map".level
- the Level
to enforce for this tracing Mono (only FINEST, FINE,
INFO, WARNING and SEVERE are taken into account)showOperatorLine
- capture the current stack to display operator
class/line number.options
- a vararg SignalType
option to filter log messagesMono
public final <R> Mono<R> map(Function<? super T,? extends R> mapper)
Mono
by applying a synchronous function to it.
public final Mono<Signal<T>> materialize()
Signal
instances,
materializing these signals.
Since the error is materialized as a Signal
, the propagation will be stopped and onComplete will be
emitted. Complete signal will first emit a Signal.complete()
and then effectively complete the flux.
Mono
of materialized Signal
dematerialize()
public final Mono<T> name(String name)
Scannable.name()
as long as this is the first reachable Scannable.parents()
.name
- a name for the sequencepublic final Mono<T> or(Mono<? extends T> other)
other
- the racing other Mono
to compete with for the resultMono
first(reactor.core.publisher.Mono<? extends T>...)
public final Mono<T> onErrorMap(Predicate<? super Throwable> predicate, Function<? super Throwable,? extends Throwable> mapper)
Mono
by synchronously applying a function
to it if the error matches the given predicate. Otherwise let the error pass through.
public final Mono<T> onErrorMap(Function<? super Throwable,? extends Throwable> mapper)
Mono
by synchronously applying a function to it.
public final <E extends Throwable> Mono<T> onErrorMap(Class<E> type, Function<? super E,? extends Throwable> mapper)
Mono
by synchronously applying a function
to it if the error matches the given type. Otherwise let the error pass through.
public final Mono<T> onErrorResume(Function<? super Throwable,? extends Mono<? extends T>> fallback)
fallback
- the function to choose the fallback to an alternative Mono
Mono
falling back upon source onErrorFlux.onErrorResume(java.util.function.Function<? super java.lang.Throwable, ? extends org.reactivestreams.Publisher<? extends T>>)
public final <E extends Throwable> Mono<T> onErrorResume(Class<E> type, Function<? super E,? extends Mono<? extends T>> fallback)
E
- the error typetype
- the error type to matchfallback
- the function to choose the fallback to an alternative Mono
Mono
falling back upon source onErrorFlux.onErrorResume(java.util.function.Function<? super java.lang.Throwable, ? extends org.reactivestreams.Publisher<? extends T>>)
public final Mono<T> onErrorResume(Predicate<? super Throwable> predicate, Function<? super Throwable,? extends Mono<? extends T>> fallback)
predicate
- the error predicate to matchfallback
- the function to choose the fallback to an alternative Mono
Mono
falling back upon source onErrorFlux.onErrorResume(java.util.function.Function<? super java.lang.Throwable, ? extends org.reactivestreams.Publisher<? extends T>>)
public final Mono<T> onErrorReturn(T fallback)
Mono
.
fallback
- the value to emit if an error occursMono
public final <E extends Throwable> Mono<T> onErrorReturn(Class<E> type, T fallbackValue)
Mono
.
E
- the error typetype
- the error type to matchfallbackValue
- the value to emit if an error occurs that matches the typeMono
public final Mono<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)
Mono
.
predicate
- the error predicate to matchfallbackValue
- the value to emit if an error occurs that matches the predicateMono
public final Mono<T> onTerminateDetach()
Subscriber
and the Subscription
on
termination or cancellation.
This should help with odd retention scenarios when running
with non-reactor Subscriber
.
Mono
public final <R> Mono<R> publish(Function<? super Mono<T>,? extends Mono<? extends R>> transform)
Mono
for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream.R
- the output value typetransform
- the transformation functionMono
public final Mono<T> publishOn(Scheduler scheduler)
Scheduler
Worker
.
This operator influences the threading context where the rest of the operators in
the chain below it will execute, up to a new occurrence of publishOn
.
Typically used for fast publisher, slow consumer(s) scenarios.
mono.publishOn(Schedulers.single()).subscribe()
scheduler
- a Scheduler
providing the Scheduler.Worker
where to publishMono
public final Flux<T> repeat()
Flux
on onCompletepublic final Flux<T> repeat(BooleanSupplier predicate)
predicate
- the boolean to evaluate on onComplete.Flux
that repeats on onComplete while the predicate matchespublic final Flux<T> repeat(long numRepeat)
numRepeat
- the number of times to re-subscribe on onCompleteFlux
that repeats on onComplete, up to the specified number of repetitionspublic final Flux<T> repeat(long numRepeat, BooleanSupplier predicate)
numRepeat
- the number of times to re-subscribe on completepredicate
- the boolean to evaluate on onCompleteFlux
that repeats on onComplete while the predicate matches,
up to the specified number of repetitionspublic final Flux<T> repeatWhen(Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
Mono
when a companion sequence emits elements in
response to the flux completion signal. Any terminal signal from the companion
sequence will terminate the resulting Flux
with the same signal immediately.
If the companion sequence signals when this Mono
is active, the repeat
attempt is suppressed.
repeatFactory
- the Function
that returns the associated Publisher
companion, given a Flux
that signals each onComplete as a Long
representing the number of source elements emitted in the latest attempt (0 or 1).Flux
that repeats on onComplete when the companion Publisher
produces an
onNext signalpublic final Mono<T> repeatWhenEmpty(Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
Mono
as long as the current subscription to this
Mono
completes empty and the companion Publisher
produces an onNext signal.
Any terminal signal will terminate the resulting Mono
with the same signal immediately.
repeatFactory
- the Function
that returns the associated Publisher
companion, given a Flux
that signals each onComplete as a 0-based incrementing Long
.Mono
that resubscribes to this Mono
if the previous subscription was empty,
as long as the companion Publisher
produces an onNext signalpublic final Mono<T> repeatWhenEmpty(int maxRepeat, Function<Flux<Long>,? extends Publisher<?>> repeatFactory)
Mono
as long as the current subscription to this
Mono
completes empty and the companion Publisher
produces an onNext signal.
Any terminal signal will terminate the resulting Mono
with the same signal immediately.
Emits an IllegalStateException
if maxRepeat
is exceeded (provided
it is different from Integer.MAX_VALUE
).
maxRepeat
- the maximum number of repeats (infinite if Integer.MAX_VALUE
)repeatFactory
- the Function
that returns the associated Publisher
companion, given a Flux
that signals each onComplete as a 0-based incrementing Long
.Mono
that resubscribes to this Mono
if the previous subscription was empty,
as long as the companion Publisher
produces an onNext signal and the maximum number of repeats isn't exceeded.public final Mono<T> retry()
Mono
sequence if it signals any error, indefinitely.
Mono
that retries on onErrorpublic final Mono<T> retry(long numRetries)
Mono
sequence if it signals any error, for a fixed
number of times.
Note that passing Long.MAX_VALUE is treated as infinite retry.
numRetries
- the number of times to tolerate an errorMono
that retries on onError up to the specified number of retry attempts.public final Mono<T> retry(Predicate<? super Throwable> retryMatcher)
Mono
sequence if it signals any error
that matches the given Predicate
, otherwise push the error downstream.
retryMatcher
- the predicate to evaluate if retry should occur based on a given error signalMono
that retries on onError if the predicates matches.public final Mono<T> retry(long numRetries, Predicate<? super Throwable> retryMatcher)
Mono
sequence up to the specified number of retries if it signals any
error that match the given Predicate
, otherwise push the error downstream.
numRetries
- the number of times to tolerate an errorretryMatcher
- the predicate to evaluate if retry should occur based on a given error signalMono
that retries on onError up to the specified number of retry
attempts, only if the predicate matches.public final Mono<T> retryWhen(Function<Flux<Throwable>,? extends Publisher<?>> whenFactory)
public final Mono<T> single()
Mono
source or signal
NoSuchElementException
for an empty source.
Note Mono doesn't need Flux.single(Object)
, since it is equivalent to
defaultIfEmpty(Object)
in a Mono
.
Mono
with the single item or an error signalpublic final Disposable subscribe()
Mono
and request unbounded demand.
This version doesn't specify any consumption behavior for the events from the chain, especially no error handling, so other variants should usually be preferred.
Disposable
that can be used to cancel the underlying Subscription
public final Disposable subscribe(Consumer<? super T> consumer)
Consumer
to this Mono
that will consume all the
sequence. It will request an unbounded demand (Long.MAX_VALUE
).
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer)
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each value (onNext signal)Disposable
that can be used to cancel the underlying Subscription
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer)
Mono
with a Consumer
that will consume all the
elements in the sequence, as well as a Consumer
that will handle errors.
The subscription will request an unbounded demand (Long.MAX_VALUE
).
For a passive version that observe and forward incoming data see doOnSuccess(Consumer)
and
doOnError(java.util.function.Consumer)
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each next signalerrorConsumer
- the consumer to invoke on error signalDisposable
that can be used to cancel the underlying Subscription
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer)
Consumer
to this Mono
that will respectively consume all the
elements in the sequence, handle errors and react to completion. The subscription
will request unbounded demand (Long.MAX_VALUE
).
For a passive version that observe and forward incoming data see doOnSuccess(Consumer)
and
doOnError(java.util.function.Consumer)
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each valueerrorConsumer
- the consumer to invoke on error signalcompleteConsumer
- the consumer to invoke on complete signalDisposable
that can be used to cancel the underlying Subscription
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer)
Consumer
to this Mono
that will respectively consume all the
elements in the sequence, handle errors, react to completion, and request upon subscription.
It will let the provided subscriptionConsumer
request the adequate amount of data, or request unbounded demand
Long.MAX_VALUE
if no such consumer is provided.
For a passive version that observe and forward incoming data see doOnSuccess(Consumer)
and
doOnError(java.util.function.Consumer)
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each valueerrorConsumer
- the consumer to invoke on error signalcompleteConsumer
- the consumer to invoke on complete signalsubscriptionConsumer
- the consumer to invoke on subscribe signal, to be used
for the initial request
, or null for max requestDisposable
that can be used to cancel the underlying Subscription
public final void subscribe(Subscriber<? super T> actual)
public abstract void subscribe(CoreSubscriber<? super T> actual)
Publisher.subscribe(Subscriber)
that will bypass
Hooks.onLastOperator(Function)
pointcut.
In addition to behave as expected by Publisher.subscribe(Subscriber)
in a controlled manner, it supports direct subscribe-time Context
passing.
actual
- the Subscriber
interested into the published sequencePublisher.subscribe(Subscriber)
public final Mono<T> subscriberContext(Context mergeContext)
Context
by adding all values
from the given Context
, producing a new Context
that is propagated
upstream.
The Context
propagation happens once per subscription (not on each onNext):
it is done during the subscribe(Subscriber)
phase, which runs from
the last operator of a chain towards the first.
So this operator enriches a Context
coming from under it in the chain
(downstream, by default an empty one) and passes the new enriched Context
to operators above it in the chain (upstream, by way of them using
Flux#subscribe(Subscriber,Context)
).
public final Mono<T> subscriberContext(Function<Context,Context> doOnContext)
Context
by applying a Function
to it, producing a new Context
that is propagated upstream.
The Context
propagation happens once per subscription (not on each onNext):
it is done during the subscribe(Subscriber)
phase, which runs from
the last operator of a chain towards the first.
So this operator enriches a Context
coming from under it in the chain
(downstream, by default an empty one) and passes the new enriched Context
to operators above it in the chain (upstream, by way of them using
Flux#subscribe(Subscriber,Context)
).
public final Mono<T> subscribeOn(Scheduler scheduler)
Scheduler
's Scheduler.Worker
.
As such, placing this operator anywhere in the chain will also impact the execution
context of onNext/onError/onComplete signals from the beginning of the chain up to
the next occurrence of a publishOn
.
mono.subscribeOn(Schedulers.parallel()).subscribe())
scheduler
- a Scheduler
providing the Scheduler.Worker
where to subscribeFlux
requesting asynchronouslypublishOn(Scheduler)
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber)
E
- the reified type of the Subscriber
for chainingsubscriber
- the Subscriber
to subscribe withSubscriber
after subscribing it to this Mono
public final Mono<T> switchIfEmpty(Mono<? extends T> alternate)
Mono
if this mono is completed without data
alternate
- the alternate mono if this mono is emptyMono
falling back upon source completing without elementsFlux.switchIfEmpty(org.reactivestreams.Publisher<? extends T>)
public final Mono<T> tag(String key, String value)
Set
of
all tags throughout the publisher chain by using Scannable.tags()
(as
traversed
by Scannable.parents()
).key
- a tag keyvalue
- a tag valuepublic final Mono<T> take(Duration duration)
timeout(Duration)
except that the resulting
Mono
completes rather than errors when the timer expires.
The timeframe is evaluated using the parallel Scheduler
.
duration
- the maximum duration to wait for the source Mono to resolve.Mono
that will propagate the signals from the source unless
no signal is received for duration
, in which case it completes.public final Mono<T> take(Duration duration, Scheduler timer)
timeout(Duration)
except that the resulting
Mono
completes rather than errors when the timer expires.
The timeframe is evaluated using the provided Scheduler
.
public final Mono<T> takeUntilOther(Publisher<?> other)
Publisher
emits. If
the companion emits before any signal from the source, the resulting Mono will
complete. Otherwise, it will relay signals from the source.public final Mono<Void> thenEmpty(Publisher<Void> other)
Mono<Void>
that waits for this Mono
to complete then
for a supplied Publisher<Void>
to also complete. The
second completion signal is replayed, or any error signal that occurs instead.
public final Mono<T> timeout(Duration timeout, Mono<? extends T> fallback)
Mono
in case no item arrives within the given Duration
.
If the fallback Mono
is null, signal a TimeoutException
instead.
public final Mono<T> timeout(Duration timeout, Scheduler timer)
TimeoutException
error in case an item doesn't arrive before the given period,
as measured on the provided Scheduler
.
public final Mono<T> timeout(Duration timeout, @Nullable Mono<? extends T> fallback, Scheduler timer)
Mono
in case an item doesn't arrive before the given period,
as measured on the provided Scheduler
.
If the given Mono
is null, signal a TimeoutException
.
public final <U> Mono<T> timeout(Publisher<U> firstTimeout)
TimeoutException
in case the item from this Mono
has
not been emitted before the given Publisher
emits.
public final <U> Mono<T> timeout(Publisher<U> firstTimeout, Mono<? extends T> fallback)
Publisher
in case the item from this Mono
has
not been emitted before the given Publisher
emits.
U
- the element type of the timeout PublisherfirstTimeout
- the timeout
Publisher
that must not emit before the first signal from this Mono
fallback
- the fallback Publisher
to subscribe when a timeout occursMono
with a fallback Mono
if the item doesn't
come before a Publisher
signalspublic final Mono<Tuple2<Long,T>> timestamp()
Mono
is valued, emit a Tuple2
pair of
T1 the current clock time in millis (as a Long
measured by the
parallel
Scheduler) and T2 the emitted data (as a T
).
Mono
public final CompletableFuture<T> toFuture()
Mono
into a CompletableFuture
completing on onNext or onComplete and failing on
onError.
CompletableFuture
public final MonoProcessor<T> toProcessor()
Mono
into a MonoProcessor
(turning it hot and allowing to block,
cancel, as well as many other operations). Note that the MonoProcessor
is
connected to
(which is equivalent to calling subscribe
on it).
MonoProcessor
to use to either retrieve value or cancel the underlying Subscription
public final <V> Mono<V> transform(Function<? super Mono<T>,? extends Publisher<V>> transformer)
Mono
in order to generate a target Mono
. Unlike compose(Function)
, the
provided function is executed as part of assembly.
Function<Mono, Mono> applySchedulers = mono -> mono.subscribeOn(Schedulers.io())
.publishOn(Schedulers.parallel());
mono.transform(applySchedulers).map(v -> v * v).subscribe();
V
- the item type in the returned Mono
transformer
- the Function
to immediately map this Mono
into a target Mono
instance.Mono
compose(Function) for deferred composition of {@link Mono} for each {@link Subscriber}
,
as(Function) for a loose conversion to an arbitrary type
public final <T2> Mono<Tuple2<T,T2>> zipWhen(Function<T,Mono<? extends T2>> rightGenerator)
rightGenerator
function and combine both results into a Tuple2
.
T2
- the element type of the other Mono instancerightGenerator
- the Function
to generate a Mono
to combine withpublic final <T2,O> Mono<O> zipWhen(Function<T,Mono<? extends T2>> rightGenerator, BiFunction<T,T2,O> combinator)
rightGenerator
function and combine both results into an arbitrary
O
object, as defined by the provided combinator
function.
T2
- the element type of the other Mono instanceO
- the element type of the combinationrightGenerator
- the Function
to generate a Mono
to combine withcombinator
- a BiFunction
combinator function when both sources completepublic final <T2> Mono<Tuple2<T,T2>> zipWith(Mono<? extends T2> other)
Tuple2
.
T2
- the element type of the other Mono instanceother
- the Mono
to combine withpublic final <T2,O> Mono<O> zipWith(Mono<? extends T2> other, BiFunction<? super T,? super T2,? extends O> combinator)
O
object,
as defined by the provided combinator
function.
T2
- the element type of the other Mono instanceO
- the element type of the combinationother
- the Mono
to combine withcombinator
- a BiFunction
combinator function when both sources
completeprotected static <T> Mono<T> onAssembly(Mono<T> source)
Hooks
pointcut given a
Mono
, potentially returning a new Mono
. This is for example useful
to activate cross-cutting concerns at assembly time, eg. a generalized
checkpoint()
.T
- the value typesource
- the source to apply assembly hooks ontoprotected static <T> Mono<T> onLastAssembly(Mono<T> source)
Hooks
pointcut given a
Mono
, potentially returning a new Mono
. This is for example useful
to activate cross-cutting concerns at assembly time, eg. a generalized
checkpoint()
.T
- the value typesource
- the source to apply assembly hooks onto