UDP Client

Reactor Netty provides the easy-to-use and easy-to-configure UdpClient. It hides most of the Netty functionality that is required to create a UDP client and adds Reactive Streams backpressure.

1. Connecting and Disconnecting

To connect the UDP client to a given endpoint, you must create and configure a UdpClient instance. By default, the host is configured for localhost and the port is 12012. The following example shows how to create and connect a UDP client:

import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				UdpClient.create()                            (1)
				         .connectNow(Duration.ofSeconds(30)); (2)

		connection.onDispose()
		          .block();
	}
}
1 Creates a UdpClient instance that is ready for configuring.
2 Connects the client in a blocking fashion and waits for it to finish initializing.

The returned Connection offers a simple connection API, including disposeNow(), which shuts the client down in a blocking fashion.

1.1. Host and Port

To connect to a specific host and port, you can apply the following configuration to the UDP client:

import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				UdpClient.create()
				         .host("example.com") (1)
				         .port(80)            (2)
				         .connectNow(Duration.ofSeconds(30));

		connection.onDispose()
		          .block();
	}
}
1 Configures the host to which this client should connect
2 Configures the port to which this client should connect
The port can be specified also with PORT environment variable.

2. Eager Initialization

By default, the initialization of the UdpClient resources happens on demand. This means that the connect operation absorbs the extra time needed to initialize and load:

  • the event loop group

  • the host name resolver

  • the native transport libraries (when native transport is used)

When you need to preload these resources, you can configure the UdpClient as follows:

import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		UdpClient udpClient = UdpClient.create()
		                               .host("example.com")
		                               .port(80)
		                               .handle((udpInbound, udpOutbound) -> udpOutbound.sendString(Mono.just("hello")));

		udpClient.warmup() (1)
		         .block();

		Connection connection = udpClient.connectNow(Duration.ofSeconds(30)); (2)

		connection.onDispose()
		          .block();
	}
}
1 Initialize and load the event loop group, the host name resolver, and the native transport libraries
2 Host name resolution happens when connecting to the remote peer

3. Writing Data

To send data to a given peer, you must attach an I/O handler. The I/O handler has access to UdpOutbound, to be able to write data.

The following example shows how to send hello:

import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				UdpClient.create()
				         .host("example.com")
				         .port(80)
				         .handle((udpInbound, udpOutbound) -> udpOutbound.sendString(Mono.just("hello"))) (1)
				         .connectNow(Duration.ofSeconds(30));

		connection.onDispose()
		          .block();
	}
}
1 Sends hello string to the remote peer.

4. Consuming Data

To receive data from a given peer, you must attach an I/O handler. The I/O handler has access to UdpInbound, to be able to read data. The following example shows how to consume data:

import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				UdpClient.create()
				         .host("example.com")
				         .port(80)
				         .handle((udpInbound, udpOutbound) -> udpInbound.receive().then()) (1)
				         .connectNow(Duration.ofSeconds(30));

		connection.onDispose()
		          .block();
	}
}
1 Receives data from a given peer

5. Lifecycle Callbacks

The following lifecycle callbacks are provided to let you extend the UdpClient:

Callback Description

doAfterResolve

Invoked after the remote address has been resolved successfully.

doOnChannelInit

Invoked when initializing the channel.

doOnConnect

Invoked when the channel is about to connect.

doOnConnected

Invoked after the channel has been connected.

doOnDisconnected

Invoked after the channel has been disconnected.

doOnResolve

Invoked when the remote address is about to be resolved.

doOnResolveError

Invoked in case the remote address hasn’t been resolved successfully.

The following example uses the doOnConnected and doOnChannelInit callbacks:

import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				UdpClient.create()
				         .host("example.com")
				         .port(80)
				         .doOnConnected(conn -> conn.addHandlerLast(new LineBasedFrameDecoder(8192))) (1)
				         .doOnChannelInit((observer, channel, remoteAddress) ->
				             channel.pipeline()
				                    .addFirst(new LoggingHandler("reactor.netty.examples")))           (2)
				         .connectNow(Duration.ofSeconds(30));

		connection.onDispose()
		          .block();
	}
}
1 Netty pipeline is extended with LineBasedFrameDecoder when the channel has been connected.
2 Netty pipeline is extended with LoggingHandler when initializing the channel.

6. Connection Configuration

This section describes three kinds of configuration that you can use at the UDP level:

6.1. Channel Options

By default, the UDP client is configured with the following options:

	UdpClientConnect() {
		this.config = new UdpClientConfig(
				ConnectionProvider.newConnection(),
				Collections.singletonMap(ChannelOption.AUTO_READ, false),
				() -> new InetSocketAddress(NetUtil.LOCALHOST, DEFAULT_PORT));
	}

If you need additional options or need to change the current options, you can apply the following configuration:

import io.netty.channel.ChannelOption;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				UdpClient.create()
				         .host("example.com")
				         .port(80)
				         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
				         .connectNow(Duration.ofSeconds(30));

		connection.onDispose()
		          .block();
	}
}

You can find more about Netty channel options at the following links:

6.2. Wire Logger

Reactor Netty provides wire logging for when the traffic between the peers needs to be inspected. By default, wire logging is disabled. To enable it, you must set the logger reactor.netty.udp.UdpClient level to DEBUG and apply the following configuration:

import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				UdpClient.create()
				         .host("example.com")
				         .port(80)
				         .wiretap(true) (1)
				         .connectNow(Duration.ofSeconds(30));

		connection.onDispose()
		          .block();
	}
}
1 Enables the wire logging

6.2.1. Wire Logger formatters

Reactor Netty supports 3 different formatters:

	/**
	 * When wire logging is enabled with this format, both events and content will be logged.
	 * The content will be in hex format.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] REGISTERED
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] ACTIVE
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] READ: 145B
	 *          +-------------------------------------------------+
	 *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
	 * +--------+-------------------------------------------------+----------------+
	 * |00000000| 50 4f 53 54 20 2f 74 65 73 74 2f 57 6f 72 6c 64 |POST /test/World|
	 * |00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 43 6f 6e 74 65 | HTTP/1.1..Conte|
	 * |00000020| 6e 74 2d 54 79 70 65 3a 20 74 65 78 74 2f 70 6c |nt-Type: text/pl|
	 * |00000030| 61 69 6e 0d 0a 75 73 65 72 2d 61 67 65 6e 74 3a |ain..user-agent:|
	 * |00000040| 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f 64 65 | ReactorNetty/de|
	 * ...
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] WRITE: 38B
	 *          +-------------------------------------------------+
	 *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
	 * +--------+-------------------------------------------------+----------------+
	 * |00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
	 * |00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
	 * |00000020| 20 30 0d 0a 0d 0a                               | 0....          |
	 * +--------+-------------------------------------------------+----------------+
	 * }
	 * </pre>
	 */
	/**
	 * When wire logging is enabled with this format, only the events will be logged.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] REGISTERED
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] ACTIVE
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] READ: 145B
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] WRITE: 38B
	 * }
	 * </pre>
	 */
	/**
	 * When wire logging is enabled with this format, both events and content will be logged.
	 * The content will be in plain text format.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] REGISTERED
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] ACTIVE
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] READ: 145B POST /test/World HTTP/1.1
	 * Content-Type: text/plain
	 * user-agent: ReactorNetty/dev
	 * ...
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] WRITE: 38B HTTP/1.1 200 OK
	 * content-length: 0
	 * }
	 * </pre>
	 */

When you need to change the default formatter you can configure it as follows:

import io.netty.handler.logging.LogLevel;
import reactor.netty.Connection;
import reactor.netty.transport.logging.AdvancedByteBufFormat;
import reactor.netty.udp.UdpClient;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				UdpClient.create()
				         .host("example.com")
				         .port(80)
				         .wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) (1)
				         .connectNow(Duration.ofSeconds(30));

		connection.onDispose()
		          .block();
	}
}
1 Enables the wire logging, AdvancedByteBufFormat#TEXTUAL is used for printing the content.

6.3. Event Loop Group

By default Reactor Netty uses an “Event Loop Group”, where the number of the worker threads equals the number of processors available to the runtime on initialization (but with a minimum value of 4). When you need a different configuration, you can use one of the LoopResources#create methods.

The following listing shows the default configuration for the Event Loop Group:

	/**
	 * Default worker thread count, fallback to available processor
	 * (but with a minimum value of 4).
	 */
	public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
	/**
	 * Default selector thread count, fallback to -1 (no selector thread)
	 * <p><strong>Note:</strong> In most use cases using a worker thread also as a selector thread works well.
	 * A possible use case for specifying a separate selector thread might be when the worker threads are too busy
	 * and connections cannot be accepted fast enough.
	 * <p><strong>Note:</strong> Although more than 1 can be configured as a selector thread count, in reality
	 * only 1 thread will be used as a selector thread.
	 */
	public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
	/**
	 * Default worker thread count for UDP, fallback to available processor
	 * (but with a minimum value of 4).
	 */
	public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
	/**
	 * Default quiet period that guarantees that the disposal of the underlying LoopResources
	 * will not happen, fallback to 2 seconds.
	 */
	public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
	/**
	 * Default maximum amount of time to wait until the disposal of the underlying LoopResources
	 * regardless if a task was submitted during the quiet period, fallback to 15 seconds.
	 */
	public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";

	/**
	 * Default value whether the native transport (epoll, kqueue) will be preferred,
	 * fallback it will be preferred when available.
	 */
	public static final String NATIVE = "reactor.netty.native";

If you need changes to these settings, you can apply the following configuration:

import reactor.netty.Connection;
import reactor.netty.resources.LoopResources;
import reactor.netty.udp.UdpClient;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		LoopResources loop = LoopResources.create("event-loop", 1, 4, true);

		Connection connection =
				UdpClient.create()
				         .host("example.com")
				         .port(80)
				         .runOn(loop)
				         .connectNow(Duration.ofSeconds(30));

		connection.onDispose()
		          .block();
	}
}

6.3.1. Disposing Event Loop Group

  • If you use the default Event Loop Group provided by Reactor Netty, invoke HttpResources#disposeLoopsAndConnections/#disposeLoopsAndConnectionsLater method.

Disposing HttpResources means that every server/client that is using it, will not be able to use it anymore!
  • If you use custom LoopResources, invoke LoopResources#dispose/#disposeLater method.

Disposing the custom LoopResources means that every server/client that is configured to use it, will not be able to use it anymore!

7. Metrics

The UDP client supports built-in integration with Micrometer. It exposes all metrics with a prefix of reactor.netty.udp.client.

The following table provides information for the UDP client metrics:

metric name type description

reactor.netty.udp.client.data.received

DistributionSummary

Amount of the data received, in bytes. See Data Received

reactor.netty.udp.client.data.sent

DistributionSummary

Amount of the data sent, in bytes. See Data Sent

reactor.netty.udp.client.errors

Counter

Number of errors that occurred. See Errors Count

reactor.netty.udp.client.connect.time

Timer

Time spent for connecting to the remote address. See Connect Time

reactor.netty.udp.client.address.resolver

Timer

Time spent for resolving the address. See Hostname Resolution Time

These additional metrics are also available:

ByteBufAllocator metrics

metric name type description

reactor.netty.bytebuf.allocator.used.heap.memory

Gauge

The number of bytes reserved by heap buffer allocator. See Used Heap Memory

reactor.netty.bytebuf.allocator.used.direct.memory

Gauge

The number of bytes reserved by direct buffer allocator. See Used Direct Memory

reactor.netty.bytebuf.allocator.heap.arenas

Gauge

The number of heap arenas (when PooledByteBufAllocator). See Heap Arenas

reactor.netty.bytebuf.allocator.direct.arenas

Gauge

The number of direct arenas (when PooledByteBufAllocator). See Direct Arenas

reactor.netty.bytebuf.allocator.threadlocal.caches

Gauge

The number of thread local caches (when PooledByteBufAllocator). See Thread Local Caches

reactor.netty.bytebuf.allocator.small.cache.size

Gauge

The size of the small cache (when PooledByteBufAllocator). See Small Cache Size

reactor.netty.bytebuf.allocator.normal.cache.size

Gauge

The size of the normal cache (when PooledByteBufAllocator). See Normal Cache Size

reactor.netty.bytebuf.allocator.chunk.size

Gauge

The chunk size for an arena (when PooledByteBufAllocator). See Chunk Size

reactor.netty.bytebuf.allocator.active.heap.memory

Gauge

The actual bytes consumed by in-use buffers allocated from heap buffer pools (when PooledByteBufAllocator). See Active Heap Memory

reactor.netty.bytebuf.allocator.active.direct.memory

Gauge

The actual bytes consumed by in-use buffers allocated from direct buffer pools (when PooledByteBufAllocator). See Active Direct Memory

EventLoop metrics

metric name type description

reactor.netty.eventloop.pending.tasks

Gauge

The number of tasks that are pending for processing on an event loop. See Pending Tasks

The following example enables that integration:

import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				UdpClient.create()
				         .host("example.com")
				         .port(80)
				         .metrics(true) (1)
				         .connectNow(Duration.ofSeconds(30));

		connection.onDispose()
		          .block();
	}
}
1 Enables the built-in integration with Micrometer

When UDP client metrics are needed for an integration with a system other than Micrometer or you want to provide your own integration with Micrometer, you can provide your own metrics recorder, as follows:

import reactor.netty.Connection;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.udp.UdpClient;

import java.net.SocketAddress;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				UdpClient.create()
				         .host("example.com")
				         .port(80)
				         .metrics(true, CustomChannelMetricsRecorder::new) (1)
				         .connectNow(Duration.ofSeconds(30));

		connection.onDispose()
		          .block();
	}
1 Enables UDP client metrics and provides ChannelMetricsRecorder implementation.

8. Unix Domain Sockets

The UdpClient supports Unix Domain Datagram Sockets (UDS) when native transport is in use.

The following example shows how to use UDS support:

import io.netty.channel.unix.DomainSocketAddress;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;

import java.io.File;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				UdpClient.create()
				         .bindAddress(Application::newDomainSocketAddress)
				         .remoteAddress(() -> new DomainSocketAddress("/tmp/test-server.sock")) (1)
				         .handle((in, out) ->
				             out.sendString(Mono.just("hello"))
				                .then(in.receive()
				                        .asString()
				                        .doOnNext(System.out::println)
				                        .then()))
				         .connectNow();

		connection.onDispose()
		          .block();
	}
1 Specifies DomainSocketAddress that will be used