Introduction

1. Overview

1.1. RabbitMQ

With more than 35,000 production deployments world-wide at small startups and large enterprises, RabbitMQ is the most popular open source message broker.

RabbitMQ is lightweight and easy to deploy on premises and in the cloud. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.

1.2. Project Reactor

Reactor is a highly optimized reactive library for building efficient, non-blocking applications on the JVM based on the Reactive Streams Specification. Reactor based applications can sustain very high throughput message rates and operate with a very low memory footprint, making it suitable for building efficient event-driven applications using the microservices architecture.

Reactor implements two publishers Flux<T> and Mono<T>, both of which support non-blocking back-pressure. This enables exchange of data between threads with well-defined memory usage, avoiding unnecessary intermediate buffering or blocking.

1.3. Reactive API for RabbitMQ

Reactor RabbitMQ is a reactive API for RabbitMQ based on Reactor and RabbitMQ Java Client. Reactor RabbitMQ API enables messages to be published to RabbitMQ and consumed from RabbitMQ using functional APIs with non-blocking back-pressure and very low overheads. This enables applications using Reactor to use RabbitMQ as a message bus or streaming platform and integrate with other systems to provide an end-to-end reactive pipeline.

2. Motivation

2.1. Functional interface for RabbitMQ

Reactor RabbitMQ is a functional Java API for RabbitMQ. For applications that are written in functional style, this API enables RabbitMQ interactions to be integrated easily without requiring non-functional produce or consume APIs to be incorporated into the application logic.

2.2. Non-blocking Back-pressure

The Reactor RabbitMQ API benefits from non-blocking back-pressure provided by Reactor. For example, in a pipeline, where messages received from an external source (e.g. an HTTP proxy) are published to RabbitMQ, back-pressure can be applied easily to the whole pipeline, limiting the number of messages in-flight and controlling memory usage. Messages flow through the pipeline as they are available, with Reactor taking care of limiting the flow rate to avoid overflow, keeping application logic simple.

2.3. End-to-end Reactive Pipeline

The value proposition for Reactor RabbitMQ is the efficient utilization of resources in applications with multiple external interactions where RabbitMQ is one of the external systems. End-to-end reactive pipelines benefit from non-blocking back-pressure and efficient use of threads, enabling a large number of concurrent requests to be processed efficiently. The optimizations provided by Project Reactor enable development of reactive applications with very low overheads and predictable capacity planning to deliver low-latency, high-throughput pipelines.

2.4. Comparisons with other RabbitMQ Java libraries

Reactor RabbitMQ is not intended to replace any of the existing Java libraries. Instead, it is aimed at providing an alternative API for reactive event-driven applications.

2.4.1. RabbitMQ Java Client

For non-reactive applications, RabbitMQ Java Client provides the most complete API to manage resources, publish messages to and consume messages from RabbitMQ. Note Reactor RabbitMQ is based on RabbitMQ Java Client.

Applications using RabbitMQ as a message bus using this API may consider switching to Reactor RabbitMQ if the application is implemented in a functional style.

2.4.2. Spring AMQP

Spring AMQP applies core Spring Framework concepts to the development of AMQP-based messaging solutions. It provides a "template" as a high-level abstraction for sending and receiving messages. It also provides support for Message-driven POJOs with a "listener container". These libraries facilitate management of AMQP resources while promoting the use of dependency injection and declarative configuration. Spring AMQP is based on RabbitMQ Java Client.

3. Getting Started

3.1. Requirements

You need Java JRE installed (Java 8 or later).

You also need to install RabbitMQ. Follow the instructions from the website. Note you should use RabbitMQ 3.6.x or later.

3.2. Quick Start

This quick start tutorial sets up a single node RabbitMQ and runs the sample reactive sender and consumer.

3.2.1. Start RabbitMQ

Start RabbitMQ on your local machine with all the defaults (e.g. AMQP port is 5672).

3.2.2. Run Reactor RabbitMQ Samples

Download Reactor RabbitMQ from github.com/reactor/reactor-rabbitmq/.

> git clone https://github.com/reactor/reactor-rabbitmq
> cd reactor-rabbitmq
Sample Sender

The SampleSender code is on GitHub.

Run the sample sender:

> ./gradlew -q sender
10:20:12.590 INFO r.rabbitmq.samples.SampleSender - Message Message_1 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_2 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_3 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_4 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_5 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_6 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_7 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_8 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_9 sent successfully
10:20:12.597 INFO r.rabbitmq.samples.SampleSender - Message Message_10 sent successfully
10:20:12.597 INFO r.rabbitmq.samples.SampleSender - Message Message_11 sent successfully
10:20:12.597 INFO r.rabbitmq.samples.SampleSender - Message Message_12 sent successfully
10:20:12.599 INFO r.rabbitmq.samples.SampleSender - Message Message_13 sent successfully
10:20:12.600 INFO r.rabbitmq.samples.SampleSender - Message Message_14 sent successfully
10:20:12.600 INFO r.rabbitmq.samples.SampleSender - Message Message_15 sent successfully
10:20:12.600 INFO r.rabbitmq.samples.SampleSender - Message Message_16 sent successfully
10:20:12.600 INFO r.rabbitmq.samples.SampleSender - Message Message_17 sent successfully
10:20:12.600 INFO r.rabbitmq.samples.SampleSender - Message Message_18 sent successfully
10:20:12.601 INFO r.rabbitmq.samples.SampleSender - Message Message_19 sent successfully
10:20:12.601 INFO r.rabbitmq.samples.SampleSender - Message Message_20 sent successfully

The SampleSender sends 20 messages to the demo-queue queue, with publisher confirms enabled. The log line for a given message is printed to the console when the publisher confirmation is received from the broker.

Sample Receiver

The SampleReceiver code is on GitHub.

Run the sample receiver:

> ./gradlew -q receiver
10:22:43.568 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_1
10:22:43.575 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_2
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_3
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_4
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_5
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_6
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_7
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_8
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_9
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_10
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_11
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_12
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_13
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_14
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_15
10:22:43.578 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_16
10:22:43.578 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_17
10:22:43.578 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_18
10:22:43.578 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_19
10:22:43.578 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_20

The SampleReceiver consumes messages from the demo-queue queue and logs the message content in the console.

3.2.3. Building Reactor RabbitMQ Applications

To build your own application using the Reactor RabbitMQ API, you need to include a dependency to Reactor RabbitMQ.

For gradle:

dependencies {
    compile "io.projectreactor.rabbitmq:reactor-rabbitmq:1.0.0.RELEASE"
}

For maven:

<dependency>
    <groupId>io.projectreactor.rabbitmq</groupId>
    <artifactId>reactor-rabbitmq</artifactId>
    <version>1.0.0.RELEASE</version>
</dependency>

When using a milestone or a release candidate, you need to add the Spring IO milestone repository.

For gradle:

repositories {
  maven { url 'http://repo.spring.io/milestone' }
}

For maven:

<repositories>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>http://repo.spring.io/milestone</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>

4. Additional Resources

4.1. Getting help

If you are having trouble with Reactor RabbitMQ, you can ask for help on RabbitMQ community mailing list.

Report bugs in Reactor RabbitMQ at github.com/reactor/reactor-rabbitmq/issues.

Reactor Rabbitmq is open source and the code and documentation are available at github.com/reactor/reactor-rabbitmq.

5. New & Noteworthy

5.1. What’s new in Reactor RabbitMQ 1.0

  • 1.0.0.M1

    • Introduction of the Sender and Receiver API

  • 1.0.0.M2

    • Support for request/reply

    • Exception handling

  • 1.0.0.M3

    • Bump Reactor to 3.2.0.RELEASE

    • Let user provide Mono<Channel> for resource management

  • 1.0.0.RC1

    • Bump Reactor to 3.2.1.RELEASE

    • Bump RabbitMQ Java client to 5.5.0

  • 1.0.0.RC2

    • Bump Reactor to 3.2.3.RELEASE

    • Bump RabbitMQ Java client to 5.5.1 for better topology recovery support

    • Complete receiving flux on channel termination

    • Handle error signal of connectionMono subscription to enable proper error handling

    • Rename ReactorRabbitMq to RabbitFlux and ReactorRabbitMqException to RabbitFluxException = Reference Documentation == Reactor RabbitMQ API

5.2. Overview

This section describes the reactive API for producing and consuming messages using RabbitMQ. There are two main classes in Reactor RabbitMQ:

  1. reactor.rabbitmq.Sender for publishing messages to RabbitMQ

  2. reactor.rabbitmq.Receiver for consuming messages from RabbitMQ

Full API for Reactor RabbitMQ is available in the javadocs.

The project uses Reactor Core to expose a "Reactive Streams" API.

5.3. Reactive RabbitMQ Sender

Outbound messages are sent to RabbitMQ using reactor.rabbitmq.Sender. A Sender is associated with one RabbitMQ Connection that is used to transport messages to the broker. A Sender can also manage resources (exchanges, queues, bindings).

A Sender is created with an instance of sender configuration options reactor.rabbitmq.SenderOptions. The properties of SenderOptions contains the ConnectionFactory that creates connections to the broker and some Reactor Scheduler`s used by the `Sender.

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();

SenderOptions senderOptions =  new SenderOptions()
    .connectionFactory(connectionFactory)                         (1)
    .resourceManagementScheduler(Schedulers.elastic());           (2)
1 Specify connection factory
2 Specify scheduler for resource management

Note you can control the creation of the Connection thanks to the connectionSupplier(ConnectionFactory) method:

SenderOptions senderOptions =  new SenderOptions()
    .connectionFactory(connectionFactory)
    .connectionSupplier(cf -> cf.newConnection(                                  (1)
        new Address[] {new Address("192.168.0.1"), new Address("192.168.0.2")},
        "reactive-sender"))
    .resourceManagementScheduler(Schedulers.elastic());
1 Specify array of addresses and connection name

In the snippet above the connection can be created from 2 different nodes (useful for failover) and the connection name is set up.

Once the required options have been configured on the options instance, a new Sender instance can be created with the options already configured in senderOptions.

Sender sender = RabbitFlux.createSender(senderOptions);

The Sender is now ready to send messages to RabbitMQ. At this point, a Sender instance has been created, but no connections to RabbitMQ have been made yet. The underlying Connection instance is created lazily when a first call is made to create a resource or to send messages.

Let’s now create a sequence of messages to send to RabbitMQ. Each outbound message to be sent to RabbitMQ is represented as a OutboundMessage. An OutboundMessage contains routing information (exchange to send to and routing key) as well as the message itself (properties and body).

A Flux<OutboundMessage> of messages is created for sending to RabbitMQ. For beginners, Lite Rx API Hands-on provides a hands-on tutorial on using the Reactor classes Flux and Mono.

Flux<OutboundMessage> outboundFlux  =
    Flux.range(1, 10)
        .map(i -> new OutboundMessage(
            "amq.direct",
            "routing.key", ("Message " + i).getBytes()
        ));

The code segment above creates a sequence of messages to send to RabbitMQ. The outbound Flux can now be sent to RabbitMQ using the Sender created earlier.

The code segment below sends the messages to RabbitMQ. The final subscribe() in the code block requests upstream to send the messages to RabbitMQ.

sender.send(outboundFlux)                         (1)
    .doOnError(e -> log.error("Send failed", e))  (2)
    .subscribe();                                 (3)
1 Reactive send operation for the outbound Flux
2 If the sending fails, log an error
3 Subscribe to trigger the actual flow of records from outboundFlux to RabbitMQ.

See SampleSender for a full code listing for a Sender.

5.3.1. Managing resources (exchanges, queues, and bindings)

The Sender is also able to declare and delete AMQP resources the reactive way. You can learn more about the AMQP model on RabbitMQ website.

Sender has a declare* method for each type of resource (exchange, binding, and queue) and there’s also a respective *Specification class to describe each creation.

Mono<AMQP.Exchange.DeclareOk> exchange = sender.declareExchange(
    ExchangeSpecification.exchange("my.exchange")
);
Mono<AMQP.Queue.DeclareOk> queue = sender.declareQueue(
    QueueSpecification.queue("my.queue")
);
Mono<AMQP.Queue.BindOk> binding = sender.bind(
    BindingSpecification.binding().exchange("my.exchange")
        .queue("my.queue").routingKey("a.b")
);

Note the Sender#declare* methods return their respective AMQP results wrapped into a Mono.

One can also use the ResourcesSpecification factory class with a static import to reduce boilerplate code. Combined with Mono chaining and Sender#declare shortcuts, it allows for condensed syntax:

import static reactor.rabbitmq.ResourcesSpecification.*;
...
sender.declare(exchange("my.exchange"))
    .then(sender.declare(queue("my.queue")))
    .then(sender.bind(binding("my.exchange", "a.b", "my.queue")))
    .subscribe(r -> System.out.println("Exchange and queue declared and bound"));

Sender has delete* and delete methods as well. Here is an example with the short method forms:

import static reactor.rabbitmq.ResourcesSpecification.*;
...
sender.unbind(binding("my.exchange", "a.b", "my.queue"))
    .then(sender.delete(exchange("my.exchange")))
    .then(sender.delete(queue("my.queue")))
    .subscribe(r -> System.out.println("Exchange and queue unbound and deleted"));

5.3.2. Reliable publishing with publisher confirms

Sender offers also the sendWithPublishConfirms method to send messages and receive publisher confirms to make sure the broker has taken into account the outbound messages.

Flux<OutboundMessage> outboundFlux  = Flux.range(1, 10)
    .map(i -> new OutboundMessage(
        "amq.direct",
        "routing.key", "hello".getBytes()
    ));
sender.sendWithPublishConfirms(outboundFlux)
    .subscribe(outboundMessageResult -> {
        // outbound message has reached the broker
    });

Sender#sendWithPublishConfirms returns a Flux<OutboundMessageResult> that can be subscribed to to know that outbound messages have successfully reached the broker.

5.3.3. Threading model

Reactor RabbitMQ configure by default the Java Client to use NIO, i.e. there’s only one thread that deals with IO. This can be changed by specifying a ConnectionFactory in the SenderOptions.

The Sender uses 2 Reactor’s Scheduler: one for the subscription when creating the connection and another one for resources management. The Sender defaults to 2 elastic schedulers, this can be overriden in the SenderOptions. The Sender takes care of disposing the default schedulers when closing. If not using the default schedulers, it’s developer’s job to dispose schedulers they passed in to the SenderOptions.

5.3.4. Closing the Sender

When the Sender is no longer required, the instance can be closed. The underlying Connection is closed, as well as the default schedulers if none has been explicitly provided.

sender.close();

5.3.5. Error handling during publishing

The send and sendWithPublishConfirms methods can take an additional SendOptions parameter to specify the behavior to adopt if the publishing of a message fails. The default behavior is to retry every 200 milliseconds for 10 seconds in case of connection failure. As automatic connection recovery is enabled by default, the connection is likely to be re-opened after a network glitch and the flux of outbound messages should stall only during connection recovery before restarting automatically. This default behavior tries to find a trade-off between reactivity and robustness.

You can customize the retry by settings your own instance of RetrySendingExceptionHandler in the SendOptions, e.g. to retry for 20 seconds every 500 milliseconds:

Sender sender = RabbitFlux.createSender();
sender.send(outboundFlux, new SendOptions().exceptionHandler(
   new ExceptionHandlers.RetrySendingExceptionHandler(
       Duration.ofSeconds(20), Duration.ofMillis(500), ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE
   )
));

The RetrySendingExceptionHandler uses a Predicate<Throwable> to decide whether an exception should trigger a retry or not. If the exception isn’t retryable, the exception handler wraps the exception in a RabbitFluxException and throws it.

For consistency sake, the retry exception handler used with ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE (the default) will trigger retry attempts for the same conditions as connection recovery triggering. This means that if connection recovery has kicked in, publishing will be retried at least for the retry timeout configured (10 seconds by default).

Note the exception handler is a BiConsumer<Sender.SendContext, Exception>, where Sender.SendContext is a class providing access to the OutboundMessage and the underlying AMQP Channel. This makes it easy to customize the default behavior: logging BiConsumer#andThen retrying, only logging, trying to send the message somewhere else, etc.

5.3.6. Request/reply

Reactor RabbitMQ supports reactive request/reply. From RabbitMQ documentation:

RPC (request/reply) is a popular pattern to implement with a messaging broker like RabbitMQ. […​] The typical way to do this is for RPC clients to send requests that are routed to a long lived (known) server queue. The RPC server(s) consume requests from this queue and then send replies to each client using the queue named by the client in the reply-to header.

For performance reason, Reactor RabbitMQ builds on top direct reply-to. The next snippet shows the usage of the RpcClient class:

String queue = "rpc.server.queue";
Sender sender = RabbitFlux.createSender();
RpcClient rpcClient = sender.rpcClient("", queue);  (1)
Mono<Delivery> reply = rpcClient.rpc(Mono.just(
    new RpcClient.RpcRequest("hello".getBytes())    (2)
));
rpcClient.close();                                  (3)
1 Create RpcClient instance from a Sender
2 Send request and get reply
3 Close RpcClient when done

In the example above, a consumer waits on the rpc.server.queue to process requests. A RpcClient is created from a Sender, it will send requests to a given exchange with a given routing key. The RpcClient handles the machinery to send the request and wait on a reply queue the result processed on the server queue, wrapping everything up with reactive API. Note a RPC client isn’t meant to be used for only 1 request, it can be a long-lived object handling different requests, as long as they’re directed to the same destination (defined by the exchange and the routing key passed in when the RpcClient is created).

A RpcClient uses a sequence of Long for correlation, but this can be changed by passing in a Supplier<String> when creating the RpcClient:

String queue = "rpc.server.queue";
Supplier<String> correlationIdSupplier = () -> UUID.randomUUID().toString(); (1)
Sender sender = RabbitFlux.createSender();
RpcClient rpcClient = sender.rpcClient(
    "", queue, correlationIdSupplier                                         (2)
);
Mono<Delivery> reply = rpcClient.rpc(Mono.just(
    new RpcClient.RpcRequest("hello".getBytes())
));
rpcClient.close();
1 Use random UUID correlation ID supplier
2 Pass in supplier on RpcClient creation

This can be useful e.g. when the RPC server can make sense of the correlation ID.

5.4. Reactive RabbitMQ Receiver

Messages stored in RabbitMQ queues are consumed using the reactive receiver reactor.rabbitmq.Receiver. Each instance of Receiver is associated with a single instance of Connection created by the options-provided ConnectionFactory.

A receiver is created with an instance of receiver configuration options reactor.rabbitmq.ReceiverOptions. The properties of SenderOptions contains the ConnectionFactory that creates connections to the broker and a Reactor Scheduler used for the connection creation.

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();

ReceiverOptions receiverOptions =  new ReceiverOptions()
    .connectionFactory(connectionFactory)                       (1)
    .connectionSubscriptionScheduler(Schedulers.elastic());     (2)
1 Specify connection factory
2 Specify scheduler for connection creation

Note you can control the creation of the Connection thanks to the connectionSupplier(ConnectionFactory) method:

SenderOptions senderOptions =  new SenderOptions()
    .connectionFactory(connectionFactory)
    .connectionSupplier(cf -> cf.newConnection(                                  (1)
        new Address[] {new Address("192.168.0.1"), new Address("192.168.0.2")},
        "reactive-sender"))
    .resourceManagementScheduler(Schedulers.elastic());
1 Specify array of addresses and connection name

In the snippet above the connection can be created from 2 different nodes (useful for failover) and the connection name is set up.

Once the required configuration options have been configured on the options instance, a new Receiver instance can be created with these options to consume inbound messages. The code snippet below creates a receiver instance and an inbound Flux for the receiver. The underlying Connection and Consumer instances are created lazily later when the inbound Flux is subscribed to.

Flux<Delivery> inboundFlux = RabbitFlux.createReceiver(receiverOptions)
        .consumeNoAck("reactive.queue");

The inbound RabbitMQ Flux is ready to be consumed. Each inbound message delivered by the Flux is represented as a Delivery.

See SampleReceiver for a full code listing for a Receiver.

5.4.1. Consuming options

The Receiver class has different flavors of the receive* method and each of them can accept a ConsumeOptions instance. Here are the different options:

  • overflowStrategy: the OverflowStrategy used when creating the Flux of messages. Default is BUFFER.

  • qos: the prefetch count used when message acknowledgment is enabled. Default is 250.

  • hookBeforeEmitBiFunction: a BiFunction<Long, ? super Delivery, Boolean> to decide whether a message should be emitted downstream or not. Default is to always emit.

  • stopConsumingBiFunction: a BiFunction<Long, ? super Delivery, Boolean> to decide whether the flux should be completed after the emission of the message. Default is to never complete.

5.4.2. Acknowledgment

Receiver has several receive* methods that differ on the way consumer are acknowledged back to the broker. Acknowledgment mode can have profound impacts on performance and memory consumption.

  • consumeNoAck: the broker forgets about a message as soon as it has sent it to the consumer. Use this mode if downstream subscribers are very fast, at least faster than the flow of inbound messages. Messages will pile up in the JVM process memory if subscribers are not able to cope with the flow of messages, leading to out-of-memory errors. Note this mode uses the auto-acknowledgment mode when registering the RabbitMQ Consumer.

  • consumeAutoAck: with this mode, messages are acknowledged right after their arrival, in the Flux#doOnNext callback. This can help to cope with the flow of messages, avoiding the downstream subscribers to be overwhelmed. Note this mode does not use the auto-acknowledgment mode when registering the RabbitMQ Consumer. In this case, consumeAutoAck means messages are automatically acknowledged by the library in one the Flux hooks.

  • consumeManualAck: this method returns a Flux<AcknowledgableDelivery> and messages must be manually acknowledged or rejected downstream with AcknowledgableDelivery#ack or AcknowledgableDelivery#nack, respectively. This mode lets the developer acknowledge messages in the most efficient way, e.g. by acknowledging several messages at the same time with AcknowledgableDelivery#ack(true) and letting Reactor control the batch size with one of the Flux#buffer methods.

To learn more on how the ConsumeOptions#qos setting can impact the behavior of Receiver#consumeAutoAck and Receiver#consumeManualAck, have a look at this post about queuing theory.

5.4.3. Closing the Receiver

When the Receiver is no longer required, the instance can be closed. The underlying Connection is closed, as well as the default scheduler if none has been explicitly provided.

receiver.close();

5.4.4. Connection failure

Network connection between the broker and the client can fail. This is transparent for consumers thanks to RabbitMQ Java client automatic connection recovery. Connection failures affect sending though, and acknowledgment is a sending operation.

When using Receiver#consumeAutoAck, acknowledgments are retried for 10 seconds every 200 milliseconds in case of connection failure. This can be changed by setting the BiConsumer<Receiver.AcknowledgmentContext, Exception> exceptionHandler in the ConsumeOptions, e.g. to retry for 20 seconds every 500 milliseconds:

Flux<Delivery> inboundFlux = RabbitFlux
    .createReceiver()
    .consumeNoAck("reactive.queue", new ConsumeOptions()
        .exceptionHandler(new ExceptionHandlers.RetryAcknowledgmentExceptionHandler(
            Duration.ofSeconds(20), Duration.ofMillis(500), (1)
            ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE
        ))
    );
1 Retry acknowledgment for 20 seconds every 500 milliseconds on connection failure

When using Receiver#consumeManualAck, acknowledgment is handled by the developer, who can do pretty anything they want on acknowledgment failure. It is possible to benefit from Reactor RabbitMQ retry support when acknowledging a message:

Receiver receiver = RabbitFlux.createReceiver();
BiConsumer<Receiver.AcknowledgmentContext, Exception> exceptionHandler =
    new ExceptionHandlers.RetryAcknowledgmentExceptionHandler(                  (1)
        Duration.ofSeconds(20), Duration.ofMillis(500),
        ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE
);
receiver.consumeManualAck("queue")
    .subscribe(msg -> {
        // ...                                                                   (2)
        try {
            msg.ack();                                                           (3)
        } catch (Exception e) {
            exceptionHandler.accept(new Receiver.AcknowledgmentContext(msg), e); (4)
        }
    });
1 Configure retry logic when exception occurs
2 Process message
3 Send acknowledgment after business processing
4 Execute retry on acknowledgment failure

Note the exception handler is a BiConsumer<Receiver.AcknowledgmentContext, Exception>. This means acknowledgment failure can be handled in any way, here we choose to retry the acknowledgment. Note also that by using ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE, we choose to retry only on unexpected connection failures and rely on the AMQP Java client to automatically re-create a new connection in the background. The decision to retry on a given exception can be customized by providing a Predicate<Throwable> in place of ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE.

5.5. Advanced features

This section covers advanced uses of the Reactor RabbitMQ API.

5.5.1. Creating a connection with a custom Mono

It is possible to specify only a ConnectionFactory for Sender/ReceiverOptions and let Reactor RabbitMQ create connection from this ConnectionFactory. If you want more control over the creation of connections, you can use Sender/ReceiverOptions#connectionSupplier(ConnectionFactory). This is fine for most cases and doesn’t use any reactive API. Both Sender and Receiver use internally a Mono<Connection> to open the connection only when needed. It is possible to provide this Mono<Connection> through the appropriate *Options class:

ConnectionFactory connectionFactory = new ConnectionFactory();                (1)
connectionFactory.useNio();

Sender sender = RabbitFlux.createSender(new SenderOptions()
    .connectionMono(
        Mono.fromCallable(() -> connectionFactory.newConnection("sender")))   (2)
);
Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions()
    .connectionMono(
        Mono.fromCallable(() -> connectionFactory.newConnection("receiver"))) (3)
);
1 Create and configure connection factory
2 Create Mono that creates connection with a name
3 Create Mono that creates connection with a name

Providing your own Mono<Connection> lets you take advantage of all the Reactor API (e.g. for caching).

5.5.2. Sharing the same connection between Sender and Receiver

Sender and Receiver instances create their own Connection but it’s possible to use only one or a few Connection instances to be able to use exclusive resources between a Sender and a Receiver or simply to control the number of created connections.

Both SenderOptions and ReceiverOptions have a connectionMono method that can encapsulate any logic to create the Mono<Connection> the Sender or Receiver will end up using. Reactor RabbitMQ provides a way to share the exact same connection instance from a Mono<Connection>:

ConnectionFactory connectionFactory = new ConnectionFactory();           (1)
connectionFactory.useNio();
Mono<? extends Connection> connectionMono = Utils.singleConnectionMono(  (2)
    connectionFactory, cf -> cf.newConnection()
);

Sender sender = RabbitFlux.createSender(
    new SenderOptions().connectionMono(connectionMono)                   (3)
);
Receiver receiver = RabbitFlux.createReceiver(
    new ReceiverOptions().connectionMono(connectionMono)                 (4)
);
1 Create and configure connection factory
2 Create Mono that re-uses the same connection instance
3 Create sender with connection Mono
4 Create receiver with connection Mono

Be aware that closing the first Sender or Receiver will close the underlying AMQP connection for all the others.

5.5.3. Threading considerations for resource management

A Sender instance maintains a Mono<Channel> to manage resources and by default the underlying Channel is cached. A new Channel is also automatically created in case of error. Channel creation is not a cheap operation, so this default behavior fits most use cases. Each resource management method provides a counterpart method with an additional ResourceManagementOptions argument. This allows to provide a custom Mono<Channel> for a given resource operation. This can be useful when multiple threads are using the same Sender instance, to avoid using the same Channel from multiple threads.

Mono<Channel> channelMono = connectionMono.map(c -> {
    try {
        return c.createChannel();
    } catch (Exception e) {
        throw new RabbitFluxException(e);
    }
}).cache();                                                                (1)

ResourceManagementOptions options = new ResourceManagementOptions()
    .channelMono(channelMono);                                             (2)

sender.declare(exchange("my.exchange"), options)                           (3)
    .then(sender.declare(queue("my.queue"), options))                      (3)
    .then(sender.bind(binding("my.exchange", "a.b", "my.queue"), options)) (3)
    .subscribe(r -> System.out.println("Exchange and queue declared and bound"));
1 Create Channel and cache it
2 Use the Mono<Channel> in ResourceManagementOptions
3 Use Mono<Channel> for each operation

In the example above, each operation will use the same Channel as it is cached. This way these operations won’t interfer with any other thread using the default resource management Mono<Channel> in the Sender instance.