T
- the type of the single value of this classpublic abstract class Mono<T>
extends java.lang.Object
implements org.reactivestreams.Publisher<T>
Publisher
with basic rx operators that completes successfully by emitting an element, or
with an error.
The rx operators will offer aliases for input Mono
type to preserve the "at most one"
property of the resulting Mono
. For instance flatMap
returns a Flux
with
possibly
more than 1 emission. Its alternative enforcing Mono
input is then
.
Mono<Void>
should be used for Publisher
that just completes without any value.
It is intended to be used in implementations and return types, input parameters should keep using raw Publisher
as much as possible.
Flux
Constructor and Description |
---|
Mono() |
Modifier and Type | Method and Description |
---|---|
<T2> Mono<Tuple2<T,T2>> |
and(Mono<? extends T2> other)
Combine the result from this mono and another into a
Tuple2 . |
<P> P |
as(java.util.function.Function<? super Mono<T>,P> transformer)
Transform this
Mono into a target type. |
Mono<T> |
awaitOnSubscribe()
Intercepts the onSubscribe call and makes sure calls to Subscription methods
only happen after the child Subscriber has returned from its onSubscribe method.
|
T |
block()
Block until a next signal is received, will return null if onComplete, T if onNext, throw a
Exceptions.DownstreamException if checked error or origin RuntimeException if unchecked.
|
T |
block(java.time.Duration timeout)
Block until a next signal is received, will return null if onComplete, T if onNext, throw a
Exceptions.DownstreamException if checked error or origin RuntimeException if unchecked.
|
T |
blockMillis(long timeout)
Block until a next signal is received, will return null if onComplete, T if onNext, throw a
Exceptions.DownstreamException if checked error or origin RuntimeException if unchecked.
|
Mono<T> |
cache()
Turn this
Mono into a hot source and cache last emitted signals for further Subscriber . |
<E> Mono<E> |
cast(java.lang.Class<E> stream)
Cast the current
Mono produced type into a target produced type. |
<V> Mono<V> |
compose(java.util.function.Function<? super Mono<T>,? extends org.reactivestreams.Publisher<V>> transformer)
|
Flux<T> |
concatWith(org.reactivestreams.Publisher<? extends T> other)
Concatenate emissions of this
Mono with the provided Publisher
(no interleave). |
static <T> Mono<T> |
create(java.util.function.Consumer<MonoSink<T>> callback)
Creates a deferred emitter that can be used with callback-based
APIs to signal at most one value, a complete or an error signal.
|
Mono<T> |
defaultIfEmpty(T defaultV)
Provide a default unique value if this mono is completed without any data
|
static <T> Mono<T> |
defer(java.util.function.Supplier<? extends Mono<? extends T>> supplier)
|
static Mono<java.lang.Long> |
delay(java.time.Duration duration)
Create a Mono which delays an onNext signal of
duration of given unit and complete on the global timer. |
static Mono<java.lang.Long> |
delayMillis(long duration)
Create a Mono which delays an onNext signal of
duration milliseconds and complete. |
static Mono<java.lang.Long> |
delayMillis(long duration,
TimedScheduler timer)
Create a Mono which delays an onNext signal of
duration milliseconds and complete. |
Mono<T> |
delaySubscription(java.time.Duration delay)
Delay the
subscription to this Mono source until the given
period elapses. |
<U> Mono<T> |
delaySubscription(org.reactivestreams.Publisher<U> subscriptionDelay)
Delay the subscription to this
Mono until another Publisher
signals a value or completes. |
Mono<T> |
delaySubscriptionMillis(long delay)
Delay the
subscription to this Mono source until the given
period elapses. |
Mono<T> |
delaySubscriptionMillis(long delay,
TimedScheduler timer)
Delay the
subscription to this Mono source until the given
period elapses. |
<X> Mono<X> |
dematerialize()
|
Mono<T> |
doAfterTerminate(java.util.function.BiConsumer<? super T,java.lang.Throwable> afterTerminate)
Triggered after the
Mono terminates, either by completing downstream successfully or with an error. |
Mono<T> |
doOnCancel(java.lang.Runnable onCancel)
Triggered when the
Mono is cancelled. |
<E extends java.lang.Throwable> |
doOnError(java.lang.Class<E> exceptionType,
java.util.function.Consumer<? super E> onError)
Triggered when the
Mono completes with an error matching the given exception type. |
Mono<T> |
doOnError(java.util.function.Consumer<? super java.lang.Throwable> onError)
Triggered when the
Mono completes with an error. |
Mono<T> |
doOnError(java.util.function.Predicate<? super java.lang.Throwable> predicate,
java.util.function.Consumer<? super java.lang.Throwable> onError)
Triggered when the
Mono completes with an error matching the given exception. |
Mono<T> |
doOnNext(java.util.function.Consumer<? super T> onNext)
Triggered when the
Mono emits a data successfully. |
Mono<T> |
doOnRequest(java.util.function.LongConsumer consumer)
|
Mono<T> |
doOnSubscribe(java.util.function.Consumer<? super org.reactivestreams.Subscription> onSubscribe)
Triggered when the
Mono is subscribed. |
Mono<T> |
doOnSuccess(java.util.function.Consumer<? super T> onSuccess)
Triggered when the
Mono completes successfully. |
Mono<T> |
doOnTerminate(java.util.function.BiConsumer<? super T,java.lang.Throwable> onTerminate)
Triggered when the
Mono terminates, either by completing successfully or with an error. |
Mono<Tuple2<java.lang.Long,T>> |
elapsed()
|
static <T> Mono<T> |
empty()
Create a
Mono that completes without emitting any item. |
static <T> Mono<java.lang.Void> |
empty(org.reactivestreams.Publisher<T> source)
Create a new
Mono that ignores onNext (dropping them) and only react on Completion signal. |
static <T> Mono<T> |
error(java.lang.Throwable error)
Create a
Mono that completes with the specified error immediately after onSubscribe. |
Mono<T> |
filter(java.util.function.Predicate<? super T> tester)
Test the result if any of this
Mono and replay it if predicate returns true. |
static <T> Mono<T> |
first(java.lang.Iterable<? extends Mono<? extends T>> monos)
Pick the first result coming from any of the given monos and populate a new Mono.
|
static <T> Mono<T> |
first(Mono<? extends T>... monos)
Pick the first result coming from any of the given monos and populate a new Mono.
|
<R> Flux<R> |
flatMap(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)
Transform the items emitted by a
Publisher into Publishers, then flatten the emissions from those by
merging them into a single Flux , so that they may interleave. |
<R> Flux<R> |
flatMap(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapperOnNext,
java.util.function.Function<java.lang.Throwable,? extends org.reactivestreams.Publisher<? extends R>> mapperOnError,
java.util.function.Supplier<? extends org.reactivestreams.Publisher<? extends R>> mapperOnComplete)
|
<R> Flux<R> |
flatMapIterable(java.util.function.Function<? super T,? extends java.lang.Iterable<? extends R>> mapper)
|
Flux<T> |
flux()
|
static <T> Mono<T> |
from(org.reactivestreams.Publisher<? extends T> source)
Expose the specified
Publisher with the Mono API, and ensure it will emit 0 or 1 item. |
static <T> Mono<T> |
fromCallable(java.util.concurrent.Callable<? extends T> supplier)
|
static <T> Mono<T> |
fromFuture(java.util.concurrent.CompletableFuture<? extends T> future)
|
static Mono<java.lang.Void> |
fromRunnable(java.lang.Runnable runnable)
Create a
Mono only producing a completion signal after using the given
runnable. |
static <T> Mono<T> |
fromSupplier(java.util.function.Supplier<? extends T> supplier)
|
<R> Mono<R> |
handle(java.util.function.BiConsumer<? super T,SynchronousSink<R>> handler)
Handle the items emitted by this
Mono by calling a biconsumer with the
output sink for each onNext. |
Mono<java.lang.Boolean> |
hasElement()
Emit a single boolean true if this
Mono has an element. |
Mono<T> |
hide()
Hides the identity of this
Mono instance. |
Mono<T> |
ignoreElement()
Ignores onNext signal (dropping it) and only reacts on termination.
|
static <T> Mono<T> |
ignoreElements(org.reactivestreams.Publisher<T> source)
Create a new
Mono that ignores onNext (dropping them) and only react on Completion signal. |
static <T> Mono<T> |
just(T data)
Create a new
Mono that emits the specified item. |
static <T> Mono<T> |
justOrEmpty(java.util.Optional<? extends T> data)
Create a new
Mono that emits the specified item if Optional.isPresent() otherwise only emits
onComplete. |
static <T> Mono<T> |
justOrEmpty(T data)
Create a new
Mono that emits the specified item if non null otherwise only emits
onComplete. |
Mono<T> |
log()
Observe all Reactive Streams signals and use
Logger support to handle trace implementation. |
Mono<T> |
log(java.lang.String category)
Observe all Reactive Streams signals and use
Logger support to handle trace implementation. |
Mono<T> |
log(java.lang.String category,
java.util.logging.Level level,
SignalType... options)
Observe Reactive Streams signals matching the passed flags
options and use Logger support to
handle trace
implementation. |
<R> Mono<R> |
map(java.util.function.Function<? super T,? extends R> mapper)
Transform the item emitted by this
Mono by applying a function to item emitted. |
<E extends java.lang.Throwable> |
mapError(java.lang.Class<E> type,
java.util.function.Function<? super E,? extends java.lang.Throwable> mapper)
Transform the error emitted by this
Mono by applying a function if the
error matches the given type, otherwise let the error flows. |
Mono<T> |
mapError(java.util.function.Function<java.lang.Throwable,? extends java.lang.Throwable> mapper)
Transform the error emitted by this
Flux by applying a function. |
Mono<T> |
mapError(java.util.function.Predicate<? super java.lang.Throwable> predicate,
java.util.function.Function<? super java.lang.Throwable,? extends java.lang.Throwable> mapper)
Transform the error emitted by this
Mono by applying a function if the
error matches the given predicate, otherwise let the error flows. |
Mono<Signal<T>> |
materialize()
Transform the incoming onNext, onError and onComplete signals into
Signal . |
Flux<T> |
mergeWith(org.reactivestreams.Publisher<? extends T> other)
Merge emissions of this
Mono with the provided Publisher . |
static <T> Mono<T> |
never()
Return a
Mono that will never signal any data, error or completion signal. |
protected static <T> Mono<T> |
onAssembly(Mono<T> source)
|
Mono<T> |
onTerminateDetach()
Detaches the both the child
Subscriber and the Subscription on
termination or cancellation. |
Mono<T> |
or(Mono<? extends T> other)
Emit the any of the result from this mono or from the given mono
|
<E extends java.lang.Throwable> |
otherwise(java.lang.Class<E> type,
java.util.function.Function<? super E,? extends Mono<? extends T>> fallback)
Subscribe to a returned fallback publisher when an error matching the given type
occurs.
|
Mono<T> |
otherwise(java.util.function.Function<? super java.lang.Throwable,? extends Mono<? extends T>> fallback)
Subscribe to a returned fallback publisher when any error occurs.
|
Mono<T> |
otherwise(java.util.function.Predicate<? super java.lang.Throwable> predicate,
java.util.function.Function<? super java.lang.Throwable,? extends Mono<? extends T>> fallback)
Subscribe to a returned fallback publisher when an error matching the given type
occurs.
|
Mono<T> |
otherwiseIfEmpty(Mono<? extends T> alternate)
Provide an alternative
Mono if this mono is completed without data |
<E extends java.lang.Throwable> |
otherwiseReturn(java.lang.Class<E> type,
T fallbackValue)
Fallback to the given value if an error of a given type is observed on this
Flux |
<E extends java.lang.Throwable> |
otherwiseReturn(java.util.function.Predicate<? super java.lang.Throwable> predicate,
T fallbackValue)
Fallback to the given value if an error matching the given predicate is
observed on this
Flux |
Mono<T> |
otherwiseReturn(T fallback)
Subscribe to a returned fallback value when any error occurs.
|
<R> Mono<R> |
publish(java.util.function.Function<? super Mono<T>,? extends Mono<? extends R>> transform)
Shares a
Mono for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream. |
Mono<T> |
publishOn(Scheduler scheduler)
Run onNext, onComplete and onError on a supplied
Scheduler |
Flux<T> |
repeat()
Repeatedly subscribe to the source completion of the previous subscription.
|
Flux<T> |
repeat(java.util.function.BooleanSupplier predicate)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.
|
Flux<T> |
repeat(long numRepeat)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.
|
Flux<T> |
repeat(long numRepeat,
java.util.function.BooleanSupplier predicate)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous
subscription.
|
Flux<T> |
repeatWhen(java.util.function.Function<Flux<java.lang.Long>,? extends org.reactivestreams.Publisher<?>> whenFactory)
Repeatedly subscribe to this
Flux when a companion sequence signals a number of emitted elements in
response to the flux completion signal. |
Mono<T> |
repeatWhenEmpty(java.util.function.Function<Flux<java.lang.Long>,? extends org.reactivestreams.Publisher<?>> repeatFactory)
Repeatedly subscribe to this
Mono until there is an onNext signal when a companion sequence signals a
number of emitted elements. |
Mono<T> |
repeatWhenEmpty(int maxRepeat,
java.util.function.Function<Flux<java.lang.Long>,? extends org.reactivestreams.Publisher<?>> repeatFactory)
Repeatedly subscribe to this
Mono until there is an onNext signal when a companion sequence signals a
number of emitted elements. |
Mono<T> |
retry()
Re-subscribes to this
Mono sequence if it signals any error
either indefinitely. |
Mono<T> |
retry(long numRetries)
Re-subscribes to this
Mono sequence if it signals any error
either indefinitely or a fixed number of times. |
Mono<T> |
retry(long numRetries,
java.util.function.Predicate<java.lang.Throwable> retryMatcher)
Re-subscribes to this
Mono sequence up to the specified number of retries if it signals any
error and the given Predicate matches otherwise push the error downstream. |
Mono<T> |
retry(java.util.function.Predicate<java.lang.Throwable> retryMatcher)
Re-subscribes to this
Mono sequence if it signals any error
and the given Predicate matches otherwise push the error downstream. |
Mono<T> |
retryWhen(java.util.function.Function<Flux<java.lang.Throwable>,? extends org.reactivestreams.Publisher<?>> whenFactory)
|
MonoProcessor<T> |
subscribe()
Start the chain and request unbounded demand.
|
Cancellation |
subscribe(java.util.function.Consumer<? super T> consumer)
Subscribe a
Consumer to this Mono that will consume all the
sequence. |
Cancellation |
subscribe(java.util.function.Consumer<? super T> consumer,
java.util.function.Consumer<? super java.lang.Throwable> errorConsumer)
Subscribe
Consumer to this Mono that will consume all the
sequence. |
Cancellation |
subscribe(java.util.function.Consumer<? super T> consumer,
java.util.function.Consumer<? super java.lang.Throwable> errorConsumer,
java.lang.Runnable completeConsumer)
Subscribe
Consumer to this Mono that will consume all the
sequence. |
Mono<T> |
subscribeOn(Scheduler scheduler)
|
<E extends org.reactivestreams.Subscriber<? super T>> |
subscribeWith(E subscriber)
Subscribe the
Mono with the givne Subscriber and return it. |
Mono<java.lang.Void> |
then()
Return a
Mono<Void> which only listens for complete and error signals from this Mono completes. |
<R> Mono<R> |
then(java.util.function.Function<? super T,? extends Mono<? extends R>> transformer)
|
<V> Mono<V> |
then(Mono<V> other)
Transform the terminal signal (error or completion) into
Mono<V> that will emit at most one result in the
returned Mono . |
<V> Mono<V> |
then(java.util.function.Supplier<? extends Mono<V>> sourceSupplier)
Transform the terminal signal (error or completion) into
Mono<V> that will emit at most one result in the
returned Mono . |
<V> Flux<V> |
thenMany(org.reactivestreams.Publisher<V> other)
Transform the terminal signal (error or completion) into
Publisher<V> that will emit at most one result in the
returned Flux . |
<V> Flux<V> |
thenMany(java.util.function.Supplier<? extends Mono<V>> sourceSupplier)
Transform the terminal signal (error or completion) into
Publisher<V> that will emit at most one result in the
returned Flux . |
Mono<T> |
timeout(java.time.Duration timeout)
Signal a
TimeoutException in case an item doesn't arrive before the given period. |
Mono<T> |
timeout(java.time.Duration timeout,
Mono<? extends T> fallback)
Switch to a fallback
Mono in case an item doesn't arrive before the given period. |
<U> Mono<T> |
timeout(org.reactivestreams.Publisher<U> firstTimeout)
Signal a
TimeoutException in case the item from this Mono has
not been emitted before the given Publisher emits. |
<U> Mono<T> |
timeout(org.reactivestreams.Publisher<U> firstTimeout,
Mono<? extends T> fallback)
Switch to a fallback
Publisher in case the item from this Mono has
not been emitted before the given Publisher emits. |
Mono<T> |
timeoutMillis(long timeout)
Signal a
TimeoutException error in case an item doesn't arrive before the given period. |
Mono<T> |
timeoutMillis(long timeout,
Mono<? extends T> fallback)
Switch to a fallback
Mono in case an item doesn't arrive before the given period. |
Mono<T> |
timeoutMillis(long timeout,
Mono<? extends T> fallback,
TimedScheduler timer)
Switch to a fallback
Mono in case an item doesn't arrive before the given period. |
Mono<T> |
timeoutMillis(long timeout,
TimedScheduler timer)
Signal a
TimeoutException error in case an item doesn't arrive before the given period. |
Mono<Tuple2<java.lang.Long,T>> |
timestamp()
|
java.util.concurrent.CompletableFuture<T> |
toFuture()
Transform this
Mono into a CompletableFuture completing on onNext or onComplete and failing on
onError. |
java.lang.String |
toString() |
<V> Mono<V> |
transform(java.util.function.Function<? super Mono<T>,? extends org.reactivestreams.Publisher<V>> transformer)
|
static <T,D> Mono<T> |
using(java.util.concurrent.Callable<? extends D> resourceSupplier,
java.util.function.Function<? super D,? extends Mono<? extends T>> sourceSupplier,
java.util.function.Consumer<? super D> resourceCleanup)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a
Mono derived from the same resource and makes sure the resource is released if the
sequence terminates or
the Subscriber cancels.
|
static <T,D> Mono<T> |
using(java.util.concurrent.Callable<? extends D> resourceSupplier,
java.util.function.Function<? super D,? extends Mono<? extends T>> sourceSupplier,
java.util.function.Consumer<? super D> resourceCleanup,
boolean eager)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a
Mono derived from the same resource and makes sure the resource is released if the
sequence terminates or
the Subscriber cancels.
|
static <R> Mono<R> |
when(java.util.function.Function<? super java.lang.Object[],? extends R> combinator,
Mono<?>... monos)
Aggregate given monos into a new a Mono that will be fulfilled when all of the given Monos have been fulfilled.
|
static <R> Mono<R> |
when(java.lang.Iterable<? extends Mono<?>> monos,
java.util.function.Function<? super java.lang.Object[],? extends R> combinator)
Aggregate given monos into a new a Mono that will be fulfilled when all of the given Monos have been fulfilled.
|
static Mono<java.lang.Void> |
when(java.lang.Iterable<? extends org.reactivestreams.Publisher<java.lang.Void>> sources)
Aggregate given void publishers into a new a Mono that will be
fulfilled when all of the given Monos have been fulfilled.
|
static <T1,T2> Mono<Tuple2<T1,T2>> |
when(Mono<? extends T1> p1,
Mono<? extends T2> p2)
Merge given monos into a new a Mono that will be fulfilled when all of the given Monos
have been fulfilled.
|
static <T1,T2,T3> Mono<Tuple3<T1,T2,T3>> |
when(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3)
Merge given monos into a new a Mono that will be fulfilled when all of the given Monos
have been fulfilled.
|
static <T1,T2,T3,T4> |
when(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4)
Merge given monos into a new a Mono that will be fulfilled when all of the given Monos
have been fulfilled.
|
static <T1,T2,T3,T4,T5> |
when(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5)
Merge given monos into a new a Mono that will be fulfilled when all of the given Monos
have been fulfilled.
|
static <T1,T2,T3,T4,T5,T6> |
when(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5,
Mono<? extends T6> p6)
Merge given monos into a new a Mono that will be fulfilled when all of the given Monos
have been fulfilled.
|
static Mono<java.lang.Void> |
when(org.reactivestreams.Publisher<java.lang.Void>... sources)
Aggregate given void publisher into a new a Mono that will be fulfilled
when all of the given Monos have been fulfilled.
|
static <R> Mono<R> |
whenDelayError(java.util.function.Function<? super java.lang.Object[],? extends R> combinator,
Mono<?>... monos)
Merge given monos into a new a Mono that will be fulfilled when all of the given Monos
have been fulfilled.
|
static <T1,T2> Mono<Tuple2<T1,T2>> |
whenDelayError(Mono<? extends T1> p1,
Mono<? extends T2> p2)
Merge given monos into a new a Mono that will be fulfilled when all of the given Monos
have been fulfilled.
|
static <T1,T2,T3> Mono<Tuple3<T1,T2,T3>> |
whenDelayError(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3)
Merge given monos into a new a Mono that will be fulfilled when all of the given Mono Monos
have been fulfilled.
|
static <T1,T2,T3,T4> |
whenDelayError(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4)
Merge given monos into a new a Mono that will be fulfilled when all of the given Monos
have been fulfilled.
|
static <T1,T2,T3,T4,T5> |
whenDelayError(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5)
Merge given monos into a new a Mono that will be fulfilled when all of the given Monos
have been fulfilled.
|
static <T1,T2,T3,T4,T5,T6> |
whenDelayError(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5,
Mono<? extends T6> p6)
Merge given monos into a new a Mono that will be fulfilled when all of the given Monos
have been fulfilled.
|
static Mono<java.lang.Void> |
whenDelayError(org.reactivestreams.Publisher<java.lang.Void>... sources)
Merge given void publishers into a new a Mono that will be fulfilled
when all of the given Monos
have been fulfilled.
|
static <T,V> Mono<V> |
zip(java.util.function.Function<? super java.lang.Object[],? extends V> combinator,
java.lang.Iterable<? extends Mono<? extends T>> monos)
Aggregate given monos into a new a Mono that will be fulfilled when all of the given Monos have been fulfilled.
|
static <T,V> Mono<V> |
zip(java.util.function.Function<? super java.lang.Object[],? extends V> combinator,
Mono<? extends T>... monos)
Aggregate given monos into a new a Mono that will be fulfilled when all of the given Monos have been fulfilled.
|
public static <T> Mono<T> create(java.util.function.Consumer<MonoSink<T>> callback)
Bridging legacy API involves mostly boilerplate code due to the lack of standard types and methods. There are two kinds of API surfaces: 1) addListener/removeListener and 2) callback-handler.
1) addListener/removeListener pairs
To work with such API one has to instantiate the listener,
wire up the SingleEmitter inside it then add the listener
to the source:
Mono.<String>create(sink -> {
HttpListener listener = event -> {
if (event.getResponseCode() >= 400) {
sink.error(new RuntimeExeption("Failed"));
} else {
String body = event.getBody();
if (body.isEmpty()) {
sink.success();
} else {
sink.success(body.toLowerCase());
}
}
};
client.addListener(listener);
sink.setCancellation(() -> client.removeListener(listener));
});
Note that this works only with single-value emitting listeners. Otherwise,
all subsequent signals are dropped. You may have to add client.removeListener(this);
to the listener's body.
2) callback handler
This requires a similar instantiation pattern such as above, but usually the
successful completion and error are separated into different methods.
In addition, the legacy API may or may not support some cancellation mechanism.
Mono.<String>create(sink -> {
Callback<String> callback = new Callback<String>() {
@Override
public void onResult(String data) {
sink.success(data.toLowerCase());
}
@Override
public void onError(Exception e) {
sink.error(e);
}
}
// without cancellation support:
client.call("query", callback);
// with cancellation support:
AutoCloseable cancel = client.call("query", callback);
sink.setCancellation(() -> {
try {
cancel.close();
} catch (Exception ex) {
Exceptions.onErrorDropped(ex);
}
});
});
public static <T> Mono<T> defer(java.util.function.Supplier<? extends Mono<? extends T>> supplier)
public static Mono<java.lang.Long> delay(java.time.Duration duration)
duration
of given unit and complete on the global timer.
If the demand cannot be produced in time, an onError will be signalled instead.
duration
- the duration of the delayMono
public static Mono<java.lang.Long> delayMillis(long duration)
duration
milliseconds and complete.
If the demand cannot be produced in time, an onError will be signalled instead.
duration
- the duration in milliseconds of the delayMono
public static Mono<java.lang.Long> delayMillis(long duration, TimedScheduler timer)
duration
milliseconds and complete.
If the demand cannot be produced in time, an onError will be signalled instead.
duration
- the duration in milliseconds of the delaytimer
- the TimedScheduler
to run onMono
public static <T> Mono<T> empty()
Mono
that completes without emitting any item.
T
- the reified Subscriber
typeMono
public static <T> Mono<java.lang.Void> empty(org.reactivestreams.Publisher<T> source)
Mono
that ignores onNext (dropping them) and only react on Completion signal.
T
- the reified Publisher
typesource
- the to ignore
Mono
.public static <T> Mono<T> error(java.lang.Throwable error)
Mono
that completes with the specified error immediately after onSubscribe.
T
- the reified Subscriber
typeerror
- the onError signalMono
@SafeVarargs public static <T> Mono<T> first(Mono<? extends T>... monos)
T
- The type of the function result.monos
- The deferred monos to use.Mono
.public static <T> Mono<T> first(java.lang.Iterable<? extends Mono<? extends T>> monos)
T
- The type of the function result.monos
- The monos to use.Mono
.public static <T> Mono<T> from(org.reactivestreams.Publisher<? extends T> source)
T
- the source typesource
- the Publisher
sourceMono
public static <T> Mono<T> fromCallable(java.util.concurrent.Callable<? extends T> supplier)
T
- type of the expected valuesupplier
- Callable
that will produce the valueMono
.public static <T> Mono<T> fromFuture(java.util.concurrent.CompletableFuture<? extends T> future)
T
- type of the expected valuefuture
- CompletableFuture
that will produce the value or null to
complete immediatelyMono
.public static Mono<java.lang.Void> fromRunnable(java.lang.Runnable runnable)
Mono
only producing a completion signal after using the given
runnable.
runnable
- Runnable
that will callback the completion signalMono
.public static <T> Mono<T> fromSupplier(java.util.function.Supplier<? extends T> supplier)
T
- type of the expected valuesupplier
- Supplier
that will produce the valueMono
.public static <T> Mono<T> ignoreElements(org.reactivestreams.Publisher<T> source)
Mono
that ignores onNext (dropping them) and only react on Completion signal.
T
- the source type of the ignored datasource
- the to ignore
Mono
.public static <T> Mono<T> just(T data)
Mono
that emits the specified item.
T
- the type of the produced itemdata
- the only item to onNextMono
.public static <T> Mono<T> justOrEmpty(java.util.Optional<? extends T> data)
Mono
that emits the specified item if Optional.isPresent()
otherwise only emits
onComplete.
T
- the type of the produced itemdata
- the Optional
item to onNext or onComplete if not presentMono
.public static <T> Mono<T> justOrEmpty(T data)
Mono
that emits the specified item if non null otherwise only emits
onComplete.
T
- the type of the produced itemdata
- the item to onNext or onComplete if nullMono
.public static <T> Mono<T> never()
Mono
that will never signal any data, error or completion signal.
T
- the Subscriber
type targetMono
public static <T,D> Mono<T> using(java.util.concurrent.Callable<? extends D> resourceSupplier, java.util.function.Function<? super D,? extends Mono<? extends T>> sourceSupplier, java.util.function.Consumer<? super D> resourceCleanup, boolean eager)
T
- emitted typeD
- resource typeresourceSupplier
- a Callable
that is called on subscribesourceSupplier
- a Mono
factory derived from the supplied resourceresourceCleanup
- invoked on completioneager
- true to clean before terminating downstream subscribersMono
public static <T,D> Mono<T> using(java.util.concurrent.Callable<? extends D> resourceSupplier, java.util.function.Function<? super D,? extends Mono<? extends T>> sourceSupplier, java.util.function.Consumer<? super D> resourceCleanup)
Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal even.
public static <T1,T2> Mono<Tuple2<T1,T2>> when(Mono<? extends T1> p1, Mono<? extends T2> p2)
Flux
.
T1
- type of the value from source1T2
- type of the value from source2p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3> Mono<Tuple3<T1,T2,T3>> when(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3)
Flux
.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4> Mono<Tuple4<T1,T2,T3,T4>> when(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4)
Flux
.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4,T5> Mono<Tuple5<T1,T2,T3,T4,T5>> when(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5)
Flux
.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.p5
- The fifth upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4,T5,T6> Mono<Tuple6<T1,T2,T3,T4,T5,T6>> when(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6)
Flux
.
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.p5
- The fifth upstream Publisher
to subscribe to.p6
- The sixth upstream Publisher
to subscribe to.Mono
.public static Mono<java.lang.Void> when(java.lang.Iterable<? extends org.reactivestreams.Publisher<java.lang.Void>> sources)
sources
- The sources to use.Mono
.public static <R> Mono<R> when(java.lang.Iterable<? extends Mono<?>> monos, java.util.function.Function<? super java.lang.Object[],? extends R> combinator)
R
- the combined resultmonos
- The monos to use.combinator
- the function to transform the combined array into an arbitrary
object.Mono
.@SafeVarargs public static Mono<java.lang.Void> when(org.reactivestreams.Publisher<java.lang.Void>... sources)
Flux
.
sources
- The sources to use.Mono
.public static <R> Mono<R> when(java.util.function.Function<? super java.lang.Object[],? extends R> combinator, Mono<?>... monos)
Flux
.
R
- the combined resultmonos
- The monos to use.combinator
- the function to transform the combined array into an arbitrary
object.Mono
.public static <T1,T2> Mono<Tuple2<T1,T2>> whenDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2)
T1
- type of the value from source1T2
- type of the value from source2p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3> Mono<Tuple3<T1,T2,T3>> whenDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3)
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4> Mono<Tuple4<T1,T2,T3,T4>> whenDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4)
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4,T5> Mono<Tuple5<T1,T2,T3,T4,T5>> whenDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5)
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.p5
- The fifth upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4,T5,T6> Mono<Tuple6<T1,T2,T3,T4,T5,T6>> whenDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6)
T1
- type of the value from source1T2
- type of the value from source2T3
- type of the value from source3T4
- type of the value from source4T5
- type of the value from source5T6
- type of the value from source6p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.p5
- The fifth upstream Publisher
to subscribe to.p6
- The sixth upstream Publisher
to subscribe to.Mono
.@SafeVarargs public static Mono<java.lang.Void> whenDelayError(org.reactivestreams.Publisher<java.lang.Void>... sources)
sources
- The sources to use.Mono
.public static <R> Mono<R> whenDelayError(java.util.function.Function<? super java.lang.Object[],? extends R> combinator, Mono<?>... monos)
R
- the combined resultmonos
- The monos to use.combinator
- the function to transform the combined array into an arbitrary
object.Mono
.@SafeVarargs public static <T,V> Mono<V> zip(java.util.function.Function<? super java.lang.Object[],? extends V> combinator, Mono<? extends T>... monos)
T
- The super incoming typeV
- The type of the function result.combinator
- the combinator Function
monos
- The monos to use.Mono
.public static <T,V> Mono<V> zip(java.util.function.Function<? super java.lang.Object[],? extends V> combinator, java.lang.Iterable<? extends Mono<? extends T>> monos)
T
- The type of the function result.V
- The result typecombinator
- the combinator Function
monos
- The monos to use.Mono
.public final <P> P as(java.util.function.Function<? super Mono<T>,P> transformer)
Mono
into a target type.
mono.as(Flux::from).subscribe()
P
- the returned instance typetransformer
- the Function
applying this Mono
Mono
to instance Pfor a bounded conversion to {@link Publisher}
public final <T2> Mono<Tuple2<T,T2>> and(Mono<? extends T2> other)
Tuple2
.
T2
- the element type of the other Mono instanceother
- the Mono
to combine withwhen(reactor.core.publisher.Mono<? extends T1>, reactor.core.publisher.Mono<? extends T2>)
public final Mono<T> awaitOnSubscribe()
This helps with child Subscribers that don't expect a recursive call from onSubscribe into their onNext because, for example, they request immediately from their onSubscribe but don't finish their preparation before that and onNext runs into a half-prepared state. This can happen with non Reactor based Subscribers.
Mono
public T block()
public final T block(java.time.Duration timeout)
RuntimeException
will be thrown.
Note that each block() will subscribe a new single (MonoSink) subscriber, in other words, the result might
miss signal from hot publishers.
timeout
- maximum time period to wait for before raising a RuntimeException
public T blockMillis(long timeout)
RuntimeException
will be thrown.
Note that each block() will subscribe a new single (MonoSink) subscriber, in other words, the result might
miss signal from hot publishers.
timeout
- maximum time period to wait for in milliseconds before raising a RuntimeException
public final <E> Mono<E> cast(java.lang.Class<E> stream)
Mono
produced type into a target produced type.
public final Mono<T> cache()
Mono
into a hot source and cache last emitted signals for further Subscriber
.
Completion and Error will also be replayed.
Mono
public final <V> Mono<V> compose(java.util.function.Function<? super Mono<T>,? extends org.reactivestreams.Publisher<V>> transformer)
Mono
in order to generate a
target Mono
type. A transformation will occur for each
Subscriber
.
flux.compose(Mono::from).subscribe()
V
- the item type in the returned Publisher
transformer
- the Function
to immediately map this Mono
into a target Mono
instance.Mono
for a loose conversion to an arbitrary type
public final Mono<T> defaultIfEmpty(T defaultV)
defaultV
- the alternate value if this sequence is emptyMono
Flux.defaultIfEmpty(Object)
public final <U> Mono<T> delaySubscription(org.reactivestreams.Publisher<U> subscriptionDelay)
U
- the other source typesubscriptionDelay
- a
Publisher
to signal by next or complete this Publisher.subscribe(Subscriber)
Mono
public final Mono<T> delaySubscriptionMillis(long delay, TimedScheduler timer)
delay
- period in milliseconds before subscribing this Mono
timer
- the TimedScheduler
to run onMono
public final <X> Mono<X> dematerialize()
Mono
is a emits onNext, onError or onComplete Signal
. The relative Subscriber
callback will be invoked, error Signal
will trigger onError and complete Signal
will trigger
onComplete.
X
- the dematerialized typeMono
public final Mono<T> doAfterTerminate(java.util.function.BiConsumer<? super T,java.lang.Throwable> afterTerminate)
Mono
terminates, either by completing downstream successfully or with an error.
The arguments will be null depending on success, success with data and error:
afterTerminate
- the callback to call after Subscriber.onNext(T)
, Subscriber.onComplete()
without preceding Subscriber.onNext(T)
or Subscriber.onError(java.lang.Throwable)
Mono
public final Mono<T> doOnCancel(java.lang.Runnable onCancel)
Mono
is cancelled.
onCancel
- the callback to call on Subscription.cancel()
Mono
public final Mono<T> doOnNext(java.util.function.Consumer<? super T> onNext)
Mono
emits a data successfully.
onNext
- the callback to call on Subscriber.onNext(T)
Mono
public final Mono<T> doOnSuccess(java.util.function.Consumer<? super T> onSuccess)
Mono
completes successfully.
public final Mono<T> doOnError(java.util.function.Consumer<? super java.lang.Throwable> onError)
Mono
completes with an error.
onError
- the error callback to call on Subscriber.onError(Throwable)
Mono
public final <E extends java.lang.Throwable> Mono<T> doOnError(java.lang.Class<E> exceptionType, java.util.function.Consumer<? super E> onError)
Mono
completes with an error matching the given exception type.
E
- type of the error to handleexceptionType
- the type of exceptions to handleonError
- the error handler for each errorMono
public final Mono<T> doOnError(java.util.function.Predicate<? super java.lang.Throwable> predicate, java.util.function.Consumer<? super java.lang.Throwable> onError)
Mono
completes with an error matching the given exception.
predicate
- the matcher for exceptions to handleonError
- the error handler for each errorMono
public final Mono<T> doOnRequest(java.util.function.LongConsumer consumer)
consumer
- the consumer to invoke on each requestMono
public final Mono<T> doOnSubscribe(java.util.function.Consumer<? super org.reactivestreams.Subscription> onSubscribe)
Mono
is subscribed.
onSubscribe
- the callback to call on Subscriber.onSubscribe(Subscription)
Mono
public final Mono<T> doOnTerminate(java.util.function.BiConsumer<? super T,java.lang.Throwable> onTerminate)
Mono
terminates, either by completing successfully or with an error.
onTerminate
- the callback to call Subscriber.onNext(T)
, Subscriber.onComplete()
without preceding Subscriber.onNext(T)
or Subscriber.onError(java.lang.Throwable)
Mono
public final Mono<Tuple2<java.lang.Long,T>> elapsed()
Mono
sequence into Tuple2
of T1 Long
timemillis and T2
T
associated data. The timemillis corresponds to the elapsed time between the subscribe and the first
next signal.
Mono
that emits a tuple of time elapsed in milliseconds and matching datapublic final Mono<T> filter(java.util.function.Predicate<? super T> tester)
Mono
and replay it if predicate returns true.
Otherwise complete without value.
tester
- the predicate to evaluateMono
public final <R> Flux<R> flatMap(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)
Publisher
into Publishers, then flatten the emissions from those by
merging them into a single Flux
, so that they may interleave.
R
- the merged sequence typemapper
- the
Function
to produce a sequence of R from the the eventual passed Subscriber.onNext(T)
Flux
as the sequence is not guaranteed to be single at mostpublic final <R> Flux<R> flatMap(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapperOnNext, java.util.function.Function<java.lang.Throwable,? extends org.reactivestreams.Publisher<? extends R>> mapperOnError, java.util.function.Supplier<? extends org.reactivestreams.Publisher<? extends R>> mapperOnComplete)
Flux
into Publishers, then flatten the emissions from those by
merging them into a single Flux
, so that they may interleave.
R
- the type of the produced merged sequencemapperOnNext
- the Function
to call on next data and returning a sequence to mergemapperOnError
- the Function
to call on error signal and returning a sequence to mergemapperOnComplete
- the Function
to call on complete signal and returning a sequence to mergeFlux
as the sequence is not guaranteed to be single at mostFlux.flatMap(Function, Function, Supplier)
public final <R> Flux<R> flatMapIterable(java.util.function.Function<? super T,? extends java.lang.Iterable<? extends R>> mapper)
Mono
into Iterable
, then flatten the elements from those by
merging them into a single Flux
. The prefetch argument allows to give an
arbitrary prefetch size to the merged Iterable
.
R
- the merged output sequence typemapper
- the Function
to transform input item into a sequence Iterable
Flux
public final Mono<java.lang.Boolean> hasElement()
Mono
has an element.
Mono
with true
if a value is emitted and false
otherwisepublic final <R> Mono<R> handle(java.util.function.BiConsumer<? super T,SynchronousSink<R>> handler)
Mono
by calling a biconsumer with the
output sink for each onNext. At most one SynchronousSink.next(Object)
call must be performed and/or 0 or 1 SynchronousSink.error(Throwable)
or
SynchronousSink.complete()
.R
- the transformed typehandler
- the handling BiConsumer
Mono
public final Mono<T> hide()
Mono
instance.
The main purpose of this operator is to prevent certain identity-based optimizations from happening, mostly for diagnostic purposes.
Mono
instancepublic final Mono<T> ignoreElement()
Mono
.public final Mono<T> log()
Logger
support to handle trace implementation. Default will
use Level.INFO
and java.util.logging. If SLF4J is available, it will be used instead.
The default log category will be "Mono".
Mono
Flux.log()
public final Mono<T> log(java.lang.String category)
Logger
support to handle trace implementation. Default will
use Level.INFO
and java.util.logging. If SLF4J is available, it will be used instead.
category
- to be mapped into logger configuration (e.g. org.springframework.reactor).Mono
public final Mono<T> log(java.lang.String category, java.util.logging.Level level, SignalType... options)
options
and use Logger
support to
handle trace
implementation. Default will
use the passed Level
and java.util.logging. If SLF4J is available, it will be used instead.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
mono.log("category", SignalType.ON_NEXT, SignalType.ON_ERROR)
category
- to be mapped into logger configuration (e.g. org.springframework.reactor).level
- the level to enforce for this tracing Fluxoptions
- a vararg SignalType
option to filter log messagesMono
public final <R> Mono<R> map(java.util.function.Function<? super T,? extends R> mapper)
Mono
by applying a function to item emitted.
R
- the transformed typemapper
- the transforming functionMono
public final Mono<T> mapError(java.util.function.Function<java.lang.Throwable,? extends java.lang.Throwable> mapper)
Flux
by applying a function.
mapper
- the error transforming Function
Flux
public final <E extends java.lang.Throwable> Mono<T> mapError(java.lang.Class<E> type, java.util.function.Function<? super E,? extends java.lang.Throwable> mapper)
Mono
by applying a function if the
error matches the given type, otherwise let the error flows.
E
- the error typetype
- the type to matchmapper
- the error transforming Function
Mono
public final Mono<T> mapError(java.util.function.Predicate<? super java.lang.Throwable> predicate, java.util.function.Function<? super java.lang.Throwable,? extends java.lang.Throwable> mapper)
Mono
by applying a function if the
error matches the given predicate, otherwise let the error flows.
predicate
- the error predicatemapper
- the error transforming Function
Mono
public final Mono<Signal<T>> materialize()
Signal
.
Since the error is materialized as a Signal
, the propagation will be stopped and onComplete will be
emitted. Complete signal will first emit a Signal.complete()
and then effectively complete the flux.
public final Flux<T> mergeWith(org.reactivestreams.Publisher<? extends T> other)
other
- the other Publisher
to merge withFlux
as the sequence is not guaranteed to be at most 1public final Mono<T> or(Mono<? extends T> other)
other
- the racing other Mono
to compete with for the resultMono
first(reactor.core.publisher.Mono<? extends T>...)
public final Mono<T> otherwise(java.util.function.Function<? super java.lang.Throwable,? extends Mono<? extends T>> fallback)
fallback
- the function to map an alternative Mono
Mono
on source onErrorFlux.onErrorResumeWith(java.util.function.Function<? super java.lang.Throwable, ? extends org.reactivestreams.Publisher<? extends T>>)
public final <E extends java.lang.Throwable> Mono<T> otherwise(java.lang.Class<E> type, java.util.function.Function<? super E,? extends Mono<? extends T>> fallback)
public final Mono<T> otherwise(java.util.function.Predicate<? super java.lang.Throwable> predicate, java.util.function.Function<? super java.lang.Throwable,? extends Mono<? extends T>> fallback)
public final Mono<T> otherwiseIfEmpty(Mono<? extends T> alternate)
Mono
if this mono is completed without data
alternate
- the alternate mono if this mono is emptyMono
on source onComplete without elementsFlux.switchIfEmpty(org.reactivestreams.Publisher<? extends T>)
public final Mono<T> otherwiseReturn(T fallback)
fallback
- the value to emit if an error occursMono
Flux.onErrorReturn(T)
public final <E extends java.lang.Throwable> Mono<T> otherwiseReturn(java.lang.Class<E> type, T fallbackValue)
Flux
E
- the error typetype
- the error type to matchfallbackValue
- alternate value on fallbackFlux
public final <E extends java.lang.Throwable> Mono<T> otherwiseReturn(java.util.function.Predicate<? super java.lang.Throwable> predicate, T fallbackValue)
Flux
E
- the error typepredicate
- the error predicate to matchfallbackValue
- alternate value on fallbackMono
public final Mono<T> onTerminateDetach()
Subscriber
and the Subscription
on
termination or cancellation.
This should help with odd retention scenarios when running
with non-reactor Subscriber
.
Mono
public final <R> Mono<R> publish(java.util.function.Function<? super Mono<T>,? extends Mono<? extends R>> transform)
Mono
for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream.R
- the output value typetransform
- Mono
public final Mono<T> publishOn(Scheduler scheduler)
Scheduler
Typically used for fast publisher, slow consumer(s) scenarios.
mono.publishOn(Schedulers.single()).subscribe()
scheduler
- a checked Scheduler.Worker
factoryMono
public final Flux<T> repeat()
Flux
on onCompletepublic final Flux<T> repeat(java.util.function.BooleanSupplier predicate)
predicate
- the boolean to evaluate on onComplete.Flux
on onCompletepublic final Flux<T> repeat(long numRepeat)
numRepeat
- the number of times to re-subscribe on onCompleteFlux
on onComplete up to number of repeat specifiedpublic final Flux<T> repeat(long numRepeat, java.util.function.BooleanSupplier predicate)
numRepeat
- the number of times to re-subscribe on completepredicate
- the boolean to evaluate on onCompleteFlux
on onComplete up to number of repeat specified OR matching
predicatepublic final Flux<T> repeatWhen(java.util.function.Function<Flux<java.lang.Long>,? extends org.reactivestreams.Publisher<?>> whenFactory)
Flux
when a companion sequence signals a number of emitted elements in
response to the flux completion signal.
If the companion sequence signals when this Flux
is active, the repeat
attempt is suppressed and any terminal signal will terminate this Flux
with the same signal immediately.
public final Mono<T> repeatWhenEmpty(java.util.function.Function<Flux<java.lang.Long>,? extends org.reactivestreams.Publisher<?>> repeatFactory)
Mono
until there is an onNext signal when a companion sequence signals a
number of emitted elements.
If the companion sequence signals when this Mono
is active, the repeat
attempt is suppressed and any terminal signal will terminate this Flux
with the same signal immediately.
public final Mono<T> repeatWhenEmpty(int maxRepeat, java.util.function.Function<Flux<java.lang.Long>,? extends org.reactivestreams.Publisher<?>> repeatFactory)
Mono
until there is an onNext signal when a companion sequence signals a
number of emitted elements.
If the companion sequence signals when this Mono
is active, the repeat
attempt is suppressed and any terminal signal will terminate this Flux
with the same signal immediately.
Emits an IllegalStateException
if the max repeat is exceeded and different from Integer.MAX_VALUE
.
maxRepeat
- the maximum repeat number of time (infinite if Integer.MAX_VALUE
)repeatFactory
- the
Function
providing a Flux
signalling the current repeat index from 0 on onComplete and returning a Publisher
companion.Mono
on onComplete when the companion Publisher
produces an
onNext signalpublic final Mono<T> retry()
Mono
sequence if it signals any error
either indefinitely.
The times == Long.MAX_VALUE is treated as infinite retry.
Mono
on onErrorpublic final Mono<T> retry(long numRetries)
Mono
sequence if it signals any error
either indefinitely or a fixed number of times.
The times == Long.MAX_VALUE is treated as infinite retry.
numRetries
- the number of times to tolerate an errorMono
on onError up to the specified number of retries.public final Mono<T> retry(java.util.function.Predicate<java.lang.Throwable> retryMatcher)
Mono
sequence if it signals any error
and the given Predicate
matches otherwise push the error downstream.
retryMatcher
- the predicate to evaluate if retry should occur based on a given error signalMono
on onError if the predicates matches.public final Mono<T> retry(long numRetries, java.util.function.Predicate<java.lang.Throwable> retryMatcher)
Mono
sequence up to the specified number of retries if it signals any
error and the given Predicate
matches otherwise push the error downstream.
numRetries
- the number of times to tolerate an errorretryMatcher
- the predicate to evaluate if retry should occur based on a given error signalMono
on onError up to the specified number of retries and if the predicate
matches.public final Mono<T> retryWhen(java.util.function.Function<Flux<java.lang.Throwable>,? extends org.reactivestreams.Publisher<?>> whenFactory)
public final MonoProcessor<T> subscribe()
Runnable
task to execute to dispose and cancel the underlying Subscription
public final Cancellation subscribe(java.util.function.Consumer<? super T> consumer)
Consumer
to this Mono
that will consume all the
sequence.
For a passive version that observe and forward incoming data see doOnSuccess(Consumer)
and
doOnError(java.util.function.Consumer)
.
consumer
- the consumer to invoke on each valueRunnable
to dispose the Subscription
public final Cancellation subscribe(java.util.function.Consumer<? super T> consumer, java.util.function.Consumer<? super java.lang.Throwable> errorConsumer)
Consumer
to this Mono
that will consume all the
sequence.
For a passive version that observe and forward incoming data see doOnSuccess(Consumer)
and
doOnError(java.util.function.Consumer)
.
consumer
- the consumer to invoke on each next signalerrorConsumer
- the consumer to invoke on error signalRunnable
to dispose the Subscription
public final Cancellation subscribe(java.util.function.Consumer<? super T> consumer, java.util.function.Consumer<? super java.lang.Throwable> errorConsumer, java.lang.Runnable completeConsumer)
Consumer
to this Mono
that will consume all the
sequence.
For a passive version that observe and forward incoming data see doOnSuccess(Consumer)
and
doOnError(java.util.function.Consumer)
.
consumer
- the consumer to invoke on each valueerrorConsumer
- the consumer to invoke on error signalcompleteConsumer
- the consumer to invoke on complete signalCancellation
to dispose the Subscription
public final Mono<T> subscribeOn(Scheduler scheduler)
Mono
on a given worker assigned by the supplied Scheduler
.
mono.subscribeOn(Schedulers.parallel()).subscribe())
scheduler
- a checked Scheduler.Worker
factoryMono
public final <E extends org.reactivestreams.Subscriber<? super T>> E subscribeWith(E subscriber)
Mono
with the givne Subscriber
and return it.E
- the reified type of the Subscriber
for chainingsubscriber
- the Subscriber
to subscribeSubscriber
after subscribing it to this Mono
public final Mono<java.lang.Void> then()
Mono
igoring its payload (actively dropping)public final <R> Mono<R> then(java.util.function.Function<? super T,? extends Mono<? extends R>> transformer)
public final <V> Mono<V> then(Mono<V> other)
Mono<V>
that will emit at most one result in the
returned Mono
.
public final <V> Mono<V> then(java.util.function.Supplier<? extends Mono<V>> sourceSupplier)
Mono<V>
that will emit at most one result in the
returned Mono
.
public final <V> Flux<V> thenMany(org.reactivestreams.Publisher<V> other)
Publisher<V>
that will emit at most one result in the
returned Flux
.
V
- the element type of the supplied Monoother
- a Publisher
to emit from after terminationFlux
that emits from the supplied Publisher
public final <V> Flux<V> thenMany(java.util.function.Supplier<? extends Mono<V>> sourceSupplier)
Publisher<V>
that will emit at most one result in the
returned Flux
.
V
- the element type of the supplied MonosourceSupplier
- a Supplier
of Publisher
to emit from after
terminationFlux
that emits from the supplied Publisher
public final Mono<T> timeout(java.time.Duration timeout)
TimeoutException
in case an item doesn't arrive before the given period.
public final Mono<T> timeout(java.time.Duration timeout, Mono<? extends T> fallback)
Mono
in case an item doesn't arrive before the given period.
If the given Publisher
is null, signal a TimeoutException
.
public final <U> Mono<T> timeout(org.reactivestreams.Publisher<U> firstTimeout)
TimeoutException
in case the item from this Mono
has
not been emitted before the given Publisher
emits.
public final <U> Mono<T> timeout(org.reactivestreams.Publisher<U> firstTimeout, Mono<? extends T> fallback)
Publisher
in case the item from this Mono
has
not been emitted before the given Publisher
emits. The following items will be individually timed via
the factory provided Publisher
.
U
- the element type of the timeout PublisherfirstTimeout
- the timeout
Publisher
that must not emit before the first signal from this Mono
fallback
- the fallback Publisher
to subscribe when a timeout occursMono
with a fallback Publisher
public final Mono<T> timeoutMillis(long timeout)
TimeoutException
error in case an item doesn't arrive before the given period.
public final Mono<T> timeoutMillis(long timeout, TimedScheduler timer)
TimeoutException
error in case an item doesn't arrive before the given period.
timeout
- the timeout before the onNext signal from this Mono
timer
- the TimedScheduler
to run onMono
public final Mono<T> timeoutMillis(long timeout, Mono<? extends T> fallback)
Mono
in case an item doesn't arrive before the given period.
If the given Publisher
is null, signal a TimeoutException
.
public final Mono<T> timeoutMillis(long timeout, Mono<? extends T> fallback, TimedScheduler timer)
Mono
in case an item doesn't arrive before the given period.
If the given Publisher
is null, signal a TimeoutException
.
timeout
- the timeout before the onNext signal from this Mono
in millisecondsfallback
- the fallback Mono
to subscribe when a timeout occurstimer
- the TimedScheduler
to run onMono
with a fallback Mono
public final Mono<Tuple2<java.lang.Long,T>> timestamp()
Tuple2
pair of T1 Long
current system time in
millis and T2 T
associated data for the eventual item from this Mono
Mono
public final java.util.concurrent.CompletableFuture<T> toFuture()
Mono
into a CompletableFuture
completing on onNext or onComplete and failing on
onError.
CompletableFuture
public java.lang.String toString()
toString
in class java.lang.Object
public final <V> Mono<V> transform(java.util.function.Function<? super Mono<T>,? extends org.reactivestreams.Publisher<V>> transformer)
Mono
in order to generate a target Mono
. Unlike compose(Function)
, the
provided function is executed as part of assembly.
Function<Mono, Mono> applySchedulers = mono -> mono.subscribeOn(Schedulers.io()).publishOn(Schedulers.parallel());
mono.transform(applySchedulers).map(v -> v * v).subscribe(Subscribers.unbounded())
V
- the item type in the returned Mono
transformer
- the Function
to immediately map this Mono
into a target Mono
instance.Mono
for deferred composition of {@link Mono} for each {@link Subscriber}
,
for a loose conversion to an arbitrary type