Threading and Schedulers
Reactor, like RxJava, can be considered to be concurrency-agnostic. That is, it does not enforce a concurrency model. Rather, it leaves you, the developer, in command. However, that does not prevent the library from helping you with concurrency.
Obtaining a Flux
or a Mono
does not necessarily mean that it runs in a dedicated
Thread
. Instead, most operators continue working in the Thread
on which the
previous operator executed. Unless specified, the topmost operator (the source)
itself runs on the Thread
in which the subscribe()
call was made. The following
example runs a Mono
in a new thread:
public static void main(String[] args) throws InterruptedException {
final Mono<String> mono = Mono.just("hello "); (1)
Thread t = new Thread(() -> mono
.map(msg -> msg + "thread ")
.subscribe(v -> (2)
System.out.println(v + Thread.currentThread().getName()) (3)
)
);
t.start();
t.join();
}
1 | The Mono<String> is assembled in thread main . |
2 | However, it is subscribed to in thread Thread-0 . |
3 | As a consequence, both the map and the onNext callback actually run in Thread-0 |
The preceding code produces the following output:
hello thread Thread-0
In Reactor, the execution model and where the execution happens is determined by the
Scheduler
that is used. A
Scheduler
has scheduling responsibilities similar to an ExecutorService
, but having a
dedicated abstraction lets it do more, notably acting as a clock and enabling
a wider range of implementations (virtual time for tests, trampolining or
immediate scheduling, and so on).
The Schedulers
class has static methods that give access to the following execution contexts:
-
No execution context (
Schedulers.immediate()
): at processing time, the submittedRunnable
will be directly executed, effectively running them on the currentThread
(can be seen as a "null object" or no-opScheduler
). -
A single, reusable thread (
Schedulers.single()
). Note that this method reuses the same thread for all callers, until the Scheduler is disposed. If you want a per-call dedicated thread, useSchedulers.newSingle()
for each call. -
An unbounded elastic thread pool (
Schedulers.elastic()
). This one is no longer preferred with the introduction ofSchedulers.boundedElastic()
, as it has a tendency to hide backpressure problems and lead to too many threads (see below). -
A bounded elastic thread pool (
Schedulers.boundedElastic()
). This is a handy way to give a blocking process its own thread so that it does not tie up other resources. This is a better choice for I/O blocking work. See How Do I Wrap a Synchronous, Blocking Call?, but doesn’t pressure the system too much with new threads. Starting from 3.6.0 this can offer two different implementations depending on the setup:-
ExecutorService
-based, which reuses platform threads between tasks. This implementation, like its predecessorelastic()
, creates new worker pools as needed and reuses idle ones. Worker pools that stay idle for too long (the default is 60s) are also disposed. Unlike itselastic()
predecessor, it has a cap on the number of backing threads it can create (default is number of CPU cores x 10). Up to 100 000 tasks submitted after the cap has been reached are enqueued and will be re-scheduled when a thread becomes available (when scheduling with a delay, the delay starts when the thread becomes available). -
Thread-per-task-based, designed to run on
VirtualThread
instances. To embrace that functionality, the application should run in Java 21+ environment and set thereactor.schedulers.defaultBoundedElasticOnVirtualThreads
system property totrue
. Once the above is set, the sharedSchedulers.boundedElastic()
return a specific implementation ofBoundedElasticScheduler
tailored to run every task on a new instance of theVirtualThread
class. This implementation is similar in terms of the behavior to theExecutorService
-based one but does not have idle pool and creates a newVirtualThread
for each task.
-
-
A fixed pool of workers that is tuned for parallel work (
Schedulers.parallel()
). It creates as many workers as you have CPU cores.
Additionally, you can create a Scheduler
out of any pre-existing ExecutorService
by
using Schedulers.fromExecutorService(ExecutorService)
. (You can also create one from an
Executor
, although doing so is discouraged.)
You can also create new instances of the various scheduler types by using the newXXX
methods. For example, Schedulers.newParallel(yourScheduleName)
creates a new parallel
scheduler named yourScheduleName
.
While Custom |
Some operators use a specific scheduler from Schedulers
by default (and usually give
you the option of providing a different one). For instance, calling the
Flux.interval(Duration.ofMillis(300))
factory method produces a Flux<Long>
that ticks every 300ms.
By default, this is enabled by Schedulers.parallel()
. The following line changes the
Scheduler to a new instance similar to Schedulers.single()
:
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
Reactor offers two means of switching the execution context (or Scheduler
) in a
reactive chain: publishOn
and subscribeOn
. Both take a Scheduler
and let you switch
the execution context to that scheduler. But the placement of publishOn
in the chain
matters, while the placement of subscribeOn
does not. To understand that difference,
you first have to remember that nothing happens until you subscribe
.
In Reactor, when you chain operators, you can wrap as many Flux
and Mono
implementations inside one another as you need. Once you subscribe, a chain of
Subscriber
objects is created, backward (up the chain) to the first
publisher. This is effectively hidden from you. All you can see is the outer layer of
Flux
(or Mono
) and Subscription
, but these intermediate operator-specific
subscribers are where the real work happens.
With that knowledge, we can have a closer look at the publishOn
and subscribeOn
operators:
1. The publishOn
Method
publishOn
applies in the same way as any other operator, in the middle of the
subscriber chain. It takes signals from upstream and replays them downstream while
executing the callback on a worker from the associated Scheduler
. Consequently, it
affects where the subsequent operators execute (until another publishOn
is
chained in), as follows:
-
Changes the execution context to one
Thread
picked by theScheduler
-
as per the specification,
onNext
calls happen in sequence, so this uses up a single thread -
unless they work on a specific
Scheduler
, operators afterpublishOn
continue execution on that same thread
The following example uses the publishOn
method:
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); (1)
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i) (2)
.publishOn(s) (3)
.map(i -> "value " + i); (4)
new Thread(() -> flux.subscribe(System.out::println)); (5)
1 | Creates a new Scheduler backed by four Thread instances. |
2 | The first map runs on the anonymous thread in <5>. |
3 | The publishOn switches the whole sequence on a Thread picked from <1>. |
4 | The second map runs on the Thread from <1>. |
5 | This anonymous Thread is the one where the subscription happens.
The print happens on the latest execution context, which is the one from publishOn . |
2. The subscribeOn
Method
subscribeOn
applies to the subscription process, when the backward chain is being
constructed. It is usually recommended to place it immediately after the source of data,
as intermediate operators can affect the context of the execution.
However, this does not affect the
behavior of subsequent calls to publishOn
— they still switch the execution context for
the part of the chain after them.
-
Changes the
Thread
from which the whole chain of operators subscribes -
Picks one thread from the
Scheduler
Only the closest subscribeOn call in the downstream chain effectively
schedules subscription and request signals to the source or operators that can
intercept them (doFirst , doOnRequest ). Using multiple subscribeOn calls will
introduce unnecessary Thread switches that have no value.
|
The following example uses the subscribeOn
method:
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); (1)
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i) (2)
.subscribeOn(s) (3)
.map(i -> "value " + i); (4)
new Thread(() -> flux.subscribe(System.out::println)); (5)
1 | Creates a new Scheduler backed by four Thread . |
2 | The first map runs on one of these four threads… |
3 | …because subscribeOn switches the whole sequence right from subscription time (<5>). |
4 | The second map also runs on same thread. |
5 | This anonymous Thread is the one where the subscription initially happens, but subscribeOn immediately shifts it to one of the four scheduler threads. |