QUIC Server
Reactor Netty provides the easy-to-use and easy-to-configure
QuicServer
.
It hides most of the Netty functionality that is required to create a QUIC
server
and add Reactive Streams backpressure.
1. Starting and Stopping
To start a QUIC
server, you must create and configure a
QuicServer instance.
By default, the host
is configured for any local address, and the system picks up an ephemeral port
when the bind
operation is invoked.
The following example shows how to create and start a QUIC
server:
import reactor.netty.Connection;
import reactor.netty.quic.QuicServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
QuicServer.create() (1)
.bindNow(Duration.ofSeconds(30)); (2)
server.onDispose()
.block();
}
}
1 | Creates a QuicServer
instance that is ready for configuring. |
2 | Starts the server in a blocking fashion and waits for it to finish initializing. |
The returned Connection
offers a simple server API, including disposeNow()
,
which shuts the server down in a blocking fashion.
1.1. Host and Port
To bind to a specific host
and port
, you can apply the following configuration to the QUIC
server:
import reactor.netty.Connection;
import reactor.netty.quic.QuicServer;
public class Application {
public static void main(String[] args) {
Connection server =
QuicServer.create()
.host("localhost") (1)
.port(8080) (2)
.bindNow();
server.onDispose()
.block();
}
}
1 | Configures the host to which this server should bind |
2 | Configures the port to which this server should bind |
2. Secure Connection
To configure the QUIC
server with SSL, you must apply the following configuration:
import io.netty.handler.codec.quic.InsecureQuicTokenHandler;
import io.netty.handler.codec.quic.QuicSslContext;
import io.netty.handler.codec.quic.QuicSslContextBuilder;
import reactor.netty.Connection;
import reactor.netty.quic.QuicServer;
import java.io.File;
public class Application {
public static void main(String[] args) {
File cert = new File("an X.509 certificate chain file in PEM format");
File key = new File("a PKCS#8 private key file in PEM format");
QuicSslContext serverCtx =
QuicSslContextBuilder.forServer(key, null, cert)
.applicationProtocols("http/1.1")
.build();
Connection server =
QuicServer.create()
.tokenHandler(InsecureQuicTokenHandler.INSTANCE)
.secure(serverCtx) (1)
.bindNow();
server.onDispose()
.block();
}
}
1 | Configures the SSL context to use for the secure connection |
3. Initial Settings
QUIC
allows configuring various settings for the connection. You can apply the following configuration:
import reactor.netty.Connection;
import reactor.netty.quic.QuicServer;
public class Application {
public static void main(String[] args) {
Connection server =
QuicServer.create()
.initialSettings(spec -> (1)
spec.maxData(10000000) (2)
.maxStreamDataBidirectionalLocal(1000000) (3)
.maxStreamDataBidirectionalRemote(1000000) (4)
.maxStreamDataUnidirectional(1000000) (5)
.maxStreamsBidirectional(100) (6)
.maxStreamsUnidirectional(100)) (7)
.bindNow();
server.onDispose()
.block();
}
}
1 | Configures the initial settings for the QUIC connection |
2 | Sets the maximum amount of data that can be sent on the connection |
3 | Sets the maximum amount of data that can be sent on a bidirectional stream initiated by the local endpoint |
4 | Sets the maximum amount of data that can be sent on a bidirectional stream initiated by the remote endpoint |
5 | Sets the maximum amount of data that can be sent on a unidirectional stream |
6 | Sets the maximum number of concurrent bidirectional streams |
7 | Sets the maximum number of concurrent unidirectional streams |
4. Handling Streams
To handle incoming streams, you can use the following approach:
import io.netty.handler.codec.quic.InsecureQuicTokenHandler;
import io.netty.handler.codec.quic.QuicSslContext;
import io.netty.handler.codec.quic.QuicSslContextBuilder;
import reactor.netty.Connection;
import reactor.netty.quic.QuicServer;
import java.io.File;
import java.time.Duration;
public class Application {
public static void main(String[] args) throws Exception {
File cert = new File("an X.509 certificate chain file in PEM format");
File key = new File("a PKCS#8 private key file in PEM format");
QuicSslContext serverCtx =
QuicSslContextBuilder.forServer(key, null, cert)
.applicationProtocols("http/1.1")
.build();
Connection server =
QuicServer.create()
.host("127.0.0.1")
.port(8080)
.tokenHandler(InsecureQuicTokenHandler.INSTANCE)
.secure(serverCtx)
.idleTimeout(Duration.ofSeconds(5))
.initialSettings(spec ->
spec.maxData(10000000)
.maxStreamDataBidirectionalRemote(1000000)
.maxStreamsBidirectional(100))
.handleStream((in, out) -> (1)
out.send(in.receive().retain())) (2)
.bindNow();
server.onDispose()
.block();
}
}
1 | Invoked when a new QUIC stream is opened |
2 | Sets up a handler for incoming streams |
5. Lifecycle Callbacks
The following lifecycle callbacks are provided to let you extend the QuicServer
:
Callback | Description |
---|---|
|
Invoked when the server is about to bind. |
|
Invoked when the server has been bound. |
|
Invoked when initializing the channel. |
|
Invoked when a new |
|
Invoked when the server has been unbound. |
The following example uses the doOnBound
and doOnConnection
callbacks:
import reactor.netty.Connection;
import reactor.netty.quic.QuicServer;
public class Application {
public static void main(String[] args) throws Exception {
Connection server =
QuicServer.create()
.doOnBound(connection -> System.out.println("Bound!")) (1)
.doOnConnection(quicConnection -> System.out.println("New connection!")) (2)
.bindNow();
server.onDispose()
.block();
}
}
1 | Invoked when the server has been bound |
2 | Invoked when a new QUIC connection has been established |
6. Connection Configuration
This section describes three kinds of configuration that you can use at the QUIC server level:
6.1. Channel Options
If you need additional options or need to change the current options, you can apply the following configuration:
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.quic.QLogConfiguration;
import io.netty.handler.codec.quic.QuicChannelOption;
import reactor.netty.Connection;
import reactor.netty.quic.QuicServer;
public class Application {
public static void main(String[] args) throws Exception {
Connection server =
QuicServer.create()
.option(ChannelOption.SO_BACKLOG, 1024) (1)
.option(QuicChannelOption.QLOG,
new QLogConfiguration("path", "logTitle", "logDescription")) (2)
.bindNow();
server.onDispose()
.block();
}
}
1 | Configures the backlog size |
2 | Enables QLOG for the QUIC connection |
You can find more about Netty channel options at the following links:
6.2. Wire Logger
Reactor Netty provides wire logging for when the traffic between the peers needs to be inspected.
By default, wire logging is disabled.
To enable it, you must set the logger reactor.netty.quic.QuicServer
level to DEBUG
and apply the following configuration:
import reactor.netty.Connection;
import reactor.netty.quic.QuicServer;
public class Application {
public static void main(String[] args) throws Exception {
Connection server =
QuicServer.create()
.wiretap(true) (1)
.bindNow();
server.onDispose()
.block();
}
}
1 | Enables the wire logging |
6.2.1. Wire Logger formatters
Reactor Netty supports 3 different formatters:
-
AdvancedByteBufFormat#HEX_DUMP - the default
/**
* When wire logging is enabled with this format, both events and content will be logged.
* The content will be in hex format.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] REGISTERED
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] ACTIVE
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] READ: 145B
* +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 50 4f 53 54 20 2f 74 65 73 74 2f 57 6f 72 6c 64 |POST /test/World|
* |00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 43 6f 6e 74 65 | HTTP/1.1..Conte|
* |00000020| 6e 74 2d 54 79 70 65 3a 20 74 65 78 74 2f 70 6c |nt-Type: text/pl|
* |00000030| 61 69 6e 0d 0a 75 73 65 72 2d 61 67 65 6e 74 3a |ain..user-agent:|
* |00000040| 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f 64 65 | ReactorNetty/de|
* ...
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] WRITE: 38B
* +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
* |00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
* |00000020| 20 30 0d 0a 0d 0a | 0.... |
* +--------+-------------------------------------------------+----------------+
* }
* </pre>
*/
/**
* When wire logging is enabled with this format, only the events will be logged.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] REGISTERED
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] ACTIVE
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] READ: 145B
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] WRITE: 38B
* }
* </pre>
*/
/**
* When wire logging is enabled with this format, both events and content will be logged.
* The content will be in plain text format.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] REGISTERED
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] ACTIVE
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] READ: 145B POST /test/World HTTP/1.1
* Content-Type: text/plain
* user-agent: ReactorNetty/dev
* ...
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] WRITE: 38B HTTP/1.1 200 OK
* content-length: 0
* }
* </pre>
*/
When you need to change the default formatter you can configure it as follows:
import io.netty.handler.logging.LogLevel;
import reactor.netty.Connection;
import reactor.netty.quic.QuicServer;
import reactor.netty.transport.logging.AdvancedByteBufFormat;
public class Application {
public static void main(String[] args) throws Exception {
Connection server =
QuicServer.create()
.wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) (1)
.bindNow();
server.onDispose()
.block();
}
}
1 | Enables the wire logging, AdvancedByteBufFormat#TEXTUAL is used for printing the content. |
6.3. Event Loop Group
By default Reactor Netty
uses an “Event Loop Group”, where the number of the worker threads equals the number of
processors available to the runtime on initialization (but with a minimum value of 4). This “Event Loop Group” is shared between all servers and clients in one JVM.
When you need a different configuration, you can use one of the LoopResources
#create
methods.
The following listing shows the default configuration for the Event Loop Group:
/**
* Default worker thread count, fallback to available processor
* (but with a minimum value of 4).
*/
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
* Default selector thread count, fallback to -1 (no selector thread)
* <p><strong>Note:</strong> In most use cases using a worker thread also as a selector thread works well.
* A possible use case for specifying a separate selector thread might be when the worker threads are too busy
* and connections cannot be accepted fast enough.
* <p><strong>Note:</strong> Although more than 1 can be configured as a selector thread count, in reality
* only 1 thread will be used as a selector thread.
*/
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
* Default worker thread count for UDP, fallback to available processor
* (but with a minimum value of 4).
*/
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
/**
* Default quiet period that guarantees that the disposal of the underlying LoopResources
* will not happen, fallback to 2 seconds.
*/
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/**
* Default maximum amount of time to wait until the disposal of the underlying LoopResources
* regardless if a task was submitted during the quiet period, fallback to 15 seconds.
*/
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";
/**
* Default value whether the native transport (epoll, kqueue) will be preferred,
* fallback it will be preferred when available.
*/
public static final String NATIVE = "reactor.netty.native";
If you need changes to these settings, you can apply the following configuration:
import reactor.netty.Connection;
import reactor.netty.quic.QuicServer;
import reactor.netty.resources.LoopResources;
public class Application {
public static void main(String[] args) throws Exception {
LoopResources loop = LoopResources.create("event-loop", 1, 4, true);
Connection server =
QuicServer.create()
.runOn(loop)
.bindNow();
server.onDispose()
.block();
}
}
6.3.1. Disposing Event Loop Group
-
If you use the default
Event Loop Group
provided by Reactor Netty, invokeHttpResources
#disposeLoopsAndConnections
/#disposeLoopsAndConnectionsLater
method.
Disposing HttpResources means that every server/client that is using it, will not be able to use it anymore!
|
-
If you use custom
LoopResources
, invokeLoopResources
#dispose
/#disposeLater
method.
Disposing the custom LoopResources means that every server/client that is configured to use it, will not be able to use it anymore!
|
7. Metrics
When QUIC
server metrics are needed for an integration with a system as Micrometer
,
you can provide your own metrics recorder, as follows:
import reactor.netty.Connection;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.quic.QuicServer;
import java.net.SocketAddress;
import java.time.Duration;
public class Application {
public static void main(String[] args) throws Exception {
Connection server =
QuicServer.create()
.metrics(true, CustomChannelMetricsRecorder::new) (1)
.bindNow();
server.onDispose()
.block();
}
1 | Enables QUIC server metrics and provides ChannelMetricsRecorder implementation. |