Handling Errors

For a quick look at the available operators for error handling, see the relevant operator decision tree.

In Reactive Streams, errors are terminal events. As soon as an error occurs, it stops the sequence and gets propagated down the chain of operators to the last step, the Subscriber you defined and its onError method.

Such errors should still be dealt with at the application level. For instance, you might display an error notification in a UI or send a meaningful error payload in a REST endpoint. For this reason, the subscriber’s onError method should always be defined.

If not defined, onError throws an UnsupportedOperationException. You can further detect and triage it with the Exceptions.isErrorCallbackNotImplemented method.

Reactor also offers alternative means of dealing with errors in the middle of the chain, as error-handling operators. The following example shows how to do so:

Flux.just(1, 2, 0)
    .map(i -> "100 / " + i + " = " + (100 / i)) //this triggers an error with 0
    .onErrorReturn("Divided by zero :("); // error handling example
Before you learn about error-handling operators, you must keep in mind that any error in a reactive sequence is a terminal event. Even if an error-handling operator is used, it does not let the original sequence continue. Rather, it converts the onError signal into the start of a new sequence (the fallback one). In other words, it replaces the terminated sequence upstream of it.

Now we can consider each means of error handling one-by-one. When relevant, we make a parallel with imperative programming’s try patterns.

1. Error Handling Operators

You may be familiar with several ways of dealing with exceptions in a try-catch block. Most notably, these include the following:

  • Catch and return a static default value.

  • Catch and execute an alternative path with a fallback method.

  • Catch and dynamically compute a fallback value.

  • Catch, wrap to a BusinessException, and re-throw.

  • Catch, log an error-specific message, and re-throw.

  • Use the finally block to clean up resources or a Java 7 “try-with-resource” construct.

All of these have equivalents in Reactor, in the form of error-handling operators. Before looking into these operators, we first want to establish a parallel between a reactive chain and a try-catch block.

When subscribing, the onError callback at the end of the chain is akin to a catch block. There, execution skips to the catch in case an Exception is thrown, as the following example shows:

Flux<String> s = Flux.range(1, 10)
    .map(v -> doSomethingDangerous(v)) (1)
    .map(v -> doSecondTransform(v)); (2)
s.subscribe(value -> System.out.println("RECEIVED " + value), (3)
            error -> System.err.println("CAUGHT " + error) (4)
);
1 A transformation that can throw an exception is performed.
2 If everything went well, a second transformation is performed.
3 Each successfully transformed value is printed out.
4 In case of an error, the sequence terminates and an error message is displayed.

The preceding example is conceptually similar to the following try-catch block:

try {
    for (int i = 1; i < 11; i++) {
        String v1 = doSomethingDangerous(i); (1)
        String v2 = doSecondTransform(v1); (2)
        System.out.println("RECEIVED " + v2);
    }
} catch (Throwable t) {
    System.err.println("CAUGHT " + t); (3)
}
1 If an exception is thrown here…​
2 …​the rest of the loop is skipped…​
3 …​ and the execution goes straight to here.

Now that we have established a parallel, we can look at the different error handling cases and their equivalent operators.

1.1. Static Fallback Value

The equivalent of “Catch and return a static default value” is onErrorReturn. The following example shows how to use it:

try {
  return doSomethingDangerous(10);
}
catch (Throwable error) {
  return "RECOVERED";
}

The following example shows the Reactor equivalent:

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn("RECOVERED");

You also have the option of applying a Predicate on the exception to decide whether or not to recover, as the following example shows:

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10"); (1)
1 Recover only if the message of the exception is "boom10"

1.2. Catch and swallow the error

If you don’t even want to replace the exception with a fallback value, but instead to ignore it and only propagate elements that have been produced so far, what you want is essentially replacing the onError signal with an onComplete signal. This can be done by the onErrorComplete operator:

Flux.just(10,20,30)
    .map(this::doSomethingDangerousOn30)
    .onErrorComplete(); (1)
1 Recover by turning the onError into an onComplete

Like onErrorReturn, onErrorComplete has variants that let you filter which exceptions to fall back on, based either on the exception’s class or on a Predicate.

1.3. Fallback Method

If you want more than a single default value and you have an alternative (safer) way of processing your data, you can use onErrorResume. This would be the equivalent of “Catch and execute an alternative path with a fallback method”.

For example, if your nominal process is fetching data from an external and unreliable service but you also keep a local cache of the same data that can be a bit more out of date but is more reliable, you could do the following:

String v1;
try {
  v1 = callExternalService("key1");
}
catch (Throwable error) {
  v1 = getFromCache("key1");
}

String v2;
try {
  v2 = callExternalService("key2");
}
catch (Throwable error) {
  v2 = getFromCache("key2");
}

The following example shows the Reactor equivalent:

Flux.just("key1", "key2")
    .flatMap(k -> callExternalService(k) (1)
        .onErrorResume(e -> getFromCache(k)) (2)
    );
1 For each key, asynchronously call the external service.
2 If the external service call fails, fall back to the cache for that key. Note that we always apply the same fallback, whatever the source error, e, is.

Like onErrorReturn, onErrorResume has variants that let you filter which exceptions to fall back on, based either on the exception’s class or on a Predicate. The fact that it takes a Function also lets you choose a different fallback sequence to switch to, depending on the error encountered. The following example shows how to do so:

Flux.just("timeout1", "unknown", "key2")
    .flatMap(k -> callExternalService(k)
        .onErrorResume(error -> { (1)
            if (error instanceof TimeoutException) (2)
                return getFromCache(k);
            else if (error instanceof UnknownKeyException)  (3)
                return registerNewEntry(k, "DEFAULT");
            else
                return Flux.error(error); (4)
        })
    );
1 The function allows dynamically choosing how to continue.
2 If the source times out, hit the local cache.
3 If the source says the key is unknown, create a new entry.
4 In all other cases, “re-throw”.

1.4. Dynamic Fallback Value

Even if you do not have an alternative (safer) way of processing your data, you might want to compute a fallback value out of the exception you received. This would be the equivalent of “Catch and dynamically compute a fallback value”.

For instance, if your return type (MyWrapper) has a variant dedicated to holding an exception (think Future.complete(T success) versus Future.completeExceptionally(Throwable error)), you could instantiate the error-holding variant and pass the exception.

An imperative example would look like the following:

try {
  Value v = erroringMethod();
  return MyWrapper.fromValue(v);
}
catch (Throwable error) {
  return MyWrapper.fromError(error);
}

You can do this reactively in the same way as the fallback method solution, by using onErrorResume, with a tiny bit of boilerplate, as follows:

erroringFlux.onErrorResume(error -> Mono.just( (1)
        MyWrapper.fromError(error) (2)
));
1 Since you expect a MyWrapper representation of the error, you need to get a Mono<MyWrapper> for onErrorResume. We use Mono.just() for that.
2 We need to compute the value out of the exception. Here, we achieved that by wrapping the exception with a relevant MyWrapper factory method.

1.5. Catch and Rethrow

"Catch, wrap to a BusinessException, and re-throw" looks like the following in the imperative world:

try {
  return callExternalService(k);
}
catch (Throwable error) {
  throw new BusinessException("oops, SLA exceeded", error);
}

In the “fallback method” example, the last line inside the flatMap gives us a hint at achieving the same reactively, as follows:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorResume(original -> Flux.error(
            new BusinessException("oops, SLA exceeded", original))
    );

However, there is a more straightforward way of achieving the same effect with onErrorMap:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));

1.6. Log or React on the Side

For cases where you want the error to continue propagating but still want to react to it without modifying the sequence (logging it, for instance), you can use the doOnError operator. This is the equivalent of “Catch, log an error-specific message, and re-throw” pattern, as the following example shows:

try {
  return callExternalService(k);
}
catch (RuntimeException error) {
  //make a record of the error
  log("uh oh, falling back, service failed for key " + k);
  throw error;
}

The doOnError operator, as well as all operators prefixed with doOn , are sometimes referred to as having a “side-effect”. They let you peek inside the sequence’s events without modifying them.

Like the imperative example shown earlier, the following example still propagates the error yet ensures that we at least log that the external service had a failure:

LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
    .flatMap(k -> callExternalService(k) (1)
        .doOnError(e -> {
            failureStat.increment();
            log("uh oh, falling back, service failed for key " + k); (2)
        })
        (3)
    );
1 The external service call that can fail…​
2 …​is decorated with a logging and stats side-effect…​
3 …​after which, it still terminates with an error, unless we use an error-recovery operator here.

We can also imagine we have statistic counters to increment as a second error side-effect.

1.7. Using Resources and the Finally Block

The last parallel to draw with imperative programming is the cleaning up that can be done either by using a “Use of the finally block to clean up resources” or by using a “Java 7 try-with-resource construct”, both shown below:

Imperative use of finally
Stats stats = new Stats();
stats.startTimer();
try {
  doSomethingDangerous();
}
finally {
  stats.stopTimerAndRecordTiming();
}
Imperative use of try-with-resource
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
  return disposableInstance.toString();
}

Both have their Reactor equivalents: doFinally and using.

doFinally is about side-effects that you want to be executed whenever the sequence terminates (with onComplete or onError) or is cancelled. It gives you a hint as to what kind of termination triggered the side-effect. The following example shows how to use doFinally:

Reactive finally: doFinally()
Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();

Flux<String> flux =
Flux.just("foo", "bar")
    .doOnSubscribe(s -> stats.startTimer())
    .doFinally(type -> { (1)
        stats.stopTimerAndRecordTiming();(2)
        if (type == SignalType.CANCEL) (3)
          statsCancel.increment();
    })
    .take(1); (4)
1 doFinally consumes a SignalType for the type of termination.
2 Similarly to finally blocks, we always record the timing.
3 Here we also increment statistics in case of cancellation only.
4 take(1) requests exactly 1 from upstream, and cancels after one item is emitted.

On the other hand, using handles the case where a Flux is derived from a resource and that resource must be acted upon whenever processing is done. In the following example, we replace the AutoCloseable interface of “try-with-resource” with a Disposable:

The Disposable resource
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
    @Override
    public void dispose() {
        isDisposed.set(true); (4)
    }

    @Override
    public String toString() {
        return "DISPOSABLE";
    }
};

Now we can do the reactive equivalent of “try-with-resource” on it, which looks like the following:

Reactive try-with-resource: using()
Flux<String> flux =
Flux.using(
        () -> disposableInstance, (1)
        disposable -> Flux.just(disposable.toString()), (2)
        Disposable::dispose (3)
);
1 The first lambda generates the resource. Here, we return our mock Disposable.
2 The second lambda processes the resource, returning a Flux<T>.
3 The third lambda is called when the Flux from <2> terminates or is cancelled, to clean up resources.
4 After subscription and execution of the sequence, the isDisposed atomic boolean becomes true.

1.8. Demonstrating the Terminal Aspect of onError

In order to demonstrate that all these operators cause the upstream original sequence to terminate when an error happens, we can use a more visual example with a Flux.interval. The interval operator ticks every x units of time with an increasing Long value. The following example uses an interval operator:

Flux<String> flux =
Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .onErrorReturn("Uh oh");

flux.subscribe(System.out::println);
Thread.sleep(2100); (1)
1 Note that interval executes on a timer Scheduler by default. If we want to run that example in a main class, we would need to add a sleep call here so that the application does not exit immediately without any value being produced.

The preceding example prints out one line every 250ms, as follows:

tick 0
tick 1
tick 2
Uh oh

Even with one extra second of runtime, no more tick comes in from the interval. The sequence was indeed terminated by the error.

1.9. Retrying

There is another operator of interest with regards to error handling, and you might be tempted to use it in the case described in the previous section. retry, as its name indicates, lets you retry an error-producing sequence.

The thing to keep in mind is that it works by re-subscribing to the upstream Flux. This is really a different sequence, and the original one is still terminated. To verify that, we can re-use the previous example and append a retry(1) to retry once instead of using onErrorReturn. The following example shows how to do so:

Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .retry(1)
    .elapsed() (1)
    .subscribe(System.out::println, System.err::println); (2)

Thread.sleep(2100); (3)
1 elapsed associates each value with the duration since previous value was emitted.
2 We also want to see when there is an onError.
3 Ensure we have enough time for our 4x2 ticks.

The preceding example produces the following output:

259,tick 0
249,tick 1
251,tick 2
506,tick 0 (1)
248,tick 1
253,tick 2
java.lang.RuntimeException: boom
1 A new interval started, from tick 0. The additional 250ms duration is coming from the 4th tick, the one that causes the exception and subsequent retry.

As you can see from the preceding example, retry(1) merely re-subscribed to the original interval once, restarting the tick from 0. The second time around, since the exception still occurs, it gives up and propagates the error downstream.

There is a more advanced version of retry (called retryWhen) that uses a “companion” Flux to tell whether or not a particular failure should retry. This companion Flux is created by the operator but decorated by the user, in order to customize the retry condition.

The companion Flux is a Flux<RetrySignal> that gets passed to a Retry strategy/function, supplied as the sole parameter of retryWhen. As the user, you define that function and make it return a new Publisher<?>. The Retry class is an abstract class, but it offers a factory method if you want to transform the companion with a simple lambda (Retry.from(Function)).

Retry cycles go as follows:

  1. Each time an error happens (giving potential for a retry), a RetrySignal is emitted into the companion Flux, which has been decorated by your function. Having a Flux here gives a bird eye’s view of all the attempts so far. The RetrySignal gives access to the error as well as metadata around it.

  2. If the companion Flux emits a value, a retry happens.

  3. If the companion Flux completes, the error is swallowed, the retry cycle stops, and the resulting sequence completes, too.

  4. If the companion Flux produces an error (e), the retry cycle stops and the resulting sequence errors with e.

The distinction between the previous two cases is important. Simply completing the companion would effectively swallow an error. Consider the following way of emulating retry(3) by using retryWhen:

Flux<String> flux = Flux
    .<String>error(new IllegalArgumentException()) (1)
    .doOnError(System.out::println) (2)
    .retryWhen(Retry.from(companion -> (3)
        companion.take(3))); (4)
1 This continuously produces errors, calling for retry attempts.
2 doOnError before the retry lets us log and see all failures.
3 The Retry is adapted from a very simple Function lambda
4 Here, we consider the first three errors as retry-able (take(3)) and then give up.

In effect, the preceding example results in an empty Flux, but it completes successfully. Since retry(3) on the same Flux would have terminated with the latest error, this retryWhen example is not exactly the same as a retry(3).

Getting to the same behavior involves a few additional tricks:

AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
		Flux.<String>error(new IllegalArgumentException())
				.doOnError(e -> errorCount.incrementAndGet())
				.retryWhen(Retry.from(companion -> (1)
						companion.map(rs -> { (2)
							if (rs.totalRetries() < 3) return rs.totalRetries(); (3)
							else throw Exceptions.propagate(rs.failure()); (4)
						})
				));
1 We customize Retry by adapting from a Function lambda rather than providing a concrete class
2 The companion emits RetrySignal objects, which bear number of retries so far and last failure
3 To allow for three retries, we consider indexes < 3 and return a value to emit (here we simply return the index).
4 In order to terminate the sequence in error, we throw the original exception after these three retries.
One can use the builders exposed in Retry to achieve the same in a more fluent manner, as well as more finely tuned retry strategies. For example: errorFlux.retryWhen(Retry.max(3));.
You can use similar code to implement an “exponential backoff and retry” pattern, as shown in the FAQ.

The core-provided Retry helpers, RetrySpec and RetryBackoffSpec, both allow advanced customizations like:

  • setting the filter(Predicate) for the exceptions that can trigger a retry

  • modifying such a previously set filter through modifyErrorFilter(Function)

  • triggering a side effect like logging around the retry trigger (ie for backoff before and after the delay), provided the retry is validated (doBeforeRetry() and doAfterRetry() are additive)

  • triggering an asynchronous Mono<Void> around the retry trigger, which allows to add asynchronous behavior on top of the base delay but thus further delay the trigger (doBeforeRetryAsync and doAfterRetryAsync are additive)

  • customizing the exception in case the maximum number of attempts has been reached, through onRetryExhaustedThrow(BiFunction). By default, Exceptions.retryExhausted(…​) is used, which can be distinguished with Exceptions.isRetryExhausted(Throwable)

  • activating the handling of transient errors (see below)

1.9.1. Retrying with transient errors

Some long-lived sources may see sporadic bursts of errors followed by longer periods of time during which all is running smoothly. This documentation refers to this pattern of errors as transient errors.

In such cases, it would be desirable to deal with each burst in isolation, so that the next burst doesn’t inherit the retry state from the previous one. For instance, with an exponential backoff strategy each subsequent burst should delay retry attempts starting from the minimum backoff Duration instead of an ever-growing one.

The RetrySignal interface, which represents retryWhen state, has a totalRetriesInARow() value which can be used for this. Instead of the usual monotonically-increasing totalRetries() index, this secondary index is reset to 0 each time an error is recovered from by the retry (ie. when a retry attempt results in an incoming onNext instead of an onError again).

When setting the transientErrors(boolean) configuration parameter to true in the RetrySpec or RetryBackoffSpec, the resulting strategy makes use of that totalRetriesInARow() index, effectively dealing with transient errors. These specs compute the retry pattern from the index, so in effect all other configuration parameters of the spec apply to each burst of error independently.

AtomicInteger errorCount = new AtomicInteger(); (1)
Flux<Integer> transientFlux = httpRequest.get() (2)
        .doOnError(e -> errorCount.incrementAndGet());

transientFlux.retryWhen(Retry.max(2).transientErrors(true))  (3)
             .blockLast();
assertThat(errorCount).hasValue(6); (4)
1 We will count the number of errors in the retried sequence for illustration.
2 We assume a http request source, eg. a streaming endpoint that will sometimes fail two times in a row, then recover.
3 We use retryWhen on that source, configured for at most 2 retry attempts, but in transientErrors mode.
4 At the end, a valid response is achieved and the transientFlux successfully completes after 6 attempts have been registered in errorCount.

Without the transientErrors(true), the configured maximum attempt of 2 would be exceeded by the second burst and the whole sequence would have ultimately failed.

If you want to locally try this without an actual http remote endpoint, you can implement a pseudo httpRequest method as a Supplier, as follows:

final AtomicInteger transientHelper = new AtomicInteger();
Supplier<Flux<Integer>> httpRequest = () ->
    Flux.generate(sink -> { (1)
        int i = transientHelper.getAndIncrement();
        if (i == 10) { (2)
            sink.next(i);
            sink.complete();
        }
        else if (i % 3 == 0) { (3)
            sink.next(i);
        }
        else {
            sink.error(new IllegalStateException("Transient error at " + i)); (4)
        }
    });
1 We generate a source that has bursts of errors.
2 It will successfully complete when the counter reaches 10.
3 If the transientHelper atomic is at a multiple of 3, we emit onNext and thus end the current burst.
4 In other cases we emit an onError. That’s 2 out of 3 times, so bursts of 2 onError interrupted by 1 onNext.

2. Handling Exceptions in Operators or Functions

In general, all operators can themselves contain code that potentially trigger an exception or calls to a user-defined callback that can similarly fail, so they all contain some form of error handling.

As a rule of thumb, an unchecked exception is always propagated through onError. For instance, throwing a RuntimeException inside a map function translates to an onError event, as the following code shows:

Flux.just("foo")
    .map(s -> { throw new IllegalArgumentException(s); })
    .subscribe(v -> System.out.println("GOT VALUE"),
               e -> System.out.println("ERROR: " + e));

The preceding code prints out the following:

ERROR: java.lang.IllegalArgumentException: foo
You can tune the Exception before it is passed to onError, through the use of a hook.

Reactor, however, defines a set of exceptions (such as OutOfMemoryError) that are always deemed to be fatal. See the Exceptions.throwIfFatal method. These errors mean that Reactor cannot keep operating and are thrown rather than propagated.

Internally, there are also cases where an unchecked exception still cannot be propagated (most notably during the subscribe and request phases), due to concurrency races that could lead to double onError or onComplete conditions. When these races happen, the error that cannot be propagated is “dropped”. These cases can still be managed to some extent by using customizable hooks. See Dropping Hooks.

You may ask: “What about checked exceptions?”

If, for example, you need to call some method that declares it throws exceptions, you still have to deal with those exceptions in a try-catch block. You have several options, though:

  1. Catch the exception and recover from it. The sequence continues normally.

  2. Catch the exception, wrap it into an unchecked exception, and then throw it (interrupting the sequence). The Exceptions utility class can help you with that (we get to that next).

  3. If you need to return a Flux (for example, you are in a flatMap), wrap the exception in an error-producing Flux, as follows: return Flux.error(checkedException). (The sequence also terminates.)

Reactor has an Exceptions utility class that you can use to ensure that exceptions are wrapped only if they are checked exceptions:

  • Use the Exceptions.propagate method to wrap exceptions, if necessary. It also calls throwIfFatal first and does not wrap RuntimeException.

  • Use the Exceptions.unwrap method to get the original unwrapped exception (going back to the root cause of a hierarchy of reactor-specific exceptions).

Consider the following example of a map that uses a conversion method that can throw an IOException:

public String convert(int i) throws IOException {
    if (i > 3) {
        throw new IOException("boom " + i);
    }
    return "OK " + i;
}

Now imagine that you want to use that method in a map. You must now explicitly catch the exception, and your map function cannot re-throw it. So you can propagate it to the map’s onError method as a RuntimeException, as follows:

Flux<String> converted = Flux
    .range(1, 10)
    .map(i -> {
        try { return convert(i); }
        catch (IOException e) { throw Exceptions.propagate(e); }
    });

Later on, when subscribing to the preceding Flux and reacting to errors (such as in the UI), you could revert back to the original exception if you want to do something special for IOExceptions. The following example shows how to do so:

converted.subscribe(
    v -> System.out.println("RECEIVED: " + v),
    e -> {
        if (Exceptions.unwrap(e) instanceof IOException) {
            System.out.println("Something bad happened with I/O");
        } else {
            System.out.println("Something bad happened");
        }
    }
);