UDP Client
Reactor Netty provides the easy-to-use and easy-to-configure
UdpClient
.
It hides most of the Netty functionality that is required to create a UDP
client
and adds Reactive Streams backpressure.
1. Connecting and Disconnecting
To connect the UDP client to a given endpoint, you must create and configure a
UdpClient instance.
By default, the host is configured for localhost
and the port is 12012
.
The following example shows how to create and connect a UDP client:
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create() (1)
.connectNow(Duration.ofSeconds(30)); (2)
connection.onDispose()
.block();
}
}
1 | Creates a UdpClient
instance that is ready for configuring. |
2 | Connects the client in a blocking fashion and waits for it to finish initializing. |
The returned Connection
offers a simple connection API, including disposeNow()
,
which shuts the client down in a blocking fashion.
1.1. Host and Port
To connect to a specific host
and port
, you can apply the following configuration to the UDP
client:
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.host("example.com") (1)
.port(80) (2)
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
1 | Configures the host to which this client should connect |
2 | Configures the port to which this client should connect |
The port can be specified also with PORT environment variable. |
2. Eager Initialization
By default, the initialization of the UdpClient
resources happens on demand. This means that the connect
operation
absorbs the extra time needed to initialize and load:
-
the event loop group
-
the host name resolver
-
the native transport libraries (when native transport is used)
When you need to preload these resources, you can configure the UdpClient
as follows:
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
UdpClient udpClient = UdpClient.create()
.host("example.com")
.port(80)
.handle((udpInbound, udpOutbound) -> udpOutbound.sendString(Mono.just("hello")));
udpClient.warmup() (1)
.block();
Connection connection = udpClient.connectNow(Duration.ofSeconds(30)); (2)
connection.onDispose()
.block();
}
}
1 | Initialize and load the event loop group, the host name resolver, and the native transport libraries |
2 | Host name resolution happens when connecting to the remote peer |
3. Writing Data
To send data to a given peer, you must attach an I/O handler.
The I/O handler has access to UdpOutbound
,
to be able to write data.
The following example shows how to send hello
:
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.handle((udpInbound, udpOutbound) -> udpOutbound.sendString(Mono.just("hello"))) (1)
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
1 | Sends hello string to the remote peer. |
4. Consuming Data
To receive data from a given peer, you must attach an I/O handler.
The I/O handler has access to UdpInbound
,
to be able to read data.
The following example shows how to consume data:
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.handle((udpInbound, udpOutbound) -> udpInbound.receive().then()) (1)
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
1 | Receives data from a given peer |
5. Lifecycle Callbacks
The following lifecycle callbacks are provided to let you extend the UdpClient
:
Callback | Description |
---|---|
|
Invoked after the remote address has been resolved successfully. |
|
Invoked when initializing the channel. |
|
Invoked when the channel is about to connect. |
|
Invoked after the channel has been connected. |
|
Invoked after the channel has been disconnected. |
|
Invoked when the remote address is about to be resolved. |
|
Invoked in case the remote address hasn’t been resolved successfully. |
The following example uses the doOnConnected
and doOnChannelInit
callbacks:
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.doOnConnected(conn -> conn.addHandlerLast(new LineBasedFrameDecoder(8192))) (1)
.doOnChannelInit((observer, channel, remoteAddress) ->
channel.pipeline()
.addFirst(new LoggingHandler("reactor.netty.examples"))) (2)
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
1 | Netty pipeline is extended with LineBasedFrameDecoder when the channel has been connected. |
2 | Netty pipeline is extended with LoggingHandler when initializing the channel. |
6. Connection Configuration
This section describes three kinds of configuration that you can use at the UDP level:
6.1. Channel Options
By default, the UDP
client is configured with the following options:
UdpClientConnect() {
this.config = new UdpClientConfig(
ConnectionProvider.newConnection(),
Collections.singletonMap(ChannelOption.AUTO_READ, false),
() -> new InetSocketAddress(NetUtil.LOCALHOST, DEFAULT_PORT));
}
If you need additional options or need to change the current options, you can apply the following configuration:
import io.netty.channel.ChannelOption;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
You can find more about Netty channel options at the following links:
6.2. Wire Logger
Reactor Netty provides wire logging for when the traffic between the peers needs to be inspected.
By default, wire logging is disabled.
To enable it, you must set the logger reactor.netty.udp.UdpClient
level to DEBUG
and apply the following configuration:
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.wiretap(true) (1)
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
1 | Enables the wire logging |
6.2.1. Wire Logger formatters
Reactor Netty supports 3 different formatters:
-
AdvancedByteBufFormat#HEX_DUMP - the default
/**
* When wire logging is enabled with this format, both events and content will be logged.
* The content will be in hex format.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] REGISTERED
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] ACTIVE
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] READ: 145B
* +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 50 4f 53 54 20 2f 74 65 73 74 2f 57 6f 72 6c 64 |POST /test/World|
* |00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 43 6f 6e 74 65 | HTTP/1.1..Conte|
* |00000020| 6e 74 2d 54 79 70 65 3a 20 74 65 78 74 2f 70 6c |nt-Type: text/pl|
* |00000030| 61 69 6e 0d 0a 75 73 65 72 2d 61 67 65 6e 74 3a |ain..user-agent:|
* |00000040| 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f 64 65 | ReactorNetty/de|
* ...
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] WRITE: 38B
* +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
* |00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
* |00000020| 20 30 0d 0a 0d 0a | 0.... |
* +--------+-------------------------------------------------+----------------+
* }
* </pre>
*/
/**
* When wire logging is enabled with this format, only the events will be logged.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] REGISTERED
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] ACTIVE
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] READ: 145B
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] WRITE: 38B
* }
* </pre>
*/
/**
* When wire logging is enabled with this format, both events and content will be logged.
* The content will be in plain text format.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] REGISTERED
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] ACTIVE
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] READ: 145B POST /test/World HTTP/1.1
* Content-Type: text/plain
* user-agent: ReactorNetty/dev
* ...
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] WRITE: 38B HTTP/1.1 200 OK
* content-length: 0
* }
* </pre>
*/
When you need to change the default formatter you can configure it as follows:
import io.netty.handler.logging.LogLevel;
import reactor.netty.Connection;
import reactor.netty.transport.logging.AdvancedByteBufFormat;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) (1)
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
1 | Enables the wire logging, AdvancedByteBufFormat#TEXTUAL is used for printing the content. |
6.3. Event Loop Group
By default Reactor Netty
uses an “Event Loop Group”, where the number of the worker threads equals the number of
processors available to the runtime on initialization (but with a minimum value of 4). When you need a different configuration,
you can use one of the LoopResources
#create
methods.
The following listing shows the default configuration for the Event Loop Group:
/**
* Default worker thread count, fallback to available processor
* (but with a minimum value of 4).
*/
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
* Default selector thread count, fallback to -1 (no selector thread)
* <p><strong>Note:</strong> In most use cases using a worker thread also as a selector thread works well.
* A possible use case for specifying a separate selector thread might be when the worker threads are too busy
* and connections cannot be accepted fast enough.
* <p><strong>Note:</strong> Although more than 1 can be configured as a selector thread count, in reality
* only 1 thread will be used as a selector thread.
*/
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
* Default worker thread count for UDP, fallback to available processor
* (but with a minimum value of 4).
*/
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
/**
* Default quiet period that guarantees that the disposal of the underlying LoopResources
* will not happen, fallback to 2 seconds.
*/
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/**
* Default maximum amount of time to wait until the disposal of the underlying LoopResources
* regardless if a task was submitted during the quiet period, fallback to 15 seconds.
*/
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";
/**
* Default value whether the native transport (epoll, kqueue) will be preferred,
* fallback it will be preferred when available.
*/
public static final String NATIVE = "reactor.netty.native";
If you need changes to these settings, you can apply the following configuration:
import reactor.netty.Connection;
import reactor.netty.resources.LoopResources;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
LoopResources loop = LoopResources.create("event-loop", 1, 4, true);
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.runOn(loop)
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
6.3.1. Disposing Event Loop Group
-
If you use the default
Event Loop Group
provided by Reactor Netty, invokeHttpResources
#disposeLoopsAndConnections
/#disposeLoopsAndConnectionsLater
method.
Disposing HttpResources means that every server/client that is using it, will not be able to use it anymore!
|
-
If you use custom
LoopResources
, invokeLoopResources
#dispose
/#disposeLater
method.
Disposing the custom LoopResources means that every server/client that is configured to use it, will not be able to use it anymore!
|
7. Metrics
The UDP client supports built-in integration with Micrometer
.
It exposes all metrics with a prefix of reactor.netty.udp.client
.
The following table provides information for the UDP client metrics:
metric name | type | description |
---|---|---|
reactor.netty.udp.client.data.received |
DistributionSummary |
Amount of the data received, in bytes. See Data Received |
reactor.netty.udp.client.data.sent |
DistributionSummary |
Amount of the data sent, in bytes. See Data Sent |
reactor.netty.udp.client.errors |
Counter |
Number of errors that occurred. See Errors Count |
reactor.netty.udp.client.connect.time |
Timer |
Time spent for connecting to the remote address. See Connect Time |
reactor.netty.udp.client.address.resolver |
Timer |
Time spent for resolving the address. See Hostname Resolution Time |
These additional metrics are also available:
ByteBufAllocator
metrics
metric name | type | description |
---|---|---|
reactor.netty.bytebuf.allocator.used.heap.memory |
Gauge |
The number of bytes reserved by heap buffer allocator. See Used Heap Memory |
reactor.netty.bytebuf.allocator.used.direct.memory |
Gauge |
The number of bytes reserved by direct buffer allocator. See Used Direct Memory |
reactor.netty.bytebuf.allocator.heap.arenas |
Gauge |
The number of heap arenas (when |
reactor.netty.bytebuf.allocator.direct.arenas |
Gauge |
The number of direct arenas (when |
reactor.netty.bytebuf.allocator.threadlocal.caches |
Gauge |
The number of thread local caches (when |
reactor.netty.bytebuf.allocator.small.cache.size |
Gauge |
The size of the small cache (when |
reactor.netty.bytebuf.allocator.normal.cache.size |
Gauge |
The size of the normal cache (when |
reactor.netty.bytebuf.allocator.chunk.size |
Gauge |
The chunk size for an arena (when |
reactor.netty.bytebuf.allocator.active.heap.memory |
Gauge |
The actual bytes consumed by in-use buffers allocated from heap buffer pools (when |
reactor.netty.bytebuf.allocator.active.direct.memory |
Gauge |
The actual bytes consumed by in-use buffers allocated from direct buffer pools (when |
EventLoop
metrics
metric name | type | description |
---|---|---|
reactor.netty.eventloop.pending.tasks |
Gauge |
The number of tasks that are pending for processing on an event loop. See Pending Tasks |
The following example enables that integration:
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.metrics(true) (1)
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
}
1 | Enables the built-in integration with Micrometer |
When UDP client metrics are needed for an integration with a system other than Micrometer
or you want
to provide your own integration with Micrometer
, you can provide your own metrics recorder, as follows:
import reactor.netty.Connection;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.udp.UdpClient;
import java.net.SocketAddress;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.host("example.com")
.port(80)
.metrics(true, CustomChannelMetricsRecorder::new) (1)
.connectNow(Duration.ofSeconds(30));
connection.onDispose()
.block();
}
1 | Enables UDP client metrics and provides ChannelMetricsRecorder implementation. |
8. Unix Domain Sockets
The UdpClient
supports Unix Domain Datagram Sockets (UDS) when native transport is in use.
The following example shows how to use UDS support:
import io.netty.channel.unix.DomainSocketAddress;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.io.File;
public class Application {
public static void main(String[] args) {
Connection connection =
UdpClient.create()
.bindAddress(Application::newDomainSocketAddress)
.remoteAddress(() -> new DomainSocketAddress("/tmp/test-server.sock")) (1)
.handle((in, out) ->
out.sendString(Mono.just("hello"))
.then(in.receive()
.asString()
.doOnNext(System.out::println)
.then()))
.connectNow();
connection.onDispose()
.block();
}
1 | Specifies DomainSocketAddress that will be used |