UDP Server

Reactor Netty provides the easy-to-use and easy-to-configure UdpServer. It hides most of the Netty functionality that is required to create a UDP server and adds Reactive Streams backpressure.

1. Starting and Stopping

To start a UDP server, a UdpServer instance has to be created and configured. By default, the host is configured to be localhost and the port is 12012. The following example shows how to create and start a UDP server:

import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection server =
				UdpServer.create()                         (1)
				         .bindNow(Duration.ofSeconds(30)); (2)

		server.onDispose()
		      .block();
	}
}
1 Creates a UdpServer instance that is ready for configuring.
2 Starts the server in a blocking fashion and waits for it to finish initializing.

The returned Connection offers a simple server API, including disposeNow(), which shuts the server down in a blocking fashion.

1.1. Host and Port

In order to serve on a specific host and port, you can apply the following configuration to the UDP server:

import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection server =
				UdpServer.create()
				         .host("localhost") (1)
				         .port(8080)        (2)
				         .bindNow(Duration.ofSeconds(30));

		server.onDispose()
		      .block();
	}
}
1 Configures the UDP server host
2 Configures the UDP server port
The port can be specified also with PORT environment variable.

2. Eager Initialization

By default, the initialization of the UdpServer resources happens on demand. This means that the bind operation absorbs the extra time needed to initialize and load:

  • the event loop group

  • the native transport libraries (when native transport is used)

When you need to preload these resources, you can configure the UdpServer as follows:

import io.netty.channel.socket.DatagramPacket;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		UdpServer udpServer =
				UdpServer.create()
				         .handle((in, out) ->
				             out.sendObject(
				                 in.receiveObject()
				                   .map(o -> {
				                       if (o instanceof DatagramPacket) {
				                           DatagramPacket p = (DatagramPacket) o;
				                           return new DatagramPacket(p.content().retain(), p.sender());
				                       }
				                       else {
				                           return Mono.error(new Exception("Unexpected type of the message: " + o));
				                       }
				                   })));

		udpServer.warmup() (1)
		         .block();

		Connection server = udpServer.bindNow(Duration.ofSeconds(30));

		server.onDispose()
		      .block();
	}
}
1 Initialize and load the event loop group and the native transport libraries

3. Writing Data

To send data to the remote peer, you must attach an I/O handler. The I/O handler has access to UdpOutbound, to be able to write data. The following example shows how to send hello:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection server =
				UdpServer.create()
				         .handle((in, out) ->
				             out.sendObject(
				                 in.receiveObject()
				                   .map(o -> {
				                       if (o instanceof DatagramPacket) {
				                           DatagramPacket p = (DatagramPacket) o;
				                           ByteBuf buf = Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8);
				                           return new DatagramPacket(buf, p.sender()); (1)
				                       }
				                       else {
				                           return Mono.error(new Exception("Unexpected type of the message: " + o));
				                       }
				                   })))
				         .bindNow(Duration.ofSeconds(30));

		server.onDispose()
		      .block();
	}
}
1 Sends a hello string to the remote peer

4. Consuming Data

To receive data from a remote peer, you must attach an I/O handler. The I/O handler has access to UdpInbound, to be able to read data. The following example shows how to consume data:

import io.netty.channel.socket.DatagramPacket;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection server =
				UdpServer.create()
				         .handle((in, out) ->
				             out.sendObject(
				                 in.receiveObject()
				                   .map(o -> {
				                       if (o instanceof DatagramPacket) {
				                           DatagramPacket p = (DatagramPacket) o;
				                           return new DatagramPacket(p.content().retain(), p.sender()); (1)
				                       }
				                       else {
				                           return Mono.error(new Exception("Unexpected type of the message: " + o));
				                       }
				                   })))
				         .bindNow(Duration.ofSeconds(30));

		server.onDispose()
		      .block();
	}
}
1 Receives data from the remote peer

5. Lifecycle Callbacks

The following lifecycle callbacks are provided to let you extend the UdpServer:

Callback Description

doOnBind

Invoked when the server channel is about to bind.

doOnBound

Invoked when the server channel is bound.

doOnChannelInit

Invoked when initializing the channel.

doOnUnbound

Invoked when the server channel is unbound.

The following example uses the doOnBound and doOnChannelInit callbacks:

import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection server =
				UdpServer.create()
				         .doOnBound(conn -> conn.addHandlerLast(new LineBasedFrameDecoder(8192))) (1)
				         .doOnChannelInit((observer, channel, remoteAddress) ->
				             channel.pipeline()
				                    .addFirst(new LoggingHandler("reactor.netty.examples")))      (2)
				         .bindNow(Duration.ofSeconds(30));

		server.onDispose()
		      .block();
	}
}
1 Netty pipeline is extended with LineBasedFrameDecoder when the server channel is bound.
2 Netty pipeline is extended with LoggingHandler when initializing the channel.

6. Connection Configuration

This section describes three kinds of configuration that you can use at the UDP level:

6.1. Channel Options

By default, the UDP server is configured with the following options:

	UdpServerBind() {
		this.config = new UdpServerConfig(
				Collections.singletonMap(ChannelOption.AUTO_READ, false),
				() -> new InetSocketAddress(NetUtil.LOCALHOST, DEFAULT_PORT));
	}

If you need additional options or need to change the current options, you can apply the following configuration:

import io.netty.channel.ChannelOption;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection server =
				UdpServer.create()
				         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
				         .bindNow(Duration.ofSeconds(30));

		server.onDispose()
		      .block();
	}
}

For more information about Netty channel options, see the following links:

6.2. Wire Logger

Reactor Netty provides wire logging for when the traffic between the peers needs to be inspected. By default, wire logging is disabled. To enable it, you must set the logger reactor.netty.udp.UdpServer level to DEBUG and apply the following configuration:

import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection server =
				UdpServer.create()
				         .wiretap(true) (1)
				         .bindNow(Duration.ofSeconds(30));

		server.onDispose()
		      .block();
	}
}
1 Enables the wire logging

6.2.1. Wire Logger formatters

Reactor Netty supports 3 different formatters:

	/**
	 * When wire logging is enabled with this format, both events and content will be logged.
	 * The content will be in hex format.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] REGISTERED
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] ACTIVE
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] READ: 145B
	 *          +-------------------------------------------------+
	 *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
	 * +--------+-------------------------------------------------+----------------+
	 * |00000000| 50 4f 53 54 20 2f 74 65 73 74 2f 57 6f 72 6c 64 |POST /test/World|
	 * |00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 43 6f 6e 74 65 | HTTP/1.1..Conte|
	 * |00000020| 6e 74 2d 54 79 70 65 3a 20 74 65 78 74 2f 70 6c |nt-Type: text/pl|
	 * |00000030| 61 69 6e 0d 0a 75 73 65 72 2d 61 67 65 6e 74 3a |ain..user-agent:|
	 * |00000040| 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f 64 65 | ReactorNetty/de|
	 * ...
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] WRITE: 38B
	 *          +-------------------------------------------------+
	 *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
	 * +--------+-------------------------------------------------+----------------+
	 * |00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
	 * |00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
	 * |00000020| 20 30 0d 0a 0d 0a                               | 0....          |
	 * +--------+-------------------------------------------------+----------------+
	 * }
	 * </pre>
	 */
	/**
	 * When wire logging is enabled with this format, only the events will be logged.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] REGISTERED
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] ACTIVE
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] READ: 145B
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] WRITE: 38B
	 * }
	 * </pre>
	 */
	/**
	 * When wire logging is enabled with this format, both events and content will be logged.
	 * The content will be in plain text format.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] REGISTERED
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] ACTIVE
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] READ: 145B POST /test/World HTTP/1.1
	 * Content-Type: text/plain
	 * user-agent: ReactorNetty/dev
	 * ...
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] WRITE: 38B HTTP/1.1 200 OK
	 * content-length: 0
	 * }
	 * </pre>
	 */

When you need to change the default formatter you can configure it as follows:

import io.netty.handler.logging.LogLevel;
import reactor.netty.Connection;
import reactor.netty.transport.logging.AdvancedByteBufFormat;
import reactor.netty.udp.UdpServer;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection server =
				UdpServer.create()
				         .wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) (1)
				         .bindNow(Duration.ofSeconds(30));

		server.onDispose()
		      .block();
	}
}
1 Enables the wire logging, AdvancedByteBufFormat#TEXTUAL is used for printing the content.

6.3. Event Loop Group

By default Reactor Netty uses an “Event Loop Group”, where the number of the worker threads equals the number of processors available to the runtime on initialization (but with a minimum value of 4). When you need a different configuration, you can use one of the LoopResources#create methods.

The following listing shows the default configuration for the Event Loop Group:

	/**
	 * Default worker thread count, fallback to available processor
	 * (but with a minimum value of 4).
	 */
	public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
	/**
	 * Default selector thread count, fallback to -1 (no selector thread)
	 * <p><strong>Note:</strong> In most use cases using a worker thread also as a selector thread works well.
	 * A possible use case for specifying a separate selector thread might be when the worker threads are too busy
	 * and connections cannot be accepted fast enough.
	 * <p><strong>Note:</strong> Although more than 1 can be configured as a selector thread count, in reality
	 * only 1 thread will be used as a selector thread.
	 */
	public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
	/**
	 * Default worker thread count for UDP, fallback to available processor
	 * (but with a minimum value of 4).
	 */
	public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
	/**
	 * Default quiet period that guarantees that the disposal of the underlying LoopResources
	 * will not happen, fallback to 2 seconds.
	 */
	public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
	/**
	 * Default maximum amount of time to wait until the disposal of the underlying LoopResources
	 * regardless if a task was submitted during the quiet period, fallback to 15 seconds.
	 */
	public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";

	/**
	 * Default value whether the native transport (epoll, kqueue) will be preferred,
	 * fallback it will be preferred when available.
	 */
	public static final String NATIVE = "reactor.netty.native";

If you need changes to these settings, you can apply the following configuration:

import reactor.netty.Connection;
import reactor.netty.resources.LoopResources;
import reactor.netty.udp.UdpServer;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		LoopResources loop = LoopResources.create("event-loop", 1, 4, true);

		Connection server =
				UdpServer.create()
				         .runOn(loop)
				         .bindNow(Duration.ofSeconds(30));

		server.onDispose()
		      .block();
	}
}

6.3.1. Disposing Event Loop Group

  • If you use the default Event Loop Group provided by Reactor Netty, invoke HttpResources#disposeLoopsAndConnections/#disposeLoopsAndConnectionsLater method.

Disposing HttpResources means that every server/client that is using it, will not be able to use it anymore!
  • If you use custom LoopResources, invoke LoopResources#dispose/#disposeLater method.

Disposing the custom LoopResources means that every server/client that is configured to use it, will not be able to use it anymore!

7. Metrics

The UDP server supports built-in integration with Micrometer. It exposes all metrics with a prefix of reactor.netty.udp.server.

The following table provides information for the UDP server metrics:

metric name type description

reactor.netty.udp.server.data.received

DistributionSummary

Amount of the data received, in bytes. See Data Received

reactor.netty.udp.server.data.sent

DistributionSummary

Amount of the data sent, in bytes. See Data Sent

reactor.netty.udp.server.errors

Counter

Number of errors that occurred. See Errors Count

These additional metrics are also available:

ByteBufAllocator metrics

metric name type description

reactor.netty.bytebuf.allocator.used.heap.memory

Gauge

The number of bytes reserved by heap buffer allocator. See Used Heap Memory

reactor.netty.bytebuf.allocator.used.direct.memory

Gauge

The number of bytes reserved by direct buffer allocator. See Used Direct Memory

reactor.netty.bytebuf.allocator.heap.arenas

Gauge

The number of heap arenas (when PooledByteBufAllocator). See Heap Arenas

reactor.netty.bytebuf.allocator.direct.arenas

Gauge

The number of direct arenas (when PooledByteBufAllocator). See Direct Arenas

reactor.netty.bytebuf.allocator.threadlocal.caches

Gauge

The number of thread local caches (when PooledByteBufAllocator). See Thread Local Caches

reactor.netty.bytebuf.allocator.small.cache.size

Gauge

The size of the small cache (when PooledByteBufAllocator). See Small Cache Size

reactor.netty.bytebuf.allocator.normal.cache.size

Gauge

The size of the normal cache (when PooledByteBufAllocator). See Normal Cache Size

reactor.netty.bytebuf.allocator.chunk.size

Gauge

The chunk size for an arena (when PooledByteBufAllocator). See Chunk Size

reactor.netty.bytebuf.allocator.active.heap.memory

Gauge

The actual bytes consumed by in-use buffers allocated from heap buffer pools (when PooledByteBufAllocator). See Active Heap Memory

reactor.netty.bytebuf.allocator.active.direct.memory

Gauge

The actual bytes consumed by in-use buffers allocated from direct buffer pools (when PooledByteBufAllocator). See Active Direct Memory

EventLoop metrics

metric name type description

reactor.netty.eventloop.pending.tasks

Gauge

The number of tasks that are pending for processing on an event loop. See Pending Tasks

The following example enables that integration:

import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection server =
				UdpServer.create()
				         .metrics(true) (1)
				         .bindNow(Duration.ofSeconds(30));

		server.onDispose()
		      .block();
	}
}
1 Enables the built-in integration with Micrometer

When UDP server metrics are needed for an integration with a system other than Micrometer or you want to provide your own integration with Micrometer, you can provide your own metrics recorder, as follows:

import reactor.netty.Connection;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.udp.UdpServer;

import java.net.SocketAddress;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection server =
				UdpServer.create()
				         .metrics(true, CustomChannelMetricsRecorder::new) (1)
				         .bindNow(Duration.ofSeconds(30));

		server.onDispose()
		      .block();
	}
1 Enables UDP server metrics and provides ChannelMetricsRecorder implementation.

8. Unix Domain Sockets

The UdpServer supports Unix Domain Datagram Sockets (UDS) when native transport is in use.

The following example shows how to use UDS support:

import io.netty.channel.unix.DomainDatagramPacket;
import io.netty.channel.unix.DomainSocketAddress;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;

import java.io.File;

public class Application {

	public static void main(String[] args) {
		Connection server =
				UdpServer.create()
				         .bindAddress(Application::newDomainSocketAddress) (1)
				         .handle((in, out) ->
				             out.sendObject(
				                 in.receiveObject()
				                   .map(o -> {
				                       if (o instanceof DomainDatagramPacket) {
				                           DomainDatagramPacket p = (DomainDatagramPacket) o;
				                           return new DomainDatagramPacket(p.content().retain(), p.sender());
				                       }
				                       else {
				                           return Mono.error(new Exception("Unexpected type of the message: " + o));
				                       }
				                   })))
				         .bindNow();

		server.onDispose()
		      .block();
	}
1 Specifies DomainSocketAddress that will be used