KafkaOutboundis a reactive gateway for outgoing data flows to Kafka. Each KafkaOutbound represents a sequence of outgoing records that are sent to Kafka using
send(Publisher). Send sequences may be chained together into a longer sequence of outgoing producer records. Like
Mono, subscribing to the tail
KafkaOutboundschedules all parent sends in the declaration order. Outgoing records of each topic partition will be delivered to Kafka in the declaration order.
The subscriber to KafkaOutbound is notified of completion and failure of its send sequence. If any
record cannot be delivered to Kafka, the outbound publisher fails with an error. Note that some
of the subsequent records already in flight may still be delivered. If
is false, sends of all records will be attempted before the sequence is failed. No metadata is returned
for individual records on success or failure.
KafkaSender.send(Publisher) may be used
to send records to Kafka when per-record completion status is required.
kafkaSender.createOutbound() .send(flux1) .send(flux2) .send(flux3) .subscribe();
|Modifier and Type||Method and Description|
Sends a sequence of producer records to Kafka.
Sends records from each inner flux of
KafkaOutbound<K,V> send(Publisher<? extends ProducerRecord<K,V>> records)
SenderOptions.stopOnError()can be configured to stop the send sequence on first failure or to attempt sends of all records even if one or more records could not be delivered. The underlying Kafka sender may continue to be used until the sender is explicitly closed using
Sends may be chained by sending another record sequence on the returned
outbound.send(flux1) .send(flux2) .send(flux3) .subscribe();
records- Outbound producer records
KafkaOutbound<K,V> sendTransactionally(Publisher<? extends Publisher<? extends ProducerRecord<K,V>>> records)
recordswithin a transaction.
outbound.sendTransactionally(outboundRecords1.window(10)) .sendTransactionally(outboundRecords2.window(10)) .then();
When consuming and producing records within a single transaction, receiver offsets may be acknowledged as record is processed, so that all acknowledged offsets are committed in the transaction.If any of the publishers generates an error, the current transaction is aborted and the outbound chain is terminated.Example usage:
outbound.sendTransactionally(receiver.receiveExactlyOnce(sender) .doOnNext(record -> record.receiverOffset().acknowledge()) .map(record -> toProducerRecord(destTopic, record)) .window(10));
records- Outbound producer records grouped as transactions. Records from each inner publisher are sent within a new transaction along with any receiver offsets acknowledged or committed by that publisher.
KafkaOutbound<K,V> then(Publisher<Void> other)
Publishertask and returns a new
KafkaOutboundto schedule further send sequences to Kafka after pending send sequences are complete.
Publisherto subscribe to when this pending outbound
Monothat completes when all the producer records in this outbound sequence sent using
send(Publisher)are delivered to Kafka. The returned Mono fails with an error if any of the producer records in the sequence cannot be delivered to Kafka after the configured number of retries.
KafkaOutboundare delivered to Kafka
default void subscribe(Subscriber<? super Void> subscriber)
Voidsubscriber to this
KafkaOutboundand triggers the send of pending producer record sequence queued using