Class Schedulers
Schedulers provides various Scheduler flavors usable by publishOn or subscribeOn :
parallel(): Optimized for fastRunnablenon-blocking executionssingle(): Optimized for low-latencyRunnableone-off executionsboundedElastic(): Optimized for longer executions, an alternative for blocking tasks where the number of active tasks (and threads) is cappedimmediate(): to immediately run submittedRunnableinstead of scheduling them (somewhat of a no-op or "null object"Scheduler)fromExecutorService(ExecutorService)to create new instances aroundExecutors
Factories prefixed with new (eg. newBoundedElastic(int, int, String) return a new instance of their flavor of Scheduler,
while other factories like boundedElastic() return a shared instance - which is the one used by operators requiring that flavor as their default Scheduler.
All instances are returned in a initialized state.
Since 3.6.0 boundedElastic() can run tasks on VirtualThreads if the application
runs on a Java 21+ runtime and the DEFAULT_BOUNDED_ELASTIC_ON_VIRTUAL_THREADS
system property is set to true.
- Author:
- Stephane Maldini
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic interfacePublic factory hook to override Schedulers behavior globallystatic final classIt is alsoDisposablein case you don't want to restore the liveSchedulers -
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final booleanDefault execution of enqueued tasks onThread#ofVirtualfor the globalboundedElastic()Scheduler, initialized by system propertyreactor.schedulers.defaultBoundedElasticOnVirtualThreadsand falls back to false .static final intDefault maximum number of enqueued tasks PER THREAD for the globalboundedElastic()Scheduler, initialized by system propertyreactor.schedulers.defaultBoundedElasticQueueSizeand falls back to a bound of 100 000 tasks per backing thread.static final intDefault maximum size for the globalboundedElastic()Scheduler, initialized by system propertyreactor.schedulers.defaultBoundedElasticSizeand falls back to 10 x number of processors available to the runtime on init.static final intDefault pool size, initialized by system propertyreactor.schedulers.defaultPoolSizeand falls back to the number of processors available to the runtime on init. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic booleanaddExecutorServiceDecorator(String key, BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService> decorator) Set up an additionalScheduledExecutorServicedecorator for a given key only if that key is not already present.static SchedulerThe common boundedElastic instance, aSchedulerthat dynamically creates a bounded number of workers.static ScheduledExecutorServicedecorateExecutorService(Scheduler owner, ScheduledExecutorService original) This method is aimed atSchedulerimplementors, enabling custom implementations that are backed by aScheduledExecutorServiceto also have said executors decorated (ie.static voidDeprecated.prefer using Micrometer#timedScheduler from the reactor-core-micrometer module.static voidDeprecated.prefer using Micrometer#timedScheduler from the reactor-core-micrometer module.static SchedulerfromExecutor(Executor executor) static SchedulerfromExecutor(Executor executor, boolean trampoline) static SchedulerfromExecutorService(ExecutorService executorService) Create aSchedulerwhich uses a backingExecutorServiceto schedule Runnables for async operators.static SchedulerfromExecutorService(ExecutorService executorService, String executorName) Create aSchedulerwhich uses a backingExecutorServiceto schedule Runnables for async operators.static SchedulerExecutes tasks immediately instead of scheduling them.static booleanCheck if calling a Reactor blocking API in the currentThreadis forbidden or not.static booleanCheck if calling a Reactor blocking API in the givenThreadis forbidden or not.static SchedulernewBoundedElastic(int threadCap, int queuedTaskCap, String name) Schedulerthat dynamically creates a bounded number of ExecutorService-based Workers, reusing them once the Workers have been shut down.static SchedulernewBoundedElastic(int threadCap, int queuedTaskCap, String name, int ttlSeconds) Schedulerthat dynamically creates a bounded number of ExecutorService-based Workers, reusing them once the Workers have been shut down.static SchedulernewBoundedElastic(int threadCap, int queuedTaskCap, String name, int ttlSeconds, boolean daemon) Schedulerthat dynamically creates a bounded number of ExecutorService-based Workers, reusing them once the Workers have been shut down.static SchedulernewBoundedElastic(int threadCap, int queuedTaskCap, ThreadFactory threadFactory, int ttlSeconds) Schedulerthat dynamically creates a bounded number of ExecutorService-based Workers, reusing them once the Workers have been shut down.static SchedulernewParallel(int parallelism, ThreadFactory threadFactory) Schedulerthat hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work.static SchedulernewParallel(String name) Schedulerthat hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work.static SchedulernewParallel(String name, int parallelism) Schedulerthat hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work.static SchedulernewParallel(String name, int parallelism, boolean daemon) Schedulerthat hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work.static SchedulerSchedulerthat hosts a single-threaded ExecutorService-based worker.static SchedulerSchedulerthat hosts a single-threaded ExecutorService-based worker.static SchedulernewSingle(ThreadFactory threadFactory) Schedulerthat hosts a single-threaded ExecutorService-based worker.static voidonHandleError(String key, BiConsumer<Thread, ? super Throwable> subHook) Define a keyed hook part that is executed alongside other parts when aSchedulerhashandled an error.static voidonHandleError(BiConsumer<Thread, ? super Throwable> subHook) Define a hook anonymous part that is executed alongside keyed parts when aSchedulerhashandled an error.static RunnableonSchedule(Runnable runnable) Applies the hooks registered withonScheduleHook(String, Function).static voidonScheduleHook(String key, Function<Runnable, Runnable> decorator) Add or replace a named schedulingdecorator.static Schedulerparallel()The common parallel instance, aSchedulerthat hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work.static voidregisterNonBlockingThreadPredicate(Predicate<Thread> predicate) Remove an existingScheduledExecutorServicedecorator if it has been set up viaaddExecutorServiceDecorator(String, BiFunction).static voidRe-apply default factory toSchedulersstatic voidresetFrom(@Nullable Schedulers.Snapshot snapshot) Replace the current Factory and shared Schedulers with the ones saved in a previouslycapturedsnapshot.static voidUnregisters all thePredicates registered so far viaregisterNonBlockingThreadPredicate(Predicate).static voidReset theonHandleError(BiConsumer)hook to the default no-op behavior, erasing all sub-hooks that might have individually added viaonHandleError(String, BiConsumer)or the whole hook set viaonHandleError(BiConsumer).static voidresetOnHandleError(String key) Reset a specific onHandleError hook part keyed to the providedString, removing that sub-hook if it has previously been defined viaonHandleError(String, BiConsumer).static voidReset a specific onScheduleHooksub-hookif it has been set up viaonScheduleHook(String, Function).static voidRemove all onScheduleHooksub-hooks.static voidsetExecutorServiceDecorator(String key, BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService> decorator) Set up an additionalScheduledExecutorServicedecorator for a given key, even if that key is already present.static voidsetFactory(Schedulers.Factory factoryInstance) static Schedulers.SnapshotsetFactoryWithSnapshot(Schedulers.Factory newFactory) static voidClear any cachedSchedulerand call dispose on them.static Schedulersingle()The common single instance, aSchedulerthat hosts a single-threaded ExecutorService-based worker.static SchedulerWraps a singleScheduler.Workerfrom some otherSchedulerand providesScheduler.Workerservices on top of it.
-
Field Details
-
DEFAULT_POOL_SIZE
public static final int DEFAULT_POOL_SIZEDefault pool size, initialized by system propertyreactor.schedulers.defaultPoolSizeand falls back to the number of processors available to the runtime on init.- See Also:
-
DEFAULT_BOUNDED_ELASTIC_SIZE
public static final int DEFAULT_BOUNDED_ELASTIC_SIZEDefault maximum size for the globalboundedElastic()Scheduler, initialized by system propertyreactor.schedulers.defaultBoundedElasticSizeand falls back to 10 x number of processors available to the runtime on init. -
DEFAULT_BOUNDED_ELASTIC_QUEUESIZE
public static final int DEFAULT_BOUNDED_ELASTIC_QUEUESIZEDefault maximum number of enqueued tasks PER THREAD for the globalboundedElastic()Scheduler, initialized by system propertyreactor.schedulers.defaultBoundedElasticQueueSizeand falls back to a bound of 100 000 tasks per backing thread.- See Also:
-
DEFAULT_BOUNDED_ELASTIC_ON_VIRTUAL_THREADS
public static final boolean DEFAULT_BOUNDED_ELASTIC_ON_VIRTUAL_THREADSDefault execution of enqueued tasks onThread#ofVirtualfor the globalboundedElastic()Scheduler, initialized by system propertyreactor.schedulers.defaultBoundedElasticOnVirtualThreadsand falls back to false .- See Also:
-
-
Constructor Details
-
Schedulers
public Schedulers()
-
-
Method Details
-
fromExecutor
Create aSchedulerwhich uses a backingExecutorto schedule Runnables for async operators.Tasks scheduled with workers of this Scheduler are not guaranteed to run in FIFO order and strictly non-concurrently. If FIFO order is desired, use trampoline parameter of
fromExecutor(Executor, boolean) -
fromExecutor
Create aSchedulerwhich uses a backingExecutorto schedule Runnables for async operators. Trampolining here means tasks submitted in a burst are queued by the Worker itself, which acts as a sole task from the perspective of theExecutorService, so no reordering (but also no threading). -
fromExecutorService
Create aSchedulerwhich uses a backingExecutorServiceto schedule Runnables for async operators.Prefer using
fromExecutorService(ExecutorService, String), especially if you plan on using metrics as this gives the executor a meaningful identifier.- Parameters:
executorService- anExecutorService- Returns:
- a new
Scheduler
-
fromExecutorService
Create aSchedulerwhich uses a backingExecutorServiceto schedule Runnables for async operators.- Parameters:
executorService- anExecutorService- Returns:
- a new
Scheduler
-
boundedElastic
The common boundedElastic instance, aSchedulerthat dynamically creates a bounded number of workers.Depends on the available environment and specified configurations, there are two types of implementations for this shared scheduler:
- ExecutorService-based implementation tailored to run on Platform
Threadinstances. Every Worker isExecutorService-based. ReusingThreads once the Workers have been shut down. The underlying daemon threads can be evicted if idle for more than60seconds. - As of 3.6.0 there is a thread-per-task implementation tailored for use
with virtual threads. This implementation is enabled if the
application runs on a JDK 21+ runtime and the system property
DEFAULT_BOUNDED_ELASTIC_ON_VIRTUAL_THREADSis set totrue. Every Worker is based on the custom implementation of the execution mechanism which ensures every submitted task runs on a newVirtualThreadinstance. This implementation has a shared instance ofScheduledExecutorServiceused to schedule delayed and periodic tasks such that when triggered they are offloaded to a dedicated newVirtualThreadinstance.
Both implementations share the same configurations:
-
The maximum number of concurrent
threads is bounded by a
cap(by default ten times the number of available CPU cores, seeDEFAULT_BOUNDED_ELASTIC_SIZE).Note: Consider increasing
DEFAULT_BOUNDED_ELASTIC_SIZEwith the thread-per-task implementation to run more concurrentVirtualThreadinstances underneath. -
The maximum number of task submissions that can be enqueued and deferred on each of these
backing threads is bounded (by default 100K additional tasks, see
DEFAULT_BOUNDED_ELASTIC_QUEUESIZE). Past that point, aRejectedExecutionExceptionis thrown.
Threads backing a new
Scheduler.Workerare picked from a pool or are created when needed. In the ExecutorService-based implementation, the pool is comprised either of idle or busy threads. When all threads are busy, a best effort attempt is made at picking the thread backing the least number of workers. In the case of the thread-per-task implementation, it always creates new threads up to the specified limit.Note that if a scheduling mechanism is backing a low amount of workers, but these workers submit a lot of pending tasks, a second worker could end up being backed by the same mechanism and see tasks rejected. The picking of the backing mechanism is also done once and for all at worker creation, so tasks could be delayed due to two workers sharing the same backing mechanism and submitting long-running tasks, despite another backing mechanism becoming idle in the meantime.
Only one instance of this common scheduler will be created on the first call and is cached. The same instance is returned on subsequent calls until it is disposed.
One cannot directly
disposethe common instances, as they are cached and shared between callers. They can however be allshut downtogether, or replaced by achange in Factory.- Returns:
- the ExecutorService/thread-per-task-based boundedElastic
instance.
A
Schedulerthat dynamically creates workers with an upper bound to the number of backing threads and after that on the number of enqueued tasks.
- ExecutorService-based implementation tailored to run on Platform
-
parallel
The common parallel instance, aSchedulerthat hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work.Only one instance of this common scheduler will be created on the first call and is cached. The same instance is returned on subsequent calls until it is disposed.
One cannot directly
disposethe common instances, as they are cached and shared between callers. They can however be allshut downtogether, or replaced by achange in Factory.- Returns:
- the common parallel instance, a
Schedulerthat hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
-
immediate
Executes tasks immediately instead of scheduling them.As a consequence tasks run on the thread that submitted them (eg. the thread on which an operator is currently processing its onNext/onComplete/onError signals). This
Scheduleris typically used as a "null object" for APIs that require a Scheduler but one doesn't want to change threads.- Returns:
- a reusable
Schedulerthat executes tasks immediately instead of scheduling them
-
newBoundedElastic
Schedulerthat dynamically creates a bounded number of ExecutorService-based Workers, reusing them once the Workers have been shut down. The underlying (user) threads can be evicted if idle for more than60seconds.The maximum number of created threads is bounded by the provided
threadCap. The maximum number of task submissions that can be enqueued and deferred on each of these backing threads is bounded by the providedqueuedTaskCap. Past that point, aRejectedExecutionExceptionis thrown.By order of preference, threads backing a new
Scheduler.Workerare picked from the idle pool, created anew or reused from the busy pool. In the later case, a best effort attempt at picking the thread backing the least amount of workers is made.Note that if a thread is backing a low amount of workers, but these workers submit a lot of pending tasks, a second worker could end up being backed by the same thread and see tasks rejected. The picking of the backing thread is also done once and for all at worker creation, so tasks could be delayed due to two workers sharing the same backing thread and submitting long-running tasks, despite another backing thread becoming idle in the meantime.
Threads backing this scheduler are user threads, so they will prevent the JVM from exiting until their worker has been disposed AND they've been evicted by TTL, or the whole scheduler has been
disposed.Please note, this implementation is not designed to run tasks on
VirtualThread. Please seeSchedulers.Factory.newThreadPerTaskBoundedElastic(int, int, ThreadFactory)if you needVirtualThreadcompatible scheduler implementation- Parameters:
threadCap- maximum number of underlying threads to createqueuedTaskCap- maximum number of tasks to enqueue when no more threads can be created. Can beInteger.MAX_VALUEfor unbounded enqueueing.name- Thread prefix- Returns:
- a new
Schedulerthat dynamically creates workers with an upper bound to the number of backing threads and after that on the number of enqueued tasks, that reuses threads and evict idle ones
-
newBoundedElastic
public static Scheduler newBoundedElastic(int threadCap, int queuedTaskCap, String name, int ttlSeconds) Schedulerthat dynamically creates a bounded number of ExecutorService-based Workers, reusing them once the Workers have been shut down. The underlying (user) threads can be evicted if idle for more than the providedttlSeconds.The maximum number of created threads is bounded by the provided
threadCap. The maximum number of task submissions that can be enqueued and deferred on each of these backing threads is bounded by the providedqueuedTaskCap. Past that point, aRejectedExecutionExceptionis thrown.By order of preference, threads backing a new
Scheduler.Workerare picked from the idle pool, created anew or reused from the busy pool. In the later case, a best effort attempt at picking the thread backing the least amount of workers is made.Note that if a thread is backing a low amount of workers, but these workers submit a lot of pending tasks, a second worker could end up being backed by the same thread and see tasks rejected. The picking of the backing thread is also done once and for all at worker creation, so tasks could be delayed due to two workers sharing the same backing thread and submitting long-running tasks, despite another backing thread becoming idle in the meantime.
Threads backing this scheduler are user threads, so they will prevent the JVM from exiting until their worker has been disposed AND they've been evicted by TTL, or the whole scheduler has been
disposed.Please note, this implementation is not designed to run tasks on
VirtualThread. Please seeSchedulers.Factory.newThreadPerTaskBoundedElastic(int, int, ThreadFactory)if you needVirtualThreadcompatible scheduler implementation- Parameters:
threadCap- maximum number of underlying threads to createqueuedTaskCap- maximum number of tasks to enqueue when no more threads can be created. Can beInteger.MAX_VALUEfor unbounded enqueueing.name- Thread prefixttlSeconds- Time-to-live for an idleScheduler.Worker- Returns:
- a new
Schedulerthat dynamically creates workers with an upper bound to the number of backing threads and after that on the number of enqueued tasks, that reuses threads and evict idle ones
-
newBoundedElastic
public static Scheduler newBoundedElastic(int threadCap, int queuedTaskCap, String name, int ttlSeconds, boolean daemon) Schedulerthat dynamically creates a bounded number of ExecutorService-based Workers, reusing them once the Workers have been shut down. The underlying (user or daemon) threads can be evicted if idle for more than the providedttlSeconds.The maximum number of created threads is bounded by the provided
threadCap. The maximum number of task submissions that can be enqueued and deferred on each of these backing threads is bounded by the providedqueuedTaskCap. Past that point, aRejectedExecutionExceptionis thrown.By order of preference, threads backing a new
Scheduler.Workerare picked from the idle pool, created anew or reused from the busy pool. In the later case, a best effort attempt at picking the thread backing the least amount of workers is made.Note that if a thread is backing a low amount of workers, but these workers submit a lot of pending tasks, a second worker could end up being backed by the same thread and see tasks rejected. The picking of the backing thread is also done once and for all at worker creation, so tasks could be delayed due to two workers sharing the same backing thread and submitting long-running tasks, despite another backing thread becoming idle in the meantime.
Depending on the
daemonparameter, threads backing this scheduler can be user threads or daemon threads. Note that user threads will prevent the JVM from exiting until their worker has been disposed AND they've been evicted by TTL, or the whole scheduler has beendisposed.Please note, this implementation is not designed to run tasks on
VirtualThread. Please seeSchedulers.Factory.newThreadPerTaskBoundedElastic(int, int, ThreadFactory)if you needVirtualThreadcompatible scheduler implementation- Parameters:
threadCap- maximum number of underlying threads to createqueuedTaskCap- maximum number of tasks to enqueue when no more threads can be created. Can beInteger.MAX_VALUEfor unbounded enqueueing.name- Thread prefixttlSeconds- Time-to-live for an idleScheduler.Workerdaemon- are backing threadsdaemon threads- Returns:
- a new
Schedulerthat dynamically creates workers with an upper bound to the number of backing threads and after that on the number of enqueued tasks, that reuses threads and evict idle ones
-
newBoundedElastic
public static Scheduler newBoundedElastic(int threadCap, int queuedTaskCap, ThreadFactory threadFactory, int ttlSeconds) Schedulerthat dynamically creates a bounded number of ExecutorService-based Workers, reusing them once the Workers have been shut down. The underlying (user) threads can be evicted if idle for more than the providedttlSeconds.The maximum number of created threads is bounded by the provided
threadCap. The maximum number of task submissions that can be enqueued and deferred on each of these backing threads is bounded by the providedqueuedTaskCap. Past that point, aRejectedExecutionExceptionis thrown.By order of preference, threads backing a new
Scheduler.Workerare picked from the idle pool, created anew or reused from the busy pool. In the later case, a best effort attempt at picking the thread backing the least amount of workers is made.Note that if a thread is backing a low amount of workers, but these workers submit a lot of pending tasks, a second worker could end up being backed by the same thread and see tasks rejected. The picking of the backing thread is also done once and for all at worker creation, so tasks could be delayed due to two workers sharing the same backing thread and submitting long-running tasks, despite another backing thread becoming idle in the meantime.
Threads backing this scheduler are created by the provided
ThreadFactory, which can decide whether to create user threads or daemon threads. Note that user threads will prevent the JVM from exiting until their worker has been disposed AND they've been evicted by TTL, or the whole scheduler has beendisposed.Please note, this implementation is not designed to run tasks on
VirtualThread. Please seeSchedulers.Factory.newThreadPerTaskBoundedElastic(int, int, ThreadFactory)if you needVirtualThreadcompatible scheduler implementation- Parameters:
threadCap- maximum number of underlying threads to createqueuedTaskCap- maximum number of tasks to enqueue when no more threads can be created. Can beInteger.MAX_VALUEfor unbounded enqueueing.threadFactory- aThreadFactoryto use each thread initializationttlSeconds- Time-to-live for an idleScheduler.Worker- Returns:
- a new
Schedulerthat dynamically creates workers with an upper bound to the number of backing threads and after that on the number of enqueued tasks, that reuses threads and evict idle ones
-
newParallel
Schedulerthat hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work. This type ofSchedulerdetects and rejects usage of blocking Reactor APIs.- Parameters:
name- Thread prefix- Returns:
- a new
Schedulerthat hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
-
newParallel
Schedulerthat hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work. This type ofSchedulerdetects and rejects usage of blocking Reactor APIs.- Parameters:
name- Thread prefixparallelism- Number of pooled workers.- Returns:
- a new
Schedulerthat hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
-
newParallel
Schedulerthat hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work. This type ofSchedulerdetects and rejects usage of blocking Reactor APIs.- Parameters:
name- Thread prefixparallelism- Number of pooled workers.daemon- false if theSchedulerrequires an explicitScheduler.dispose()to exit the VM.- Returns:
- a new
Schedulerthat hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
-
newParallel
Schedulerthat hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work.- Parameters:
parallelism- Number of pooled workers.threadFactory- aThreadFactoryto use for the fixed initialized number ofThread- Returns:
- a new
Schedulerthat hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
-
newSingle
Schedulerthat hosts a single-threaded ExecutorService-based worker. This type ofSchedulerdetects and rejects usage of blocking Reactor APIs.- Parameters:
name- Component and thread name prefix- Returns:
- a new
Schedulerthat hosts a single-threaded ExecutorService-based worker
-
newSingle
Schedulerthat hosts a single-threaded ExecutorService-based worker. This type ofSchedulerdetects and rejects usage of blocking Reactor APIs.- Parameters:
name- Component and thread name prefixdaemon- false if theSchedulerrequires an explicitScheduler.dispose()to exit the VM.- Returns:
- a new
Schedulerthat hosts a single-threaded ExecutorService-based worker
-
newSingle
Schedulerthat hosts a single-threaded ExecutorService-based worker.- Parameters:
threadFactory- aThreadFactoryto use for the unique thread of theScheduler- Returns:
- a new
Schedulerthat hosts a single-threaded ExecutorService-based worker
-
onHandleError
Define a hook anonymous part that is executed alongside keyed parts when aSchedulerhashandled an error. Note that it is executed after the error has been passed to the thread uncaughtErrorHandler, which is not the case when a fatal error occurs (seeExceptions.throwIfJvmFatal(Throwable)).This variant uses an internal private key, which allows the method to be additive with
onHandleError(String, BiConsumer). Prefer adding and removing handler parts for keys that you own viaonHandleError(String, BiConsumer)nonetheless.- Parameters:
subHook- the newBiConsumerto set as the hook's anonymous part.- See Also:
-
onHandleError
Define a keyed hook part that is executed alongside other parts when aSchedulerhashandled an error. Note that it is executed after the error has been passed to the thread uncaughtErrorHandler, which is not the case when a fatal error occurs (seeExceptions.throwIfJvmFatal(Throwable)).Calling this method twice with the same key replaces the old hook part of the same key. Calling this method twice with two different keys is otherwise additive. Note that
onHandleError(BiConsumer)also defines an anonymous part which effectively uses a private internal key, making it also additive with this method.- Parameters:
key- theStringkey identifying the hook part to set/replace.subHook- the new hook part to set for the given key.
-
isInNonBlockingThread
public static boolean isInNonBlockingThread()Check if calling a Reactor blocking API in the currentThreadis forbidden or not. This method returnstrueand will forbid the Reactor blocking API if any of the following conditions meet:- the thread implements
NonBlocking; or - any of the
Predicates registered viaregisterNonBlockingThreadPredicate(Predicate)returnstrue.
- Returns:
trueif blocking is forbidden in this thread,falseotherwise
- the thread implements
-
isNonBlockingThread
Check if calling a Reactor blocking API in the givenThreadis forbidden or not. This method returnstrueand will forbid the Reactor blocking API if any of the following conditions meet:- the thread implements
NonBlocking; or - any of the
Predicates registered viaregisterNonBlockingThreadPredicate(Predicate)returnstrue.
- Returns:
trueif blocking is forbidden in that thread,falseotherwise
- the thread implements
-
registerNonBlockingThreadPredicate
-
resetNonBlockingThreadPredicate
public static void resetNonBlockingThreadPredicate()Unregisters all thePredicates registered so far viaregisterNonBlockingThreadPredicate(Predicate). -
enableMetrics
Deprecated.prefer using Micrometer#timedScheduler from the reactor-core-micrometer module. To be removed at the earliest in 3.6.0.If Micrometer is available, set-up a decorator that will instrument anyExecutorServicethat backs aScheduler. No-op if Micrometer isn't available.The
MeterRegistryused by reactor can be configured viaMetrics.MicrometerConfiguration.useRegistry(MeterRegistry)prior to using this method, the default beingMetrics.globalRegistry.- Implementation Note:
- Note that this is added as a decorator via Schedulers when enabling metrics for schedulers, which doesn't change the Factory.
-
disableMetrics
Deprecated.prefer using Micrometer#timedScheduler from the reactor-core-micrometer module. To be removed at the earliest in 3.6.0.IfenableMetrics()has been previously called, removes the decorator. No-op ifenableMetrics()hasn't been called. -
resetFactory
public static void resetFactory()Re-apply default factory toSchedulers -
setFactoryWithSnapshot
ReplaceSchedulersfactories (newParallel,newSingleandnewBoundedElastic). UnlikesetFactory(Factory), doesn't shutdown previous Schedulers but capture them in aSchedulers.Snapshotthat can be later restored viaresetFrom(Snapshot).This method should be called safely and with caution, typically on app startup.
- Parameters:
newFactory- an arbitrarySchedulers.Factoryinstance- Returns:
- a
Schedulers.Snapshotrepresenting a restorable snapshot ofSchedulers
-
resetFrom
Replace the current Factory and shared Schedulers with the ones saved in a previouslycapturedsnapshot.Passing
nullre-applies the default factory. -
resetOnHandleError
public static void resetOnHandleError()Reset theonHandleError(BiConsumer)hook to the default no-op behavior, erasing all sub-hooks that might have individually added viaonHandleError(String, BiConsumer)or the whole hook set viaonHandleError(BiConsumer).- See Also:
-
resetOnHandleError
Reset a specific onHandleError hook part keyed to the providedString, removing that sub-hook if it has previously been defined viaonHandleError(String, BiConsumer). -
setFactory
ReplaceSchedulersfactories (newParallel,newSingleandnewBoundedElastic). Also shutdown Schedulers from the cached factories (likesingle()) in order to also use these replacements, re-creating the shared schedulers from the new factory upon next use.This method should be called safely and with caution, typically on app startup.
- Parameters:
factoryInstance- an arbitrarySchedulers.Factoryinstance.
-
addExecutorServiceDecorator
public static boolean addExecutorServiceDecorator(String key, BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService> decorator) Set up an additionalScheduledExecutorServicedecorator for a given key only if that key is not already present.The decorator is a
BiFunctiontaking the Scheduler and the backingScheduledExecutorServiceas second argument. It returns the decoratedScheduledExecutorService.- Parameters:
key- the key under which to set up the decoratordecorator- the executor service decorator to add, if key not already present.- Returns:
- true if the decorator was added, false if a decorator was already present for this key.
- See Also:
-
setExecutorServiceDecorator
public static void setExecutorServiceDecorator(String key, BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService> decorator) Set up an additionalScheduledExecutorServicedecorator for a given key, even if that key is already present.The decorator is a
BiFunctiontaking the Scheduler and the backingScheduledExecutorServiceas second argument. It returns the decoratedScheduledExecutorService.- Parameters:
key- the key under which to set up the decoratordecorator- the executor service decorator to add, if key not already present.- See Also:
-
removeExecutorServiceDecorator
public static BiFunction<Scheduler,ScheduledExecutorService, removeExecutorServiceDecoratorScheduledExecutorService> (String key) Remove an existingScheduledExecutorServicedecorator if it has been set up viaaddExecutorServiceDecorator(String, BiFunction).In case the decorator implements
Disposable, it is alsodisposed.- Parameters:
key- the key for the executor service decorator to remove- Returns:
- the removed decorator, or null if none was set for that key
- See Also:
-
decorateExecutorService
public static ScheduledExecutorService decorateExecutorService(Scheduler owner, ScheduledExecutorService original) This method is aimed atSchedulerimplementors, enabling custom implementations that are backed by aScheduledExecutorServiceto also have said executors decorated (ie. for instrumentation purposes).It applies the decorators added via
addExecutorServiceDecorator(String, BiFunction), so it shouldn't be added as a decorator. Note also that decorators are not guaranteed to be idempotent, so this method should be called only once per executor.- Parameters:
owner- aSchedulerthat owns theScheduledExecutorServiceoriginal- theScheduledExecutorServicethat theSchedulerwants to use originally- Returns:
- the decorated
ScheduledExecutorService, or the original if no decorator is set up - See Also:
-
onScheduleHook
Add or replace a named schedulingdecorator. With subsequent calls to this method, the onScheduleHook hook can be a composite of several sub-hooks, each with a different key.The sub-hook is a
Functiontaking the scheduledRunnable. It returns the decoratedRunnable.- Parameters:
key- the key under which to set up the onScheduleHook sub-hookdecorator- theRunnabledecorator to add (or replace, if key is already present)- See Also:
-
resetOnScheduleHook
Reset a specific onScheduleHooksub-hookif it has been set up viaonScheduleHook(String, Function).- Parameters:
key- the key for onScheduleHook sub-hook to remove- See Also:
-
resetOnScheduleHooks
public static void resetOnScheduleHooks()Remove all onScheduleHooksub-hooks. -
onSchedule
Applies the hooks registered withonScheduleHook(String, Function). -
shutdownNow
public static void shutdownNow()Clear any cachedSchedulerand call dispose on them. -
single
The common single instance, aSchedulerthat hosts a single-threaded ExecutorService-based worker.Only one instance of this common scheduler will be created on the first call and is cached. The same instance is returned on subsequent calls until it is disposed.
One cannot directly
disposethe common instances, as they are cached and shared between callers. They can however be allshut downtogether, or replaced by achange in Factory.- Returns:
- the common single instance, a
Schedulerthat hosts a single-threaded ExecutorService-based worker
-
single
Wraps a singleScheduler.Workerfrom some otherSchedulerand providesScheduler.Workerservices on top of it. Unlike with other factory methods in this class, the delegate is assumed to beinitializedand won't be implicitly initialized by this method.Use the
Scheduler.dispose()to release the wrapped worker.- Parameters:
original- aSchedulerto call upon to get the singleScheduler.Worker- Returns:
- a wrapping
Schedulerconsistently returning a same worker from a sourceScheduler
-