public interface SenderOptions<K,V>
Modifier and Type | Interface and Description |
---|---|
static interface |
SenderOptions.ProducerListener
Called whenever a producer is added or removed.
|
Modifier and Type | Method and Description |
---|---|
default String |
bootstrapServers()
Return the bootstrap servers from the provided
ProducerConfig . |
default String |
clientId()
Return the client id provided by the
ProducerConfig . |
Duration |
closeTimeout()
Returns the timeout for graceful shutdown of this sender.
|
SenderOptions<K,V> |
closeTimeout(Duration timeout)
Configures the timeout for graceful shutdown of this sender.
|
static <K,V> SenderOptions<K,V> |
create()
Creates a sender options instance with default properties.
|
static <K,V> SenderOptions<K,V> |
create(Map<String,Object> configProperties)
Creates a sender options instance with the specified config overrides for the underlying
Kafka
Producer . |
static <K,V> SenderOptions<K,V> |
create(Properties configProperties)
Creates a sender options instance with the specified config overrides for the underlying
Kafka
Producer . |
default boolean |
fatalException(Throwable t) |
default boolean |
isTransactional()
Senders created from this options will be transactional if a transactional id is
configured using
ProducerConfig.TRANSACTIONAL_ID_CONFIG . |
org.apache.kafka.common.serialization.Serializer<K> |
keySerializer()
Returns optionally a serializer witch is used by
KafkaProducer for key serialization. |
int |
maxInFlight()
Returns the maximum number of in-flight records that are fetched
from the outbound record publisher while acknowledgements are pending.
|
SenderOptions<K,V> |
maxInFlight(int maxInFlight)
Configures the maximum number of in-flight records that are fetched
from the outbound record publisher while acknowledgements are pending.
|
KafkaSenderObservationConvention |
observationConvention()
Return a
KafkaSenderObservationConvention to support a publishing operation observation. |
io.micrometer.observation.ObservationRegistry |
observationRegistry()
Return an
ObservationRegistry to observe Kafka record publishing operation. |
default SenderOptions.ProducerListener |
producerListener()
Returns the listener that will be applied after a producer is added and removed.
|
default SenderOptions<K,V> |
producerListener(SenderOptions.ProducerListener listener)
Set a listener that will be applied after a producer is added and removed.
|
Map<String,Object> |
producerProperties()
Returns the configuration properties for the underlying Kafka
Producer . |
Object |
producerProperty(String name)
Returns the Kafka
Producer configuration property value for the specified option name. |
SenderOptions<K,V> |
producerProperty(String name,
Object value)
Sets Kafka
Producer configuration property to the specified value. |
Scheduler |
scheduler()
Returns the scheduler used for publishing send results.
|
SenderOptions<K,V> |
scheduler(Scheduler scheduler)
Sets the scheduler used for publishing send results.
|
boolean |
stopOnError()
Returns stopOnError configuration which indicates if a send operation
should be terminated when an error is encountered.
|
SenderOptions<K,V> |
stopOnError(boolean stopOnError)
Configures error handling behaviour for
KafkaSender.send(org.reactivestreams.Publisher) . |
default String |
transactionalId()
Returns the configured transactional id
|
org.apache.kafka.common.serialization.Serializer<V> |
valueSerializer()
Returns optionally a serializer witch is used by
KafkaProducer for value serialization. |
SenderOptions<K,V> |
withKeySerializer(org.apache.kafka.common.serialization.Serializer<K> keySerializer)
Set a concrete serializer instant to be used by the
KafkaProducer for keys. |
default SenderOptions<K,V> |
withObservation(io.micrometer.observation.ObservationRegistry observationRegistry)
Configure an
ObservationRegistry to observe Kafka record publishing operation. |
SenderOptions<K,V> |
withObservation(io.micrometer.observation.ObservationRegistry observationRegistry,
KafkaSenderObservationConvention observationConvention)
Configure an
ObservationRegistry to observe Kafka record publishing operation. |
SenderOptions<K,V> |
withValueSerializer(org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
Set a concrete serializer instant to be used by the
KafkaProducer for values. |
@NonNull static <K,V> SenderOptions<K,V> create()
@NonNull static <K,V> SenderOptions<K,V> create(@NonNull Map<String,Object> configProperties)
Producer
.@NonNull static <K,V> SenderOptions<K,V> create(@NonNull Properties configProperties)
Producer
.@NonNull Map<String,Object> producerProperties()
Producer
.@Nullable Object producerProperty(@NonNull String name)
Producer
configuration property value for the specified option name.@NonNull SenderOptions<K,V> producerProperty(@NonNull String name, @NonNull Object value)
Producer
configuration property to the specified value.@Nullable org.apache.kafka.common.serialization.Serializer<K> keySerializer()
KafkaProducer
for key serialization.@NonNull SenderOptions<K,V> withKeySerializer(@NonNull org.apache.kafka.common.serialization.Serializer<K> keySerializer)
KafkaProducer
for keys. Overrides any setting of the
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
property.keySerializer
- key serializer to use in the consumer@Nullable org.apache.kafka.common.serialization.Serializer<V> valueSerializer()
KafkaProducer
for value serialization.@NonNull SenderOptions<K,V> withValueSerializer(@NonNull org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
KafkaProducer
for values. Overrides any setting of the
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
property.valueSerializer
- value serializer to use in the consumer@NonNull Scheduler scheduler()
@NonNull SenderOptions<K,V> scheduler(@NonNull Scheduler scheduler)
@NonNull int maxInFlight()
@NonNull SenderOptions<K,V> maxInFlight(@NonNull int maxInFlight)
ProducerConfig.BUFFER_MEMORY_CONFIG
to control memory usage and to avoid blocking the reactive pipeline.@NonNull boolean stopOnError()
@NonNull SenderOptions<K,V> stopOnError(@NonNull boolean stopOnError)
KafkaSender.send(org.reactivestreams.Publisher)
.
If set to true, send fails when an error is encountered and only records
that are already in transit may be delivered after the first error. If set to false,
an attempt is made to send each record to Kafka, even if one or more records cannot
be delivered after the configured number of retries due to a non-fatal exception.
This flag should be set along with ProducerConfig.RETRIES_CONFIG
and
ProducerConfig.ACKS_CONFIG
to configure the required quality-of-service.
By default, stopOnError is true.stopOnError
- true to stop each send sequence on first failure@NonNull Duration closeTimeout()
@NonNull SenderOptions<K,V> closeTimeout(@NonNull Duration timeout)
@Nullable default SenderOptions.ProducerListener producerListener()
default SenderOptions<K,V> producerListener(@Nullable SenderOptions.ProducerListener listener)
@NonNull default SenderOptions<K,V> withObservation(@NonNull io.micrometer.observation.ObservationRegistry observationRegistry)
ObservationRegistry
to observe Kafka record publishing operation.observationRegistry
- ObservationRegistry
to use.@NonNull SenderOptions<K,V> withObservation(@NonNull io.micrometer.observation.ObservationRegistry observationRegistry, @Nullable KafkaSenderObservationConvention observationConvention)
ObservationRegistry
to observe Kafka record publishing operation.observationRegistry
- ObservationRegistry
to use.observationConvention
- the KafkaSenderObservationConvention
to use.@NonNull io.micrometer.observation.ObservationRegistry observationRegistry()
ObservationRegistry
to observe Kafka record publishing operation.ObservationRegistry
.@Nullable KafkaSenderObservationConvention observationConvention()
KafkaSenderObservationConvention
to support a publishing operation observation.KafkaSenderObservationConvention
.@NonNull default boolean isTransactional()
ProducerConfig.TRANSACTIONAL_ID_CONFIG
. If transactional,
KafkaProducer.initTransactions()
is invoked on the producer to initialize
transactions before any operations are performed on the sender. If scheduler is overridden
using scheduler(Scheduler)
, the configured scheduler
must be single-threaded. Otherwise, the behaviour is undefined and may result in unexpected
exceptions.@Nullable default String clientId()
ProducerConfig
.@Nullable default String transactionalId()
@NonNull default String bootstrapServers()
ProducerConfig
.