Rajini Sivaram, Mark Pollack, Oleh Dokuka, Gary Russell
1.4.0-SNAPSHOT
Introduction
1. Overview
1.1. Apache Kafka
Kafka is a scalable, high-performance distributed messaging engine. Low latency, high throughput messaging capability combined with fault-tolerance have made Kafka a popular messaging service as well as a powerful streaming platform for processing real-time streams of events.
Apache Kafka provides three main APIs:
-
Producer/Consumer API to publish messages to Kafka topics and consume messages from Kafka topics
-
Connector API to pull data from existing data storage systems to Kafka or push data from Kafka topics to other data systems
-
Streams API for transforming and analyzing real-time streams of events published to Kafka
1.2. Project Reactor
Reactor is a highly optimized reactive library for building efficient, non-blocking applications on the JVM based on the Reactive Streams Specification. Reactor based applications can sustain very high throughput message rates and operate with a very low memory footprint, making it suitable for building efficient event-driven applications using the microservices architecture.
1.3. Reactive API for Kafka
Reactor Kafka is a reactive API for Kafka based on Reactor and the Kafka Producer/Consumer API. Reactor Kafka API enables messages to be published to Kafka and consumed from Kafka using functional APIs with non-blocking back-pressure and very low overheads. This enables applications using Reactor to use Kafka as a message bus or streaming platform and integrate with other systems to provide an end-to-end reactive pipeline.
2. Motivation
2.1. Functional interface for Kafka
Reactor Kafka is a functional Java API for Kafka. For applications that are written in functional style, this API enables Kafka interactions to be integrated easily without requiring non-functional asynchronous produce or consume APIs to be incorporated into the application logic.
2.2. Non-blocking Back-pressure
The Reactor Kafka API benefits from non-blocking back-pressure provided by Reactor. For example, in a pipeline, where messages received from an external source (e.g. an HTTP proxy) are published to Kafka, back-pressure can be applied easily to the whole pipeline, limiting the number of messages in-flight and controlling memory usage. Messages flow through the pipeline as they are available, with Reactor taking care of limiting the flow rate to avoid overflow, keeping application logic simple.
2.3. End-to-end Reactive Pipeline
The value proposition for Reactor Kafka is the efficient utilization of resources in applications with multiple external interactions where Kafka is one of the external systems. End-to-end reactive pipelines benefit from non-blocking back-pressure and efficient use of threads, enabling a large number of concurrent requests to be processed efficiently. The optimizations provided by Project Reactor enable development of reactive applications with very low overheads and predictable capacity planning to deliver low-latency, high-throughput pipelines.
2.4. Comparisons with other Kafka APIs
Reactor Kafka is not intended to replace any of the existing Kafka APIs. Instead, it is aimed at providing an alternative API for reactive event-driven applications.
2.4.1. Kafka Producer and Consumer APIs
For non-reactive applications, Kafka’s Producer/Consumer API provides a low latency interface to publish messages to Kafka and consume messages from Kafka.
Applications using Kafka as a message bus using this API may consider switching to Reactor Kafka if the application is implemented in a functional style.
2.4.2. Kafka Connect API
Kafka Connect provides a simple interface to migrate messages from an external data system (e.g. a database) to one or more Kafka topics. Using existing connectors, this migration can be performed without writing any new code.
Applications using the connector API may consider using Reactor Kafka if a reactive API is available for the external data system and transformations are required for the data. When transformations involve other I/O (e.g. to obtain additional information from another database), a reactive pipeline benefits from end-to-end non-blocking back-pressure provided by Reactor. Messages from/to different Kafka partitions can be processed in parallel, improving throughput by avoiding blocking for I/O. The pull model in Reactor controls the pace of messages flowing through the pipeline, enabling efficient use of threads and memory without the need for overflow handling in the application.
2.4.3. Kafka Streams API
Kafka Streams provides lightweight APIs to build stream processing applications that process data stored in Kafka using standard streaming concepts and transformation primitives. Using a simple threading model, the streams API avoids the need for back-pressure. This model works well in cases where transformations do not involve external interactions.
Reactor Kafka is useful for streams applications which process data from Kafka and use external interactions (e.g. get additional data for records from a database) for transformations. In this case, Reactor can provide end-to-end non-blocking back-pressure combined with better utilization of resources if all external interactions use the reactive model.
3. Getting Started
3.1. Requirements
You need Java JRE installed (Java 8 or later).
You need Apache Kafka installed (1.0.0 or later). Kafka can be downloaded from kafka.apache.org/downloads.html. Note that the Apache Kafka client library used with Reactor Kafka should be 2.0.0 or later and the broker version should be 1.0.0 or higher.
3.2. Quick Start
This quick start tutorial sets up a single node Zookeeper and Kafka and runs the sample reactive producer and consumer. Instructions to set up multi-broker clusters are available here.
3.2.1. Start Kafka
If you haven’t yet downloaded Kafka, download Kafka version 2.0.0 or higher.
Unzip the release and set KAFKA_DIR to the installation directory. For example,
> tar -zxf kafka_2.11-2.0.0.tgz -C /opt
> export KAFKA_DIR=/opt/kafka_2.11-2.0.0
Start a single-node Zookeeper instance using the Zookeeper installation included in the Kafka download:
> $KAFKA_DIR/bin/zookeeper-server-start.sh $KAFKA_DIR/config/zookeeper.properties > /tmp/zookeeper.log &
Start a single-node Kafka instance:
> $KAFKA_DIR/bin/kafka-server-start.sh $KAFKA_DIR/config/server.properties > /tmp/kafka.log &
Create a Kafka topic:
> $KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 2 --topic demo-topic
Created topic "demo-topic".
Check that Kafka topic was created successfully:
> $KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 --describe
Topic: demo-topic PartitionCount:2 ReplicationFactor:1 Configs:
Topic: demo-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: demo-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
3.2.2. Run Reactor Kafka Samples
Download and build Reactor Kafka from github.com/reactor/reactor-kafka/.
> git clone https://github.com/reactor/reactor-kafka
> cd reactor-kafka
> ./gradlew jar
Set CLASSPATH
for running reactor-kafka samples. CLASSPATH
can be obtained using the classpath task of the samples sub-project.
> export CLASSPATH=`./gradlew -q :reactor-kafka-samples:classpath`
Sample Producer
See github.com/reactor/reactor-kafka/blob/main/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleProducer.java for sample producer code.
Run sample producer:
> $KAFKA_DIR/bin/kafka-run-class.sh reactor.kafka.samples.SampleProducer
Message 2 sent successfully, topic-partition=demo-topic-1 offset=0 timestamp=13:33:16:716 GMT 30 Nov 2016
Message 3 sent successfully, topic-partition=demo-topic-1 offset=1 timestamp=13:33:16:716 GMT 30 Nov 2016
Message 4 sent successfully, topic-partition=demo-topic-1 offset=2 timestamp=13:33:16:716 GMT 30 Nov 2016
Message 6 sent successfully, topic-partition=demo-topic-1 offset=3 timestamp=13:33:16:716 GMT 30 Nov 2016
Message 7 sent successfully, topic-partition=demo-topic-1 offset=4 timestamp=13:33:16:716 GMT 30 Nov 2016
Message 10 sent successfully, topic-partition=demo-topic-1 offset=5 timestamp=13:33:16:716 GMT 30 Nov 2016
Message 11 sent successfully, topic-partition=demo-topic-1 offset=6 timestamp=13:33:16:716 GMT 30 Nov 2016
Message 12 sent successfully, topic-partition=demo-topic-1 offset=7 timestamp=13:33:16:717 GMT 30 Nov 2016
Message 13 sent successfully, topic-partition=demo-topic-1 offset=8 timestamp=13:33:16:717 GMT 30 Nov 2016
Message 14 sent successfully, topic-partition=demo-topic-1 offset=9 timestamp=13:33:16:717 GMT 30 Nov 2016
Message 16 sent successfully, topic-partition=demo-topic-1 offset=10 timestamp=13:33:16:717 GMT 30 Nov 2016
Message 17 sent successfully, topic-partition=demo-topic-1 offset=11 timestamp=13:33:16:717 GMT 30 Nov 2016
Message 20 sent successfully, topic-partition=demo-topic-1 offset=12 timestamp=13:33:16:717 GMT 30 Nov 2016
Message 1 sent successfully, topic-partition=demo-topic-0 offset=0 timestamp=13:33:16:712 GMT 30 Nov 2016
Message 5 sent successfully, topic-partition=demo-topic-0 offset=1 timestamp=13:33:16:716 GMT 30 Nov 2016
Message 8 sent successfully, topic-partition=demo-topic-0 offset=2 timestamp=13:33:16:716 GMT 30 Nov 2016
Message 9 sent successfully, topic-partition=demo-topic-0 offset=3 timestamp=13:33:16:716 GMT 30 Nov 2016
Message 15 sent successfully, topic-partition=demo-topic-0 offset=4 timestamp=13:33:16:717 GMT 30 Nov 2016
Message 18 sent successfully, topic-partition=demo-topic-0 offset=5 timestamp=13:33:16:717 GMT 30 Nov 2016
Message 19 sent successfully, topic-partition=demo-topic-0 offset=6 timestamp=13:33:16:717 GMT 30 Nov 2016
The sample producer sends 20 messages to Kafka topic demo-topic
using the default partitioner. The partition
and offset of each published message is output to console. As shown in the sample output above, the order of
results may be different from the order of messages published. Results are delivered in order for each partition,
but results from different partitions may be interleaved. In the sample, message index is included as
correlation metadata to match each result to its corresponding message.
Sample Consumer
See github.com/reactor/reactor-kafka/blob/main/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleConsumer.java for sample consumer code.
Run sample consumer:
> $KAFKA_DIR/bin/kafka-run-class.sh reactor.kafka.samples.SampleConsumer
Received message: topic-partition=demo-topic-1 offset=0 timestamp=13:33:16:716 GMT 30 Nov 2016 key=2 value=Message_2
Received message: topic-partition=demo-topic-1 offset=1 timestamp=13:33:16:716 GMT 30 Nov 2016 key=3 value=Message_3
Received message: topic-partition=demo-topic-1 offset=2 timestamp=13:33:16:716 GMT 30 Nov 2016 key=4 value=Message_4
Received message: topic-partition=demo-topic-1 offset=3 timestamp=13:33:16:716 GMT 30 Nov 2016 key=6 value=Message_6
Received message: topic-partition=demo-topic-1 offset=4 timestamp=13:33:16:716 GMT 30 Nov 2016 key=7 value=Message_7
Received message: topic-partition=demo-topic-1 offset=5 timestamp=13:33:16:716 GMT 30 Nov 2016 key=10 value=Message_10
Received message: topic-partition=demo-topic-1 offset=6 timestamp=13:33:16:716 GMT 30 Nov 2016 key=11 value=Message_11
Received message: topic-partition=demo-topic-1 offset=7 timestamp=13:33:16:717 GMT 30 Nov 2016 key=12 value=Message_12
Received message: topic-partition=demo-topic-1 offset=8 timestamp=13:33:16:717 GMT 30 Nov 2016 key=13 value=Message_13
Received message: topic-partition=demo-topic-1 offset=9 timestamp=13:33:16:717 GMT 30 Nov 2016 key=14 value=Message_14
Received message: topic-partition=demo-topic-1 offset=10 timestamp=13:33:16:717 GMT 30 Nov 2016 key=16 value=Message_16
Received message: topic-partition=demo-topic-1 offset=11 timestamp=13:33:16:717 GMT 30 Nov 2016 key=17 value=Message_17
Received message: topic-partition=demo-topic-1 offset=12 timestamp=13:33:16:717 GMT 30 Nov 2016 key=20 value=Message_20
Received message: topic-partition=demo-topic-0 offset=0 timestamp=13:33:16:712 GMT 30 Nov 2016 key=1 value=Message_1
Received message: topic-partition=demo-topic-0 offset=1 timestamp=13:33:16:716 GMT 30 Nov 2016 key=5 value=Message_5
Received message: topic-partition=demo-topic-0 offset=2 timestamp=13:33:16:716 GMT 30 Nov 2016 key=8 value=Message_8
Received message: topic-partition=demo-topic-0 offset=3 timestamp=13:33:16:716 GMT 30 Nov 2016 key=9 value=Message_9
Received message: topic-partition=demo-topic-0 offset=4 timestamp=13:33:16:717 GMT 30 Nov 2016 key=15 value=Message_15
Received message: topic-partition=demo-topic-0 offset=5 timestamp=13:33:16:717 GMT 30 Nov 2016 key=18 value=Message_18
Received message: topic-partition=demo-topic-0 offset=6 timestamp=13:33:16:717 GMT 30 Nov 2016 key=19 value=Message_19
The sample consumer consumes messages from topic demo-topic
and outputs the messages to console. The 20 messages
published by the Producer sample should appear on the console. As shown in the output above, messages are consumed
in order for each partition, but messages from different partitions may be interleaved.
3.2.3. Building Reactor Kafka Applications
To build your own application using the Reactor Kafka API, you need to include a dependency to Reactor Kafka.
For gradle:
dependencies {
compile "io.projectreactor.kafka:reactor-kafka:1.4.0-SNAPSHOT"
}
For maven:
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.4.0-SNAPSHOT</version>
</dependency>
4. Additional Resources
4.1. Getting help
If you are having trouble with Reactor Kafka, we’d like to help.
Report bugs in Reactor Kafka at github.com/reactor/reactor-kafka/issues.
Reactor Kafka is open source and the code and documentation are available at github.com/reactor/reactor-kafka.
Reference Documentation
5. Reactor Kafka API
5.1. Overview
This section describes the reactive API for producing and consuming messages using Apache Kafka. There are two main interfaces in Reactor Kafka:
-
reactor.kafka.sender.KafkaSender
for publishing messages to Kafka -
reactor.kafka.receiver.KafkaReceiver
for consuming messages from Kafka
Full API for Reactor Kafka is available in the javadocs.
The project uses Reactor Core to expose a "Reactive Streams" API.
5.2. Reactive Kafka Sender
Outbound messages are sent to Kafka using reactor.kafka.sender.KafkaSender
. Senders are thread-safe and can be shared
across multiple threads to improve throughput. A KafkaSender
is associated with one KafkaProducer
that is used
to transport messages to Kafka.
A KafkaSender is created with an instance of sender configuration options reactor.kafka.sender.SenderOptions
.
Changes made to SenderOptions
after the creation of KafkaSender
will not be used by the KafkaSender.
The properties of SenderOptions such as a list of bootstrap Kafka brokers and serializers are passed down
to the underlying KafkaProducer
. The properties may be configured on the SenderOptions instance at creation time
or by using the setter SenderOptions#producerProperty
. Other configuration options for the reactive KafkaSender like
the maximum number of in-flight messages can also be configured before the KafkaSender instance is created.
The generic types of SenderOptions<K, V>
and KafkaSender<K, V>
are the key and value types of producer records
published using the KafkaSender and corresponding serializers must be set on the SenderOptions instance before
the KafkaSender is created.
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<Integer, String> senderOptions =
SenderOptions.<Integer, String>create(producerProps) (1)
.maxInFlight(1024); (2)
1 | Specify properties for underlying KafkaProducer |
2 | Configure options for reactive KafkaSender |
Once the required options have been configured on the options instance, a new KafkaSender
instance
can be created with the options already configured in senderOptions
.
KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);
The KafkaSender is now ready to send messages to Kafka.
The underlying KafkaProducer
instance is created lazily when the first message is ready to be sent.
At this point, a KafkaSender
instance has been created, but no connections to Kafka have been made yet.
Let’s now create a sequence of messages to send to Kafka. Each outbound message to be sent to Kafka
is represented as a SenderRecord
. A SenderRecord
is a Kafka
ProducerRecord
with additional correlation metadata for matching send results to records. ProducerRecord
consists of a key/value pair
to send to Kafka and the name of the Kafka topic to send the message to. Producer records may also optionally
specify a partition to send the message to or use the configured partitioner to choose a partition. Timestamp may
also be optionally specified in the record and if not specified, the current timestamp will be assigned by the Producer.
The additional correlation metadata included in SenderRecord
is not sent to Kafka, but is included in the
SendResult
generated for the record when the send operation completes or fails. Since results of sends to
different partitions may be interleaved, the correlation metadata enables results to be matched to their corresponding record.
A Flux<SenderRecord> of records
is created for sending to Kafka. For beginners, Lite Rx API Hands-on
provides a hands-on tutorial on using the Reactor classes Flux
and Mono
.
Flux<SenderRecord<Integer, String, Integer>> outboundFlux =
Flux.range(1, 10)
.map(i -> SenderRecord.create(topic, partition, timestamp, i, "Message_" + i, i));
The code segment above creates a sequence of messages to send to Kafka, using the message index as
correlation metadata in each SenderRecord
. The outbound Flux can now be sent to Kafka using the
KafkaSender
created earlier.
The code segment below sends the records to Kafka and prints out the response metadata received from Kafka
and the correlation metadata for each record. The final subscribe()
in the code block
requests upstream to send the records to Kafka and the response metadata received from Kafka flow downstream.
As each result is received, the record metadata from Kafka along with the correlation metadata identifying the
record is printed out to console by the onNext
handler. The response from Kafka includes the partition to which
the record was sent as well as the offset at the which the record was appended, if available.
When records are sent to multiple partitions, responses arrive in order
for each partition, but responses from different partitions may be interleaved.
sender.send(outboundFlux) (1)
.doOnError(e-> log.error("Send failed", e)) (2)
.doOnNext(r -> System.out.printf("Message #%d send response: %s\n", r.correlationMetadata(), r.recordMetadata())) (3)
.subscribe(); (4)
1 | Reactive send operation for the outbound Flux |
2 | If Kafka send fails, log an error |
3 | Print metadata returned by Kafka and the message index in correlationMetadata() |
4 | Subscribe to trigger the actual flow of records from outboundFlux to Kafka. |
See github.com/reactor/reactor-kafka/blob/main/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleProducer.java for the full code listing of a sample producer.
5.2.1. Error handling
public SenderOptions<K, V> stopOnError(boolean stopOnError);
SenderOptions#stopOnError()
specifies whether each send sequence should fail immediately if one
record could not be delivered to Kafka after the configured number of retries or wait until all records
have been processed. This can be used along with ProducerConfig#ACKS_CONFIG
and ProducerConfig#RETRIES_CONFIG
to configure the required quality of service.
<T> Flux<SenderResult<T>> send(Publisher<SenderRecord<K, V, T>> outboundRecords);
If stopOnError
is false, a success or error response is returned for each outgoing record.
For error responses, the exception from Kafka indicating the reason for send failure is set on SenderResult
and can be retrieved using SenderResult#exception()
. The Flux fails with an error after attempting to send
all records published on outboundRecords
. If outboundRecords
is a non-terminating Flux
, send continues to send
records published on this Flux
until the result Flux
is explicitly cancelled by the user.
If stopOnError
is true, a response is returned for the first failed send and the result Flux is terminated
immediately with an error. Since multiple outbound messages may be in-flight at any time, it is possible that
some messages are delivered successfully to Kafka after the first failure is detected. SenderOptions#maxInFlight()
option may be configured to limit the number of messages in-flight at any time.
5.2.2. Send without result metadata
If individual results are not required for each send request, ProducerRecord
can be sent to Kafka
without providing correlation metadata using the KafkaOutbound
interface. KafkaOutbound
is a fluent
interface that enables sends to be chained together.
KafkaOutbound<K, V> send(Publisher<? extends ProducerRecord<K, V>> outboundRecords);
The send sequence is initiated by subscribing to the Mono obtained from KafkaOutbound#then()
.
The returned Mono completes successfully if all the outbound records are delivered successfully. The Mono
terminates on the first send failure. If outboundRecords
is a non-terminating Flux, records continue to
be sent to Kafka unless a send fails or the returned Mono is cancelled.
sender.createOutbound()
.send(Flux.range(1, 10)
.map(i -> new ProducerRecord<Integer, String>(topic, i, "Message_" + i))) (1)
.then() (2)
.doOnError(e -> e.printStackTrace()) (3)
.doOnSuccess(s -> System.out.println("Sends succeeded")) (4)
.subscribe(); (5)
1 | Create ProducerRecord Flux. Records are not wrapped in SenderRecord |
2 | Get the Mono to subscribe to for starting the message flow |
3 | Error indicates failure to send one or more records |
4 | Success indicates all records were published, individual partitions or offsets not returned |
5 | Subscribe to request the actual sends |
Multiple sends can be chained together using a sequence of sends on KafkaOutbound
.
When the Mono returned from KafkaOutbound#then()
is subscribed to, the sends are invoked
in sequence in the declaration order. The sequence is cancelled if any of the sends fail
after the configured number of retries.
sender.createOutbound()
.send(flux1) (1)
.send(flux2)
.send(flux3)
.then() (2)
.doOnError(e -> e.printStackTrace()) (3)
.doOnSuccess(s -> System.out.println("Sends succeeded")) (4)
.subscribe(); (5)
1 | Sends flux1 , flux2 and flux3 in order |
2 | Get the Mono to subscribe to for starting the message flow sequence |
3 | Error indicates failure to send one or more records from any of the sends in the chain |
4 | Success indicates successful send of all records from the whole chain |
5 | Subscribe to initiate the sequence of sends in the chain |
Note that in all cases the retries configured for the KafkaProducer
are attempted and failures returned by
the reactive KafkaSender
indicate a failure to send after the configured number of retry attempts. Retries
can result in messages being delivered out of order. The producer property
ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
may be set to one to avoid re-ordering.
5.2.3. Threading model
KafkaProducer
uses a separate network thread for sending requests and processing responses. To ensure
that the producer network thread is never blocked by applications while processing results, KafkaSender
delivers responses to applications on a separate scheduler. By default, this is a single threaded
pooled scheduler that is freed when no longer required. The scheduler can be overridden if required, for instance,
to use a parallel scheduler when the Kafka sends are part of a larger pipeline. This is done on the SenderOptions
instance before the KafkaSender instance is created using:
public SenderOptions<K, V> scheduler(Scheduler scheduler);
5.2.4. Non-blocking back-pressure
The number of in-flight sends can be controlled using the maxInFlight
option. Requests for more elements from
upstream are limited by the configured maxInFlight
to ensure that the total number of requests at any time for which
responses are pending are limited. Along with buffer.memory
and max.block.ms
options on KafkaProducer
,
maxInFlight
enables control of memory and thread usage when KafkaSender
is used in a reactive pipeline. This option
can be configured on SenderOptions
before the KafkaSender is created. Default value is 256. For small messages,
a higher value will improve throughput.
public SenderOptions<K, V> maxInFlight(int maxInFlight);
5.2.5. Closing the KafkaSender
When the KafkaSender is no longer required, the KafkaSender instance can be closed. The underlying KafkaProducer
is closed,
closing all client connections and freeing all memory used by the producer.
sender.close();
5.2.6. Access to the underlying KafkaProducer
Reactive applications may sometimes require access to the underlying producer instance to perform actions that are not
exposed by the KafkaSender
interface. For example, an application might need to know the number of partitions in a topic
in order to choose the partition to send a record to. Operations that are not provided directly by KafkaSender
like send
can be run on the underlying KafkaProducer
using KafkaSender#doOnProducer
.
sender.doOnProducer(producer -> producer.partitionsFor(topic))
.doOnSuccess(partitions -> System.out.println("Partitions " + partitions))
.subscribe();
User provided methods are executed asynchronously.
A Mono
is returned by doOnProducer
which completes with the value returned by the user-provided function.
5.3. Reactive Kafka Receiver
Messages stored in Kafka topics are consumed using the reactive receiver reactor.kafka.receiver.KafkaReceiver
.
Each instance of KafkaReceiver
is associated with a single instance of KafkaConsumer
. KafkaReceiver
is not thread-safe
since the underlying KafkaConsumer
cannot be accessed concurrently by multiple threads.
A receiver is created with an instance of receiver configuration options reactor.kafka.receiver.ReceiverOptions
.
Changes made to ReceiverOptions
after the creation of the receiver instance will not be used by the KafkaReceiver
.
The properties of ReceiverOptions such as a list of bootstrap Kafka brokers and de-serializers are passed down
to the underlying KafkaConsumer
. These properties may be configured on the ReceiverOptions instance at creation time
or by using the setter ReceiverOptions#consumerProperty
. Other configuration options for the reactive
KafkaReceiver including subscription topics must be added to options before the KafkaReceiver instance is created.
The generic types of ReceiverOptions<K, V>
and KafkaReceiver<K, V>
are the key and value types of consumer records
consumed using the receiver and corresponding de-serializers must be set on the ReceiverOptions instance before
the KafkaReceiver is created.
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ReceiverOptions<Integer, String> receiverOptions =
ReceiverOptions.<Integer, String>create(consumerProps) (1)
.subscription(Collections.singleton(topic)); (2)
1 | Specify properties to be provided to KafkaConsumer |
2 | Topics to subscribe to |
Once the required configuration options have been configured on the options instance, a new KafkaReceiver
instance
can be created with these options to consume inbound messages.
The code block below creates a receiver instance and creates an inbound Flux for the receiver.
The underlying KafkaConsumer
instance is created lazily later when the inbound Flux is subscribed to.
Flux<ReceiverRecord<Integer, String>> inboundFlux =
KafkaReceiver.create(receiverOptions)
.receive();
The inbound Kafka Flux is ready to be consumed. Each inbound message delivered by the Flux is represented
as a ReceiverRecord
. Each receiver record is a
ConsumerRecord
returned by KafkaConsumer
along with a committable ReceiverOffset
instance. The offset must be acknowledged
after the message is processed since unacknowledged offsets will not be committed.
If commit interval or commit batch size are configured, acknowledged offsets will be committed periodically.
Offsets may also be committed manually using ReceiverOffset#commit()
if finer grained control of commit
operations is required.
inboundFlux.subscribe(r -> {
System.out.printf("Received message: %s\n", r); (1)
r.receiverOffset().acknowledge(); (2)
});
1 | Prints each consumer record from Kafka |
2 | Acknowledges that the record has been processed so that the offset may be committed |
5.3.1. Error handling
Since in reactive streams an error represents a terminal signal, any error signal emitted in the inbound Flux will cause the subscription to be cancelled and
effectively cause the consumer to shut down. This can be mitigated by using the retry()
operator (or retryWhen
for finer grained control),
which will ensure that a new consumer is created:
Flux<ReceiverRecord<Integer, String>> inboundFlux =
KafkaReceiver.create(receiverOptions)
.receive()
.retryWhen(Retry.backoff(3, Duration.of(10L, ChronoUnit.SECONDS)));
Any errors related to the event processing rather than the KafkaConsumer
itself should be handled as close to the source as possible
and should ideally be prevented from propagating up to the inbound Flux. This is to ensure that the KafkaConsumer
doesn’t get restarted
unnecessarily due to unrelated application errors.
5.3.2. Subscribing to wildcard patterns
The example above subscribed to a single Kafka topic. The same API can be used to subscribe to
more than one topic by specifying multiple topics in the collection provided to ReceiverOptions#subscription()
.
Subscription can also be made to a wildcard pattern by specifying a pattern to subscribe to. Group
management in KafkaConsumer
dynamically updates topic assignment when topics matching the pattern
are created or deleted and assigns partitions of matching topics to available consumer instances.
receiverOptions = receiverOptions.subscription(Pattern.compile("demo.*")); (1)
1 | Consume records from all topics starting with "demo" |
Changes to ReceiverOptions
must be made before the receiver instance is created. Altering the subscription
deletes any existing subscriptions on the options instance.
5.3.3. Manual assignment of topic partitions
Partitions may be manually assigned to the receiver without using Kafka consumer group management.
receiverOptions = receiverOptions.assignment(Collections.singleton(new TopicPartition(topic, 0))); (1)
1 | Consume from partition 0 of specified topic |
Existing subscriptions and assignments on the options instance are deleted when a new assignment is specified. Every receiver created from this options instance with manual assignment consumes messages from all the specified partitions.
5.3.4. Controlling commit frequency
Commit frequency can be controlled using a combination of commit interval
and commit batch size. Commits are performed when either the interval or batch size is reached. One or both
of these options may be set on ReceiverOptions
before the receiver instance is created. If commit interval
is configured, at least one commit is scheduled within that interval if any records were
consumed. If commit batch size is configured, a commit is scheduled when the configured number of records
are consumed and acknowledged.
Manual acknowledgement of consumed records after processing along with automatic commits based on
the configured commit frequency provides at-least-once delivery semantics. Messages are re-delivered
if the consuming application crashes after message was dispatched but before it was processed and
acknowledged. Only offsets explicitly acknowledged using ReceiverOffset#acknowledge()
are committed.
Note that acknowledging an offset acknowledges all previous offsets on the same partition. All
acknowledged offsets are committed when partitions are revoked during rebalance and when the receive
Flux is terminated.
Applications which require fine-grained control over the timing of commit operations
can disable periodic commits and explicitly invoke ReceiverOffset#commit()
when required to trigger
a commit. This commit is asynchronous by default, but the application many invoke Mono#block()
on the returned Mono to implement synchronous commits. Applications may batch commits by acknowledging
messages as they are consumed and invoking commit() periodically to commit acknowledged offsets.
receiver.receive()
.doOnNext(r -> {
process(r);
r.receiverOffset().commit().block();
});
Note that committing an offset acknowledges and commits all previous offsets on that partition. All acknowledged offsets are committed when partitions are revoked during rebalance and when the receive Flux is terminated.
Starting with version 1.3.12, when a rebalance occurs due to group member changes, the rebalance is delayed until records received from the previous poll have been processed.
This is controlled by two ReceiverOptions
- maxDelayRebalance
(default 60s) and commitIntervalDuringDelay
(default 100ms).
While the delay is in process, any offsets available for committal will be committed every commitIntervalDuringDelay
milliseconds.
This allows orderly completion of processing the records that have already been received.
maxDelayRebalance
should be less than max.poll.interval.ms
to avoid a forced rebalance due to a non-responsive consumer.
5.3.5. Out of Order Commits
Starting with version 1.3.8, commits can be performed out of order and the framework will defer the commits as needed, until any "gaps" are filled. This removes the need for applications to keep track of offsets and commit them in the right order. Deferring commits increases the likelihood of duplicate deliveries if the application crashes while deferred commits are present.
To enable this feature, set the maxDeferredCommits
property of ReceiverOptions
.
If the number of deferred offset commits exceeds this value, the consumer is pause()
d until the number of deferred commits is reduced by the application acknowledging or commiting some of the "missing" offsets.
ReceiverOptions<Object, Object> options = ReceiverOptions.create()
.maxDeferredCommits(100)
.subscription(Collections.singletonList("someTopic"));
The number is an aggregate of deferred commits across all the assigned topics/partitions.
Leaving the property at its default 0
disables the feature and commits are performed whenever called.
5.3.6. Auto-acknowledgement of batches of records
KafkaReceiver#receiveAutoAck
returns a Flux
of batches of records returned by each KafkaConsumer#poll()
.
The records in each batch are automatically acknowledged when the Flux corresponding to the batch terminates.
KafkaReceiver.create(receiverOptions)
.receiveAutoAck()
.concatMap(r -> r) (1)
.subscribe(r -> System.out.println("Received: " + r)); (2)
1 | Concatenate in order |
2 | Print out each consumer record received, no explicit ack required |
The maximum number of records in each batch can be controlled using the KafkaConsumer
property
MAX_POLL_RECORDS
. This is used together with the fetch size and wait times configured on the
KafkaConsumer to control the amount of data fetched from Kafka brokers in each poll. Each batch is
returned as a Flux that is acknowledged after the Flux terminates. Acknowledged records are committed periodically
based on the configured commit interval and batch size. This mode is simple to use since applications
do not need to perform any acknowledge or commit actions. It is efficient as well but can not be used
for at-least-once delivery of messages.
5.3.7. Manual acknowledgement of batches of records
KafkaReceiver#receiveBatch
returns a Flux
of batches of records returned by each KafkaConsumer#poll()
.
The records in each batch should be manually acknowledged or committed.
KafkaReceiver.create(receiverOptions)
.receiveBatch()
.concatMap(b -> b) (1)
.subscribe(r -> {
System.out.println("Received message: " + r); (2)
r.receiverOffset().acknowledge(); (3)
});
1 | Concatenate in order |
2 | Print out each consumer record received |
3 | Explicit ack for each message |
Same as the KafkaReceiver#receiveAutoAck
method, the maximum number of records in each batch can be controlled
using the KafkaConsumer
property MAX_POLL_RECORDS
. This is used together with the fetch size and wait times
configured on the KafkaConsumer to control the amount of data fetched from Kafka brokers in each poll.
But unlike the KafkaReceiver#receiveAutoAck
, each batch is returned as a Flux that should be acknowledged
or committed using ReceiverOffset
.
As the KafkaReceiver#receive
method messages, each message in the batch is represented as a ReceiverRecord
which has a committable ReceiverOffset
instance.
KafkaReceiver#receiveBatch
combines the batch consumption mode of KafkaReceiver#receiveAutoAck
with the manual
acknowledgement/commit mode of KafkaReceiver#receive
. This batching mode is efficient and is easy to use
for at-least-once delivery of messages.
5.3.8. Disabling automatic commits
Applications which don’t require offset commits to Kafka may disable automatic commits by not acknowledging
any records consumed using KafkaReceiver#receive()
.
receiverOptions = ReceiverOptions.<Integer, String>create()
.commitInterval(Duration.ZERO) (1)
.commitBatchSize(0); (2)
KafkaReceiver.create(receiverOptions)
.receive()
.subscribe(r -> process(r)); (3)
1 | Disable periodic commits |
2 | Disable commits based on batch size |
3 | Process records, but don’t acknowledge |
5.3.9. At-most-once delivery
Applications may disable automatic commits to avoid re-delivery of records. ConsumerConfig#AUTO_OFFSET_RESET_CONFIG
can be configured to "latest" to consume only new records. But this could mean that an unpredictable
number of records are not consumed if an application fails and restarts.
KafkaReceiver#receiveAtmostOnce
can be used to consume records with at-most-once semantics with a configurable
number of records-per-partition that may be lost if the application fails or crashes. Offsets are committed
synchronously before the corresponding record is dispatched. Records are guaranteed not to be re-delivered
even if the consuming application fails, but some records may not be processed if an application fails
after the commit before the records could be processed.
This mode is expensive since each record is committed individually and records are not delivered until
the commit operation succeeds. ReceiverOptions#atmostOnceCommitCommitAheadSize
may be configured
to reduce the cost of commits and avoid blocking before dispatch if the offset of the record has already
been committed. By default, commit-ahead is disabled and at-most one record is lost per-partition if
an application crashes. If commit-ahead is configured, the maximum number of records that may be
lost per-partition is ReceiverOptions#atmostOnceCommitCommitAheadSize + 1
.
KafkaReceiver.create(receiverOptions)
.receiveAtmostOnce()
.subscribe(r -> System.out.println("Received: " + r)); (1)
1 | Process each consumer record, this record is not re-delivered if the processing fails |
5.3.10. Partition assignment and revocation listeners
Applications can enable assignment and revocation listeners to perform any actions when partitions are assigned or revoked from a consumer.
When group management is used, assignment listeners are invoked whenever partitions are assigned
to the consumer after a rebalance operation. When manual assignment is used, assignment listeners
are invoked when the consumer is started. Assignment listeners can be used to seek to particular offsets
in the assigned partitions so that messages are consumed from the specified offset.
When a user pauses topics/partitions before rebalancing, the behavior depends on the value of
pauseAllAfterRebalance
. If it is set to false
, the paused topics/partitions will remain paused after the rebalance.
However, if it is set to true
, all assigned topics/partitions will be paused after the rebalance.
When group management is used, revocation listeners are invoked whenever partitions are revoked from a consumer after a rebalance operation. When manual assignment is used, revocation listeners are invoked before the consumer is closed. Revocation listeners can be used to commit processed offsets when manual commits are used. Acknowledged offsets are automatically committed on revocation if automatic commits are enabled.
5.3.11. Controlling start offsets for consuming records
By default, receivers start consuming records from the last committed offset of each assigned partition.
If a committed offset is not available, the offset reset strategy ConsumerConfig#AUTO_OFFSET_RESET_CONFIG
configured for the KafkaConsumer
is used to set the start offset to the earliest or latest offset on the partition.
Applications can override offsets by seeking to new offsets in an assignment listener. Methods are provided on
ReceiverPartition
to seek to the earliest, latest, a specific offset in the partition, or to a record with
a timestamp later than a point in time.
void seekToBeginning();
void seekToEnd();
void seek(long offset);
void seekToTimestamp(long timestamp);
For example, the following code block starts consuming messages from the latest offset.
receiverOptions = receiverOptions
.addAssignListener(partitions -> partitions.forEach(p -> p.seekToEnd())) (1)
.subscription(Collections.singleton(topic));
KafkaReceiver.create(receiverOptions).receive().subscribe();
1 | Seek to the last offset in each assigned partition |
Other methods are available on ReceiverPartition
to determine the current position, the beginning offset, and
ending offset, at the time the partition is assigned.
long position();
Long beginningOffset();
Long endOffset();
5.3.12. Consumer lifecycle
Each KafkaReceiver
instance is associated with a KafkaConsumer
that is created when the inbound
Flux returned by one of the receive methods in KafkaReceiver
is subscribed to. The consumer is kept alive until
the Flux completes. When the Flux completes, all acknowledged offsets are committed and the
underlying consumer is closed.
Only one receive operation may be active in a KafkaReceiver
at any one time. Any of the receive
methods can be invoked after the receive Flux corresponding to the last receive is terminated.
5.4. Micrometer Metrics
To enable micrometer metrics for the underlying Kafka Consumers and Producers, add a MicrometerConsumerListener
to the ReceiverOptions
or a MicrometerProducerListener
to the SenderOptions
respectively.
5.5. Micrometer Observation
To enable Micrometer observation for produced and consumed records, add an ObservationRegistry
to the SenderOptions
and ReceiverOptions
using the withObservation()
API.
A custom KafkaSenderObservationConvention
(and KafkaReceiverObservationConvention
) can also be set.
See their default implementations in the KafkaSenderObservation
and KafkaReceiverObservation
, respectively.
The DefaultKafkaSenderObservationConvention
exposes two low-cardinality tags: reactor.kafka.type = sender
and reactor.kafka.client.id
with the ProducerConfig.CLIENT_ID_CONFIG
option or identity hash code of the DefaultKafkaSender
instance prefixed with the reactor-kafka-sender-
.
The DefaultKafkaReceiverObservationConvention
exposes two low-cardinality tags: reactor.kafka.type = receiver
and reactor.kafka.client.id
with the ConsumerConfig.CLIENT_ID_CONFIG
option or identity hash code of the DefaultKafkaReceiver
instance prefixed with the reactor-kafka-receiver-
.
If a PropagatingSenderTracingObservationHandler
is configured on the ObservationRegistry
, the tracing information from the context around a producer record is stored into its headers before publishing this record to the Kafka topic.
If a PropagatingReceiverTracingObservationHandler
is configured on the ObservationRegistry
, the tracing information from the mentioned Kafka record headers, is restored into the context on the receiver side with a child span.
Because the reverse order nature of the Reactor context, the observation functionality on the KafkaReceiver
is limited just to a single trace
logging message for each received record.
Restored tracing information will be correlated into logs if so configured for the logging system.
If there are requirements to continue an observation on the consumer side, the KafkaReceiverObservation.RECEIVER_OBSERVATION
API must be used manually in the record processing operator:
KafkaReceiver.create(receiverOptions.subscription(List.of(topic)))
.receive()
.flatMap(record -> {
Observation receiverObservation =
KafkaReceiverObservation.RECEIVER_OBSERVATION.start(null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() ->
new KafkaRecordReceiverContext(
record, "user.receiver", receiverOptions.bootstrapServers()),
observationRegistry);
return Mono.just(record)
.flatMap(TARGET_RECORD_HANDLER)
.doOnTerminate(receiverObservation::stop)
.doOnError(receiverObservation::error)
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
})
.subscribe();
6. Sample Scenarios
This section shows sample code segments for typical scenarios where Reactor Kafka API may be used. Full code listing for these scenarios are included in the samples sub-project.
6.1. Sending records to Kafka
See KafkaSender API for details on the KafkaSender API for sending outbound records to Kafka. The following code segment creates a simple pipeline that sends records to Kafka and processes the responses. The outbound flow is triggered when the returned Flux is subscribed to.
KafkaSender.create(SenderOptions.<Integer, String>create(producerProps).maxInFlight(512)) (1)
.send(outbound.map(r -> senderRecord(r))) (2)
.doOnNext(result -> processResponse(result)) (3)
.doOnError(e -> processError(e));
1 | Create a sender with maximum 512 messages in-flight |
2 | Send a sequence of sender records |
3 | Process send result when onNext is triggered |
6.2. Replaying records from Kafka topics
See KafkaReceiver API for details on the KafkaReceiver API for consuming records from Kafka topics. The following code segment creates a Flux that replays all records on a topic and commits offsets after processing the messages. Manual acknowledgement provides at-least-once delivery semantics.
ReceiverOptions<Integer, String> options =
ReceiverOptions.<Integer, String>create(consumerProps)
.consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") (1)
.commitBatchSize(10) (2)
.subscription(Collections.singleton("demo-topic")); (3)
KafkaReceiver.create(options)
.receive()
.doOnNext(r -> {
processRecord(r); (4)
r.receiverOffset().acknowledge(); (5)
})
.subscribe();
1 | Start consuming from first available offset on each partition if committed offsets are not available |
2 | Commit every 10 acknowledged messages |
3 | Topics to consume from |
4 | Process consumer record from Kafka |
5 | Acknowledge that record has been consumed |
6.3. Reactive pipeline with Kafka sink
The code segment below consumes messages from an external source, performs some transformation and stores the output records in Kafka. Large number of retry attempts are configured on the Kafka producer so that transient failures don’t impact the pipeline. Source commits are performed only after records are successfully written to Kafka.
senderOptions = senderOptions
.producerProperty(ProducerConfig.ACKS_CONFIG, "all") (1)
.producerProperty(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE) (2)
.maxInFlight(128); (3)
KafkaSender.create(senderOptions)
.send(source.flux().map(r -> transform(r))) (4)
.doOnError(e-> log.error("Send failed, terminating.", e)) (5)
.doOnNext(r -> source.commit(r.correlationMetadata())) (6)
.retryWhen(Retry.backoff(3, Duration.of(10L, ChronoUnit.SECONDS)));
1 | Send is acknowledged by Kafka for acks=all after message is delivered to all in-sync replicas |
2 | Large number of retries in the producer to cope with transient failures in brokers |
3 | Low in-flight count to avoid filling up producer buffer and blocking the pipeline, default stopOnError=true |
4 | Receive from external source, transform and send to Kafka |
5 | If a send fails, it indicates catastrophic error, fail the whole pipeline |
6 | Use correlation metadata in the sender record to commit source record |
6.4. Reactive pipeline with Kafka source
The code segment below consumes records from Kafka topics, transforms the record and sends the output to an external sink. Kafka consumer offsets are committed after records are successfully output to sink.
receiverOptions = receiverOptions
.commitInterval(Duration.ZERO) (1)
.commitBatchSize(0) (2)
.subscription(Pattern.compile(topics)); (3)
KafkaReceiver.create(receiverOptions)
.receive()
.publishOn(aBoundedElasticScheduler) (4)
.concatMap(m -> sink.store(transform(m)) (5)
.doOnSuccess(r -> m.receiverOffset().commit().block())) (6)
.retryWhen(Retry.backoff(3, Duration.of(10L, ChronoUnit.SECONDS)));
1 | Disable periodic commits |
2 | Disable commits by batch size |
3 | Wildcard subscription |
4 | Cannot block the receiver thread |
5 | Tranform Kafka record and store in external sink |
6 | Synchronous commit after record is successfully delivered to sink |
6.5. Reactive pipeline with Kafka source and sink
The code segment below consumes messages from Kafka topic, performs some transformation on the incoming messages and stores the result in some Kafka topics. Manual acknowledgement mode provides at-least-once semantics with messages acknowledged after the output records are delivered to Kafka. Acknowledged offsets are committed periodically based on the configured commit interval.
receiverOptions = receiverOptions
.commitInterval(Duration.ofSeconds(10)) (1)
.subscription(Pattern.compile(topics));
sender.send(KafkaReceiver.create(receiverOptions)
.receive()
.map(m -> SenderRecord.create(transform(m.value()), m.receiverOffset()))) (2)
.doOnNext(m -> m.correlationMetadata().acknowledge()); (3)
1 | Configure interval for automatic commits |
2 | Transform incoming record and create outbound record with transformed data in the payload and inbound offset as correlation metadata |
3 | Acknowledge the inbound offset using the offset instance in correlation metadata after outbound record is delivered to Kafka |
6.6. At-most-once delivery
The code segment below demonstrates a flow with at-most once delivery. Producer does not wait for acks and
does not perform any retries. Messages that cannot be delivered to Kafka on the first attempt
are dropped. KafkaReceiver
commits offsets before delivery to the application to ensure that if the consumer
restarts, messages are not redelivered. With replication factor 1 for topic partitions,
this code can be used for at-most-once delivery.
senderOptions = senderOptions
.producerProperty(ProducerConfig.ACKS_CONFIG, "0") (1)
.producerProperty(ProducerConfig.RETRIES_CONFIG, "0") (2)
.stopOnError(false); (3)
receiverOptions = receiverOptions
.subscription(Collections.singleton(sourceTopic));
KafkaSender.create(senderOptions)
.send(KafkaReceiver.create(receiverOptions)
.receiveAtmostOnce() (4)
.map(cr -> SenderRecord.create(transform(cr.value()), cr.offset())));
1 | Send with acks=0 completes when message is buffered locally, before it is delivered to Kafka broker |
2 | No retries in producer |
3 | Ignore any error and continue to send remaining records |
4 | At-most-once receive |
6.7. Fan-out with Multiple Streams
The code segment below demonstrates fan-out with the same records processed in multiple independent streams. Each stream is processed on a different thread and which transforms the input record and stores the output in a Kafka topic.
Reactor’s EmitterProcessor is used to broadcast the input records from Kafka to multiple subscribers.
EmitterProcessor<Person> processor = EmitterProcessor.create(); (1)
BlockingSink<Person> incoming = processor.connectSink(); (2)
inputRecords = KafkaReceiver.create(receiverOptions)
.receive()
.doOnNext(m -> incoming.emit(m.value())); (3)
outputRecords1 = processor.publishOn(scheduler1).map(p -> process1(p)); (4)
outputRecords2 = processor.publishOn(scheduler2).map(p -> process2(p)); (5)
Flux.merge(sender.send(outputRecords1), sender.send(outputRecords2))
.doOnSubscribe(s -> inputRecords.subscribe())
.subscribe(); (6)
1 | Create publish/subscribe EmitterProcessor for fan-out of Kafka inbound records |
2 | Create BlockingSink to which records are emitted |
3 | Receive from Kafka and emit to BlockingSink |
4 | Consume records on a scheduler, process and generate output records to send to Kafka |
5 | Add another processor for the same input data on a different scheduler |
6 | Merge the streams and subscribe to start the flow |
6.8. Concurrent Processing with Partition-Based Ordering
The code segment below demonstrates a flow where messages are consumed from a Kafka topic, processed by multiple threads and the results stored in another Kafka topic. Messages are grouped by partition to guarantee ordering in message processing and commit operations. Messages from each partition are processed on a single thread.
Scheduler scheduler = Schedulers.newElastic("sample", 60, true);
KafkaReceiver.create(receiverOptions)
.receive()
.groupBy(m -> m.receiverOffset().topicPartition()) (1)
.flatMap(partitionFlux ->
partitionFlux.publishOn(scheduler)
.map(r -> processRecord(partitionFlux.key(), r))
.sample(Duration.ofMillis(5000)) (2)
.concatMap(offset -> offset.commit())); (3)
1 | Group by partition to guarantee ordering |
2 | Commit periodically |
3 | Commit in sequence using concatMap |
6.9. Transactional send
The code segment below consumes messages from an external source, performs some transformation and stores multiple transformed records in different Kafka topics within a transaction.
senderOptions = senderOptions
.producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "SampleTxn"); (1)
KafkaSender.create(senderOptions)
.sendTransactionally(source.map(r -> Flux.fromIterable(transform(r)))) (2)
.concatMap(r -> r)
.doOnError(e-> log.error("Send failed, terminating.", e))
.doOnNext(r -> log.debug("Send completed {}", r.correlationMetadata());
1 | Configure transactional id for producer |
2 | Send multiple records generated from each source record within a transaction |
6.10. Exactly-once delivery
The code segment below demonstrates a flow with exactly once delivery. Source records received from a Kafka topic are transformed and sent to Kafka. Each batch of records is delivered to the application in a new transaction. Offsets of the source records of each batch are automatically committed within its transaction. Each transaction is committed by the application after the transformed records of the batch are successfully delivered to the destination topic. Next batch of records is delivered to the application in a new transaction after the current transaction is committed.
senderOptions = senderOptions
.producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "SampleTxn"); (1)
receiverOptions = receiverOptions
.consumerProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed") (2)
.subscription(Collections.singleton(sourceTopic));
sender = KafkaSender.create(senderOptions);
transactionManager = sender.transactionManager();
receiver.receiveExactlyOnce(transactionManager) (3)
.concatMap(f -> sender.send(f.map(r -> transform(r))) (4)
.concatWith(transactionManager.commit())) (5)
.onErrorResume(e -> transactionManager.abort().then(Mono.error(e))) (6)
1 | Configure transactional id for producer |
2 | Consume only committed messages |
3 | Receive exactly once within transactions, offsets are auto-committed when transaction is committed |
4 | Send transformed records within the same transaction as source record offsets |
5 | Commit transaction after sends complete successfully |
6 | Abort transaction if send fails and propagate error |