QUIC Client

Reactor Netty provides the easy-to-use and easy-to-configure QuicClient. It hides most of the Netty functionality that is required to create a QUIC client and add Reactive Streams backpressure.

1. Connecting and Disconnecting

To connect the QUIC client to a given endpoint, you must create and configure a QuicClient instance. By default, the host is configured for localhost and the port is 12012. The following example shows how to create and connect a QUIC client:

import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				QuicClient.create()                            (1)
				          .connectNow(Duration.ofSeconds(30)); (2)

		connection.onDispose()
		          .block();
	}
}
1 Creates a QuicClient instance that is ready for configuring.
2 Connects the client in a blocking fashion and waits for it to finish initializing.

The returned QuicConnection offers a simple connection API, including disposeNow(), which shuts the client down in a blocking fashion.

1.1. Host and Port

To connect to a specific host and port, you can apply the following configuration to the QUIC client:

import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;

import java.net.InetSocketAddress;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				QuicClient.create()
				          .remoteAddress(() -> new InetSocketAddress("127.0.0.1", 8080)) (1)
				          .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Configures the host and port to which this client should connect
The port can be specified also with QUIC_PORT environment variable.

2. Secure Connection

To configure the QUIC client with SSL, you must apply the following configuration:

import io.netty.handler.codec.quic.QuicSslContext;
import io.netty.handler.codec.quic.QuicSslContextBuilder;
import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;

public class Application {

	public static void main(String[] args) {
		QuicSslContext clientCtx =
				QuicSslContextBuilder.forClient()
				                     .applicationProtocols("http/1.1")
				                     .build();

		Connection connection =
				QuicClient.create()
				          .secure(clientCtx) (1)
				          .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Configures the SSL context to use for the secure connection

3. Initial Settings

QUIC allows configuring various settings for the connection. You can apply the following configuration:

import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				QuicClient.create()
				          .initialSettings(spec ->                           (1)
				              spec.maxData(10000000)                         (2)
				                  .maxStreamDataBidirectionalLocal(1000000)  (3)
				                  .maxStreamDataBidirectionalRemote(1000000) (4)
				                  .maxStreamDataUnidirectional(1000000)      (5)
				                  .maxStreamsBidirectional(100)              (6)
				                  .maxStreamsUnidirectional(100))            (7)
				          .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Configures the initial settings for the QUIC connection
2 Sets the maximum amount of data that can be sent on the connection
3 Sets the maximum amount of data that can be sent on a bidirectional stream initiated by the local endpoint
4 Sets the maximum amount of data that can be sent on a bidirectional stream initiated by the remote endpoint
5 Sets the maximum amount of data that can be sent on a unidirectional stream
6 Sets the maximum number of concurrent bidirectional streams
7 Sets the maximum number of concurrent unidirectional streams

4. Opening Streams

To open a stream to the remote peer, you can use the following approach:

import io.netty.handler.codec.quic.QuicSslContext;
import io.netty.handler.codec.quic.QuicSslContextBuilder;
import reactor.core.publisher.Mono;
import reactor.netty.quic.QuicClient;
import reactor.netty.quic.QuicConnection;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;

public class Application {

	public static void main(String[] args) throws InterruptedException {
		QuicSslContext clientCtx =
				QuicSslContextBuilder.forClient()
				                     .applicationProtocols("http/1.1")
				                     .build();

		QuicConnection connection =
				QuicClient.create()
				          .bindAddress(() -> new InetSocketAddress(0))
				          .remoteAddress(() -> new InetSocketAddress("127.0.0.1", 8080))
				          .secure(clientCtx)
				          .idleTimeout(Duration.ofSeconds(5))
				          .initialSettings(spec ->
				              spec.maxData(10000000)
				                  .maxStreamDataBidirectionalLocal(1000000))
				          .connectNow();

		CountDownLatch latch = new CountDownLatch(1);
		connection.createStream((in, out) ->             (1)
		              out.sendString(Mono.just("Hello")) (2)
		                 .then(in.receive()              (3)
		                         .asString()
		                         .doOnNext(s -> {
		                             System.out.println("CLIENT RECEIVED: " + s);
		                             latch.countDown();
		                         })
		                         .then()))
		          .subscribe();

		latch.await();

		connection.onDispose()
		          .block();
	}
}
1 Creates a new bidirectional stream
2 Sends data to the remote peer
3 Receives data from the remote peer

5. Lifecycle Callbacks

The following lifecycle callbacks are provided to let you extend the QuicClient:

Callback Description

doOnBind

Invoked when the client is about to bind.

doOnBound

Invoked when the client has been bound.

doOnChannelInit

Invoked when initializing the channel.

doOnConnect

Invoked when the client is about to connect to the remote address.

doOnConnected

Invoked after the client has been connected.

doOnDisconnected

Invoked after the client has been disconnected.

doOnUnbound

Invoked when the client has been unbound.

The following example uses the doOnConnected callback:

import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;

public class Application {

	public static void main(String[] args) throws InterruptedException {
		Connection connection =
				QuicClient.create()
				          .doOnConnected(quicConnection -> System.out.println("Connected!")) (1)
				          .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Invoked after the client has been connected

6. Connection Configuration

This section describes three kinds of configuration that you can use at the QUIC client level:

6.1. Channel Options

If you need additional options or need to change the current options, you can apply the following configuration:

import io.netty.channel.ChannelOption;
import io.netty.handler.codec.quic.QLogConfiguration;
import io.netty.handler.codec.quic.QuicChannelOption;
import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;

public class Application {

	public static void main(String[] args) throws InterruptedException {
		Connection connection =
				QuicClient.create()
				          .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)             (1)
				          .option(QuicChannelOption.QLOG,
				              new QLogConfiguration("path", "logTitle", "logDescription")) (2)
				          .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Configures the connect timeout
2 Enables QLOG for the QUIC connection

You can find more about Netty channel options at the following links:

6.2. Wire Logger

Reactor Netty provides wire logging for when the traffic between the peers needs to be inspected. By default, wire logging is disabled. To enable it, you must set the logger reactor.netty.quic.QuicClient level to DEBUG and apply the following configuration:

import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				QuicClient.create()
				          .wiretap(true) (1)
				          .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Enables the wire logging

6.2.1. Wire Logger formatters

Reactor Netty supports 3 different formatters:

	/**
	 * When wire logging is enabled with this format, both events and content will be logged.
	 * The content will be in hex format.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] REGISTERED
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] ACTIVE
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] READ: 145B
	 *          +-------------------------------------------------+
	 *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
	 * +--------+-------------------------------------------------+----------------+
	 * |00000000| 50 4f 53 54 20 2f 74 65 73 74 2f 57 6f 72 6c 64 |POST /test/World|
	 * |00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 43 6f 6e 74 65 | HTTP/1.1..Conte|
	 * |00000020| 6e 74 2d 54 79 70 65 3a 20 74 65 78 74 2f 70 6c |nt-Type: text/pl|
	 * |00000030| 61 69 6e 0d 0a 75 73 65 72 2d 61 67 65 6e 74 3a |ain..user-agent:|
	 * |00000040| 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f 64 65 | ReactorNetty/de|
	 * ...
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] WRITE: 38B
	 *          +-------------------------------------------------+
	 *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
	 * +--------+-------------------------------------------------+----------------+
	 * |00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
	 * |00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
	 * |00000020| 20 30 0d 0a 0d 0a                               | 0....          |
	 * +--------+-------------------------------------------------+----------------+
	 * }
	 * </pre>
	 */
	/**
	 * When wire logging is enabled with this format, only the events will be logged.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] REGISTERED
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] ACTIVE
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] READ: 145B
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] WRITE: 38B
	 * }
	 * </pre>
	 */
	/**
	 * When wire logging is enabled with this format, both events and content will be logged.
	 * The content will be in plain text format.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] REGISTERED
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] ACTIVE
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] READ: 145B POST /test/World HTTP/1.1
	 * Content-Type: text/plain
	 * user-agent: ReactorNetty/dev
	 * ...
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] WRITE: 38B HTTP/1.1 200 OK
	 * content-length: 0
	 * }
	 * </pre>
	 */

When you need to change the default formatter you can configure it as follows:

import io.netty.handler.logging.LogLevel;
import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;
import reactor.netty.transport.logging.AdvancedByteBufFormat;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				QuicClient.create()
				          .wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) (1)
				          .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Enables the wire logging, AdvancedByteBufFormat#TEXTUAL is used for printing the content.

6.3. Event Loop Group

By default Reactor Netty uses an “Event Loop Group”, where the number of the worker threads equals the number of processors available to the runtime on initialization (but with a minimum value of 4). This “Event Loop Group” is shared between all servers and clients in one JVM. When you need a different configuration, you can use one of the LoopResources#create methods.

The following listing shows the default configuration for the Event Loop Group:

	/**
	 * Default worker thread count, fallback to available processor
	 * (but with a minimum value of 4).
	 */
	public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
	/**
	 * Default selector thread count, fallback to -1 (no selector thread)
	 * <p><strong>Note:</strong> In most use cases using a worker thread also as a selector thread works well.
	 * A possible use case for specifying a separate selector thread might be when the worker threads are too busy
	 * and connections cannot be accepted fast enough.
	 * <p><strong>Note:</strong> Although more than 1 can be configured as a selector thread count, in reality
	 * only 1 thread will be used as a selector thread.
	 */
	public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
	/**
	 * Default worker thread count for UDP, fallback to available processor
	 * (but with a minimum value of 4).
	 */
	public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
	/**
	 * Default quiet period that guarantees that the disposal of the underlying LoopResources
	 * will not happen, fallback to 2 seconds.
	 */
	public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
	/**
	 * Default maximum amount of time to wait until the disposal of the underlying LoopResources
	 * regardless if a task was submitted during the quiet period, fallback to 15 seconds.
	 */
	public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";

	/**
	 * Default value whether the native transport (epoll, kqueue) will be preferred,
	 * fallback it will be preferred when available.
	 */
	public static final String NATIVE = "reactor.netty.native";

If you need changes to these settings, you can apply the following configuration:

import reactor.netty.Connection;
import reactor.netty.quic.QuicClient;
import reactor.netty.resources.LoopResources;

public class Application {

	public static void main(String[] args) {
		LoopResources loop = LoopResources.create("event-loop", 1, 4, true);
		Connection connection =
				QuicClient.create()
				          .runOn(loop)
				          .connectNow();

		connection.onDispose()
		          .block();
	}
}

6.3.1. Disposing Event Loop Group

  • If you use the default Event Loop Group provided by Reactor Netty, invoke HttpResources#disposeLoopsAndConnections/#disposeLoopsAndConnectionsLater method.

Disposing HttpResources means that every server/client that is using it, will not be able to use it anymore!
  • If you use custom LoopResources, invoke LoopResources#dispose/#disposeLater method.

Disposing the custom LoopResources means that every server/client that is configured to use it, will not be able to use it anymore!

7. Metrics

When QUIC client metrics are needed for an integration with a system as Micrometer, you can provide your own metrics recorder, as follows:

import reactor.netty.Connection;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.quic.QuicClient;

import java.net.SocketAddress;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				QuicClient.create()
				          .metrics(true, CustomChannelMetricsRecorder::new) (1)
				          .connectNow();

		connection.onDispose()
		          .block();
	}
1 Enables QUIC client metrics and provides ChannelMetricsRecorder implementation.