Introduction

1. Overview

1.1. RabbitMQ

RabbitMQ is the most widely deployed open source message broker.

With more than 35,000 production deployments of RabbitMQ world-wide at small startups and large enterprises, RabbitMQ is the most popular open source message broker.

RabbitMQ is lightweight and easy to deploy on premises and in the cloud. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.

1.2. Project Reactor

Reactor is a highly optimized reactive library for building efficient, non-blocking applications on the JVM based on the Reactive Streams Specification. Reactor based applications can sustain very high throughput message rates and operate with a very low memory footprint, making it suitable for building efficient event-driven applications using the microservices architecture.

Reactor implements two publishers Flux<T> and Mono<T>, both of which support non-blocking back-pressure. This enables exchange of data between threads with well-defined memory usage, avoiding unnecessary intermediate buffering or blocking.

1.3. Reactive API for RabbitMQ

Reactor RabbitMQ is a reactive API for RabbitMQ based on Reactor and RabbitMQ Java Client. Reactor RabbitMQ API enables messages to be published to RabbitMQ and consumed from RabbitMQ using functional APIs with non-blocking back-pressure and very low overheads. This enables applications using Reactor to use RabbitMQ as a message bus or streaming platform and integrate with other systems to provide an end-to-end reactive pipeline.

2. Motivation

2.1. Functional interface for RabbitMQ

Reactor RabbitMQ is a functional Java API for RabbitMQ. For applications that are written in functional style, this API enables RabbitMQ interactions to be integrated easily without requiring non-functional produce or consume APIs to be incorporated into the application logic.

2.2. Non-blocking Back-pressure

The Reactor RabbitMQ API benefits from non-blocking back-pressure provided by Reactor. For example, in a pipeline, where messages received from an external source (e.g. an HTTP proxy) are published to RabbitMQ, back-pressure can be applied easily to the whole pipeline, limiting the number of messages in-flight and controlling memory usage. Messages flow through the pipeline as they are available, with Reactor taking care of limiting the flow rate to avoid overflow, keeping application logic simple.

2.3. End-to-end Reactive Pipeline

The value proposition for Reactor RabbitMQ is the efficient utilization of resources in applications with multiple external interactions where RabbitMQ is one of the external systems. End-to-end reactive pipelines benefit from non-blocking back-pressure and efficient use of threads, enabling a large number of concurrent requests to be processed efficiently. The optimizations provided by Project Reactor enable development of reactive applications with very low overheads and predictable capacity planning to deliver low-latency, high-throughput pipelines.

2.4. Comparisons with other RabbitMQ Java libraries

Reactor RabbitMQ is not intended to replace any of the existing Java libraries. Instead, it is aimed at providing an alternative API for reactive event-driven applications.

2.4.1. RabbitMQ Java Client

For non-reactive applications, RabbitMQ Java Client provides the most complete API to manage resources, publish messages to and consume messages from RabbitMQ. Note Reactor RabbitMQ is based on RabbitMQ Java Client.

Applications using RabbitMQ as a message bus using this API may consider switching to Reactor RabbitMQ if the application is implemented in a functional style.

2.4.2. Spring AMQP

Spring AMQP applies core Spring Framework concepts to the development of AMQP-based messaging solutions. It provides a "template" as a high-level abstraction for sending and receiving messages. It also provides support for Message-driven POJOs with a "listener container". These libraries facilitate management of AMQP resources while promoting the use of dependency injection and declarative configuration. Spring AMQP is based on RabbitMQ Java Client.

3. Getting Started

3.1. Requirements

You need Java JRE installed (Java 8 or later).

You also need to install RabbitMQ. Follow the instructions from the website. Note you should use RabbitMQ 3.6.x or later.

3.2. Quick Start

This quick start tutorial sets up a single node RabbitMQ and runs the sample reactive sender and consumer.

3.2.1. Start RabbitMQ

Start RabbitMQ on your local machine with all the defaults (e.g. AMQP port is 5672).

3.2.2. Run Reactor RabbitMQ Samples

Download Reactor RabbitMQ from github.com/reactor/reactor-rabbitmq/.

> git clone https://github.com/reactor/reactor-rabbitmq
> cd reactor-rabbitmq
Sample Sender

The SampleSender code is on GitHub.

Run the sample sender:

> ./gradlew -q sender
10:20:12.590 INFO r.rabbitmq.samples.SampleSender - Message Message_1 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_2 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_3 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_4 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_5 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_6 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_7 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_8 sent successfully
10:20:12.596 INFO r.rabbitmq.samples.SampleSender - Message Message_9 sent successfully
10:20:12.597 INFO r.rabbitmq.samples.SampleSender - Message Message_10 sent successfully
10:20:12.597 INFO r.rabbitmq.samples.SampleSender - Message Message_11 sent successfully
10:20:12.597 INFO r.rabbitmq.samples.SampleSender - Message Message_12 sent successfully
10:20:12.599 INFO r.rabbitmq.samples.SampleSender - Message Message_13 sent successfully
10:20:12.600 INFO r.rabbitmq.samples.SampleSender - Message Message_14 sent successfully
10:20:12.600 INFO r.rabbitmq.samples.SampleSender - Message Message_15 sent successfully
10:20:12.600 INFO r.rabbitmq.samples.SampleSender - Message Message_16 sent successfully
10:20:12.600 INFO r.rabbitmq.samples.SampleSender - Message Message_17 sent successfully
10:20:12.600 INFO r.rabbitmq.samples.SampleSender - Message Message_18 sent successfully
10:20:12.601 INFO r.rabbitmq.samples.SampleSender - Message Message_19 sent successfully
10:20:12.601 INFO r.rabbitmq.samples.SampleSender - Message Message_20 sent successfully

The SampleSender sends 20 messages to the demo-queue queue, with publisher confirms enabled. The log line for a given message is printed to the console when the publisher confirmation is received from the broker.

Sample Receiver

The SampleReceiver code is on GitHub.

Run the sample receiver:

> ./gradlew -q receiver
10:22:43.568 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_1
10:22:43.575 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_2
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_3
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_4
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_5
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_6
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_7
10:22:43.576 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_8
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_9
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_10
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_11
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_12
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_13
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_14
10:22:43.577 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_15
10:22:43.578 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_16
10:22:43.578 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_17
10:22:43.578 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_18
10:22:43.578 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_19
10:22:43.578 INFO r.rabbitmq.samples.SampleReceiver - Received message Message_20

The SampleReceiver consumes messages from the demo-queue queue and logs the message content in the console.

3.2.3. Building Reactor RabbitMQ Applications

To build your own application using the Reactor RabbitMQ API, you need to include a dependency to Reactor RabbitMQ.

For gradle:

dependencies {
    compile "io.projectreactor.rabbitmq:reactor-rabbitmq:1.0.0.M1"
}

For maven:

<dependency>
    <groupId>io.projectreactor.rabbitmq</groupId>
    <artifactId>reactor-rabbitmq</artifactId>
    <version>1.0.0.M1</version>
</dependency>

When using a milestone or a release candidate, you need to add the Spring IO milestone repository.

For gradle:

repositories {
  maven { url 'http://repo.spring.io/milestone' }
}

For maven:

<repositories>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>http://repo.spring.io/milestone</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>

4. Additional Resources

4.1. Getting help

If you are having trouble with Reactor RabbitMQ, you can ask for help on RabbitMQ community mailing list.

Report bugs in Reactor RabbitMQ at github.com/reactor/reactor-rabbitmq/issues.

Reactor Rabbitmq is open source and the code and documentation are available at github.com/reactor/reactor-rabbitmq.

5. New & Noteworthy

5.1. What’s new in Reactor RabbitMQ 1.0

  • 1.0.0.M1

    • Introduction of the Sender and Receiver API

Reference Documentation

6. Reactor RabbitMQ API

6.1. Overview

This section describes the reactive API for producing and consuming messages using RabbitMQ. There are two main classes in Reactor RabbitMQ:

  1. reactor.rabbitmq.Sender for publishing messages to RabbitMQ

  2. reactor.rabbitmq.Receiver for consuming messages from RabbitMQ

Full API for Reactor RabbitMQ is available in the javadocs.

The project uses Reactor Core to expose a "Reactive Streams" API.

6.2. Reactive RabbitMQ Sender

Outbound messages are sent to RabbitMQ using reactor.rabbitmq.Sender. A Sender is associated with one RabbitMQ Connection that is used to transport messages to the broker. A Sender can also manage resources (exchanges, queues, bindings).

A Sender is created with an instance of sender configuration options reactor.rabbitmq.SenderOptions. The properties of SenderOptions contains the ConnectionFactory that creates connections to the broker and some Reactor Scheduler used by the Sender.

SenderOptions senderOptions =  new SenderOptions()
    .connectionFactory(new ConnectionFactory())                 (1)
    .resourceCreationScheduler(Schedulers.elastic());           (2)
1 Specify connection factory
2 Specify scheduler for resource creation

Once the required options have been configured on the options instance, a new Sender instance can be created with the options already configured in senderOptions.

Sender sender = ReactorRabbitMq.sender(senderOptions);

The Sender is now ready to send messages to RabbitMQ. At this point, a Sender instance has been created, but no connections to RabbitMQ have been made yet. The underlying Connection instance is created lazily when a first call is made to create a resource or to send messages.

Let’s now create a sequence of messages to send to RabbitMQ. Each outbound message to be sent to RabbitMQ is represented as a OutboundMessage. An OutboundMessage contains routing information (exchange to send to and routing key) as well as the message itself (properties and body).

A Flux<OutboundMessage> of messages is created for sending to RabbitMQ. For beginners, Lite Rx API Hands-on provides a hands-on tutorial on using the Reactor classes Flux and Mono.

Flux<OutboundMessage> outboundFlux  =
    Flux.range(1, 10)
        .map(i -> new OutboundMessage("amq.direct", "routing.key", ("Message " + i).getBytes());

The code segment above creates a sequence of messages to send to RabbitMQ. The outbound Flux can now be sent to RabbitMQ using the Sender created earlier.

The code segment below sends the messages to RabbitMQ. The final subscribe() in the code block requests upstream to send the messages to RabbitMQ.

sender.send(outboundFlux)                           (1)
      .doOnError(e -> log.error("Send failed", e))  (2)
      .subscribe();                                 (3)
1 Reactive send operation for the outbound Flux
2 If the sending fails, log an error
3 Subscribe to trigger the actual flow of records from outboundFlux to RabbitMQ.

See SampleSender for a full code listing for a Sender.

6.2.1. Managing resources (exchanges, queues, and bindings)

The Sender is also able to declare and delete AMQP resources the reactive way. You can learn more about the AMQP model on RabbitMQ website.

Sender has a declare* method for each type of resource (exchange, binding, and queue) and there’s also a respective *Specification class to describe each creation.

Mono<AMQP.Exchange.DeclareOk> exchange = sender.declareExchange(
  ExchangeSpecification.exchange("my.exchange")
);
Mono<AMQP.Queue.DeclareOk> queue = sender.declareQueue(
  QueueSpecification.queue("my.queue")
);
Mono<AMQP.Queue.BindOk> binding = sender.bind(
  BindingSpecification.binding().exchange("my.exchange").queue("my.queue").routingKey("a.b")
);

Note the Sender#declare* methods return their respective AMQP results wrapped into a Mono.

One can also use the ResourcesSpecification factory class with a static import to reduce boilerplate code. Combined with Mono chaining and Sender#declare shortcuts, it allows for condensed syntax:

import static reactor.rabbitmq.ResourcesSpecification.*;
...
sender.declare(exchange("my.exchange"))
    .then(sender.declare(queue("my.queue")))
    .then(sender.bind(binding("my.exchange", "a.b", "my.queue")))
    .subscribe(r -> System.out.println("Exchange and queue declared and bound"));

Sender has delete* and delete methods as well. Here is an example with the short method forms:

import static reactor.rabbitmq.ResourcesSpecification.*;
...
sender.unbind(binding("my.exchange", "a.b", "my.queue"))
    .then(sender.delete(exchange(my.exchange)))
    .then(sender.delete(queue("my.queue")))
    .subscribe(r -> System.out.println("Exchange and queue unbound and deleted"));

6.2.2. Reliable publishing with publisher confirms

Sender offers also the sendWithPublishConfirms method to send messages and receive publisher confirms to make sure the broker has taken into account the outbound messages.

Flux<OutboundMessage> outboundFlux  = ...;
sender.sendWithPublishConfirms(outboundFlux)
      .subscribe(outboundMessageResult -> {
        // outbound message has reached the broker
      });

Sender#sendWithPublishConfirms returns a Flux<OutboundMessageResult> that can be subscribed to to know that outbound messages have successfully reached the broker.

6.2.3. Threading model

Reactor RabbitMQ configure by default the Java Client to use NIO, i.e. there’s only one thread that deals with IO. This can be changed by specifying a ConnectionFactory in the SenderOptions.

The Sender uses 2 Reactor’s Scheduler: one for the subscription when creating the connection and another one for resources management. The Sender defaults to 2 elastic schedulers, this can be overriden in the SenderOptions. The Sender takes care of disposing the default schedulers when closing. If not using the default schedulers, it’s developer’s job to dispose schedulers they passed in to the SenderOptions.

6.2.4. Closing the Sender

When the Sender is no longer required, the instance can be closed. The underlying Connection is closed, as well as the default schedulers if none has been explicitly provided.

sender.close();

6.3. Reactive RabbitMQ Receiver

Messages stored in RabbitMQ queues are consumed using the reactive receiver reactor.rabbitmq.Receiver. Each instance of Receiver is associated with a single instance of Connection created by the options-provided ConnectionFactory.

A receiver is created with an instance of receiver configuration options reactor.rabbitmq.ReceiverOptions. The properties of SenderOptions contains the ConnectionFactory that creates connections to the broker and a Reactor Scheduler used for the connection creation.

ReceiverOptions receiverOptions =  new ReceiverOptions()
    .connectionFactory(new ConnectionFactory())                 (1)
    .connectionSubscriptionScheduler(Schedulers.elastic());     (2)
1 Specify connection factory
2 Specify scheduler for connection creation

Once the required configuration options have been configured on the options instance, a new Receiver instance can be created with these options to consume inbound messages. The code snippet below creates a receiver instance and an inbound Flux for the receiver. The underlying Connection and Consumer instances are created lazily later when the inbound Flux is subscribed to.

Flux<Delivery> inboundFlux =
    ReactorRabbitMq.receiver(receiverOptions)
                   .consumeNoAck("reactive.queue");

The inbound RabbitMQ Flux is ready to be consumed. Each inbound message delivered by the Flux is represented as a Delivery.

See SampleReceiver for a full code listing for a Receiver.

6.3.1. Consuming options

The Receiver class has different flavors of the receive* method and each of them can accept a ConsumeOptions instance. Here are the different options:

  • overflowStrategy: the OverflowStrategy used when creating the Flux of messages. Default is BUFFER.

  • qos: the prefetch count used when message acknowledgment is enabled. Default is 250.

  • hookBeforeEmitBiFunction: a BiFunction<Long, ? super Delivery, Boolean> to decide whether a message should be emitted downstream or not. Default is to always emit.

  • stopConsumingBiFunction: a BiFunction<Long, ? super Delivery, Boolean> to decide whether the flux should be completed after the emission of the message. Default is to never complete.

6.3.2. Acknowledgment

Receiver has several receive* methods that differ on the way consumer are acknowledged back to the broker. Acknowledgment mode can have profound impacts on performance and memory consumption.

  • consumeNoAck: the broker forgets about a message as soon as it has sent it to the consumer. Use this mode if downstream subscribers are very fast, at least faster than the flow of inbound messages. Messages will pile up in the JVM process memory if subscribers are not able to cope with the flow of messages, leading to out-of-memory errors. Note this mode uses the auto-acknowledgment mode when registering the RabbitMQ Consumer.

  • consumeAutoAck: with this mode, messages are acknowledged right after their arrival, in the Flux#doOnNext callback. This can help to cope with the flow of messages, avoiding the downstream subscribers to be overwhelmed. Note this mode does not use the auto-acknowledgment mode when registering the RabbitMQ Consumer. In this case, consumeAutoAck means messages are automatically acknowledged by the library in one the Flux hooks.

  • consumeManualAck: this method returns a Flux<AcknowledgableDelivery> and messages must be manually acknowledged or rejected downstream with AcknowledgableDelivery#ack or AcknowledgableDelivery#nack, respectively. This mode lets the developer acknowledge messages in the most efficient way, e.g. by acknowledging several messages at the same time with AcknowledgableDelivery#ack(true) and letting Reactor control the batch size with one of the Flux#buffer methods.

To learn more on how the ConsumeOptions#qos setting can impact the behavior of Receiver#consumeAutoAck and Receiver#consumeManualAck, have a look at this post about queuing theory.

6.3.3. Closing the Receiver

When the Receiver is no longer required, the instance can be closed. The underlying Connection is closed, as well as the default scheduler if none has been explicitly provided.

receiver.close();