public interface KafkaOutbound<K,V> extends Publisher<Void>
KafkaOutbound is a reactive gateway for outgoing data flows to Kafka. Each KafkaOutbound
represents a sequence of outgoing records that are sent to Kafka using send(Publisher).
Send sequences may be chained together into a longer sequence of outgoing producer records.
Like Flux and Mono, subscribing to the tail KafkaOutbound schedules all
parent sends in the declaration order. Outgoing records of each topic partition will be delivered
to Kafka in the declaration order.
The subscriber to KafkaOutbound is notified of completion and failure of its send sequence. If any
record cannot be delivered to Kafka, the outbound publisher fails with an error. Note that some
of the subsequent records already in flight may still be delivered. If SenderOptions.stopOnError()
is false, sends of all records will be attempted before the sequence is failed. No metadata is returned
for individual records on success or failure. KafkaSender.send(Publisher) may be used
to send records to Kafka when per-record completion status is required.
Example usage:
kafkaSender.createOutbound()
.send(flux1)
.send(flux2)
.send(flux3)
.subscribe();
| Modifier and Type | Method and Description |
|---|---|
KafkaOutbound<K,V> |
send(Publisher<? extends org.apache.kafka.clients.producer.ProducerRecord<K,V>> records)
Sends a sequence of producer records to Kafka.
|
KafkaOutbound<K,V> |
sendTransactionally(Publisher<? extends Publisher<? extends org.apache.kafka.clients.producer.ProducerRecord<K,V>>> records)
Sends records from each inner flux of
records within a transaction. |
default void |
subscribe(Subscriber<? super Void> subscriber)
Subscribes the specified
Void subscriber to this KafkaOutbound and triggers the send of
pending producer record sequence queued using send(Publisher) to Kafka. |
Mono<Void> |
then()
Returns a
Mono that completes when all the producer records in this outbound
sequence sent using send(Publisher) are delivered to Kafka. |
KafkaOutbound<K,V> |
then(Publisher<Void> other)
Appends a
Publisher task and returns a new KafkaOutbound to schedule further send sequences
to Kafka after pending send sequences are complete. |
KafkaOutbound<K,V> send(Publisher<? extends org.apache.kafka.clients.producer.ProducerRecord<K,V>> records)
ProducerConfig.RETRIES_CONFIG.
SenderOptions.stopOnError() can be configured to stop the send sequence on first failure
or to attempt sends of all records even if one or more records could not be delivered.
The underlying Kafka sender may continue to be used until the sender is explicitly closed using
KafkaSender.close().
Sends may be chained by sending another record sequence on the returned KafkaOutbound.
Example usage:
outbound.send(flux1)
.send(flux2)
.send(flux3)
.subscribe();
records - Outbound producer recordsKafkaOutbound<K,V> sendTransactionally(Publisher<? extends Publisher<? extends org.apache.kafka.clients.producer.ProducerRecord<K,V>>> records)
records within a transaction.
Example usage:
outbound.sendTransactionally(outboundRecords1.window(10))
.sendTransactionally(outboundRecords2.window(10))
.then();
When consuming and producing records within a single transaction, receiver offsets may be acknowledged as record is processed, so that all acknowledged offsets are committed in the transaction.If any of the publishers generates an error, the current transaction is aborted and the outbound chain is terminated.
Example usage:
outbound.sendTransactionally(receiver.receiveExactlyOnce(sender)
.doOnNext(record -> record.receiverOffset().acknowledge())
.map(record -> toProducerRecord(destTopic, record))
.window(10));
records - Outbound producer records grouped as transactions. Records from each inner publisher
are sent within a new transaction along with any receiver offsets acknowledged or committed
by that publisher.KafkaOutbound<K,V> then(Publisher<Void> other)
Publisher task and returns a new KafkaOutbound to schedule further send sequences
to Kafka after pending send sequences are complete.other - the Publisher to subscribe to when this pending outbound then(org.reactivestreams.Publisher<java.lang.Void>) is completeMono<Void> then()
Mono that completes when all the producer records in this outbound
sequence sent using send(Publisher) are delivered to Kafka. The returned
Mono fails with an error if any of the producer records in the sequence cannot be
delivered to Kafka after the configured number of retries.KafkaOutbound are delivered to Kafkadefault void subscribe(Subscriber<? super Void> subscriber)
Void subscriber to this KafkaOutbound and triggers the send of
pending producer record sequence queued using send(Publisher) to Kafka.subscribe in interface Publisher<Void>subscriber - the Subscriber to listen for send sequence completion or failure