QUIC Client
Reactor Netty provides the easy-to-use and easy-to-configure
QuicClient
.
It hides most of the Netty functionality that is required to create a QUIC
client
and add Reactive Streams backpressure.
1. Connecting and Disconnecting
To connect the QUIC
client to a given endpoint, you must create and configure a
QuicClient instance.
By default, the host is configured for localhost
and the port is 12012
.
The following example shows how to create and connect a QUIC
client:
import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
QuicClient.create() (1)
.connectNow(Duration.ofSeconds(30)); (2)
connection.onDispose()
.block();
}
}
1 | Creates a QuicClient
instance that is ready for configuring. |
2 | Connects the client in a blocking fashion and waits for it to finish initializing. |
The returned QuicConnection
offers a simple connection API, including disposeNow()
,
which shuts the client down in a blocking fashion.
1.1. Host and Port
To connect to a specific host
and port
, you can apply the following configuration to the QUIC
client:
import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;
import java.net.InetSocketAddress;
public class Application {
public static void main(String[] args) {
Connection connection =
QuicClient.create()
.remoteAddress(() -> new InetSocketAddress("127.0.0.1", 8080)) (1)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Configures the host and port to which this client should connect |
The port can be specified also with QUIC_PORT environment variable. |
2. Secure Connection
To configure the QUIC
client with SSL, you must apply the following configuration:
import io.netty.handler.codec.quic.QuicSslContext;
import io.netty.handler.codec.quic.QuicSslContextBuilder;
import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;
public class Application {
public static void main(String[] args) {
QuicSslContext clientCtx =
QuicSslContextBuilder.forClient()
.applicationProtocols("http/1.1")
.build();
Connection connection =
QuicClient.create()
.secure(clientCtx) (1)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Configures the SSL context to use for the secure connection |
3. Initial Settings
QUIC
allows configuring various settings for the connection. You can apply the following configuration:
import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;
public class Application {
public static void main(String[] args) {
Connection connection =
QuicClient.create()
.initialSettings(spec -> (1)
spec.maxData(10000000) (2)
.maxStreamDataBidirectionalLocal(1000000) (3)
.maxStreamDataBidirectionalRemote(1000000) (4)
.maxStreamDataUnidirectional(1000000) (5)
.maxStreamsBidirectional(100) (6)
.maxStreamsUnidirectional(100)) (7)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Configures the initial settings for the QUIC connection |
2 | Sets the maximum amount of data that can be sent on the connection |
3 | Sets the maximum amount of data that can be sent on a bidirectional stream initiated by the local endpoint |
4 | Sets the maximum amount of data that can be sent on a bidirectional stream initiated by the remote endpoint |
5 | Sets the maximum amount of data that can be sent on a unidirectional stream |
6 | Sets the maximum number of concurrent bidirectional streams |
7 | Sets the maximum number of concurrent unidirectional streams |
4. Opening Streams
To open a stream to the remote peer, you can use the following approach:
import io.netty.handler.codec.quic.QuicSslContext;
import io.netty.handler.codec.quic.QuicSslContextBuilder;
import reactor.core.publisher.Mono;
import reactor.netty.quic.QuicClient;
import reactor.netty.quic.QuicConnection;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
public class Application {
public static void main(String[] args) throws InterruptedException {
QuicSslContext clientCtx =
QuicSslContextBuilder.forClient()
.applicationProtocols("http/1.1")
.build();
QuicConnection connection =
QuicClient.create()
.bindAddress(() -> new InetSocketAddress(0))
.remoteAddress(() -> new InetSocketAddress("127.0.0.1", 8080))
.secure(clientCtx)
.idleTimeout(Duration.ofSeconds(5))
.initialSettings(spec ->
spec.maxData(10000000)
.maxStreamDataBidirectionalLocal(1000000))
.connectNow();
CountDownLatch latch = new CountDownLatch(1);
connection.createStream((in, out) -> (1)
out.sendString(Mono.just("Hello")) (2)
.then(in.receive() (3)
.asString()
.doOnNext(s -> {
System.out.println("CLIENT RECEIVED: " + s);
latch.countDown();
})
.then()))
.subscribe();
latch.await();
connection.onDispose()
.block();
}
}
1 | Creates a new bidirectional stream |
2 | Sends data to the remote peer |
3 | Receives data from the remote peer |
5. Lifecycle Callbacks
The following lifecycle callbacks are provided to let you extend the QuicClient
:
Callback | Description |
---|---|
|
Invoked when the client is about to bind. |
|
Invoked when the client has been bound. |
|
Invoked when initializing the channel. |
|
Invoked when the client is about to connect to the remote address. |
|
Invoked after the client has been connected. |
|
Invoked after the client has been disconnected. |
|
Invoked when the client has been unbound. |
The following example uses the doOnConnected
callback:
import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;
public class Application {
public static void main(String[] args) throws InterruptedException {
Connection connection =
QuicClient.create()
.doOnConnected(quicConnection -> System.out.println("Connected!")) (1)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Invoked after the client has been connected |
6. Connection Configuration
This section describes three kinds of configuration that you can use at the QUIC
client level:
6.1. Channel Options
If you need additional options or need to change the current options, you can apply the following configuration:
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.quic.QLogConfiguration;
import io.netty.handler.codec.quic.QuicChannelOption;
import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;
public class Application {
public static void main(String[] args) throws InterruptedException {
Connection connection =
QuicClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) (1)
.option(QuicChannelOption.QLOG,
new QLogConfiguration("path", "logTitle", "logDescription")) (2)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Configures the connect timeout |
2 | Enables QLOG for the QUIC connection |
You can find more about Netty channel options at the following links:
6.2. Wire Logger
Reactor Netty provides wire logging for when the traffic between the peers needs to be inspected.
By default, wire logging is disabled.
To enable it, you must set the logger reactor.netty.quic.QuicClient
level to DEBUG
and apply the following configuration:
import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;
public class Application {
public static void main(String[] args) {
Connection connection =
QuicClient.create()
.wiretap(true) (1)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Enables the wire logging |
6.2.1. Wire Logger formatters
Reactor Netty supports 3 different formatters:
-
AdvancedByteBufFormat#HEX_DUMP - the default
/**
* When wire logging is enabled with this format, both events and content will be logged.
* The content will be in hex format.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] REGISTERED
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] ACTIVE
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] READ: 145B
* +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 50 4f 53 54 20 2f 74 65 73 74 2f 57 6f 72 6c 64 |POST /test/World|
* |00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 43 6f 6e 74 65 | HTTP/1.1..Conte|
* |00000020| 6e 74 2d 54 79 70 65 3a 20 74 65 78 74 2f 70 6c |nt-Type: text/pl|
* |00000030| 61 69 6e 0d 0a 75 73 65 72 2d 61 67 65 6e 74 3a |ain..user-agent:|
* |00000040| 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f 64 65 | ReactorNetty/de|
* ...
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] WRITE: 38B
* +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
* |00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
* |00000020| 20 30 0d 0a 0d 0a | 0.... |
* +--------+-------------------------------------------------+----------------+
* }
* </pre>
*/
/**
* When wire logging is enabled with this format, only the events will be logged.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] REGISTERED
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] ACTIVE
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] READ: 145B
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] WRITE: 38B
* }
* </pre>
*/
/**
* When wire logging is enabled with this format, both events and content will be logged.
* The content will be in plain text format.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] REGISTERED
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] ACTIVE
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] READ: 145B POST /test/World HTTP/1.1
* Content-Type: text/plain
* user-agent: ReactorNetty/dev
* ...
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] WRITE: 38B HTTP/1.1 200 OK
* content-length: 0
* }
* </pre>
*/
When you need to change the default formatter you can configure it as follows:
import io.netty.handler.logging.LogLevel;
import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;
import reactor.netty.transport.logging.AdvancedByteBufFormat;
public class Application {
public static void main(String[] args) {
Connection connection =
QuicClient.create()
.wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) (1)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Enables the wire logging, AdvancedByteBufFormat#TEXTUAL is used for printing the content. |
6.3. Event Loop Group
By default Reactor Netty
uses an “Event Loop Group”, where the number of the worker threads equals the number of
processors available to the runtime on initialization (but with a minimum value of 4). This “Event Loop Group” is shared between all servers and clients in one JVM.
When you need a different configuration, you can use one of the LoopResources
#create
methods.
The following listing shows the default configuration for the Event Loop Group:
/**
* Default worker thread count, fallback to available processor
* (but with a minimum value of 4).
*/
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
* Default selector thread count, fallback to -1 (no selector thread)
* <p><strong>Note:</strong> In most use cases using a worker thread also as a selector thread works well.
* A possible use case for specifying a separate selector thread might be when the worker threads are too busy
* and connections cannot be accepted fast enough.
* <p><strong>Note:</strong> Although more than 1 can be configured as a selector thread count, in reality
* only 1 thread will be used as a selector thread.
*/
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
* Default worker thread count for UDP, fallback to available processor
* (but with a minimum value of 4).
*/
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
/**
* Default quiet period that guarantees that the disposal of the underlying LoopResources
* will not happen, fallback to 2 seconds.
*/
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/**
* Default maximum amount of time to wait until the disposal of the underlying LoopResources
* regardless if a task was submitted during the quiet period, fallback to 15 seconds.
*/
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";
/**
* Default value whether the native transport (epoll, kqueue) will be preferred,
* fallback it will be preferred when available.
*/
public static final String NATIVE = "reactor.netty.native";
If you need changes to these settings, you can apply the following configuration:
import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;
import reactor.netty.resources.LoopResources;
public class Application {
public static void main(String[] args) {
LoopResources loop = LoopResources.create("event-loop", 1, 4, true);
Connection connection =
QuicClient.create()
.runOn(loop)
.connectNow();
connection.onDispose()
.block();
}
}
6.3.1. Disposing Event Loop Group
-
If you use the default
Event Loop Group
provided by Reactor Netty, invokeHttpResources
#disposeLoopsAndConnections
/#disposeLoopsAndConnectionsLater
method.
Disposing HttpResources means that every server/client that is using it, will not be able to use it anymore!
|
-
If you use custom
LoopResources
, invokeLoopResources
#dispose
/#disposeLater
method.
Disposing the custom LoopResources means that every server/client that is configured to use it, will not be able to use it anymore!
|
7. Metrics
When QUIC
client metrics are needed for an integration with a system as Micrometer
,
you can provide your own metrics recorder, as follows:
import reactor.netty.Connection;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.quic.QuicClient;
import java.net.SocketAddress;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
QuicClient.create()
.metrics(true, CustomChannelMetricsRecorder::new) (1)
.connectNow();
connection.onDispose()
.block();
}
1 | Enables QUIC client metrics and provides ChannelMetricsRecorder implementation. |