Introduction
1. Overview
1.1. RabbitMQ
With more than 35,000 production deployments world-wide at small startups and large enterprises, RabbitMQ is the most popular open source message broker.
RabbitMQ is lightweight and easy to deploy on premises and in the cloud. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.
1.2. Project Reactor
Reactor is a highly optimized reactive library for building efficient, non-blocking applications on the JVM based on the Reactive Streams Specification. Reactor based applications can sustain very high throughput message rates and operate with a very low memory footprint, making it suitable for building efficient event-driven applications using the microservices architecture.
1.3. Reactive API for RabbitMQ
Reactor RabbitMQ is a reactive API for RabbitMQ based on Reactor and RabbitMQ Java Client. Reactor RabbitMQ API enables messages to be published to RabbitMQ and consumed from RabbitMQ using functional APIs with non-blocking back-pressure and very low overheads. This enables applications using Reactor to use RabbitMQ as a message bus or streaming platform and integrate with other systems to provide an end-to-end reactive pipeline.
2. Motivation
2.1. Functional interface for RabbitMQ
Reactor RabbitMQ is a functional Java API for RabbitMQ. For applications that are written in functional style, this API enables RabbitMQ interactions to be integrated easily without requiring non-functional produce or consume APIs to be incorporated into the application logic.
2.2. Non-blocking Back-pressure
The Reactor RabbitMQ API benefits from non-blocking back-pressure provided by Reactor. For example, in a pipeline, where messages received from an external source (e.g. an HTTP proxy) are published to RabbitMQ, back-pressure can be applied easily to the whole pipeline, limiting the number of messages in-flight and controlling memory usage. Messages flow through the pipeline as they are available, with Reactor taking care of limiting the flow rate to avoid overflow, keeping application logic simple.
2.3. End-to-end Reactive Pipeline
The value proposition for Reactor RabbitMQ is the efficient utilization of resources in applications with multiple external interactions where RabbitMQ is one of the external systems. End-to-end reactive pipelines benefit from non-blocking back-pressure and efficient use of threads, enabling a large number of concurrent requests to be processed efficiently. The optimizations provided by Project Reactor enable development of reactive applications with very low overheads and predictable capacity planning to deliver low-latency, high-throughput pipelines.
2.4. Comparisons with other RabbitMQ Java libraries
Reactor RabbitMQ is not intended to replace any of the existing Java libraries. Instead, it is aimed at providing an alternative API for reactive event-driven applications.
2.4.1. RabbitMQ Java Client
For non-reactive applications, RabbitMQ Java Client provides the most complete API to manage resources, publish messages to and consume messages from RabbitMQ. Note Reactor RabbitMQ is based on RabbitMQ Java Client.
Applications using RabbitMQ as a message bus using this API may consider switching to Reactor RabbitMQ if the application is implemented in a functional style.
2.4.2. Spring AMQP
Spring AMQP applies core Spring Framework concepts to the development of AMQP-based messaging solutions. It provides a "template" as a high-level abstraction for sending and receiving messages. It also provides support for Message-driven POJOs with a "listener container". These libraries facilitate management of AMQP resources while promoting the use of dependency injection and declarative configuration. Spring AMQP is based on RabbitMQ Java Client.
3. Getting Started
3.1. Requirements
You need Java JRE installed (Java 8 or later).
You also need to install RabbitMQ. Follow the instructions from the website. Note you should use RabbitMQ 3.6.x or later.
3.2. Quick Start
This quick start tutorial sets up a single node RabbitMQ and runs the sample reactive sender and consumer.
3.2.1. Start RabbitMQ
Start RabbitMQ on your local machine with all the defaults (e.g. AMQP port is 5672).
3.2.2. Run Reactor RabbitMQ Samples
Download Reactor RabbitMQ from github.com/reactor/reactor-rabbitmq/.
> git clone https://github.com/reactor/reactor-rabbitmq
> cd reactor-rabbitmq
Sample Sender
The SampleSender
code is on GitHub.
Run the sample sender:
> ./gradlew -q sender
10:20:12.590 INFO r.rabbitmq.samples.SampleSender - Message Message_1 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_2 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_3 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_4 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_5 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_6 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_7 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_8 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_9 sent successfully
10:20:12.597 INFO r.rabbitmq.samples.SampleSender - Message Message_10 sent successfully
10:20:12.597 INFO r.rabbitmq.samples.SampleSender - Message Message_11 sent successfully
10:20:12.597 INFO r.rabbitmq.samples.SampleSender - Message Message_12 sent successfully
10:20:12.599 INFO r.rabbitmq.samples.SampleSender - Message Message_13 sent successfully
10:20:12.600 INFO r.rabbitmq.samples.SampleSender - Message Message_14 sent successfully
10:20:12.600 INFO r.rabbitmq.samples.SampleSender - Message Message_15 sent successfully
10:20:12.600 INFO r.rabbitmq.samples.SampleSender - Message Message_16 sent successfully
10:20:12.600 INFO r.rabbitmq.samples.SampleSender - Message Message_17 sent successfully
10:20:12.600 INFO r.rabbitmq.samples.SampleSender - Message Message_18 sent successfully
10:20:12.601 INFO r.rabbitmq.samples.SampleSender - Message Message_19 sent successfully
10:20:12.601 INFO r.rabbitmq.samples.SampleSender - Message Message_20 sent successfully
The SampleSender
sends 20 messages to the demo-queue
queue, with publisher
confirms enabled. The log line for a given message is printed to the console
when the publisher confirmation is received from the broker.
Sample Receiver
The SampleReceiver
code is on GitHub.
Run the sample receiver:
> ./gradlew -q receiver
10:22:43.568 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_1
10:22:43.575 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_2
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_3
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_4
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_5
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_6
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_7
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_8
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_9
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_10
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_11
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_12
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_13
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_14
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_15
10:22:43.578 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_16
10:22:43.578 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_17
10:22:43.578 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_18
10:22:43.578 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_19
10:22:43.578 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_20
The SampleReceiver
consumes messages from the demo-queue
queue and logs
the message content in the console.
Sample Spring Boot Application
The SpringBootSample
code is on GitHub.
Run the sample Spring Boot application:
> ./gradlew -q springboot
...
11:47:43.837 INFO r.rabbitmq.samples.SpringBootSample - Sending messages...
11:47:43.846 INFO r.rabbitmq.samples.SpringBootSample - Received message Message_1
11:47:43.849 INFO r.rabbitmq.samples.SpringBootSample - Received message Message_2
11:47:43.850 INFO r.rabbitmq.samples.SpringBootSample - Received message Message_3
11:47:43.850 INFO r.rabbitmq.samples.SpringBootSample - Received message Message_4
11:47:43.851 INFO r.rabbitmq.samples.SpringBootSample - Received message Message_5
11:47:43.851 INFO r.rabbitmq.samples.SpringBootSample - Received message Message_6
11:47:43.851 INFO r.rabbitmq.samples.SpringBootSample - Received message Message_7
11:47:43.851 INFO r.rabbitmq.samples.SpringBootSample - Received message Message_8
11:47:43.851 INFO r.rabbitmq.samples.SpringBootSample - Received message Message_9
11:47:43.851 INFO r.rabbitmq.samples.SpringBootSample - Received message Message_10
The Spring Boot sample publishes messages with a Sender
and consumes them with
a Receiver
. This application illustrates how to configure Reactor RabbitMQ in a Spring
Boot environment.
3.2.3. Building Reactor RabbitMQ Applications
To build your own application using the Reactor RabbitMQ API, you need to include a dependency to Reactor RabbitMQ.
For Gradle:
dependencies {
compile "io.projectreactor.rabbitmq:reactor-rabbitmq:1.5.6"
}
For Maven:
<dependency>
<groupId>io.projectreactor.rabbitmq</groupId>
<artifactId>reactor-rabbitmq</artifactId>
<version>1.5.6</version>
</dependency>
When using a milestone or a release candidate, you need to add the Spring IO milestone repository.
For Gradle:
repositories {
maven { url 'https://repo.spring.io/milestone' }
mavenCentral()
}
For Maven:
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
When using a snapshot, you need to add the Spring IO snapshots repository.
For Gradle:
repositories {
maven { url 'https://repo.spring.io/libs-snapshot' }
mavenCentral()
}
For Maven:
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
3.2.4. Versioning
Reactor RabbitMQ used semantic versioning from version 1.0 to version 1.4, but switched to another scheme for consistency with Reactor Core and the other Reactor libraries.
Starting from 1.4, Reactor RabbitMQ uses a GENERATION.MAJOR.MINOR
scheme, whereby an increment in:
-
GENERATION
marks a change of library generation. Expect improvements, new features, bug fixes, and incompatible API changes. -
MAJOR
marks a significant release. Expect new features, bug fixes, and small incompatible API changes. -
MINOR
marks a maintenance release. Expect new features and bug fixes, but no incompatible API changes.
4. Additional Resources
4.1. Getting help
If you are having trouble with Reactor RabbitMQ, you can ask for help on RabbitMQ community mailing list.
Report bugs in Reactor RabbitMQ at github.com/reactor/reactor-rabbitmq/issues.
Reactor Rabbitmq is open source and the code and documentation are available at github.com/reactor/reactor-rabbitmq.
5. New & Noteworthy
5.1. What’s new in Reactor RabbitMQ 1.4.1
-
Support exchange-to-exchange binding and unbinding
-
Change in versioning scheme: Reactor RabbitMQ does not follow semantic versioning anymore, it uses now a
GENERATION.MAJOR.MINOR
scheme, for consistency with the other Reactor libraries
5.2. What’s new in Reactor RabbitMQ 1.4
-
Add
@NonNullApi
and@Nullable
annotations -
Add
Sender#sendWithTypedPublishConfirms
to be able to publishOutboundMessage
instances of a custom class and get them back in the flux ofOutboundMessageResult
-
Use Reactor 3.3.1.RELEASE
5.3. What’s new in Reactor RabbitMQ 1.3
-
Use Reactor 3.3.0.RELEASE
-
Allow passive exchange and queue declaration
-
Emit exception on server-initiated channel closing in publish confirms flow
-
Add support for handling returned (undeliverable) messages in
Sender
-
Add hook to configure
Mono<Connection>
-
Support client-generated consumer tags
-
Cache connections and channels only on success
-
Use Java client 5.7.3
5.4. What’s new in Reactor RabbitMQ 1.2
-
Limit in-flight records in publisher confirms if requested
-
Implement back pressure in publisher confirms support
-
Use Reactor 3.2.8.RELEASE
5.5. What’s new in Reactor RabbitMQ 1.1
-
Let user provide
Mono<Channel>
for sending messages -
Add optional channel pooling for sending messages
-
Automatically retry on ack and nack
-
Use Reactor 3.2.5.RELEASE
-
Use Java client 5.6.0
5.6. What’s new in Reactor RabbitMQ 1.0
-
Introduction of the
Sender
andReceiver
API -
Support for request/reply
-
Exception handling
-
Let user provide
Mono<Channel>
for resource management -
Complete receiving flux on channel termination
-
Handle error signal of
connectionMono
subscription to enable proper error handling -
Use Reactor 3.2.3.RELEASE
-
Use Java client 5.5.1
Reference Documentation
6. Reactor RabbitMQ API
6.1. Overview
This section describes the reactive API for producing and consuming messages using RabbitMQ. There are two main classes in Reactor RabbitMQ:
-
reactor.rabbitmq.Sender
for publishing messages to RabbitMQ -
reactor.rabbitmq.Receiver
for consuming messages from RabbitMQ
Full API for Reactor RabbitMQ is available in the javadocs.
The project uses Reactor Core to expose a "Reactive Streams" API.
6.2. Reactive RabbitMQ Sender
Outbound messages are sent to RabbitMQ using reactor.rabbitmq.Sender
.
A Sender
is associated with one RabbitMQ Connection
that is used
to transport messages to the broker. A Sender
can also manage resources
(exchanges, queues, bindings).
A Sender
is created with an instance of sender configuration options
reactor.rabbitmq.SenderOptions
.
The properties of SenderOptions
contains the ConnectionFactory
that creates
connections to the broker and a Reactor Scheduler
used by the Sender
.
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
SenderOptions senderOptions = new SenderOptions()
.connectionFactory(connectionFactory) (1)
.resourceManagementScheduler(Schedulers.boundedElastic()); (2)
1 | Specify connection factory |
2 | Specify scheduler for resource management |
Note you can control the creation of the Connection
thanks to the
connectionSupplier(ConnectionFactory)
method:
SenderOptions senderOptions = new SenderOptions()
.connectionFactory(connectionFactory)
.connectionSupplier(cf -> cf.newConnection( (1)
new Address[] {new Address("192.168.0.1"), new Address("192.168.0.2")},
"reactive-sender"))
.resourceManagementScheduler(Schedulers.boundedElastic());
1 | Specify array of addresses and connection name |
In the snippet above the connection can be created from 2 different nodes (useful for failover) and the connection name is set up.
If TLS is required, it must be configured with the Java client’s ConnectionFactory
,
see the TLS section in the client documentation
and the TLS guide
for more information.
Once the required options have been configured on the options instance,
a new Sender
instance can be created with the options already
configured in senderOptions
.
Sender sender = RabbitFlux.createSender(senderOptions);
The Sender
is now ready to send messages to RabbitMQ.
At this point, a Sender
instance has been created,
but no connections to RabbitMQ have been made yet.
The underlying Connection
instance is created lazily
when a first call is made to create a resource or to send messages.
Let’s now create a sequence of messages to send to RabbitMQ.
Each outbound message to be sent to RabbitMQ is represented as a OutboundMessage
.
An OutboundMessage
contains routing information (exchange to send to and routing key)
as well as the message itself (properties and body).
A Flux<OutboundMessage>
of messages is created for sending to RabbitMQ.
For beginners, Lite Rx API Hands-on
provides a hands-on tutorial on using the Reactor classes Flux
and Mono
.
Flux<OutboundMessage> outboundFlux =
Flux.range(1, 10)
.map(i -> new OutboundMessage(
"amq.direct",
"routing.key", ("Message " + i).getBytes()
));
The code segment above creates a sequence of messages to send to RabbitMQ.
The outbound Flux can now be sent to RabbitMQ using the
Sender
created earlier.
The code segment below sends the messages to RabbitMQ. The final subscribe()
in the code block
requests upstream to send the messages to RabbitMQ.
sender.send(outboundFlux) (1)
.doOnError(e -> log.error("Send failed", e)) (2)
.subscribe(); (3)
1 | Reactive send operation for the outbound Flux |
2 | If the sending fails, log an error |
3 | Subscribe to trigger the actual flow of records from outboundFlux to RabbitMQ. |
See SampleSender
for a full code listing for a Sender
.
6.2.1. Managing resources (exchanges, queues, and bindings)
The Sender
is also able to declare and delete AMQP resources the reactive way.
You can learn more about the AMQP
model on RabbitMQ website.
Sender
has a declare*
method for each type of resource
(exchange, binding, and queue) and there’s also a respective *Specification
class to describe each creation.
Mono<AMQP.Exchange.DeclareOk> exchange = sender.declareExchange(
ExchangeSpecification.exchange("my.exchange")
);
Mono<AMQP.Queue.DeclareOk> queue = sender.declareQueue(
QueueSpecification.queue("my.queue")
);
Mono<AMQP.Queue.BindOk> binding = sender.bind(
BindingSpecification.binding().exchange("my.exchange")
.queue("my.queue").routingKey("a.b")
);
Note the Sender#declare*
methods return their respective AMQP results
wrapped into a Mono
.
For queue creation, note that if a queue specification has a
null name, the queue to be created will have a server-generated name
and will be non-durable, exclusive, and auto-delete. If you want
a queue to have a server-generated name but other parameters,
specify an empty name |
One can also use the ResourcesSpecification
factory class
with a static import to reduce boilerplate code. Combined with
Mono
chaining and Sender#declare
shortcuts, it allows for condensed syntax:
import static reactor.rabbitmq.ResourcesSpecification.*;
...
sender.declare(exchange("my.exchange"))
.then(sender.declare(queue("my.queue")))
.then(sender.bind(binding("my.exchange", "a.b", "my.queue")))
.subscribe(r -> System.out.println("Exchange and queue declared and bound"));
Sender
has delete*
and delete
methods as well. Here is an example with
the short method forms:
import static reactor.rabbitmq.ResourcesSpecification.*;
...
sender.unbind(binding("my.exchange", "a.b", "my.queue"))
.then(sender.delete(exchange("my.exchange")))
.then(sender.delete(queue("my.queue")))
.subscribe(r -> System.out.println("Exchange and queue unbound and deleted"));
Warning: These methods relies on RPCs. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock is being used to prevent concurrent RPCs, making this publisher potentially blocking. |
6.2.2. Reliable publishing with publisher confirms
Sender
offers also the sendWithPublishConfirms
method to send
messages and receive publisher
confirms to make sure the broker has taken into account the outbound messages.
Flux<OutboundMessage> outboundFlux = Flux.range(1, 10)
.map(i -> new OutboundMessage(
"amq.direct",
"routing.key", "hello".getBytes()
));
sender.sendWithPublishConfirms(outboundFlux)
.subscribe(outboundMessageResult -> {
if (outboundMessageResult.isAck()) {
(1)
}
});
1 | Outbound message has reached the broker |
Sender#sendWithPublishConfirms
returns a Flux<OutboundMessageResult>
that can be subscribed to to know that outbound messages
have successfully reached the broker.
It is also possible to know about unroutable messages,
that is messages not routed to any queue because they do not match any routing rule. Tracking
of unroutable messages is disabled by default, it can be enabled by
using the Sender#sendWithPublishConfirms(Publisher<OutboundMessage>, SendOptions)
method and
set the trackReturned
flag on SendOptions
:
Flux<OutboundMessage> outboundFlux = Flux.range(1, 10)
.map(i -> new OutboundMessage(
"amq.direct",
"routing.key", "hello".getBytes()
));
sender.sendWithPublishConfirms(outboundFlux, new SendOptions().trackReturned(true)) (1)
.subscribe(outboundMessageResult -> {
if (outboundMessageResult.isReturned()) {
(2)
}
});
1 | Track unroutable messages |
2 | Outbound message was not routed to any queue |
The OutboundMessageResult#isReturned
method then tells whether a message has been routed somewhere or not.
This method always returns false
if unroutable messages are not tracked. Note the
OutboundMessageResult#isAck
method returns true
for unroutable messages, because the
broker considered they have been taken care of (i.e. confirmed). So if you are interested
in unroutable messages, the returned status should always be checked before the confirmed status.
Note it is possible to publish OutboundMessage
instances of a custom class and get them
back in the flux of OutboundMessageResult
. To do so, use the sendWithTypedPublishConfirms
method:
Flux<CorrelableOutboundMessage<Integer>> outboundFlux = Flux.range(1, 10)
.map(i -> new CorrelableOutboundMessage<>( (1)
"amq.direct",
"routing.key", "hello".getBytes(),
i (2)
));
sender.sendWithTypedPublishConfirms(outboundFlux)
.subscribe(confirmation -> {
CorrelableOutboundMessage<Integer> message = confirmation.getOutboundMessage(); (3)
Integer confirmedBusinessData = message.getCorrelationMetadata(); (4)
});
1 | Use a subclass of OutboundMessage |
2 | Provide some extra information in the outbound message |
3 | Get the original message |
4 | Retrieve the extra information |
The previous sample uses the provided CorrelableOutboundMessage
class, but it could
be any subclass of OutboundMessage
. The OutboundMessageResult
instances
of the confirmation flux are typed with the original message class, so the extra information
is available. This allows to perform any useful processing on this extra information
when the outbound message is confirmed.
6.2.3. Threading model
Reactor RabbitMQ configure by default the Java Client to use NIO, i.e. there’s only
one thread that deals with IO. This can be changed by specifying a ConnectionFactory
in the SenderOptions
.
The Sender
uses 2 Reactor’s Scheduler
: one for the subscription when creating the
connection and another one for resources management. The Sender
defaults
to 2 bounded elastic schedulers, this can be overriden in the SenderOptions
. The Sender
takes care of disposing the default schedulers when closing. If not using the default
schedulers, it’s developer’s job to dispose schedulers they passed in to the
SenderOptions
.
6.2.4. Closing the Sender
When the Sender
is no longer required, the instance can be closed.
The underlying Connection
is closed, as well as the default
schedulers if none has been explicitly provided.
sender.close();
6.2.5. Error handling during publishing
The send
and sendWithPublishConfirms
methods can take an additional
SendOptions
parameter to specify the behavior to adopt if the publishing of a message
fails. The default behavior is to retry every 200 milliseconds for 10 seconds
in case of connection failure. As automatic connection recovery
is enabled by default,
the connection is likely to be re-opened after a network glitch and the flux of
outbound messages should stall only during connection recovery before restarting automatically.
This default behavior tries to find a trade-off between reactivity and robustness.
You can customize the retry by settings your own instance of RetrySendingExceptionHandler
in the SendOptions
, e.g. to retry for 20 seconds every 500 milliseconds:
Sender sender = RabbitFlux.createSender();
sender.send(outboundFlux, new SendOptions().exceptionHandler(
new ExceptionHandlers.RetrySendingExceptionHandler(
Duration.ofSeconds(20), Duration.ofMillis(500),
ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE
)
));
The RetrySendingExceptionHandler
uses a Predicate<Throwable>
to decide whether
an exception should trigger a retry or not. If the exception isn’t retryable, the exception
handler wraps the exception in a RabbitFluxException
and throws it.
For consistency sake, the retry exception handler used with ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE
(the default) will trigger retry attempts for the same conditions as connection recovery triggering.
This means that if connection recovery has kicked in, publishing will be retried at least for the retry
timeout configured (10 seconds by default).
Note the exception handler is a BiConsumer<Sender.SendContext, Exception>
, where Sender.SendContext
is a class providing access to the OutboundMessage
and the underlying AMQP Channel
. This makes it
easy to customize the default behavior: logging BiConsumer#andThen
retrying, only logging, trying to
send the message somewhere else, etc.
6.2.6. Request/reply
Reactor RabbitMQ supports reactive request/reply. From RabbitMQ documentation:
RPC (request/reply) is a popular pattern to implement with a messaging broker like RabbitMQ. […] The typical way to do this is for RPC clients to send requests that are routed to a long lived (known) server queue. The RPC server(s) consume requests from this queue and then send replies to each client using the queue named by the client in the reply-to header.
For performance reason, Reactor RabbitMQ builds on top
direct reply-to. The next
snippet shows the usage of the RpcClient
class:
String queue = "rpc.server.queue";
Sender sender = RabbitFlux.createSender();
RpcClient rpcClient = sender.rpcClient("", queue); (1)
Mono<Delivery> reply = rpcClient.rpc(Mono.just(
new RpcClient.RpcRequest("hello".getBytes()) (2)
));
rpcClient.close(); (3)
1 | Create RpcClient instance from a Sender |
2 | Send request and get reply |
3 | Close RpcClient when done |
In the example above, a consumer waits on the rpc.server.queue
to
process requests. A RpcClient
is created from a Sender
, it will
send requests to a given exchange with a given routing key. The RpcClient
handles the machinery to send the request and wait on a reply queue the
result processed on the server queue, wrapping everything up with reactive API.
Note a RPC client isn’t meant to be used for only 1 request, it can be a long-lived object
handling different requests, as long as they’re directed to the same destination (defined
by the exchange and the routing key passed in when the RpcClient
is created).
A RpcClient
uses a sequence of Long
for correlation, but this can be changed
by passing in a Supplier<String>
when creating the RpcClient
:
String queue = "rpc.server.queue";
Supplier<String> correlationIdSupplier = () -> UUID.randomUUID().toString(); (1)
Sender sender = RabbitFlux.createSender();
RpcClient rpcClient = sender.rpcClient(
"", queue, correlationIdSupplier (2)
);
Mono<Delivery> reply = rpcClient.rpc(Mono.just(
new RpcClient.RpcRequest("hello".getBytes())
));
rpcClient.close();
1 | Use random UUID correlation ID supplier |
2 | Pass in supplier on RpcClient creation |
This can be useful e.g. when the RPC server can make sense of the correlation ID.
|
6.3. Reactive RabbitMQ Receiver
Messages stored in RabbitMQ queues are consumed using the reactive
receiver reactor.rabbitmq.Receiver
.
Each instance of Receiver
is associated with a single instance
of Connection
created by the options-provided ConnectionFactory
.
A receiver is created with an instance of receiver configuration options
reactor.rabbitmq.ReceiverOptions
. The properties of ReceiverOptions
contains the ConnectionFactory
that creates connections to the broker
and a Reactor Scheduler
used for the connection creation.
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
ReceiverOptions receiverOptions = new ReceiverOptions()
.connectionFactory(connectionFactory) (1)
.connectionSubscriptionScheduler(Schedulers.boundedElastic()); (2)
1 | Specify connection factory |
2 | Specify scheduler for connection creation |
Note you can control the creation of the Connection
thanks to the
connectionSupplier(ConnectionFactory)
method:
SenderOptions senderOptions = new SenderOptions()
.connectionFactory(connectionFactory)
.connectionSupplier(cf -> cf.newConnection( (1)
new Address[] {new Address("192.168.0.1"), new Address("192.168.0.2")},
"reactive-sender"))
.resourceManagementScheduler(Schedulers.boundedElastic());
1 | Specify array of addresses and connection name |
In the snippet above the connection can be created from 2 different nodes (useful for failover) and the connection name is set up.
If TLS is required, it must be configured with the Java client’s ConnectionFactory
,
see the TLS section in the client documentation
and the TLS guide
for more information.
Once the required configuration options have been configured on the options instance,
a new Receiver
instance can be created with these options to consume inbound messages.
The code snippet below creates a receiver instance and an inbound Flux
for the receiver.
The underlying Connection
and Consumer
instances are created lazily
later when the inbound Flux
is subscribed to.
Flux<Delivery> inboundFlux = RabbitFlux.createReceiver(receiverOptions)
.consumeNoAck("reactive.queue");
The inbound RabbitMQ Flux
is ready to be consumed.
Each inbound message delivered by the Flux is represented as a
Delivery
.
See SampleReceiver
for a full code listing for a Receiver
.
6.3.1. Consuming options
The Receiver
class has different flavors of the receive*
method and each of them
can accept a ConsumeOptions
instance. Here are the different options:
-
overflowStrategy
: theOverflowStrategy
used when creating theFlux
of messages. Default isBUFFER
. -
qos
: the prefetch count used when message acknowledgment is enabled. Default is 250. -
consumerTag
: Consumer tag used to register the consumer. Default is server-generated identifier. -
hookBeforeEmitBiFunction
: aBiFunction<Long, ? super Delivery, Boolean>
to decide whether a message should be emitted downstream or not. Default is to always emit. -
stopConsumingBiFunction
: aBiFunction<Long, ? super Delivery, Boolean>
to decide whether the flux should be completed after the emission of the message. Default is to never complete.
6.3.2. Acknowledgment
Receiver
has several receive*
methods that differ on the way consumer are acknowledged
back to the broker. Acknowledgment mode can have profound impacts on performance and memory
consumption.
-
consumeNoAck
: the broker forgets about a message as soon as it has sent it to the consumer. Use this mode if downstream subscribers are very fast, at least faster than the flow of inbound messages. Messages will pile up in the JVM process memory if subscribers are not able to cope with the flow of messages, leading to out-of-memory errors. Note this mode uses the auto-acknowledgment mode when registering the RabbitMQConsumer
. -
consumeAutoAck
: with this mode, messages are acknowledged right after their arrival, in theFlux#doOnNext
callback. This can help to cope with the flow of messages, avoiding the downstream subscribers to be overwhelmed. Note this mode does not use the auto-acknowledgment mode when registering the RabbitMQConsumer
. In this case,consumeAutoAck
means messages are automatically acknowledged by the library in one theFlux
hooks. -
consumeManualAck
: this method returns aFlux<AcknowledgableDelivery>
and messages must be manually acknowledged or rejected downstream withAcknowledgableDelivery#ack
orAcknowledgableDelivery#nack
, respectively. This mode lets the developer acknowledge messages in the most efficient way, e.g. by acknowledging several messages at the same time withAcknowledgableDelivery#ack(true)
and letting Reactor control the batch size with one of theFlux#buffer
methods.
To learn more on how the ConsumeOptions#qos
setting can impact the behavior
of Receiver#consumeAutoAck
and Receiver#consumeManualAck
, have a look at
this
post about queuing theory.
6.3.3. Closing the Receiver
When the Receiver
is no longer required, the instance can be closed.
The underlying Connection
is closed, as well as the default scheduler
if none has been explicitly provided.
receiver.close();
6.3.4. Connection failure
Network connection between the broker and the client can fail. This is transparent for consumers thanks to RabbitMQ Java client automatic connection recovery. Connection failures affect sending though, and acknowledgment is a sending operation.
When using Receiver#consumeAutoAck
, acknowledgments are retried for 10 seconds every 200 milliseconds
in case of connection failure. This can be changed by setting the
BiConsumer<Receiver.AcknowledgmentContext, Exception> exceptionHandler
in the
ConsumeOptions
, e.g. to retry for 20 seconds every 500 milliseconds:
Flux<Delivery> inboundFlux = RabbitFlux
.createReceiver()
.consumeAutoAck("reactive.queue", new ConsumeOptions()
.exceptionHandler(new ExceptionHandlers.RetryAcknowledgmentExceptionHandler(
Duration.ofSeconds(20), Duration.ofMillis(500), (1)
ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE
))
);
1 | Retry acknowledgment for 20 seconds every 500 milliseconds on connection failure |
When using Receiver#consumeManualAck
, acknowledgment is handled by the developer, who
can do pretty anything they want on acknowledgment failure.
AcknowledgableDelivery#ack
and AcknowledgableDelivery#nack
methods handle retry internally
based on BiConsumer<Receiver.AcknowledgmentContext, Exception> exceptionHandler
in the ConsumeOptions
.
Developer does not have to execute retry explicitly on acknowledgment failure and benefits from Reactor RabbitMQ
retry support when acknowledging a message:
Receiver receiver = RabbitFlux.createReceiver();
BiConsumer<Receiver.AcknowledgmentContext, Exception> exceptionHandler =
new ExceptionHandlers.RetryAcknowledgmentExceptionHandler( (1)
Duration.ofSeconds(20), Duration.ofMillis(500),
ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE
);
receiver.consumeManualAck("queue",
new ConsumeOptions().exceptionHandler(exceptionHandler))
.subscribe(msg -> {
// ... (2)
msg.ack(); (3)
});
1 | Configure retry logic when exception occurs |
2 | Process message |
3 | Send acknowledgment after business processing |
Note the exception handler is a BiConsumer<Receiver.AcknowledgmentContext, Exception>
. This means
acknowledgment failure can be handled in any way, here we choose to retry the acknowledgment.
Note also that by using ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE
, we choose
to retry only on unexpected connection failures and rely on the AMQP Java client to automatically re-create
a new connection in the background. The decision to retry on a given
exception can be customized by providing a Predicate<Throwable>
in place of
ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE
.
6.4. Advanced features
This section covers advanced uses of the Reactor RabbitMQ API.
6.4.1. Customizing connection creation
It is possible to specify only a ConnectionFactory
for Sender/ReceiverOptions
and
let Reactor RabbitMQ create connection from this ConnectionFactory
. Internally, Reactor
RabbitMQ will create a Mono<Connection>
to perform its operations and the connection
will be created only when needed.
When the developer
lets Reactor RabbitMQ create a Mono<Connection>
, the library will take responsibility
for the following actions for each instance of Sender
and Receiver
:
-
using a cache to avoid creating several connections (by using
Mono#cache()
) -
making the
Mono<Connection>
register onconnectionSubscriptionScheduler
(withMono#subscribeOn
) -
closing the connection when
Sender/Receiver#close()
is closed
Reactor RabbitMQ provides 2 ways to have more control over the connection creation, e.g. to provide a name or to connect to different nodes:
-
using a connection supplier (simplest option, no Reactive API involved)
-
using a custom
Mono<Connection>
(implies Reactive API but provides more control)
Creating a connection with a supplier
The following snippet shows how to create connections with a custom name:
ConnectionFactory connectionFactory = new ConnectionFactory(); (1)
connectionFactory.useNio();
Sender sender = RabbitFlux.createSender(new SenderOptions()
.connectionFactory(connectionFactory)
.connectionSupplier(cf -> cf.newConnection("sender")) (2)
);
Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions()
.connectionFactory(connectionFactory)
.connectionSupplier(cf -> cf.newConnection("receiver")) (3)
);
1 | Create and configure connection factory |
2 | Create supplier that creates connection with a name |
3 | Create supplier that creates connection with a name |
When using a connection supplier, Reactor RabbitMQ will create a Mono<Connection>
and will take care
of the operations mentioned above (caching, registering on a scheduler, and closing).
Creating a connection with a custom Mono
The following snippet shows how to provide custom Mono<Connection>
:
ConnectionFactory connectionFactory = new ConnectionFactory(); (1)
connectionFactory.useNio();
Sender sender = RabbitFlux.createSender(new SenderOptions()
.connectionMono(
Mono.fromCallable(() -> connectionFactory.newConnection("sender")).cache()) (2)
);
Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions()
.connectionMono(
Mono.fromCallable(() -> connectionFactory.newConnection("receiver")).cache()) (3)
);
1 | Create and configure connection factory |
2 | Create Mono that creates connection with a name |
3 | Create Mono that creates connection with a name |
Providing your own Mono<Connection>
lets you take advantage of all the Reactor API
(e.g. for caching) but has some caveats: Reactor RabbitMQ will not cache the provided Mono<Connection>
,
will not use it on a scheduler, and will not close it automatically. This is developer’s responsibility
to take care of these actions if they make sense in their context.
6.4.2. Sharing the same connection between Sender
and Receiver
Sender
and Receiver
instances create their own Connection
but it’s possible to use
only one or a few Connection
instances to be able to use exclusive resources between a Sender
and a Receiver
or simply to control the number of created connections.
Both SenderOptions
and ReceiverOptions
have a connectionSupplier
method that can encapsulate
any logic to create the Connection
the Sender
or Receiver
will end up using through
a Mono<Connection>
. Reactor
RabbitMQ provides a way to share the exact same connection instance between some Sender
and Receiver
instances:
ConnectionFactory connectionFactory = new ConnectionFactory(); (1)
connectionFactory.useNio();
Utils.ExceptionFunction<ConnectionFactory, ? extends Connection> connectionsupplier =
Utils.singleConnectionSupplier( (2)
connectionFactory, cf -> cf.newConnection()
);
Sender sender = RabbitFlux.createSender(
new SenderOptions().connectionSupplier(connectionsupplier) (3)
);
Receiver receiver = RabbitFlux.createReceiver(
new ReceiverOptions().connectionSupplier(connectionsupplier) (4)
);
1 | Create and configure connection factory |
2 | Create supplier that re-uses the same connection instance |
3 | Create sender with connection supplier |
4 | Create receiver with connection supplier |
Be aware that closing the first Sender
or Receiver
will close the underlying
AMQP connection for all the others.
6.4.3. Creating channels with a custom Mono
in Sender
SenderOptions
provides a channelMono
property that is called when creating the Channel
used
in sending methods. This is a convenient way to provide any custom logic when
creating the Channel
, e.g. retry logic.
6.4.4. Threading considerations for resource management
A Sender
instance maintains a Mono<Channel>
to manage resources and by default
the underlying Channel
is cached. A new Channel
is also automatically created in case of error.
Channel creation is not a cheap operation, so this default behavior fits most use
cases. Each resource management method provides a counterpart method with an additional
ResourceManagementOptions
argument. This allows to provide a custom Mono<Channel>
for a given resource operation. This can be useful when multiple threads are using
the same Sender
instance, to avoid using the same Channel
from multiple threads.
Mono<Channel> channelMono = connectionMono.map(c -> {
try {
return c.createChannel();
} catch (Exception e) {
throw new RabbitFluxException(e);
}
}).cache(); (1)
ResourceManagementOptions options = new ResourceManagementOptions()
.channelMono(channelMono); (2)
sender.declare(exchange("my.exchange"), options) (3)
.then(sender.declare(queue("my.queue"), options)) (3)
.then(sender.bind(binding("my.exchange", "a.b", "my.queue"), options)) (3)
.subscribe(r -> System.out.println("Exchange and queue declared and bound"));
1 | Create Channel and cache it |
2 | Use the Mono<Channel> in ResourceManagementOptions |
3 | Use Mono<Channel> for each operation |
In the example above, each operation will use the same Channel
as it is cached.
This way these operations won’t interfer with any other thread using the default
resource management Mono<Channel>
in the Sender
instance.
6.4.5. Channel pooling in Sender
By default, Sender#send*
methods open a new Channel
for every call. This is
OK for long-running calls, e.g. when the flux of outbound messages is
infinite. For workloads whereby Sender#send*
is called often for finite,
short flux of messages, opening a new Channel
every time may not be optimal.
It is possible to use a pool of channels as part of the SendOptions
when
sending outbound messages with Sender
, as illustrated in the following snippet:
ChannelPool channelPool = ChannelPoolFactory.createChannelPool( (1)
connectionMono,
new ChannelPoolOptions().maxCacheSize(5) (2)
);
sender.send(outboundFlux, new SendOptions().channelPool(channelPool)); (3)
// ...
channelPool.close(); (4)
1 | Create ChannelPool with factory |
2 | Set the maximum size to 5 channels |
3 | Use a channel from the pool to send messages |
4 | Close the pool when no longer needed |
Note it is developer’s responsibility to close the pool when it is no longer necessary, typically at application shutdown.
Micro-benchmarks revealed channel pooling performs much better for sending short sequence of messages (1 to 10) repeatedly, without publisher confirms. With longer sequence of messages (100 or more), channel pooling can perform worse than without pooling at all. According to the same micro-benchmarks, channel pooling does not make sending with publisher confirms perform better, it appears to perform even worse. Don’t take these conclusions for granted, you should always make your own benchmarks depending on your workloads.
6.4.6. Retry configuration on connection opening
Some applications may want to fail fast and throw an exception when the broker is unavailable. Other applications may want to retry connection opening when it fails.
Both SenderOptions
and ReceiverOptions
provide a Function<Mono<? extends Connection>, Mono<? extends Connection>>
connectionMonoConfigurator
, which is a hook in the Mono<Connection>
creation of the Sender
or
Receiver
instance. This is a good place to customize the Mono<Connection>
to configure a retry policy.
The following snippet shows how to configure retry with an inline connectionMonoConfigurator
for a Sender
:
Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions()
.connectionMonoConfigurator(
cm -> cm.retryWhen(RetrySpec.backoff(3, Duration.ofSeconds(5))) (1)
));
1 | Set up retry on connection opening |
Please read the Reactor Core documentation for more information about retry, retry with exponential backoff, and retry support in Reactor-Extra.
As |