public interface ReceiverOptions<K,V>
Modifier and Type | Interface and Description |
---|---|
static interface |
ReceiverOptions.ConsumerListener
Called whenever a consumer is added or removed.
|
Modifier and Type | Method and Description |
---|---|
ReceiverOptions<K,V> |
addAssignListener(Consumer<Collection<ReceiverPartition>> onAssign)
Adds a listener for partition assignments.
|
ReceiverOptions<K,V> |
addRevokeListener(Consumer<Collection<ReceiverPartition>> onRevoke)
Adds a listener for partition revocations.
|
List<Consumer<Collection<ReceiverPartition>>> |
assignListeners()
Returns list of configured partition assignment listeners.
|
Collection<org.apache.kafka.common.TopicPartition> |
assignment()
Returns the collection of partitions to be assigned if this instance is
configured for manual partition assignment.
|
ReceiverOptions<K,V> |
assignment(Collection<org.apache.kafka.common.TopicPartition> partitions)
Sets subscription using manual assignment to the specified partitions.
|
int |
atmostOnceCommitAheadSize()
Returns the maximum difference between the offset committed for at-most-once
delivery and the offset of the last record dispatched.
|
ReceiverOptions<K,V> |
atmostOnceCommitAheadSize(int commitAheadSize)
Configures commit ahead size per partition for at-most-once delivery.
|
default String |
bootstrapServers()
Return the bootstrap servers from the provided
ConsumerConfig . |
ReceiverOptions<K,V> |
clearAssignListeners()
Removes all partition assignment listeners.
|
ReceiverOptions<K,V> |
clearRevokeListeners()
Removes all partition revocation listeners.
|
default String |
clientId()
Return the client id provided by the
ConsumerConfig . |
Duration |
closeTimeout()
Returns timeout for graceful shutdown of
KafkaConsumer . |
ReceiverOptions<K,V> |
closeTimeout(Duration timeout)
Sets timeout for graceful shutdown of
KafkaConsumer . |
int |
commitBatchSize()
Returns the configured commit batch size for automatic commits of acknowledged records.
|
ReceiverOptions<K,V> |
commitBatchSize(int commitBatchSize)
Configures commit batch size for automatic commits.
|
Duration |
commitInterval()
Returns the configured commit interval for automatic commits of acknowledged records.
|
ReceiverOptions<K,V> |
commitInterval(Duration commitInterval)
Configures commit interval for automatic commits.
|
default long |
commitIntervalDuringDelay()
Get how often to commit offsets, in milliseconds, while a rebalance is being
delayed.
|
default ReceiverOptions<K,V> |
commitIntervalDuringDelay(long interval)
Set how often to commit offsets, in milliseconds, while a rebalance is being
delayed.
|
Duration |
commitRetryInterval()
Returns the configured retry commit interval for commits that fail with non-fatal
RetriableCommitFailedException s. |
ReceiverOptions<K,V> |
commitRetryInterval(Duration commitRetryInterval)
Configures the retry commit interval for commits that fail with non-fatal
RetriableCommitFailedException . |
default ReceiverOptions.ConsumerListener |
consumerListener()
Returns the function that will be applied after a consumer is created but before it
is subscribed.
|
default ReceiverOptions<K,V> |
consumerListener(ReceiverOptions.ConsumerListener listener)
Set a function that will be applied after a consumer is created but before it
is subscribed.
|
Map<String,Object> |
consumerProperties()
Returns the configuration properties of the underlying
KafkaConsumer . |
Object |
consumerProperty(String name)
Returns the
KafkaConsumer configuration property value for the specified option name. |
ReceiverOptions<K,V> |
consumerProperty(String name,
Object newValue)
Sets
KafkaConsumer configuration property to the specified value. |
static <K,V> ReceiverOptions<K,V> |
create()
Creates an options instance with default properties.
|
static <K,V> ReceiverOptions<K,V> |
create(Map<String,Object> configProperties)
Creates an options instance with the specified config overrides for
KafkaConsumer . |
static <K,V> ReceiverOptions<K,V> |
create(Properties configProperties)
Creates an options instance with the specified config overrides for
KafkaConsumer . |
String |
groupId()
Returns the configured Kafka consumer group id.
|
Duration |
heartbeatInterval()
Returns the configured heartbeat interval for Kafka consumer.
|
org.apache.kafka.common.serialization.Deserializer<K> |
keyDeserializer()
Returns optionally a deserializer witch is used by
KafkaConsumer for key deserialization. |
int |
maxCommitAttempts()
Returns the maximum number of consecutive non-fatal commit failures that are tolerated.
|
ReceiverOptions<K,V> |
maxCommitAttempts(int maxAttempts)
Configures the maximum number of consecutive non-fatal
RetriableCommitFailedException
commit failures that are tolerated. |
default int |
maxDeferredCommits()
When greater than 0, enables out of order commit sequencing.
|
default ReceiverOptions<K,V> |
maxDeferredCommits(int maxDeferred)
Set to greater than 0 to enable out of order commit sequencing.
|
default Duration |
maxDelayRebalance()
Get the maximum amount of time to delay a rebalance until existing records in the
pipeline have been processed.
|
default ReceiverOptions<K,V> |
maxDelayRebalance(Duration maxDelay)
Set the maximum amount of time to delay a rebalance until existing records in the
pipeline have been processed.
|
KafkaReceiverObservationConvention |
observationConvention()
Return a
KafkaReceiverObservationConvention to support a publishing operation observation. |
io.micrometer.observation.ObservationRegistry |
observationRegistry()
Return an
ObservationRegistry to observe Kafka record consuming operation. |
default boolean |
pauseAllAfterRebalance()
When true, pause all partitions on assignment after rebalance,
if any partitions were paused by User before the rebalance.
|
default ReceiverOptions<K,V> |
pauseAllAfterRebalance(boolean pauseAll)
When true, pause all partitions on assignment after rebalance,
if any partitions were paused by User before the rebalance.
|
Duration |
pollTimeout()
Returns the timeout for each
KafkaConsumer.poll(long) operation. |
ReceiverOptions<K,V> |
pollTimeout(Duration timeout)
Sets the timeout for each
KafkaConsumer.poll(long) operation. |
List<Consumer<Collection<ReceiverPartition>>> |
revokeListeners()
Returns list of configured partition revocation listeners.
|
Supplier<Scheduler> |
schedulerSupplier()
Returns the Supplier for a Scheduler that Records will be published on
|
ReceiverOptions<K,V> |
schedulerSupplier(Supplier<Scheduler> schedulerSupplier)
Configures the Supplier for a Scheduler on which Records will be published
|
default Consumer<org.apache.kafka.clients.consumer.Consumer<K,V>> |
subscriber(org.apache.kafka.clients.consumer.ConsumerRebalanceListener listener)
Returns the
KafkaConsumer.subscribe(Collection, ConsumerRebalanceListener) ,
KafkaConsumer.subscribe(Pattern, ConsumerRebalanceListener) or KafkaConsumer.assign(Collection)
operation corresponding to the subscription or assignment options configured for this instance. |
ReceiverOptions<K,V> |
subscription(Collection<String> topics)
Sets subscription using group management to the specified collection of topics.
|
ReceiverOptions<K,V> |
subscription(Pattern pattern)
Sets subscription using group management to the specified pattern.
|
Pattern |
subscriptionPattern()
Returns the Pattern by which the topic should be selected
|
Collection<String> |
subscriptionTopics()
Returns the collection of Topics to be subscribed
|
org.apache.kafka.common.serialization.Deserializer<V> |
valueDeserializer()
Returns optionally a deserializer witch is used by
KafkaConsumer for value deserialization. |
ReceiverOptions<K,V> |
withKeyDeserializer(org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer)
Set a concrete deserializer instant to be used by the
KafkaConsumer for keys. |
default ReceiverOptions<K,V> |
withObservation(io.micrometer.observation.ObservationRegistry observationRegistry)
Configure an
ObservationRegistry to observe Kafka record consuming operation. |
ReceiverOptions<K,V> |
withObservation(io.micrometer.observation.ObservationRegistry observationRegistry,
KafkaReceiverObservationConvention observationConvention)
Configure an
ObservationRegistry to observe Kafka record receiving operation. |
ReceiverOptions<K,V> |
withValueDeserializer(org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
Set a concrete deserializer instant to be used by the
KafkaConsumer for values. |
@NonNull static <K,V> ReceiverOptions<K,V> create()
@NonNull static <K,V> ReceiverOptions<K,V> create(@NonNull Map<String,Object> configProperties)
KafkaConsumer
.@NonNull static <K,V> ReceiverOptions<K,V> create(@NonNull Properties configProperties)
KafkaConsumer
.@NonNull ReceiverOptions<K,V> consumerProperty(@NonNull String name, @NonNull Object newValue)
KafkaConsumer
configuration property to the specified value.@NonNull ReceiverOptions<K,V> withKeyDeserializer(@NonNull org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer)
KafkaConsumer
for keys. Overrides any setting of the
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
property.keyDeserializer
- key deserializer to use in the consumer@NonNull ReceiverOptions<K,V> withValueDeserializer(@NonNull org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
KafkaConsumer
for values. Overrides any setting of the
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
property.valueDeserializer
- value deserializer to use in the consumer@NonNull ReceiverOptions<K,V> pollTimeout(@NonNull Duration timeout)
KafkaConsumer.poll(long)
operation. Since
the underlying Kafka consumer is not thread-safe, long poll intervals may delay
commits and other operations invoked using KafkaReceiver.doOnConsumer(java.util.function.Function)
.
Very short timeouts may reduce batching and increase load on the broker,@NonNull ReceiverOptions<K,V> closeTimeout(@NonNull Duration timeout)
KafkaConsumer
.@NonNull ReceiverOptions<K,V> addAssignListener(@NonNull Consumer<Collection<ReceiverPartition>> onAssign)
ReceiverPartition
. When group management is used, assign listeners are invoked
after every rebalance operation. With manual partition assignment using assignment()
,
assign listeners are invoked once when the receive Flux is subscribed to.@NonNull ReceiverOptions<K,V> addRevokeListener(@NonNull Consumer<Collection<ReceiverPartition>> onRevoke)
assignment()
,
revoke listeners are invoked once when the receive Flux is terminated.@NonNull ReceiverOptions<K,V> clearAssignListeners()
@NonNull ReceiverOptions<K,V> clearRevokeListeners()
@NonNull ReceiverOptions<K,V> assignment(Collection<org.apache.kafka.common.TopicPartition> partitions)
KafkaReceiver
using this
options instance is subscribed to. Any existing subscriptions or assignments on this
option are deleted.@NonNull ReceiverOptions<K,V> subscription(Collection<String> topics)
KafkaReceiver
using this
options instance is subscribed to. Any existing subscriptions or assignments on this
option are deleted.@NonNull ReceiverOptions<K,V> subscription(Pattern pattern)
KafkaReceiver
using this
options instance is subscribed to. Any existing subscriptions or assignments on this
option are deleted. Topics are dynamically assigned or removed when topics
matching the pattern are created or deleted.@NonNull ReceiverOptions<K,V> commitInterval(Duration commitInterval)
If commitInterval
is zero, periodic commits based on time intervals
are disabled. If commit batch size is configured, offsets are committed when the number
of acknowledged offsets reaches the batch size. If commit batch size is also zero, it
is the responsibility of the application to explicitly commit records using
ReceiverOffset.commit()
if required.
If commit interval and commit batch size are configured, a commit operation is scheduled when either the interval or batch size is reached.
@NonNull ReceiverOptions<K,V> commitBatchSize(int commitBatchSize)
If commitBatchSize
is 0, commits are only performed based on commit
interval. If commit interval is null, no automatic commits are performed and it is the
responsibility of the application to commit offsets explicitly using ReceiverOffset.commit()
if required.
If commit batch size and commit interval are configured, a commit operation is scheduled when either the batch size or interval is reached.
@NonNull ReceiverOptions<K,V> atmostOnceCommitAheadSize(int commitAheadSize)
commitAheadSize + 1
.
A high commit ahead size reduces the cost of commits in at-most-once delivery by
reducing the number of commits and avoiding blocking before dispatch if the offset
corresponding to the record was already committed.
If commitAheadSize
is zero (default), offsets are committed synchronously before
each record is dispatched for KafkaReceiver.receiveAtmostOnce()
. Otherwise, commits are
performed ahead of dispatch and record dispatch is blocked only if commits haven't completed.
@NonNull ReceiverOptions<K,V> maxCommitAttempts(int maxAttempts)
RetriableCommitFailedException
commit failures that are tolerated. For manual commits, failure in commit after the configured
number of attempts fails the commit operation. For auto commits, the receive Flux is terminated
if the commit does not succeed after these attempts.commitRetryInterval()
@NonNull ReceiverOptions<K,V> commitRetryInterval(Duration commitRetryInterval)
RetriableCommitFailedException
.default ReceiverOptions<K,V> maxDeferredCommits(int maxDeferred)
default ReceiverOptions<K,V> maxDelayRebalance(Duration maxDelay)
max.poll.interval.ms
.maxDelay
- the max delay.commitIntervalDuringDelay(long)
default ReceiverOptions<K,V> pauseAllAfterRebalance(boolean pauseAll)
pauseAll
- default ReceiverOptions<K,V> commitIntervalDuringDelay(long interval)
interval
- the interval.maxDelayRebalance(Duration)
@NonNull ReceiverOptions<K,V> schedulerSupplier(Supplier<Scheduler> schedulerSupplier)
default ReceiverOptions<K,V> consumerListener(@Nullable ReceiverOptions.ConsumerListener listener)
@NonNull default ReceiverOptions<K,V> withObservation(@NonNull io.micrometer.observation.ObservationRegistry observationRegistry)
ObservationRegistry
to observe Kafka record consuming operation.observationRegistry
- ObservationRegistry
to use.@NonNull ReceiverOptions<K,V> withObservation(@NonNull io.micrometer.observation.ObservationRegistry observationRegistry, @Nullable KafkaReceiverObservationConvention observationConvention)
ObservationRegistry
to observe Kafka record receiving operation.
This functionality makes sense only in simple use-cases where it needs to be closed gaps
in tracing on this consumer side: an observation is opened and closed immediately to
attach respective consumer span to the trace.
For more complex (e.g. tracing continuation) the KafkaReceiverObservation
API
should be used: the tracing and parent span information is extracted from the consumer record.observationRegistry
- ObservationRegistry
to use.observationConvention
- the KafkaReceiverObservationConvention
to use.@NonNull Map<String,Object> consumerProperties()
KafkaConsumer
.@Nullable Object consumerProperty(@NonNull String name)
KafkaConsumer
configuration property value for the specified option name.@Nullable org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer()
KafkaConsumer
for key deserialization.@Nullable org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer()
KafkaConsumer
for value deserialization.@NonNull Duration pollTimeout()
KafkaConsumer.poll(long)
operation.@NonNull Duration closeTimeout()
KafkaConsumer
.@NonNull List<Consumer<Collection<ReceiverPartition>>> assignListeners()
@NonNull List<Consumer<Collection<ReceiverPartition>>> revokeListeners()
@Nullable Collection<org.apache.kafka.common.TopicPartition> assignment()
@Nullable Collection<String> subscriptionTopics()
@Nullable Pattern subscriptionPattern()
@Nullable String groupId()
@NonNull Duration heartbeatInterval()
@NonNull Duration commitInterval()
@NonNull int commitBatchSize()
@NonNull int atmostOnceCommitAheadSize()
commitAheadSize + 1
@NonNull int maxCommitAttempts()
@NonNull Duration commitRetryInterval()
RetriableCommitFailedException
s.maxCommitAttempts()
default int maxDeferredCommits()
default Duration maxDelayRebalance()
commitIntervalDuringDelay()
default boolean pauseAllAfterRebalance()
default long commitIntervalDuringDelay()
maxDelayRebalance()
@NonNull Supplier<Scheduler> schedulerSupplier()
@Nullable default ReceiverOptions.ConsumerListener consumerListener()
@NonNull io.micrometer.observation.ObservationRegistry observationRegistry()
ObservationRegistry
to observe Kafka record consuming operation.ObservationRegistry
.@Nullable KafkaReceiverObservationConvention observationConvention()
KafkaReceiverObservationConvention
to support a publishing operation observation.KafkaReceiverObservationConvention
.@Nullable default String clientId()
ConsumerConfig
.@NonNull default String bootstrapServers()
ConsumerConfig
.@NonNull default Consumer<org.apache.kafka.clients.consumer.Consumer<K,V>> subscriber(@NonNull org.apache.kafka.clients.consumer.ConsumerRebalanceListener listener)
KafkaConsumer.subscribe(Collection, ConsumerRebalanceListener)
,
KafkaConsumer.subscribe(Pattern, ConsumerRebalanceListener)
or KafkaConsumer.assign(Collection)
operation corresponding to the subscription or assignment options configured for this instance.