K
- outgoing record key typeV
- outgoing record value typepublic interface KafkaSender<K,V>
KafkaSender
is shared for each record
type in a client application.Modifier and Type | Method and Description |
---|---|
void |
close()
Closes this sender and the underlying Kafka producer and releases all resources allocated to it.
|
static <K,V> KafkaSender<K,V> |
create(ProducerFactory factory,
SenderOptions<K,V> options)
Creates a Kafka sender that appends records to Kafka topic partitions.
|
static <K,V> KafkaSender<K,V> |
create(SenderOptions<K,V> options)
Creates a Kafka sender that appends records to Kafka topic partitions.
|
KafkaOutbound<K,V> |
createOutbound()
Creates a reactive gateway for outgoing Kafka records.
|
<T> Mono<T> |
doOnProducer(Function<org.apache.kafka.clients.producer.Producer<K,V>,? extends T> function)
Invokes the specified function on the Kafka
Producer associated with this KafkaSender . |
<T> Flux<SenderResult<T>> |
send(Publisher<? extends SenderRecord<K,V,T>> records)
Sends a sequence of records to Kafka and returns a
Flux of response record metadata including
partition and offset of each record. |
<T> Flux<Flux<SenderResult<T>>> |
sendTransactionally(Publisher<? extends Publisher<? extends SenderRecord<K,V,T>>> records)
Sends records from each inner publisher of
records within a transaction. |
TransactionManager |
transactionManager()
Returns the
TransactionManager instance associated with this sender,
which may be used for fine-grained control over transaction states. |
static <K,V> KafkaSender<K,V> create(SenderOptions<K,V> options)
options
- Configuration options of this sender. Changes made to the options
after the sender is created will not be used by the sender.static <K,V> KafkaSender<K,V> create(ProducerFactory factory, SenderOptions<K,V> options)
factory
- A custom producer factory other than the default.options
- Configuration options of this sender. Changes made to the options
after the sender is created will not be used by the sender.<T> Flux<SenderResult<T>> send(Publisher<? extends SenderRecord<K,V,T>> records)
Flux
of response record metadata including
partition and offset of each record. Responses are ordered for each partition in the absence of retries,
but responses from different partitions may be interleaved in a different order from the requests.
Additional correlation metadata may be passed through in the SenderRecord
that is not sent
to Kafka, but is included in the response Flux
to match responses to requests.
Results are published when the send is acknowledged based on the acknowledgement mode
configured using the option ProducerConfig.ACKS_CONFIG
. If acks=0, records are acknowledged
after the requests are buffered without waiting for any server acknowledgements. In this case the
requests are not retried and the offset returned in SenderResult
will be -1. For other ack
modes, requests are retried up to the configured ProducerConfig.RETRIES_CONFIG
times. If
the request does not succeed after these attempts, the request fails and an exception indicating
the reason for failure is returned in SenderResult.exception()
.
SenderOptions.stopOnError(boolean)
can be configured to stop the send sequence on first failure
or to attempt all sends even if one or more records could not be delivered.
Example usage:
source = Flux.range(1, count)
.map(i -> SenderRecord.create(topic, partition, null, key(i), message(i), i));
sender.send(source, true)
.doOnNext(r -> System.out.println("Message #" + r.correlationMetadata() + " metadata=" + r.recordMetadata()));
records
- Outbound records along with additional correlation metadata to be included in response<T> Flux<Flux<SenderResult<T>>> sendTransactionally(Publisher<? extends Publisher<? extends SenderRecord<K,V,T>>> records)
records
within a transaction.
Each transaction is committed if all the records are successfully delivered to Kafka
and aborted if any of the records in that batch could not be delivered.
Example usage:
sender.sendTransactionally(outboundRecords.window(10));
records
- Outbound producer records along with correlation metadata to match results returned.
Records from each inner publisher are sent within a new transaction.IllegalStateException
- if the sender was created without setting a non-empty
"transactional.id" in SenderOptions
TransactionManager transactionManager()
TransactionManager
instance associated with this sender,
which may be used for fine-grained control over transaction states. Sender
must have been created with a non-empty transactional id by setting
"transactional.id" in SenderOptions
.
Threading model for transactional sender:
Sends may be scheduled from multiple threads with a transactional sender similar to non-transactional senders. But transaction control operations and offset commits onTransactionManager
must be serialized and no sends may be performed
while one of the transaction control operations is in progress.TransactionManager
associated with this senderIllegalStateException
- if the sender was created without setting a non-empty
"transactional.id" in SenderOptions
KafkaOutbound<K,V> createOutbound()
KafkaOutbound.send(Publisher)
or sendTransactionally(Publisher)
.
Like Flux
and Mono
, subscribing to the tail KafkaOutbound
will
schedule all parent sends in the declaration order.
Example usage:
kafkaSender.createOutbound()
.send(flux1)
.send(flux2)
.send(flux3)
.then()
.subscribe();
<T> Mono<T> doOnProducer(Function<org.apache.kafka.clients.producer.Producer<K,V>,? extends T> function)
Producer
associated with this KafkaSender
.
The function is invoked when the returned Mono
is subscribed to.
Example usage:
sender.doOnProducer(producer -> producer.partitionsFor(topic))
.doOnSuccess(partitions -> System.out.println("Partitions " + partitions));
Functions that are directly supported on the reactive KafkaSender
interface (eg. send)
should not be invoked from function
. The methods supported by
doOnProducer
are:
Producer.sendOffsetsToTransaction(Map, String)
Producer.partitionsFor(String)
Producer.metrics()
Producer.flush()
function
- A function that takes Kafka Producer as parameterfunction
void close()