UDP Server
Reactor Netty provides the easy-to-use and easy-to-configure
UdpServer
.
It hides most of the Netty functionality that is required to create a UDP
server
and adds Reactive Streams
backpressure.
1. Starting and Stopping
To start a UDP server, a UdpServer
instance has to be created and configured.
By default, the host is configured to be localhost
and the port is 12012
.
The following example shows how to create and start a UDP server:
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create() (1)
.bindNow(Duration.ofSeconds(30)); (2)
server.onDispose()
.block();
}
}
1 | Creates a UdpServer
instance that is ready for configuring. |
2 | Starts the server in a blocking fashion and waits for it to finish initializing. |
The returned Connection
offers a simple server API, including disposeNow()
,
which shuts the server down in a blocking fashion.
1.1. Host and Port
In order to serve on a specific host and port, you can apply the following configuration to the UDP
server:
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.host("localhost") (1)
.port(8080) (2)
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
1 | Configures the UDP server host |
2 | Configures the UDP server port |
The port can be specified also with PORT environment variable. |
2. Eager Initialization
By default, the initialization of the UdpServer
resources happens on demand. This means that the bind
operation
absorbs the extra time needed to initialize and load:
-
the event loop group
-
the native transport libraries (when native transport is used)
When you need to preload these resources, you can configure the UdpServer
as follows:
import io.netty.channel.socket.DatagramPacket;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
UdpServer udpServer =
UdpServer.create()
.handle((in, out) ->
out.sendObject(
in.receiveObject()
.map(o -> {
if (o instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) o;
return new DatagramPacket(p.content().retain(), p.sender());
}
else {
return Mono.error(new Exception("Unexpected type of the message: " + o));
}
})));
udpServer.warmup() (1)
.block();
Connection server = udpServer.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
1 | Initialize and load the event loop group and the native transport libraries |
3. Writing Data
To send data to the remote peer, you must attach an I/O handler.
The I/O handler has access to UdpOutbound
,
to be able to write data.
The following example shows how to send hello
:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.handle((in, out) ->
out.sendObject(
in.receiveObject()
.map(o -> {
if (o instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) o;
ByteBuf buf = Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8);
return new DatagramPacket(buf, p.sender()); (1)
}
else {
return Mono.error(new Exception("Unexpected type of the message: " + o));
}
})))
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
1 | Sends a hello string to the remote peer |
4. Consuming Data
To receive data from a remote peer, you must attach an I/O handler.
The I/O handler has access to UdpInbound
,
to be able to read data.
The following example shows how to consume data:
import io.netty.channel.socket.DatagramPacket;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.handle((in, out) ->
out.sendObject(
in.receiveObject()
.map(o -> {
if (o instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) o;
return new DatagramPacket(p.content().retain(), p.sender()); (1)
}
else {
return Mono.error(new Exception("Unexpected type of the message: " + o));
}
})))
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
1 | Receives data from the remote peer |
5. Lifecycle Callbacks
The following lifecycle callbacks are provided to let you extend the UdpServer
:
Callback | Description |
---|---|
|
Invoked when the server channel is about to bind. |
|
Invoked when the server channel is bound. |
|
Invoked when initializing the channel. |
|
Invoked when the server channel is unbound. |
The following example uses the doOnBound
and doOnChannelInit
callbacks:
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.doOnBound(conn -> conn.addHandlerLast(new LineBasedFrameDecoder(8192))) (1)
.doOnChannelInit((observer, channel, remoteAddress) ->
channel.pipeline()
.addFirst(new LoggingHandler("reactor.netty.examples"))) (2)
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
1 | Netty pipeline is extended with LineBasedFrameDecoder when the server channel is bound. |
2 | Netty pipeline is extended with LoggingHandler when initializing the channel. |
6. Connection Configuration
This section describes three kinds of configuration that you can use at the UDP level:
6.1. Channel Options
By default, the UDP
server is configured with the following options:
UdpServerBind() {
this.config = new UdpServerConfig(
Collections.singletonMap(ChannelOption.AUTO_READ, false),
() -> new InetSocketAddress(NetUtil.LOCALHOST, DEFAULT_PORT));
}
If you need additional options or need to change the current options, you can apply the following configuration:
import io.netty.channel.ChannelOption;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
For more information about Netty channel options, see the following links:
6.2. Wire Logger
Reactor Netty provides wire logging for when the traffic between the peers needs to be inspected.
By default, wire logging is disabled.
To enable it, you must set the logger reactor.netty.udp.UdpServer
level to DEBUG
and apply the following configuration:
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.wiretap(true) (1)
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
1 | Enables the wire logging |
6.2.1. Wire Logger formatters
Reactor Netty supports 3 different formatters:
-
AdvancedByteBufFormat#HEX_DUMP - the default
/**
* When wire logging is enabled with this format, both events and content will be logged.
* The content will be in hex format.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] REGISTERED
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] ACTIVE
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] READ: 145B
* +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 50 4f 53 54 20 2f 74 65 73 74 2f 57 6f 72 6c 64 |POST /test/World|
* |00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 43 6f 6e 74 65 | HTTP/1.1..Conte|
* |00000020| 6e 74 2d 54 79 70 65 3a 20 74 65 78 74 2f 70 6c |nt-Type: text/pl|
* |00000030| 61 69 6e 0d 0a 75 73 65 72 2d 61 67 65 6e 74 3a |ain..user-agent:|
* |00000040| 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f 64 65 | ReactorNetty/de|
* ...
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] WRITE: 38B
* +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
* |00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
* |00000020| 20 30 0d 0a 0d 0a | 0.... |
* +--------+-------------------------------------------------+----------------+
* }
* </pre>
*/
/**
* When wire logging is enabled with this format, only the events will be logged.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] REGISTERED
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] ACTIVE
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] READ: 145B
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] WRITE: 38B
* }
* </pre>
*/
/**
* When wire logging is enabled with this format, both events and content will be logged.
* The content will be in plain text format.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] REGISTERED
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] ACTIVE
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] READ: 145B POST /test/World HTTP/1.1
* Content-Type: text/plain
* user-agent: ReactorNetty/dev
* ...
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] WRITE: 38B HTTP/1.1 200 OK
* content-length: 0
* }
* </pre>
*/
When you need to change the default formatter you can configure it as follows:
import io.netty.handler.logging.LogLevel;
import reactor.netty.Connection;
import reactor.netty.transport.logging.AdvancedByteBufFormat;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) (1)
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
1 | Enables the wire logging, AdvancedByteBufFormat#TEXTUAL is used for printing the content. |
6.3. Event Loop Group
By default Reactor Netty
uses an “Event Loop Group”, where the number of the worker threads equals the number of
processors available to the runtime on initialization (but with a minimum value of 4). This “Event Loop Group” is shared between all servers and clients in one JVM.
When you need a different configuration, you can use one of the LoopResources
#create
methods.
The following listing shows the default configuration for the Event Loop Group:
/**
* Default worker thread count, fallback to available processor
* (but with a minimum value of 4).
*/
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
* Default selector thread count, fallback to -1 (no selector thread)
* <p><strong>Note:</strong> In most use cases using a worker thread also as a selector thread works well.
* A possible use case for specifying a separate selector thread might be when the worker threads are too busy
* and connections cannot be accepted fast enough.
* <p><strong>Note:</strong> Although more than 1 can be configured as a selector thread count, in reality
* only 1 thread will be used as a selector thread.
*/
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
* Default worker thread count for UDP, fallback to available processor
* (but with a minimum value of 4).
*/
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
/**
* Default quiet period that guarantees that the disposal of the underlying LoopResources
* will not happen, fallback to 2 seconds.
*/
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/**
* Default maximum amount of time to wait until the disposal of the underlying LoopResources
* regardless if a task was submitted during the quiet period, fallback to 15 seconds.
*/
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";
/**
* Default value whether the native transport (epoll, kqueue) will be preferred,
* fallback it will be preferred when available.
*/
public static final String NATIVE = "reactor.netty.native";
If you need changes to these settings, you can apply the following configuration:
import reactor.netty.Connection;
import reactor.netty.resources.LoopResources;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
LoopResources loop = LoopResources.create("event-loop", 1, 4, true);
Connection server =
UdpServer.create()
.runOn(loop)
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
6.3.1. Disposing Event Loop Group
-
If you use the default
Event Loop Group
provided by Reactor Netty, invokeHttpResources
#disposeLoopsAndConnections
/#disposeLoopsAndConnectionsLater
method.
Disposing HttpResources means that every server/client that is using it, will not be able to use it anymore!
|
-
If you use custom
LoopResources
, invokeLoopResources
#dispose
/#disposeLater
method.
Disposing the custom LoopResources means that every server/client that is configured to use it, will not be able to use it anymore!
|
7. Metrics
The UDP server supports built-in integration with Micrometer
.
It exposes all metrics with a prefix of reactor.netty.udp.server
.
The following table provides information for the UDP server metrics:
metric name | type | description |
---|---|---|
reactor.netty.udp.server.data.received |
DistributionSummary |
Amount of the data received, in bytes. See Data Received |
reactor.netty.udp.server.data.sent |
DistributionSummary |
Amount of the data sent, in bytes. See Data Sent |
reactor.netty.udp.server.errors |
Counter |
Number of errors that occurred. See Errors Count |
These additional metrics are also available:
ByteBufAllocator
metrics
metric name | type | description |
---|---|---|
reactor.netty.bytebuf.allocator.used.heap.memory |
Gauge |
The number of bytes reserved by heap buffer allocator. See Used Heap Memory |
reactor.netty.bytebuf.allocator.used.direct.memory |
Gauge |
The number of bytes reserved by direct buffer allocator. See Used Direct Memory |
reactor.netty.bytebuf.allocator.heap.arenas |
Gauge |
The number of heap arenas (when |
reactor.netty.bytebuf.allocator.direct.arenas |
Gauge |
The number of direct arenas (when |
reactor.netty.bytebuf.allocator.threadlocal.caches |
Gauge |
The number of thread local caches (when |
reactor.netty.bytebuf.allocator.small.cache.size |
Gauge |
The size of the small cache (when |
reactor.netty.bytebuf.allocator.normal.cache.size |
Gauge |
The size of the normal cache (when |
reactor.netty.bytebuf.allocator.chunk.size |
Gauge |
The chunk size for an arena (when |
reactor.netty.bytebuf.allocator.active.heap.memory |
Gauge |
The actual bytes consumed by in-use buffers allocated from heap buffer pools (when |
reactor.netty.bytebuf.allocator.active.direct.memory |
Gauge |
The actual bytes consumed by in-use buffers allocated from direct buffer pools (when |
EventLoop
metrics
metric name | type | description |
---|---|---|
reactor.netty.eventloop.pending.tasks |
Gauge |
The number of tasks that are pending for processing on an event loop. See Pending Tasks |
The following example enables that integration:
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.metrics(true) (1)
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
}
1 | Enables the built-in integration with Micrometer |
When UDP server metrics are needed for an integration with a system other than Micrometer
or you want
to provide your own integration with Micrometer
, you can provide your own metrics recorder, as follows:
import reactor.netty.Connection;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.udp.UdpServer;
import java.net.SocketAddress;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.metrics(true, CustomChannelMetricsRecorder::new) (1)
.bindNow(Duration.ofSeconds(30));
server.onDispose()
.block();
}
1 | Enables UDP server metrics and provides ChannelMetricsRecorder implementation. |
8. Unix Domain Sockets
The UdpServer
supports Unix Domain Datagram Sockets (UDS) when native transport is in use.
The following example shows how to use UDS support:
import io.netty.channel.unix.DomainDatagramPacket;
import io.netty.channel.unix.DomainSocketAddress;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.io.File;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create()
.bindAddress(Application::newDomainSocketAddress) (1)
.handle((in, out) ->
out.sendObject(
in.receiveObject()
.map(o -> {
if (o instanceof DomainDatagramPacket) {
DomainDatagramPacket p = (DomainDatagramPacket) o;
return new DomainDatagramPacket(p.content().retain(), p.sender());
}
else {
return Mono.error(new Exception("Unexpected type of the message: " + o));
}
})))
.bindNow();
server.onDispose()
.block();
}
1 | Specifies DomainSocketAddress that will be used |