Replacing Default Schedulers
As we described in the Threading and Schedulers section, Reactor Core comes with several
Scheduler
implementations. While you can always create new instances through the new*
factory methods, each Scheduler
flavor also has a default singleton instance that is
accessible through the direct factory method (such as Schedulers.boundedElastic()
versus
Schedulers.newBoundedElastic(…)
).
These default instances are the ones used by operators that need a Scheduler
to work
when you do not explicitly specify one. For example, Flux#delayElements(Duration)
uses
the Schedulers.parallel()
instance.
In some cases, however, you might need to change these default instances with something
else in a cross-cutting way, without having to make sure every single operator you call
has your specific Scheduler
as a parameter. An example is measuring the time every
single scheduled task takes by wrapping the real schedulers, for instrumentation
purposes. In other words, you might want to change the default Schedulers
.
Changing the default schedulers is possible through the Schedulers.Factory
class. By
default, a Factory
creates all the standard Scheduler
through similarly named
methods. You can override each of these with your custom implementation.
Additionally, the factory exposes one additional customization method:
decorateExecutorService
. It is invoked during the creation of every Reactor Core
Scheduler
that is backed by a ScheduledExecutorService
(even non-default instances,
such as those created by calls to Schedulers.newParallel()
).
This lets you tune the ScheduledExecutorService
to be used: The default one is exposed
as a Supplier
and, depending on the type of Scheduler
being configured, you can choose
to entirely bypass that supplier and return your own instance or you can get()
the
default instance and wrap it.
Once you create a Factory that fits your needs, you must install it by calling
Schedulers.setFactory(Factory) .
|
Finally, there is a last customizable hook in Schedulers
: onHandleError
. This hook is
invoked whenever a Runnable
task submitted to a Scheduler
throws an Exception
(note
that if there is an UncaughtExceptionHandler
set for the Thread
that ran the task,
both the handler and the hook are invoked).