Exposing Reactor metrics
Project Reactor is a library designed for performance and better utilization of resources. But to truly understand the performance of a system, it is best to be able to monitor its various components.
This is why Reactor provides a built-in integration with Micrometer via the reactor-core-micrometer
module.
Introduced in the 2022.0 BOM
release, the module provides an explicit dependency to Micrometer, which allows it to offer fine-tuned APIs for metrics and observations.
Up to Reactor-Core 3.5.0 , metrics were implemented as operators that would be no-op if Micrometer wasn’t on the classpath.
|
The reactor-core-micrometer
APIs require the user to provide a form of registry explicitly instead of relying on a hardcoded global registry.
When applying instrumentation to classes that have a NATIVE notion of naming or tags, these APIs will attempt to discover such elements in the reactive chain.
Otherwise, the API will expect that a prefix for naming meters is provided alongside the registry.
1. Scheduler metrics
Every async operation in Reactor is done via the Scheduler abstraction described in Threading and Schedulers. This is why it is important to monitor your schedulers, watch out for key metrics that start to look suspicious and react accordingly.
The reactor-core-micrometer
module offers a "timed" Scheduler
wrapper that perform measurements around tasks submitted through it, which can be used as follows:
Scheduler originalScheduler = Schedulers.newParallel("test", 4);
Scheduler schedulerWithMetrics = Micrometer.timedScheduler(
originalScheduler, (1)
applicationDefinedMeterRegistry, (2)
"testingMetrics", (3)
Tags.of(Tag.of("additionalTag", "yes")) (4)
);
1 | the Scheduler to wrap |
2 | the MeterRegistry in which to publish metrics |
3 | the prefix to use in naming meters. This would for example lead to a testingMetrics.scheduler.tasks.completed meter being created. |
4 | optional tags to add to all the meters created for that wrapping Scheduler |
When wrapping a common Scheduler (eg. Schedulers.single() ) or a Scheduler that is used in multiple places, only the Runnable tasks that are
submitted through the wrapper instance returned by Micrometer#timedScheduler are going to be instrumented.
|
See Micrometer.timedScheduler() for produced meters and associated default tags.
2. Publisher metrics
Sometimes it is useful to be able to record metrics at some stage in your reactive pipeline.
One way to do it would be to manually push the values to your metrics backend of choice from a custom SignalListener
provided to the tap
operator.
An out-of-the-box implementation is actually provided by the reactor-core-micrometer
module, via Micrometer#metrics
APIs.
Consider the following pipeline:
listenToEvents()
.doOnNext(event -> log.info("Received {}", event))
.delayUntil(this::processEvent)
.retry()
.subscribe();
To enable the metrics for this source Flux
(returned from listenToEvents()
), we need to turn on the metrics collection:
listenToEvents()
.name("events") (1)
.tap(Micrometer.metrics( (2)
applicationDefinedMeterRegistry (3)
))
.doOnNext(event -> log.info("Received {}", event))
.delayUntil(this::processEvent)
.retry()
.subscribe();
1 | Every metric at this stage of the reactive pipeline will use "events" as a naming prefix (optional, defaults to reactor prefix). |
2 | We use the tap operator combined with a SignalListener implementation provided in reactor-core-micrometer for metrics collection. |
3 | As with other APIs in that module, the MeterRegistry into which to publish metrics needs to be explicitly provided. |
The detail of the exposed metrics is available in Micrometer.metrics().
2.1. Tags
In addition to the common tags described in Micrometer.metrics(), users can add custom tags to their reactive chains via the tag
operator:
listenToEvents()
.name("events") (1)
.tag("source", "kafka") (2)
.tap(Micrometer.metrics(applicationDefinedRegistry)) (3)
.doOnNext(event -> log.info("Received {}", event))
.delayUntil(this::processEvent)
.retry()
.subscribe();
1 | Every metric at this stage will be identified with the "events" prefix. |
2 | Set a custom tag "source" to value "kafka". |
3 | All reported metrics will have source=kafka tag assigned in addition to the common tags. |
Please note that depending on the monitoring system you’re using, using a name can be considered mandatory when using tags, since it would otherwise result in a different set of tags between two default-named sequences. Some systems like Prometheus might also require to have the exact same set of tags for each metric with the same name.
2.2. Observation
In addition to full metrics, the reactor-core-micrometer
module offers an alternative based on Micrometer’s Observation
.
Depending on the configuration and runtime classpath, an Observation
could translate to timers, spans, logging statements or any combination.
A reactive chain can be observed via the tap
operator and Micrometer.observation
utility, as follows:
listenToEvents()
.name("events") (1)
.tap(Micrometer.observation( (2)
applicationDefinedRegistry)) (3)
.doOnNext(event -> log.info("Received {}", event))
.delayUntil(this::processEvent)
.retry()
.subscribe();
1 | The Observation for this pipeline will be identified with the "events" prefix. |
2 | We use the tap operator with the observation utility. |
3 | A registry must be provided into which to publish the observation results. Note this is an ObservationRegistry . |
The detail of the observation and its tags is provided in Micrometer.observation().
You can also fully customize Micrometer’s Observation via
Micrometer.observation(ObservationRegistry registry, Function<ObservationRegistry, Observation> observationSupplier)
with your own Observation supplier, as follows:
listenToEvents()
.name("events") (1)
.tap(Micrometer.observation( (2)
applicationDefinedRegistry, (3)
registry -> Observation.createNotStarted( (4)
myConvention, (5)
myContextSupplier, (6)
registry)))
.doOnNext(event -> log.info("Received {}", event))
.delayUntil(this::processEvent)
.retry()
.subscribe();
1 | The Observation for this pipeline will be identified with the "events" prefix. |
2 | We use the tap operator with the observation utility. |
3 | A registry must be provided into which to publish the observation results. Note this is an ObservationRegistry . |
4 | We provide our own function to create the Observation |
5 | with a custom ObservationConvention |
6 | and a custom Supplier<Context> . |
3. Meters and tags for Reactor-Core-Micrometer module
3.1. Micrometer.metrics()
Below is the list of meters used by the metrics tap listener feature, as exposed via
Micrometer.metrics(MeterRegistry meterRegistry)
.
Please note that metrics below use a dynamic %s prefix.
When applied on a Flux or Mono that uses the name(String n) operator, this is replaced with n .
Otherwise, this is replaced by the default value of "reactor" .
|
3.1.1. Flow Duration
Times the duration elapsed between a subscription and the termination or cancellation of the sequence. A TerminationTags#STATUS tag is added to specify what event caused the timer to end (
"completed"
,"completedEmpty"
,"error"
or"cancelled"
).
Metric name %s.flow.duration
- since it contains %s
, the name is dynamic and will be resolved at runtime. Type distribution summary
.
KeyValues that are added after starting the Observation might be missing from the *.active metrics. |
Name |
Description |
|
Tag used by FLOW_DURATION when STATUS is |
|
The termination status:
|
|
The type of the sequence ( |
3.1.2. Malformed Source Events
Counts the number of events received from a malformed source (ie an onNext after an onComplete).
Metric name %s.malformed.source
- since it contains %s
, the name is dynamic and will be resolved at runtime. Type counter
.
KeyValues that are added after starting the Observation might be missing from the *.active metrics. |
Name |
Description |
|
The type of the sequence ( |
3.1.3. On Next Delay
Measures the delay between each onNext (or between the first onNext and the onSubscribe event).
Metric name %s.onNext.delay
- since it contains %s
, the name is dynamic and will be resolved at runtime. Type timer
and base unit nanoseconds
.
KeyValues that are added after starting the Observation might be missing from the *.active metrics. |
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)
|
Name |
Description |
|
The type of the sequence ( |
3.1.4. Requested Amount
Counts the amount requested to a named sequence (eg.
Flux.name(String)
) by all subscribers, until at least one requests an unbounded amount.
Metric name %s.requested
- since it contains %s
, the name is dynamic and will be resolved at runtime. Type distribution summary
.
KeyValues that are added after starting the Observation might be missing from the *.active metrics. |
Name |
Description |
|
The type of the sequence ( |
3.1.5. Subscribed
Counts the number of subscriptions to a sequence.
Metric name %s.subscribed
- since it contains %s
, the name is dynamic and will be resolved at runtime. Type counter
.
KeyValues that are added after starting the Observation might be missing from the *.active metrics. |
Name |
Description |
|
The type of the sequence ( |
3.2. Micrometer.timedScheduler()
Below is the list of meters used by the TimedScheduler feature, as exposed via
Micrometer.timedScheduler(Scheduler original, MeterRegistry meterRegistry, String metricsPrefix)
.
Please note that metrics below use a dynamic %s prefix. This is replaced with the provided metricsPrefix in practice.
|
3.2.1. Tasks Active
LongTaskTimer reflecting tasks currently running. Note that this reflects all types of active tasks, including tasks scheduled with a delay or periodically (each iteration being considered an active task).
Metric name %s.scheduler.tasks.active
- since it contains %s
, the name is dynamic and will be resolved at runtime. Type long task timer
.
KeyValues that are added after starting the Observation might be missing from the *.active metrics. |
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)
|
3.2.2. Tasks Completed
Timer reflecting tasks that have finished execution. Note that this reflects all types of active tasks, including tasks with a delay or periodically (each iteration being considered a separate completed task).
Metric name %s.scheduler.tasks.completed
- since it contains %s
, the name is dynamic and will be resolved at runtime. Type timer
.
KeyValues that are added after starting the Observation might be missing from the *.active metrics. |
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)
|
3.2.3. Tasks Pending
LongTaskTimer reflecting tasks that were submitted for immediate execution but couldn’t be started immediately because the scheduler is already at max capacity. Note that only immediate submissions via Scheduler#schedule(Runnable) and Scheduler.Worker#schedule(Runnable) are considered.
Metric name %s.scheduler.tasks.pending
- since it contains %s
, the name is dynamic and will be resolved at runtime. Type long task timer
.
KeyValues that are added after starting the Observation might be missing from the *.active metrics. |
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)
|
3.2.4. Tasks Submitted
Counter that increments by one each time a task is submitted (via any of the schedule methods on both Scheduler and Scheduler.Worker).
Note that there are actually 4 counters, which can be differentiated by the SubmittedTags#SUBMISSION tag. The sum of all these can thus be compared with the TASKS_COMPLETED counter.
Metric name %s.scheduler.tasks.submitted
- since it contains %s
, the name is dynamic and will be resolved at runtime. Type counter
.
KeyValues that are added after starting the Observation might be missing from the *.active metrics. |
Name |
Description |
|
The type of submission:
|
3.3. Micrometer.observation()
Below is the list of meters used by the observation tap listener feature, as exposed via
Micrometer.observation(ObservationRegistry registry)
.
This is the ANONYMOUS observation, but you can create a similar Observation with a custom name by using the name(String)
operator.
You can also fully customize Micrometer’s Observation via
Micrometer.observation(ObservationRegistry registry, Function<ObservationRegistry, Observation> observationSupplier)
with your own Observation supplier, allowing to configure its attributes (name, contextual name, low and high cardinality keys, …).
|
3.3.1. Anonymous
Anonymous version of the Micrometer.observation(), when the sequence hasn’t been explicitly named via e.g. Flux#name(String) operator.
Metric name reactor.observation
. Type timer
.
Metric name reactor.observation.active
. Type long task timer
.
KeyValues that are added after starting the Observation might be missing from the *.active metrics. |
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)
|
Name |
Description |
|
The status of the sequence, which indicates how it terminated ( |
|
The type of the sequence, i.e. |