Exposing Reactor metrics

Project Reactor is a library designed for performance and better utilization of resources. But to truly understand the performance of a system, it is best to be able to monitor its various components.

This is why Reactor provides a built-in integration with Micrometer via the reactor-core-micrometer module. Introduced in the 2022.0 BOM release, the module provides an explicit dependency to Micrometer, which allows it to offer fine-tuned APIs for metrics and observations.

Up to Reactor-Core 3.5.0, metrics were implemented as operators that would be no-op if Micrometer wasn’t on the classpath.

The reactor-core-micrometer APIs require the user to provide a form of registry explicitly instead of relying on a hardcoded global registry. When applying instrumentation to classes that have a NATIVE notion of naming or tags, these APIs will attempt to discover such elements in the reactive chain. Otherwise, the API will expect that a prefix for naming meters is provided alongside the registry.

1. Scheduler metrics

Every async operation in Reactor is done via the Scheduler abstraction described in Threading and Schedulers. This is why it is important to monitor your schedulers, watch out for key metrics that start to look suspicious and react accordingly.

The reactor-core-micrometer module offers a "timed" Scheduler wrapper that perform measurements around tasks submitted through it, which can be used as follows:

Scheduler originalScheduler = Schedulers.newParallel("test", 4);

Scheduler schedulerWithMetrics = Micrometer.timedScheduler(
	originalScheduler, (1)
	applicationDefinedMeterRegistry, (2)
	"testingMetrics", (3)
	Tags.of(Tag.of("additionalTag", "yes")) (4)
);
1 the Scheduler to wrap
2 the MeterRegistry in which to publish metrics
3 the prefix to use in naming meters. This would for example lead to a testingMetrics.scheduler.tasks.completed meter being created.
4 optional tags to add to all the meters created for that wrapping Scheduler
When wrapping a common Scheduler (eg. Schedulers.single()) or a Scheduler that is used in multiple places, only the Runnable tasks that are submitted through the wrapper instance returned by Micrometer#timedScheduler are going to be instrumented.

See Micrometer.timedScheduler() for produced meters and associated default tags.

2. Publisher metrics

Sometimes it is useful to be able to record metrics at some stage in your reactive pipeline.

One way to do it would be to manually push the values to your metrics backend of choice from a custom SignalListener provided to the tap operator.

An out-of-the-box implementation is actually provided by the reactor-core-micrometer module, via Micrometer#metrics APIs. Consider the following pipeline:

listenToEvents()
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();

To enable the metrics for this source Flux (returned from listenToEvents()), we need to turn on the metrics collection:

listenToEvents()
    .name("events") (1)
    .tap(Micrometer.metrics( (2)
        applicationDefinedMeterRegistry (3)
    ))
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();
1 Every metric at this stage of the reactive pipeline will use "events" as a naming prefix (optional, defaults to reactor prefix).
2 We use the tap operator combined with a SignalListener implementation provided in reactor-core-micrometer for metrics collection.
3 As with other APIs in that module, the MeterRegistry into which to publish metrics needs to be explicitly provided.

The detail of the exposed metrics is available in Micrometer.metrics().

2.1. Tags

In addition to the common tags described in Micrometer.metrics(), users can add custom tags to their reactive chains via the tag operator:

listenToEvents()
    .name("events") (1)
    .tag("source", "kafka") (2)
    .tap(Micrometer.metrics(applicationDefinedRegistry)) (3)
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();
1 Every metric at this stage will be identified with the "events" prefix.
2 Set a custom tag "source" to value "kafka".
3 All reported metrics will have source=kafka tag assigned in addition to the common tags.

Please note that depending on the monitoring system you’re using, using a name can be considered mandatory when using tags, since it would otherwise result in a different set of tags between two default-named sequences. Some systems like Prometheus might also require to have the exact same set of tags for each metric with the same name.

2.2. Observation

In addition to full metrics, the reactor-core-micrometer module offers an alternative based on Micrometer’s Observation. Depending on the configuration and runtime classpath, an Observation could translate to timers, spans, logging statements or any combination.

A reactive chain can be observed via the tap operator and Micrometer.observation utility, as follows:

listenToEvents()
    .name("events") (1)
    .tap(Micrometer.observation( (2)
		applicationDefinedRegistry)) (3)
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();
1 The Observation for this pipeline will be identified with the "events" prefix.
2 We use the tap operator with the observation utility.
3 A registry must be provided into which to publish the observation results. Note this is an ObservationRegistry.

The detail of the observation and its tags is provided in Micrometer.observation().

You can also fully customize Micrometer’s Observation via Micrometer.observation(ObservationRegistry registry, Function<ObservationRegistry, Observation> observationSupplier) with your own Observation supplier, as follows:

listenToEvents()
    .name("events") (1)
    .tap(Micrometer.observation( (2)
    	applicationDefinedRegistry, (3)
    	registry -> Observation.createNotStarted( (4)
    		myConvention, (5)
            myContextSupplier, (6)
            registry)))
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();
1 The Observation for this pipeline will be identified with the "events" prefix.
2 We use the tap operator with the observation utility.
3 A registry must be provided into which to publish the observation results. Note this is an ObservationRegistry.
4 We provide our own function to create the Observation
5 with a custom ObservationConvention
6 and a custom Supplier<Context>.

3. Meters and tags for Reactor-Core-Micrometer module

3.1. Micrometer.metrics()

Below is the list of meters used by the metrics tap listener feature, as exposed via Micrometer.metrics(MeterRegistry meterRegistry).

Please note that metrics below use a dynamic %s prefix. When applied on a Flux or Mono that uses the name(String n) operator, this is replaced with n. Otherwise, this is replaced by the default value of "reactor".

3.1.1. Flow Duration

Times the duration elapsed between a subscription and the termination or cancellation of the sequence. A TerminationTags#STATUS tag is added to specify what event caused the timer to end ( "completed", "completedEmpty", "error" or "cancelled").

Metric name %s.flow.duration - since it contains %s, the name is dynamic and will be resolved at runtime. Type distribution summary.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Table 1. Low cardinality Keys

Name

Description

exception (required)

Tag used by FLOW_DURATION when STATUS is "error", to store the exception that occurred.

status (required)

The termination status:

  • "completed" for a sequence that terminates with an onComplete, with onNext(s)

  • "completedEmpty" for a sequence that terminates without any onNext before the onComplete

  • "error" for a sequence that terminates with an onError

  • "cancelled" for a sequence that has cancelled its subscription

type (required)

The type of the sequence ( "Flux" or "Mono").

3.1.2. Malformed Source Events

Counts the number of events received from a malformed source (ie an onNext after an onComplete).

Metric name %s.malformed.source - since it contains %s, the name is dynamic and will be resolved at runtime. Type counter.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Table 2. Low cardinality Keys

Name

Description

type (required)

The type of the sequence ( "Flux" or "Mono").

3.1.3. On Next Delay

Measures the delay between each onNext (or between the first onNext and the onSubscribe event).

Metric name %s.onNext.delay - since it contains %s, the name is dynamic and will be resolved at runtime. Type timer and base unit nanoseconds.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)
Table 3. Low cardinality Keys

Name

Description

type (required)

The type of the sequence ( "Flux" or "Mono").

3.1.4. Requested Amount

Counts the amount requested to a named sequence (eg. Flux.name(String)) by all subscribers, until at least one requests an unbounded amount.

Metric name %s.requested - since it contains %s, the name is dynamic and will be resolved at runtime. Type distribution summary.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Table 4. Low cardinality Keys

Name

Description

type (required)

The type of the sequence ( "Flux" or "Mono").

3.1.5. Subscribed

Counts the number of subscriptions to a sequence.

Metric name %s.subscribed - since it contains %s, the name is dynamic and will be resolved at runtime. Type counter.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Table 5. Low cardinality Keys

Name

Description

type (required)

The type of the sequence ( "Flux" or "Mono").

3.2. Micrometer.timedScheduler()

Below is the list of meters used by the TimedScheduler feature, as exposed via Micrometer.timedScheduler(Scheduler original, MeterRegistry meterRegistry, String metricsPrefix).

Please note that metrics below use a dynamic %s prefix. This is replaced with the provided metricsPrefix in practice.

3.2.1. Tasks Active

LongTaskTimer reflecting tasks currently running. Note that this reflects all types of active tasks, including tasks scheduled with a delay or periodically (each iteration being considered an active task).

Metric name %s.scheduler.tasks.active - since it contains %s, the name is dynamic and will be resolved at runtime. Type long task timer.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)

3.2.2. Tasks Completed

Timer reflecting tasks that have finished execution. Note that this reflects all types of active tasks, including tasks with a delay or periodically (each iteration being considered a separate completed task).

Metric name %s.scheduler.tasks.completed - since it contains %s, the name is dynamic and will be resolved at runtime. Type timer.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)

3.2.3. Tasks Pending

LongTaskTimer reflecting tasks that were submitted for immediate execution but couldn’t be started immediately because the scheduler is already at max capacity. Note that only immediate submissions via Scheduler#schedule(Runnable) and Scheduler.Worker#schedule(Runnable) are considered.

Metric name %s.scheduler.tasks.pending - since it contains %s, the name is dynamic and will be resolved at runtime. Type long task timer.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)

3.2.4. Tasks Submitted

Counter that increments by one each time a task is submitted (via any of the schedule methods on both Scheduler and Scheduler.Worker).

Note that there are actually 4 counters, which can be differentiated by the SubmittedTags#SUBMISSION tag. The sum of all these can thus be compared with the TASKS_COMPLETED counter.

Metric name %s.scheduler.tasks.submitted - since it contains %s, the name is dynamic and will be resolved at runtime. Type counter.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Table 6. Low cardinality Keys

Name

Description

submission.type (required)

The type of submission:

  • "direct" for Scheduler#schedule(Runnable)

  • "delayed" for Scheduler#schedule(Runnable,long,TimeUnit)

  • "periodic_initial" for Scheduler#schedulePeriodically(Runnable,long,long,TimeUnit) after the initial delay

  • "periodic_iteration" for Scheduler#schedulePeriodically(Runnable,long,long,TimeUnit) further periodic iterations

3.3. Micrometer.observation()

Below is the list of meters used by the observation tap listener feature, as exposed via Micrometer.observation(ObservationRegistry registry).

This is the ANONYMOUS observation, but you can create a similar Observation with a custom name by using the name(String) operator.

You can also fully customize Micrometer’s Observation via Micrometer.observation(ObservationRegistry registry, Function<ObservationRegistry, Observation> observationSupplier) with your own Observation supplier, allowing to configure its attributes (name, contextual name, low and high cardinality keys, …​).

3.3.1. Anonymous

Anonymous version of the Micrometer.observation(), when the sequence hasn’t been explicitly named via e.g. Flux#name(String) operator.

Metric name reactor.observation. Type timer.

Metric name reactor.observation.active. Type long task timer.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)
Table 7. Low cardinality Keys

Name

Description

reactor.status (required)

The status of the sequence, which indicates how it terminated ( "completed", "completedEmpty", "error" or "cancelled").

reactor.type (required)

The type of the sequence, i.e. "Flux" or "Mono".