Threading and Schedulers

Reactor, like RxJava, can be considered to be concurrency-agnostic. That is, it does not enforce a concurrency model. Rather, it leaves you, the developer, in command. However, that does not prevent the library from helping you with concurrency.

Obtaining a Flux or a Mono does not necessarily mean that it runs in a dedicated Thread. Instead, most operators continue working in the Thread on which the previous operator executed. Unless specified, the topmost operator (the source) itself runs on the Thread in which the subscribe() call was made. The following example runs a Mono in a new thread:

public static void main(String[] args) throws InterruptedException {
  final Mono<String> mono = Mono.just("hello "); (1)

  Thread t = new Thread(() -> mono
      .map(msg -> msg + "thread ")
      .subscribe(v -> (2)
          System.out.println(v + Thread.currentThread().getName()) (3)

1 The Mono<String> is assembled in thread main.
2 However, it is subscribed to in thread Thread-0.
3 As a consequence, both the map and the onNext callback actually run in Thread-0

The preceding code produces the following output:

hello thread Thread-0

In Reactor, the execution model and where the execution happens is determined by the Scheduler that is used. A Scheduler has scheduling responsibilities similar to an ExecutorService, but having a dedicated abstraction lets it do more, notably acting as a clock and enabling a wider range of implementations (virtual time for tests, trampolining or immediate scheduling, and so on).

The Schedulers class has static methods that give access to the following execution contexts:

  • No execution context (Schedulers.immediate()): at processing time, the submitted Runnable will be directly executed, effectively running them on the current Thread (can be seen as a "null object" or no-op Scheduler).

  • A single, reusable thread (Schedulers.single()). Note that this method reuses the same thread for all callers, until the Scheduler is disposed. If you want a per-call dedicated thread, use Schedulers.newSingle() for each call.

  • An unbounded elastic thread pool (Schedulers.elastic()). This one is no longer preferred with the introduction of Schedulers.boundedElastic(), as it has a tendency to hide backpressure problems and lead to too many threads (see below).

  • A bounded elastic thread pool (Schedulers.boundedElastic()). This is a handy way to give a blocking process its own thread so that it does not tie up other resources. This is a better choice for I/O blocking work. See How Do I Wrap a Synchronous, Blocking Call?, but doesn’t pressure the system too much with new threads. Starting from 3.6.0 this can offer two different implementations depending on the setup:

    • ExecutorService-based, which reuses platform threads between tasks. This implementation, like its predecessor elastic(), creates new worker pools as needed and reuses idle ones. Worker pools that stay idle for too long (the default is 60s) are also disposed. Unlike its elastic() predecessor, it has a cap on the number of backing threads it can create (default is number of CPU cores x 10). Up to 100 000 tasks submitted after the cap has been reached are enqueued and will be re-scheduled when a thread becomes available (when scheduling with a delay, the delay starts when the thread becomes available).

    • Thread-per-task-based, designed to run on VirtualThread instances. To embrace that functionality, the application should run in Java 21+ environment and set the reactor.schedulers.defaultBoundedElasticOnVirtualThreads system property to true. Once the above is set, the shared Schedulers.boundedElastic() return a specific implementation of BoundedElasticScheduler tailored to run every task on a new instance of the VirtualThread class. This implementation is similar in terms of the behavior to the ExecutorService-based one but does not have idle pool and creates a new VirtualThread for each task.

  • A fixed pool of workers that is tuned for parallel work (Schedulers.parallel()). It creates as many workers as you have CPU cores.

Additionally, you can create a Scheduler out of any pre-existing ExecutorService by using Schedulers.fromExecutorService(ExecutorService). (You can also create one from an Executor, although doing so is discouraged.)

You can also create new instances of the various scheduler types by using the newXXX methods. For example, Schedulers.newParallel(yourScheduleName) creates a new parallel scheduler named yourScheduleName.

While boundedElastic is made to help with legacy blocking code if it cannot be avoided, single and parallel are not. As a consequence, the use of Reactor blocking APIs (block(), blockFirst(), blockLast() (as well as iterating over toIterable() or toStream()) inside the default single and parallel schedulers) results in an IllegalStateException being thrown.

Custom Schedulers can also be marked as "non blocking only" by creating instances of Thread that implement the NonBlocking marker interface.

Some operators use a specific scheduler from Schedulers by default (and usually give you the option of providing a different one). For instance, calling the Flux.interval(Duration.ofMillis(300)) factory method produces a Flux<Long> that ticks every 300ms. By default, this is enabled by Schedulers.parallel(). The following line changes the Scheduler to a new instance similar to Schedulers.single():

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

Reactor offers two means of switching the execution context (or Scheduler) in a reactive chain: publishOn and subscribeOn. Both take a Scheduler and let you switch the execution context to that scheduler. But the placement of publishOn in the chain matters, while the placement of subscribeOn does not. To understand that difference, you first have to remember that nothing happens until you subscribe .

In Reactor, when you chain operators, you can wrap as many Flux and Mono implementations inside one another as you need. Once you subscribe, a chain of Subscriber objects is created, backward (up the chain) to the first publisher. This is effectively hidden from you. All you can see is the outer layer of Flux (or Mono) and Subscription, but these intermediate operator-specific subscribers are where the real work happens.

With that knowledge, we can have a closer look at the publishOn and subscribeOn operators:

1. The publishOn Method

publishOn applies in the same way as any other operator, in the middle of the subscriber chain. It takes signals from upstream and replays them downstream while executing the callback on a worker from the associated Scheduler. Consequently, it affects where the subsequent operators execute (until another publishOn is chained in), as follows:

  • Changes the execution context to one Thread picked by the Scheduler

  • as per the specification, onNext calls happen in sequence, so this uses up a single thread

  • unless they work on a specific Scheduler, operators after publishOn continue execution on that same thread

The following example uses the publishOn method:

Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); (1)

final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i)  (2)
    .publishOn(s)  (3)
    .map(i -> "value " + i);  (4)

new Thread(() -> flux.subscribe(System.out::println));  (5)
1 Creates a new Scheduler backed by four Thread instances.
2 The first map runs on the anonymous thread in <5>.
3 The publishOn switches the whole sequence on a Thread picked from <1>.
4 The second map runs on the Thread from <1>.
5 This anonymous Thread is the one where the subscription happens. The print happens on the latest execution context, which is the one from publishOn.

2. The subscribeOn Method

subscribeOn applies to the subscription process, when the backward chain is being constructed. It is usually recommended to place it immediately after the source of data, as intermediate operators can affect the context of the execution.

However, this does not affect the behavior of subsequent calls to publishOn — they still switch the execution context for the part of the chain after them.

  • Changes the Thread from which the whole chain of operators subscribes

  • Picks one thread from the Scheduler

Only the closest subscribeOn call in the downstream chain effectively schedules subscription and request signals to the source or operators that can intercept them (doFirst, doOnRequest). Using multiple subscribeOn calls will introduce unnecessary Thread switches that have no value.

The following example uses the subscribeOn method:

Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); (1)

final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i)  (2)
    .subscribeOn(s)  (3)
    .map(i -> "value " + i);  (4)

new Thread(() -> flux.subscribe(System.out::println));  (5)
1 Creates a new Scheduler backed by four Thread.
2 The first map runs on one of these four threads…​
3 …​because subscribeOn switches the whole sequence right from subscription time (<5>).
4 The second map also runs on same thread.
5 This anonymous Thread is the one where the subscription initially happens, but subscribeOn immediately shifts it to one of the four scheduler threads.