public interface KafkaOutbound<K,V> extends Publisher<Void>
KafkaOutbound
is a reactive gateway for outgoing data flows to Kafka. Each KafkaOutbound
represents a sequence of outgoing records that are sent to Kafka using send(Publisher)
.
Send sequences may be chained together into a longer sequence of outgoing producer records.
Like Flux
and Mono
, subscribing to the tail KafkaOutbound
schedules all
parent sends in the declaration order. Outgoing records of each topic partition will be delivered
to Kafka in the declaration order.
The subscriber to KafkaOutbound is notified of completion and failure of its send sequence. If any
record cannot be delivered to Kafka, the outbound publisher fails with an error. Note that some
of the subsequent records already in flight may still be delivered. If SenderOptions.stopOnError()
is false, sends of all records will be attempted before the sequence is failed. No metadata is returned
for individual records on success or failure. KafkaSender.send(Publisher)
may be used
to send records to Kafka when per-record completion status is required.
Example usage:
kafkaSender.createOutbound()
.send(flux1)
.send(flux2)
.send(flux3)
.subscribe();
Modifier and Type | Method and Description |
---|---|
KafkaOutbound<K,V> |
send(Publisher<? extends org.apache.kafka.clients.producer.ProducerRecord<K,V>> records)
Sends a sequence of producer records to Kafka.
|
KafkaOutbound<K,V> |
sendTransactionally(Publisher<? extends Publisher<? extends org.apache.kafka.clients.producer.ProducerRecord<K,V>>> records)
Sends records from each inner flux of
records within a transaction. |
default void |
subscribe(Subscriber<? super Void> subscriber)
Subscribes the specified
Void subscriber to this KafkaOutbound and triggers the send of
pending producer record sequence queued using send(Publisher) to Kafka. |
Mono<Void> |
then()
Returns a
Mono that completes when all the producer records in this outbound
sequence sent using send(Publisher) are delivered to Kafka. |
KafkaOutbound<K,V> |
then(Publisher<Void> other)
Appends a
Publisher task and returns a new KafkaOutbound to schedule further send sequences
to Kafka after pending send sequences are complete. |
KafkaOutbound<K,V> send(Publisher<? extends org.apache.kafka.clients.producer.ProducerRecord<K,V>> records)
ProducerConfig.RETRIES_CONFIG
.
SenderOptions.stopOnError()
can be configured to stop the send sequence on first failure
or to attempt sends of all records even if one or more records could not be delivered.
The underlying Kafka sender may continue to be used until the sender is explicitly closed using
KafkaSender.close()
.
Sends may be chained by sending another record sequence on the returned KafkaOutbound
.
Example usage:
outbound.send(flux1)
.send(flux2)
.send(flux3)
.subscribe();
records
- Outbound producer recordsKafkaOutbound<K,V> sendTransactionally(Publisher<? extends Publisher<? extends org.apache.kafka.clients.producer.ProducerRecord<K,V>>> records)
records
within a transaction.
Example usage:
outbound.sendTransactionally(outboundRecords1.window(10))
.sendTransactionally(outboundRecords2.window(10))
.then();
When consuming and producing records within a single transaction, receiver offsets may be acknowledged as record is processed, so that all acknowledged offsets are committed in the transaction.If any of the publishers generates an error, the current transaction is aborted and the outbound chain is terminated.
Example usage:
outbound.sendTransactionally(receiver.receiveExactlyOnce(sender)
.doOnNext(record -> record.receiverOffset().acknowledge())
.map(record -> toProducerRecord(destTopic, record))
.window(10));
records
- Outbound producer records grouped as transactions. Records from each inner publisher
are sent within a new transaction along with any receiver offsets acknowledged or committed
by that publisher.KafkaOutbound<K,V> then(Publisher<Void> other)
Publisher
task and returns a new KafkaOutbound
to schedule further send sequences
to Kafka after pending send sequences are complete.other
- the Publisher
to subscribe to when this pending outbound then(org.reactivestreams.Publisher<java.lang.Void>)
is completeMono<Void> then()
Mono
that completes when all the producer records in this outbound
sequence sent using send(Publisher)
are delivered to Kafka. The returned
Mono fails with an error if any of the producer records in the sequence cannot be
delivered to Kafka after the configured number of retries.KafkaOutbound
are delivered to Kafkadefault void subscribe(Subscriber<? super Void> subscriber)
Void
subscriber to this KafkaOutbound
and triggers the send of
pending producer record sequence queued using send(Publisher)
to Kafka.subscribe
in interface Publisher<Void>
subscriber
- the Subscriber
to listen for send sequence completion or failure