K
- incoming record key typeV
- incoming record value typepublic interface KafkaReceiver<K,V>
Modifier and Type | Method and Description |
---|---|
static <K,V> KafkaReceiver<K,V> |
create(ConsumerFactory factory,
ReceiverOptions<K,V> options)
Creates a reactive Kafka receiver with the specified configuration options.
|
static <K,V> KafkaReceiver<K,V> |
create(ReceiverOptions<K,V> options)
Creates a reactive Kafka receiver with the specified configuration options.
|
<T> Mono<T> |
doOnConsumer(Function<org.apache.kafka.clients.consumer.Consumer<K,V>,? extends T> function)
Invokes the specified function on the Kafka
Consumer associated with this KafkaReceiver . |
default Flux<ReceiverRecord<K,V>> |
receive()
Starts a Kafka consumer that consumes records from the subscriptions or partition
assignments configured for this receiver.
|
Flux<ReceiverRecord<K,V>> |
receive(Integer prefetch)
Starts a Kafka consumer that consumes records from the subscriptions or partition
assignments configured for this receiver.
|
default Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> |
receiveAtmostOnce()
Returns a
Flux of consumer records that are committed before the record is dispatched
to provide atmost-once delivery semantics. |
Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> |
receiveAtmostOnce(Integer prefetch)
Returns a
Flux of consumer records that are committed before the record is dispatched
to provide atmost-once delivery semantics. |
default Flux<Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> |
receiveAutoAck()
Returns a
Flux containing each batch of consumer records returned by Consumer.poll(long) . |
Flux<Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> |
receiveAutoAck(Integer prefetch)
Returns a
Flux containing each batch of consumer records returned by Consumer.poll(long) . |
default Flux<Flux<ReceiverRecord<K,V>>> |
receiveBatch()
Returns a
Flux containing each batch of consumer records returned by Consumer.poll(long) . |
Flux<Flux<ReceiverRecord<K,V>>> |
receiveBatch(Integer prefetch)
Returns a
Flux containing each batch of consumer records returned by Consumer.poll(long) . |
default Flux<Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> |
receiveExactlyOnce(TransactionManager transactionManager)
Returns a
Flux of consumer record batches that may be used for exactly once
delivery semantics. |
Flux<Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> |
receiveExactlyOnce(TransactionManager transactionManager,
Integer prefetch)
Returns a
Flux of consumer record batches that may be used for exactly once
delivery semantics. |
static <K,V> KafkaReceiver<K,V> create(ReceiverOptions<K,V> options)
options
- Configuration options of this receiver. Changes made to the options
after the receiver is created will not be used by the receiver.
A subscription using group management or a manual assignment of topic partitions
must be set on the options instance prior to creating this receiver.static <K,V> KafkaReceiver<K,V> create(ConsumerFactory factory, ReceiverOptions<K,V> options)
factory
- A custom consumer factory other than the default.options
- Configuration options of this receiver. Changes made to the options
after the receiver is created will not be used by the receiver.
A subscription using group management or a manual assignment of topic partitions
must be set on the options instance prior to creating this receiver.Flux<ReceiverRecord<K,V>> receive(Integer prefetch)
Every record must be acknowledged using ReceiverOffset.acknowledge()
in order
to commit the offset corresponding to the record. Acknowledged records are committed
based on the configured commit interval and commit batch size in ReceiverOptions
.
Records may also be committed manually using ReceiverOffset.commit()
.
prefetch
- amount of prefetched batchesdefault Flux<ReceiverRecord<K,V>> receive()
Every record must be acknowledged using ReceiverOffset.acknowledge()
in order
to commit the offset corresponding to the record. Acknowledged records are committed
based on the configured commit interval and commit batch size in ReceiverOptions
.
Records may also be committed manually using ReceiverOffset.commit()
.
Flux<Flux<ReceiverRecord<K,V>>> receiveBatch(Integer prefetch)
Flux
containing each batch of consumer records returned by Consumer.poll(long)
.
The maximum number of records returned in each batch can be configured on ReceiverOptions
by setting
the consumer property ConsumerConfig.MAX_POLL_RECORDS_CONFIG
. Each batch is returned as one Flux.
Every record must be acknowledged using ReceiverOffset.acknowledge() in order to commit the offset
corresponding to the record. Acknowledged records are committed based on the configured commit interval
and commit batch size in ReceiverOptions. Records may also be committed manually using ReceiverOffset.commit().prefetch
- amount of prefetched batchesdefault Flux<Flux<ReceiverRecord<K,V>>> receiveBatch()
Flux
containing each batch of consumer records returned by Consumer.poll(long)
.
The maximum number of records returned in each batch can be configured on ReceiverOptions
by setting
the consumer property ConsumerConfig.MAX_POLL_RECORDS_CONFIG
. Each batch is returned as one Flux.
Every record must be acknowledged using ReceiverOffset.acknowledge() in order to commit the offset
corresponding to the record. Acknowledged records are committed based on the configured commit interval
and commit batch size in ReceiverOptions. Records may also be committed manually using ReceiverOffset.commit().Flux<Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> receiveAutoAck(Integer prefetch)
Flux
containing each batch of consumer records returned by Consumer.poll(long)
.
The maximum number of records returned in each batch can be configured on ReceiverOptions
by setting
the consumer property ConsumerConfig.MAX_POLL_RECORDS_CONFIG
. Each batch is returned as one Flux.
All the records in a batch are acknowledged automatically when its Flux terminates. Acknowledged records
are committed periodically based on the configured commit interval and commit batch size of
this receiver's ReceiverOptions
.prefetch
- amount of prefetched batchesdefault Flux<Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> receiveAutoAck()
Flux
containing each batch of consumer records returned by Consumer.poll(long)
.
The maximum number of records returned in each batch can be configured on ReceiverOptions
by setting
the consumer property ConsumerConfig.MAX_POLL_RECORDS_CONFIG
. Each batch is returned as one Flux.
All the records in a batch are acknowledged automatically when its Flux terminates. Acknowledged records
are committed periodically based on the configured commit interval and commit batch size of
this receiver's ReceiverOptions
.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> receiveAtmostOnce(Integer prefetch)
Flux
of consumer records that are committed before the record is dispatched
to provide atmost-once delivery semantics. The offset of each record dispatched on the
returned Flux is committed synchronously to ensure that the record is not re-delivered
if the application fails.
This mode is expensive since each method is committed individually and records are
not delivered until the commit operation succeeds. The cost of commits may be reduced by
configuring ReceiverOptions.atmostOnceCommitAheadSize()
. The maximum number of records that
may be lost on each partition if the consuming application crashes is commitAheadSize + 1
.
prefetch
- amount of prefetched batchesdefault Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> receiveAtmostOnce()
Flux
of consumer records that are committed before the record is dispatched
to provide atmost-once delivery semantics. The offset of each record dispatched on the
returned Flux is committed synchronously to ensure that the record is not re-delivered
if the application fails.
This mode is expensive since each method is committed individually and records are
not delivered until the commit operation succeeds. The cost of commits may be reduced by
configuring ReceiverOptions.atmostOnceCommitAheadSize()
. The maximum number of records that
may be lost on each partition if the consuming application crashes is commitAheadSize + 1
.
default Flux<Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> receiveExactlyOnce(TransactionManager transactionManager)
Flux
of consumer record batches that may be used for exactly once
delivery semantics. A new transaction is started for each inner Flux and it is the
responsibility of the consuming application to commit or abort the transaction
using TransactionManager.commit()
or TransactionManager.abort()
after processing the Flux. The next batch of consumer records will be delivered only
after the previous flux terminates. Offsets of records dispatched on each inner Flux
are committed using the provided transactionManager
within the transaction
started for that Flux.
See @link KafkaSender.transactionManager()
for details on configuring a transactional
sender and the threading model required for transactional/exactly-once semantics.
receiver.receiveExactlyOnce(transactionManager)
.concatMap(f -> sender.send(f.map(r -> toSenderRecord(r)).then(transactionManager.commit()))
.onErrorResume(e -> transactionManager.abort().then(Mono.error(e)));
transactionManager
- Transaction manager used to begin new transaction for each
inner Flux and commit offsets within that transactionFlux<Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> receiveExactlyOnce(TransactionManager transactionManager, Integer prefetch)
Flux
of consumer record batches that may be used for exactly once
delivery semantics. A new transaction is started for each inner Flux and it is the
responsibility of the consuming application to commit or abort the transaction
using TransactionManager.commit()
or TransactionManager.abort()
after processing the Flux. The next batch of consumer records will be delivered only
after the previous flux terminates. Offsets of records dispatched on each inner Flux
are committed using the provided transactionManager
within the transaction
started for that Flux.
See @link KafkaSender.transactionManager()
for details on configuring a transactional
sender and the threading model required for transactional/exactly-once semantics.
receiver.receiveExactlyOnce(transactionManager)
.concatMap(f -> sender.send(f.map(r -> toSenderRecord(r)).then(transactionManager.commit()))
.onErrorResume(e -> transactionManager.abort().then(Mono.error(e)));
transactionManager
- Transaction manager used to begin new transaction for each
inner Flux and commit offsets within that transactionprefetch
- amount of prefetched batches<T> Mono<T> doOnConsumer(Function<org.apache.kafka.clients.consumer.Consumer<K,V>,? extends T> function)
Consumer
associated with this KafkaReceiver
.
The function is scheduled when the returned Mono
is subscribed to. The function is
executed on the thread used for other consumer operations to ensure that Consumer
is never accessed concurrently from multiple threads.
Example usage:
receiver.doOnConsumer(consumer -> consumer.partitionsFor(topic))
.doOnSuccess(partitions -> System.out.println("Partitions " + partitions));
Functions that are directly supported through the reactive KafkaReceiver
interface
like poll
and commit
should not be invoked from function
.
The methods supported by doOnConsumer
are:
Consumer.assignment()
Consumer.subscription()
Consumer.seek(org.apache.kafka.common.TopicPartition, long)
Consumer.seekToBeginning(java.util.Collection)
Consumer.seekToEnd(java.util.Collection)
Consumer.position(org.apache.kafka.common.TopicPartition)
Consumer.committed(org.apache.kafka.common.TopicPartition)
Consumer.metrics()
Consumer.partitionsFor(String)
Consumer.listTopics()
Consumer.paused()
Consumer.pause(java.util.Collection)
Consumer.resume(java.util.Collection)
Consumer.offsetsForTimes(java.util.Map)
Consumer.beginningOffsets(java.util.Collection)
Consumer.endOffsets(java.util.Collection)
function
- A function that takes Kafka Consumer
as parameterfunction