Handling Errors
For a quick look at the available operators for error handling, see the relevant operator decision tree. |
In Reactive Streams, errors are terminal events. As soon as an error occurs, it stops the
sequence and gets propagated down the chain of operators to the last step, the
Subscriber
you defined and its onError
method.
Such errors should still be dealt with at the application level. For instance, you might
display an error notification in a UI or send a meaningful error payload in a REST
endpoint. For this reason, the subscriber’s onError
method should always be defined.
If not defined, onError throws an UnsupportedOperationException . You can
further detect and triage it with the Exceptions.isErrorCallbackNotImplemented method.
|
Reactor also offers alternative means of dealing with errors in the middle of the chain, as error-handling operators. The following example shows how to do so:
Flux.just(1, 2, 0)
.map(i -> "100 / " + i + " = " + (100 / i)) //this triggers an error with 0
.onErrorReturn("Divided by zero :("); // error handling example
Before you learn about error-handling operators, you must keep in mind that
any error in a reactive sequence is a terminal event. Even if an error-handling
operator is used, it does not let the original sequence continue. Rather, it
converts the onError signal into the start of a new sequence (the fallback one). In
other words, it replaces the terminated sequence upstream of it.
|
Now we can consider each means of error handling one-by-one. When relevant, we make a
parallel with imperative programming’s try
patterns.
1. Error Handling Operators
You may be familiar with several ways of dealing with exceptions in a try-catch block. Most notably, these include the following:
-
Catch and return a static default value.
-
Catch and execute an alternative path with a fallback method.
-
Catch and dynamically compute a fallback value.
-
Catch, wrap to a
BusinessException
, and re-throw. -
Catch, log an error-specific message, and re-throw.
-
Use the
finally
block to clean up resources or a Java 7 “try-with-resource” construct.
All of these have equivalents in Reactor, in the form of error-handling operators. Before looking into these operators, we first want to establish a parallel between a reactive chain and a try-catch block.
When subscribing, the onError
callback at the end of the chain is akin to a catch
block. There, execution skips to the catch in case an Exception
is thrown, as the
following example shows:
Flux<String> s = Flux.range(1, 10)
.map(v -> doSomethingDangerous(v)) (1)
.map(v -> doSecondTransform(v)); (2)
s.subscribe(value -> System.out.println("RECEIVED " + value), (3)
error -> System.err.println("CAUGHT " + error) (4)
);
1 | A transformation that can throw an exception is performed. |
2 | If everything went well, a second transformation is performed. |
3 | Each successfully transformed value is printed out. |
4 | In case of an error, the sequence terminates and an error message is displayed. |
The preceding example is conceptually similar to the following try-catch block:
try {
for (int i = 1; i < 11; i++) {
String v1 = doSomethingDangerous(i); (1)
String v2 = doSecondTransform(v1); (2)
System.out.println("RECEIVED " + v2);
}
} catch (Throwable t) {
System.err.println("CAUGHT " + t); (3)
}
1 | If an exception is thrown here… |
2 | …the rest of the loop is skipped… |
3 | … and the execution goes straight to here. |
Now that we have established a parallel, we can look at the different error handling cases and their equivalent operators.
1.1. Static Fallback Value
The equivalent of “Catch and return a static default value” is onErrorReturn
.
The following example shows how to use it:
try {
return doSomethingDangerous(10);
}
catch (Throwable error) {
return "RECOVERED";
}
The following example shows the Reactor equivalent:
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn("RECOVERED");
You also have the option of applying a Predicate
on the exception to decide
whether or not to recover, as the following example shows:
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10"); (1)
1 | Recover only if the message of the exception is "boom10" |
1.2. Catch and swallow the error
If you don’t even want to replace the exception with a fallback value, but instead to ignore it and
only propagate elements that have been produced so far, what you want is essentially replacing
the onError
signal with an onComplete
signal. This can be done by the onErrorComplete
operator:
Flux.just(10,20,30)
.map(this::doSomethingDangerousOn30)
.onErrorComplete(); (1)
1 | Recover by turning the onError into an onComplete |
Like onErrorReturn
, onErrorComplete
has variants that let you filter which exceptions
to fall back on, based either on the exception’s class or on a Predicate
.
1.3. Fallback Method
If you want more than a single default value and you have an alternative (safer) way of
processing your data, you can use onErrorResume
. This would be the equivalent of
“Catch and execute an alternative path with a fallback method”.
For example, if your nominal process is fetching data from an external and unreliable service but you also keep a local cache of the same data that can be a bit more out of date but is more reliable, you could do the following:
String v1;
try {
v1 = callExternalService("key1");
}
catch (Throwable error) {
v1 = getFromCache("key1");
}
String v2;
try {
v2 = callExternalService("key2");
}
catch (Throwable error) {
v2 = getFromCache("key2");
}
The following example shows the Reactor equivalent:
Flux.just("key1", "key2")
.flatMap(k -> callExternalService(k) (1)
.onErrorResume(e -> getFromCache(k)) (2)
);
1 | For each key, asynchronously call the external service. |
2 | If the external service call fails, fall back to the cache for that key. Note that
we always apply the same fallback, whatever the source error, e , is. |
Like onErrorReturn
, onErrorResume
has variants that let you filter which exceptions
to fall back on, based either on the exception’s class or on a Predicate
. The fact that it
takes a Function
also lets you choose a different fallback sequence to switch to,
depending on the error encountered. The following example shows how to do so:
Flux.just("timeout1", "unknown", "key2")
.flatMap(k -> callExternalService(k)
.onErrorResume(error -> { (1)
if (error instanceof TimeoutException) (2)
return getFromCache(k);
else if (error instanceof UnknownKeyException) (3)
return registerNewEntry(k, "DEFAULT");
else
return Flux.error(error); (4)
})
);
1 | The function allows dynamically choosing how to continue. |
2 | If the source times out, hit the local cache. |
3 | If the source says the key is unknown, create a new entry. |
4 | In all other cases, “re-throw”. |
1.4. Dynamic Fallback Value
Even if you do not have an alternative (safer) way of processing your data, you might want to compute a fallback value out of the exception you received. This would be the equivalent of “Catch and dynamically compute a fallback value”.
For instance, if your return type (MyWrapper
) has a variant dedicated to holding an exception (think
Future.complete(T success)
versus Future.completeExceptionally(Throwable error)
), you
could instantiate the error-holding variant and pass the exception.
An imperative example would look like the following:
try {
Value v = erroringMethod();
return MyWrapper.fromValue(v);
}
catch (Throwable error) {
return MyWrapper.fromError(error);
}
You can do this reactively in the same way as the fallback method solution,
by using onErrorResume
, with a tiny bit of boilerplate, as follows:
erroringFlux.onErrorResume(error -> Mono.just( (1)
MyWrapper.fromError(error) (2)
));
1 | Since you expect a MyWrapper representation of the error, you need to get a
Mono<MyWrapper> for onErrorResume . We use Mono.just() for that. |
2 | We need to compute the value out of the exception. Here, we achieved that
by wrapping the exception with a relevant MyWrapper factory method. |
1.5. Catch and Rethrow
"Catch, wrap to a BusinessException
, and re-throw" looks like the following in the
imperative world:
try {
return callExternalService(k);
}
catch (Throwable error) {
throw new BusinessException("oops, SLA exceeded", error);
}
In the “fallback method” example, the last line inside the flatMap
gives us a hint
at achieving the same reactively, as follows:
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorResume(original -> Flux.error(
new BusinessException("oops, SLA exceeded", original))
);
However, there is a more straightforward way of achieving the same effect with onErrorMap
:
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
1.6. Log or React on the Side
For cases where you want the error to continue propagating but still want to react to
it without modifying the sequence (logging it, for instance), you can use the doOnError
operator. This is the equivalent of “Catch, log an error-specific message, and re-throw”
pattern, as the following example shows:
try {
return callExternalService(k);
}
catch (RuntimeException error) {
//make a record of the error
log("uh oh, falling back, service failed for key " + k);
throw error;
}
The doOnError
operator, as well as all operators prefixed with doOn
, are sometimes
referred to as having a “side-effect”. They let you peek inside the sequence’s events without
modifying them.
Like the imperative example shown earlier, the following example still propagates the error yet ensures that we at least log that the external service had a failure:
LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
.flatMap(k -> callExternalService(k) (1)
.doOnError(e -> {
failureStat.increment();
log("uh oh, falling back, service failed for key " + k); (2)
})
(3)
);
1 | The external service call that can fail… |
2 | …is decorated with a logging and stats side-effect… |
3 | …after which, it still terminates with an error, unless we use an error-recovery operator here. |
We can also imagine we have statistic counters to increment as a second error side-effect.
1.7. Using Resources and the Finally Block
The last parallel to draw with imperative programming is the cleaning up that can be done
either by using a “Use of the finally
block to clean up resources” or by using a
“Java 7 try-with-resource construct”, both shown below:
Stats stats = new Stats();
stats.startTimer();
try {
doSomethingDangerous();
}
finally {
stats.stopTimerAndRecordTiming();
}
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
return disposableInstance.toString();
}
Both have their Reactor equivalents: doFinally
and using
.
doFinally
is about side-effects that you want to be executed whenever the
sequence terminates (with onComplete
or onError
) or is cancelled.
It gives you a hint as to what kind of termination triggered the side-effect.
The following example shows how to use doFinally
:
doFinally()
Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();
Flux<String> flux =
Flux.just("foo", "bar")
.doOnSubscribe(s -> stats.startTimer())
.doFinally(type -> { (1)
stats.stopTimerAndRecordTiming();(2)
if (type == SignalType.CANCEL) (3)
statsCancel.increment();
})
.take(1); (4)
1 | doFinally consumes a SignalType for the type of termination. |
2 | Similarly to finally blocks, we always record the timing. |
3 | Here we also increment statistics in case of cancellation only. |
4 | take(1) requests exactly 1 from upstream, and cancels after one item is emitted. |
On the other hand, using
handles the case where a Flux
is derived from a
resource and that resource must be acted upon whenever processing is done.
In the following example, we replace the AutoCloseable
interface of “try-with-resource” with a
Disposable
:
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
@Override
public void dispose() {
isDisposed.set(true); (4)
}
@Override
public String toString() {
return "DISPOSABLE";
}
};
Now we can do the reactive equivalent of “try-with-resource” on it, which looks like the following:
using()
Flux<String> flux =
Flux.using(
() -> disposableInstance, (1)
disposable -> Flux.just(disposable.toString()), (2)
Disposable::dispose (3)
);
1 | The first lambda generates the resource. Here, we return our mock Disposable . |
2 | The second lambda processes the resource, returning a Flux<T> . |
3 | The third lambda is called when the Flux from <2> terminates or is cancelled, to
clean up resources. |
4 | After subscription and execution of the sequence, the isDisposed atomic boolean
becomes true . |
1.8. Demonstrating the Terminal Aspect of onError
In order to demonstrate that all these operators cause the upstream original sequence to
terminate when an error happens, we can use a more visual example with a
Flux.interval
. The interval
operator ticks every x units of time with an increasing
Long
value. The following example uses an interval
operator:
Flux<String> flux =
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.onErrorReturn("Uh oh");
flux.subscribe(System.out::println);
Thread.sleep(2100); (1)
1 | Note that interval executes on a timer Scheduler by default. If we want
to run that example in a main class, we would need to add a sleep call here so that the
application does not exit immediately without any value being produced. |
The preceding example prints out one line every 250ms, as follows:
tick 0
tick 1
tick 2
Uh oh
Even with one extra second of runtime, no more tick comes in from the interval
. The
sequence was indeed terminated by the error.
1.9. Retrying
There is another operator of interest with regards to error handling, and you might be
tempted to use it in the case described in the previous section. retry
, as its name
indicates, lets you retry an error-producing sequence.
The thing to keep in mind is that it works by re-subscribing to the upstream Flux
.
This is really a different sequence, and the original one is still terminated.
To verify that, we can re-use the previous example and append a retry(1)
to
retry once instead of using onErrorReturn
. The following example shows how to do so:
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.retry(1)
.elapsed() (1)
.subscribe(System.out::println, System.err::println); (2)
Thread.sleep(2100); (3)
1 | elapsed associates each value with the duration since previous value was emitted. |
2 | We also want to see when there is an onError . |
3 | Ensure we have enough time for our 4x2 ticks. |
The preceding example produces the following output:
259,tick 0
249,tick 1
251,tick 2
506,tick 0 (1)
248,tick 1
253,tick 2
java.lang.RuntimeException: boom
1 | A new interval started, from tick 0. The additional 250ms duration is
coming from the 4th tick, the one that causes the exception and subsequent
retry. |
As you can see from the preceding example, retry(1)
merely re-subscribed to the original interval
once, restarting the tick from 0. The second time around, since the exception
still occurs, it gives up and propagates the error downstream.
There is a more advanced version of retry
(called retryWhen
) that uses a “companion”
Flux
to tell whether or not a particular failure should retry. This companion Flux
is
created by the operator but decorated by the user, in order to customize the retry
condition.
The companion Flux
is a Flux<RetrySignal>
that gets passed to a Retry
strategy/function,
supplied as the sole parameter of retryWhen
. As the user, you define that function and make it return a new
Publisher<?>
. The Retry
class is an abstract class, but it offers a factory method if you
want to transform the companion with a simple lambda (Retry.from(Function)
).
Retry cycles go as follows:
-
Each time an error happens (giving potential for a retry), a
RetrySignal
is emitted into the companionFlux
, which has been decorated by your function. Having aFlux
here gives a bird eye’s view of all the attempts so far. TheRetrySignal
gives access to the error as well as metadata around it. -
If the companion
Flux
emits a value, a retry happens. -
If the companion
Flux
completes, the error is swallowed, the retry cycle stops, and the resulting sequence completes, too. -
If the companion
Flux
produces an error (e
), the retry cycle stops and the resulting sequence errors withe
.
The distinction between the previous two cases is important. Simply completing the
companion would effectively swallow an error. Consider the following way of emulating
retry(3)
by using retryWhen
:
Flux<String> flux = Flux
.<String>error(new IllegalArgumentException()) (1)
.doOnError(System.out::println) (2)
.retryWhen(Retry.from(companion -> (3)
companion.take(3))); (4)
1 | This continuously produces errors, calling for retry attempts. |
2 | doOnError before the retry lets us log and see all failures. |
3 | The Retry is adapted from a very simple Function lambda |
4 | Here, we consider the first three errors as retry-able (take(3) ) and then give up. |
In effect, the preceding example results in an empty Flux
, but it completes successfully. Since
retry(3)
on the same Flux
would have terminated with the latest error, this
retryWhen
example is not exactly the same as a retry(3)
.
Getting to the same behavior involves a few additional tricks:
AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.doOnError(e -> errorCount.incrementAndGet())
.retryWhen(Retry.from(companion -> (1)
companion.map(rs -> { (2)
if (rs.totalRetries() < 3) return rs.totalRetries(); (3)
else throw Exceptions.propagate(rs.failure()); (4)
})
));
1 | We customize Retry by adapting from a Function lambda rather than providing a concrete class |
2 | The companion emits RetrySignal objects, which bear number of retries so far and last failure |
3 | To allow for three retries, we consider indexes < 3 and return a value to emit (here we simply return the index). |
4 | In order to terminate the sequence in error, we throw the original exception after these three retries. |
One can use the builders exposed in Retry to achieve the same in a more fluent manner, as
well as more finely tuned retry strategies. For example: errorFlux.retryWhen(Retry.max(3)); .
|
You can use similar code to implement an “exponential backoff and retry” pattern, as shown in the FAQ. |
The core-provided Retry
helpers, RetrySpec
and RetryBackoffSpec
, both allow advanced customizations like:
-
setting the
filter(Predicate)
for the exceptions that can trigger a retry -
modifying such a previously set filter through
modifyErrorFilter(Function)
-
triggering a side effect like logging around the retry trigger (ie for backoff before and after the delay), provided the retry is validated (
doBeforeRetry()
anddoAfterRetry()
are additive) -
triggering an asynchronous
Mono<Void>
around the retry trigger, which allows to add asynchronous behavior on top of the base delay but thus further delay the trigger (doBeforeRetryAsync
anddoAfterRetryAsync
are additive) -
customizing the exception in case the maximum number of attempts has been reached, through
onRetryExhaustedThrow(BiFunction)
. By default,Exceptions.retryExhausted(…)
is used, which can be distinguished withExceptions.isRetryExhausted(Throwable)
-
activating the handling of transient errors (see below)
1.9.1. Retrying with transient errors
Some long-lived sources may see sporadic bursts of errors followed by longer periods of time during which all is running smoothly. This documentation refers to this pattern of errors as transient errors.
In such cases, it would be desirable to deal with each burst in isolation, so that the next burst doesn’t inherit the retry state from the previous one.
For instance, with an exponential backoff strategy each subsequent burst should delay retry attempts starting from the minimum backoff Duration
instead of an ever-growing one.
The RetrySignal
interface, which represents retryWhen
state, has a totalRetriesInARow()
value which can be used for this.
Instead of the usual monotonically-increasing totalRetries()
index, this secondary index is reset to 0 each time an error
is recovered from by the retry (ie. when a retry attempt results in an incoming onNext
instead of an onError
again).
When setting the transientErrors(boolean)
configuration parameter to true
in the RetrySpec
or RetryBackoffSpec
, the resulting strategy makes use of that totalRetriesInARow()
index, effectively dealing with transient errors.
These specs compute the retry pattern from the index, so in effect all other configuration parameters of the spec apply to each burst of error independently.
AtomicInteger errorCount = new AtomicInteger(); (1)
Flux<Integer> transientFlux = httpRequest.get() (2)
.doOnError(e -> errorCount.incrementAndGet());
transientFlux.retryWhen(Retry.max(2).transientErrors(true)) (3)
.blockLast();
assertThat(errorCount).hasValue(6); (4)
1 | We will count the number of errors in the retried sequence for illustration. |
2 | We assume a http request source, eg. a streaming endpoint that will sometimes fail two times in a row, then recover. |
3 | We use retryWhen on that source, configured for at most 2 retry attempts, but in transientErrors mode. |
4 | At the end, a valid response is achieved and the transientFlux successfully completes after 6 attempts have been registered in errorCount . |
Without the transientErrors(true)
, the configured maximum attempt of 2
would be exceeded by the second burst and the whole sequence would have ultimately failed.
If you want to locally try this without an actual http remote endpoint, you can implement a pseudo
|
2. Handling Exceptions in Operators or Functions
In general, all operators can themselves contain code that potentially trigger an exception or calls to a user-defined callback that can similarly fail, so they all contain some form of error handling.
As a rule of thumb, an unchecked exception is always propagated through onError
. For
instance, throwing a RuntimeException
inside a map
function translates to an
onError
event, as the following code shows:
Flux.just("foo")
.map(s -> { throw new IllegalArgumentException(s); })
.subscribe(v -> System.out.println("GOT VALUE"),
e -> System.out.println("ERROR: " + e));
The preceding code prints out the following:
ERROR: java.lang.IllegalArgumentException: foo
You can tune the Exception before it is passed to onError , through the use of a
hook.
|
Reactor, however, defines a set of exceptions (such as OutOfMemoryError
) that are
always deemed to be fatal. See the Exceptions.throwIfFatal
method. These errors mean that
Reactor cannot keep operating and are thrown rather than propagated.
Internally, there are also cases where an unchecked exception still cannot be
propagated (most notably during the subscribe and request phases), due to concurrency
races that could lead to double onError or onComplete conditions. When these races
happen, the error that cannot be propagated is “dropped”. These cases can still be
managed to some extent by using customizable hooks. See Dropping Hooks.
|
You may ask: “What about checked exceptions?”
If, for example, you need to call some method that declares it throws
exceptions, you
still have to deal with those exceptions in a try-catch
block. You have several
options, though:
-
Catch the exception and recover from it. The sequence continues normally.
-
Catch the exception, wrap it into an unchecked exception, and then throw it (interrupting the sequence). The
Exceptions
utility class can help you with that (we get to that next). -
If you need to return a
Flux
(for example, you are in aflatMap
), wrap the exception in an error-producingFlux
, as follows:return Flux.error(checkedException)
. (The sequence also terminates.)
Reactor has an Exceptions
utility class that you can use to ensure that exceptions are
wrapped only if they are checked exceptions:
-
Use the
Exceptions.propagate
method to wrap exceptions, if necessary. It also callsthrowIfFatal
first and does not wrapRuntimeException
. -
Use the
Exceptions.unwrap
method to get the original unwrapped exception (going back to the root cause of a hierarchy of reactor-specific exceptions).
Consider the following example of a map
that uses a conversion method that can throw an
IOException
:
public String convert(int i) throws IOException {
if (i > 3) {
throw new IOException("boom " + i);
}
return "OK " + i;
}
Now imagine that you want to use that method in a map
. You must now explicitly catch
the exception, and your map function cannot re-throw it. So you can propagate it to the
map’s onError
method as a RuntimeException
, as follows:
Flux<String> converted = Flux
.range(1, 10)
.map(i -> {
try { return convert(i); }
catch (IOException e) { throw Exceptions.propagate(e); }
});
Later on, when subscribing to the preceding Flux
and reacting to errors (such as in the
UI), you could revert back to the original exception if you want to do something
special for IOExceptions. The following example shows how to do so:
converted.subscribe(
v -> System.out.println("RECEIVED: " + v),
e -> {
if (Exceptions.unwrap(e) instanceof IOException) {
System.out.println("Something bad happened with I/O");
} else {
System.out.println("Something bad happened");
}
}
);