TCP Client

Reactor Netty provides the easy-to-use and easy-to-configure TcpClient. It hides most of the Netty functionality that is needed in order to create a TCP client and adds Reactive Streams backpressure.

1. Connect and Disconnect

To connect the TCP client to a given endpoint, you must create and configure a TcpClient instance. By default, the host is localhost and the port is 12012. The following example shows how to create a TcpClient:

import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				TcpClient.create()      (1)
				         .connectNow(); (2)

		connection.onDispose()
		          .block();
	}
}
1 Creates a TcpClient instance that is ready for configuring.
2 Connects the client in a blocking fashion and waits for it to finish initializing.

The returned Connection offers a simple connection API, including disposeNow(), which shuts the client down in a blocking fashion.

1.1. Host and Port

To connect to a specific host and port, you can apply the following configuration to the TCP client. The following example shows how to do so:

import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				TcpClient.create()
				         .host("example.com") (1)
				         .port(80)            (2)
				         .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Configures the TCP host
2 Configures the TCP port
The port can be specified also with PORT environment variable.

2. Eager Initialization

By default, the initialization of the TcpClient resources happens on demand. This means that the connect operation absorbs the extra time needed to initialize and load:

  • the event loop group

  • the host name resolver

  • the native transport libraries (when native transport is used)

  • the native libraries for the security (in case of OpenSsl)

When you need to preload these resources, you can configure the TcpClient as follows:

import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

	public static void main(String[] args) {
		TcpClient tcpClient =
				TcpClient.create()
				         .host("example.com")
				         .port(80)
				         .handle((inbound, outbound) -> outbound.sendString(Mono.just("hello")));

		tcpClient.warmup() (1)
		         .block();

		Connection connection = tcpClient.connectNow(); (2)

		connection.onDispose()
		          .block();
	}
}
1 Initialize and load the event loop group, the host name resolver, the native transport libraries and the native libraries for the security
2 Host name resolution happens when connecting to the remote peer

3. Writing Data

To send data to a given endpoint, you must attach an I/O handler. The I/O handler has access to NettyOutbound to be able to write data.

import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				TcpClient.create()
				         .host("example.com")
				         .port(80)
				         .handle((inbound, outbound) -> outbound.sendString(Mono.just("hello"))) (1)
				         .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Sends hello string to the endpoint.

When you need more control over the writing process, as an alternative for I/O handler you may use Connection#outbound. As opposed to I/O handler where the connection is closed when the provided Publisher finishes (in case of finite Publisher), when using Connection#outbound, you have to invoke explicitly Connection#dispose in order to close the connection.

import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				TcpClient.create()
				         .host("example.com")
				         .port(80)
				         .connectNow();

		connection.outbound()
		          .sendString(Mono.just("hello 1")) (1)
		          .then()
		          .subscribe();

		connection.outbound()
		          .sendString(Mono.just("hello 2")) (2)
		          .then()
		          .subscribe(null, null, connection::dispose); (3)

		connection.onDispose()
		          .block();
	}
}
1 Sends hello 1 string to the endpoint.
2 Sends hello 2 string to the endpoint.
3 Closes the connection once the message is sent to the endpoint.

4. Consuming Data

To receive data from a given endpoint, you must attach an I/O handler. The I/O handler has access to NettyInbound to be able to read data. The following example shows how to do so:

import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				TcpClient.create()
				         .host("example.com")
				         .port(80)
				         .handle((inbound, outbound) -> inbound.receive().then()) (1)
				         .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Receives data from a given endpoint

When you need more control over the reading process, as an alternative for I/O handler you may use Connection#inbound. As opposed to I/O handler where the connection is closed when the provided Publisher finishes (in case of finite Publisher), when using Connection#inbound, you have to invoke explicitly Connection#dispose in order to close the connection.

import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				TcpClient.create()
				         .host("example.com")
				         .port(80)
				         .connectNow();

		connection.inbound()
		          .receive() (1)
		          .then()
		          .subscribe();

		connection.onDispose()
		          .block();
	}
}
1 Receives data from a given endpoint.

5. Lifecycle Callbacks

The following lifecycle callbacks are provided to let you extend the TcpClient.

Callback Description

doAfterResolve

Invoked after the remote address has been resolved successfully.

doOnChannelInit

Invoked when initializing the channel.

doOnConnect

Invoked when the channel is about to connect.

doOnConnected

Invoked after the channel has been connected.

doOnDisconnected

Invoked after the channel has been disconnected.

doOnResolve

Invoked when the remote address is about to be resolved.

doOnResolveError

Invoked in case the remote address hasn’t been resolved successfully.

The following example uses the doOnConnected and doOnChannelInit callbacks:

import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
import java.util.concurrent.TimeUnit;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				TcpClient.create()
				         .host("example.com")
				         .port(80)
				         .doOnConnected(conn ->
				             conn.addHandlerFirst(new ReadTimeoutHandler(10, TimeUnit.SECONDS))) (1)
				         .doOnChannelInit((observer, channel, remoteAddress) ->
				             channel.pipeline()
				                    .addFirst(new LoggingHandler("reactor.netty.examples")))     (2)
				         .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Netty pipeline is extended with ReadTimeoutHandler when the channel has been connected.
2 Netty pipeline is extended with LoggingHandler when initializing the channel.

6. TCP-level Configurations

This section describes three kinds of configuration that you can use at the TCP level:

6.1. Channel Options

By default, the TCP client is configured with the following options:

	TcpClientConnect(ConnectionProvider provider) {
		this.config = new TcpClientConfig(
				provider,
				Collections.singletonMap(ChannelOption.AUTO_READ, false),
				() -> AddressUtils.createUnresolved(NetUtil.LOCALHOST.getHostAddress(), DEFAULT_PORT));
	}

If additional options are necessary or changes to the current options are needed, you can apply the following configuration:

import io.netty.channel.ChannelOption;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				TcpClient.create()
				         .host("example.com")
				         .port(80)
				         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
				         .connectNow();

		connection.onDispose()
		          .block();
	}
}

You can find more about Netty channel options at the following links:

6.2. Wire Logger

Reactor Netty provides wire logging for when the traffic between the peers needs to be inspected. By default, wire logging is disabled. To enable it, you must set the logger reactor.netty.tcp.TcpClient level to DEBUG and apply the following configuration:

import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				TcpClient.create()
				         .wiretap(true) (1)
				         .host("example.com")
				         .port(80)
				         .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Enables the wire logging

6.2.1. Wire Logger formatters

Reactor Netty supports 3 different formatters:

	/**
	 * When wire logging is enabled with this format, both events and content will be logged.
	 * The content will be in hex format.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] REGISTERED
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] ACTIVE
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] READ: 145B
	 *          +-------------------------------------------------+
	 *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
	 * +--------+-------------------------------------------------+----------------+
	 * |00000000| 50 4f 53 54 20 2f 74 65 73 74 2f 57 6f 72 6c 64 |POST /test/World|
	 * |00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 43 6f 6e 74 65 | HTTP/1.1..Conte|
	 * |00000020| 6e 74 2d 54 79 70 65 3a 20 74 65 78 74 2f 70 6c |nt-Type: text/pl|
	 * |00000030| 61 69 6e 0d 0a 75 73 65 72 2d 61 67 65 6e 74 3a |ain..user-agent:|
	 * |00000040| 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f 64 65 | ReactorNetty/de|
	 * ...
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] WRITE: 38B
	 *          +-------------------------------------------------+
	 *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
	 * +--------+-------------------------------------------------+----------------+
	 * |00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
	 * |00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
	 * |00000020| 20 30 0d 0a 0d 0a                               | 0....          |
	 * +--------+-------------------------------------------------+----------------+
	 * }
	 * </pre>
	 */
	/**
	 * When wire logging is enabled with this format, only the events will be logged.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] REGISTERED
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] ACTIVE
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] READ: 145B
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] WRITE: 38B
	 * }
	 * </pre>
	 */
	/**
	 * When wire logging is enabled with this format, both events and content will be logged.
	 * The content will be in plain text format.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] REGISTERED
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] ACTIVE
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] READ: 145B POST /test/World HTTP/1.1
	 * Content-Type: text/plain
	 * user-agent: ReactorNetty/dev
	 * ...
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] WRITE: 38B HTTP/1.1 200 OK
	 * content-length: 0
	 * }
	 * </pre>
	 */

When you need to change the default formatter you can configure it as follows:

import io.netty.handler.logging.LogLevel;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
import reactor.netty.transport.logging.AdvancedByteBufFormat;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				TcpClient.create()
				         .wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) (1)
				         .host("example.com")
				         .port(80)
				         .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Enables the wire logging, AdvancedByteBufFormat#TEXTUAL is used for printing the content.

6.3. Event Loop Group

By default Reactor Netty uses an “Event Loop Group”, where the number of the worker threads equals the number of processors available to the runtime on initialization (but with a minimum value of 4). This “Event Loop Group” is shared between all servers and clients in one JVM. When you need a different configuration, you can use one of the LoopResources#create methods.

The following listing shows the default configuration for the Event Loop Group:

	/**
	 * Default worker thread count, fallback to available processor
	 * (but with a minimum value of 4).
	 */
	public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
	/**
	 * Default selector thread count, fallback to -1 (no selector thread)
	 * <p><strong>Note:</strong> In most use cases using a worker thread also as a selector thread works well.
	 * A possible use case for specifying a separate selector thread might be when the worker threads are too busy
	 * and connections cannot be accepted fast enough.
	 * <p><strong>Note:</strong> Although more than 1 can be configured as a selector thread count, in reality
	 * only 1 thread will be used as a selector thread.
	 */
	public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
	/**
	 * Default worker thread count for UDP, fallback to available processor
	 * (but with a minimum value of 4).
	 */
	public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
	/**
	 * Default quiet period that guarantees that the disposal of the underlying LoopResources
	 * will not happen, fallback to 2 seconds.
	 */
	public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
	/**
	 * Default maximum amount of time to wait until the disposal of the underlying LoopResources
	 * regardless if a task was submitted during the quiet period, fallback to 15 seconds.
	 */
	public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";

	/**
	 * Default value whether the native transport (epoll, kqueue) will be preferred,
	 * fallback it will be preferred when available.
	 */

If you need changes to these settings, you can apply the following configuration:

import reactor.netty.Connection;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;

public class Application {

	public static void main(String[] args) {
		LoopResources loop = LoopResources.create("event-loop", 1, 4, true);

		Connection connection =
				TcpClient.create()
				         .host("example.com")
				         .port(80)
				         .runOn(loop)
				         .connectNow();

		connection.onDispose()
		          .block();
	}
}

6.3.1. Disposing Event Loop Group

  • If you use the default Event Loop Group provided by Reactor Netty, invoke HttpResources#disposeLoopsAndConnections/#disposeLoopsAndConnectionsLater method.

Disposing HttpResources means that every server/client that is using it, will not be able to use it anymore!
  • If you use custom LoopResources, invoke LoopResources#dispose/#disposeLater method.

Disposing the custom LoopResources means that every server/client that is configured to use it, will not be able to use it anymore!

7. Connection Pool

By default, TcpClient (TcpClient.create()) uses a shared ConnectionProvider. This ConnectionProvider is configured to create a “fixed” connection pool per remote host (a remote host implies the combination of a hostname and its associated port number) with:

  • 500 as the maximum number of active channels

  • 1000 as the maximum number of further channel acquisition attempts allowed to be kept in a pending state

  • The rest of the configurations are the defaults (check the system properties or the builder configurations below)

This means that the implementation creates a new channel if someone tries to acquire a channel as long as less than 500 have been created and are managed by the pool. When the maximum number of channels in the pool is reached, up to 1000 new attempts to acquire a channel are delayed (pending) until a channel is closed (and thus a slot is free and a new connection can be opened), and further attempts are declined with an error.

Connections used by the TcpClient are never returned to the pool, but closed. When a connection is closed, a slot is freed in the pool and thus a new connection can be opened when needed. This behaviour is specific only for TcpClient and is intentional because only the user/framework knows if the actual protocol is compatible with reusing connections. (opposed to HttpClient where the protocol is known and Reactor Netty can return the connection to the pool when this is possible)
	/**
	 * Default max connections. Fallback to
	 * 2 * available number of processors (but with a minimum value of 16)
	 */
	public static final String POOL_MAX_CONNECTIONS = "reactor.netty.pool.maxConnections";
	/**
	 * Default acquisition timeout (milliseconds) before error. If -1 will never wait to
	 * acquire before opening a new
	 * connection in an unbounded fashion. Fallback 45 seconds
	 */

When you need to change the default settings, you can configure the ConnectionProvider as follows:

import reactor.netty.Connection;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		ConnectionProvider provider =
				ConnectionProvider.builder("fixed")
				                  .maxConnections(50)
				                  .pendingAcquireTimeout(Duration.ofSeconds(60)) (1)
				                  .build();

		Connection connection =
				TcpClient.create(provider)
				         .host("example.com")
				         .port(80)
				         .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Configures the maximum time for the pending acquire operation to 60 seconds.

The following listing shows the available configurations:

Configuration name Description

disposeInactivePoolsInBackground

When this option is enabled, connection pools are regularly checked in the background, and those that are empty and been inactive for a specified time become eligible for disposal. By default, this background disposal of inactive pools is disabled.

disposeTimeout

When ConnectionProvider#dispose() or ConnectionProvider#disposeLater() is called, trigger a graceful shutdown for the connection pools, with this grace period timeout. From there on, all calls for acquiring a connection will fail fast with an exception. However, for the provided Duration, pending acquires will get a chance to be served. Note: The rejection of new acquires and the grace timer start immediately, irrespective of subscription to the Mono returned by ConnectionProvider#disposeLater(). Subsequent calls return the same Mono, effectively getting notifications from the first graceful shutdown call and ignoring subsequently provided timeouts. By default, dispose timeout is not specified.

maxConnections

The maximum number of connections (per connection pool) before start pending. Default to 2 * available number of processors (but with a minimum value of 16).

metrics

Enables/disables built-in integration with Micrometer. ConnectionProvider.MeterRegistrar can be provided for integration with another metrics system. By default, metrics are not enabled.

pendingAcquireMaxCount

The maximum number of extra attempts at acquiring a connection to keep in a pending queue. If -1 is specified, the pending queue does not have upper limit. Default to 2 * max connections.

pendingAcquireTimeout

The maximum time before which a pending acquire must complete, or a TimeoutException is thrown (resolution: ms). If -1 is specified, no such timeout is applied. Default: 45 seconds.

If you need to disable the connection pool, you can apply the following configuration:

import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				TcpClient.newConnection()
				         .host("example.com")
				         .port(80)
				         .connectNow();

		connection.onDispose()
		          .block();
	}
}

7.1. Disposing Connection Pool

  • If you use the default ConnectionProvider provided by Reactor Netty, invoke HttpResources#disposeLoopsAndConnections/#disposeLoopsAndConnectionsLater method.

Disposing HttpResources means that every client that is using it, will not be able to use it anymore!
  • If you use custom ConnectionProvider, invoke ConnectionProvider#dispose/#disposeLater/#disposeWhen method.

Disposing the custom ConnectionProvider means that every client that is configured to use it, will not be able to use it anymore!

7.2. Metrics

The pooled ConnectionProvider supports built-in integration with Micrometer. It exposes all metrics with a prefix of reactor.netty.connection.provider.

Pooled ConnectionProvider metrics

metric name type description

reactor.netty.connection.provider.total.connections

Gauge

The number of all connections, active or idle. See Total Connections

reactor.netty.connection.provider.active.connections

Gauge

The number of the connections that have been successfully acquired and are in active use. See Active Connections

reactor.netty.connection.provider.max.connections

Gauge

The maximum number of active connections that are allowed. See Max Connections

reactor.netty.connection.provider.idle.connections

Gauge

The number of the idle connections. See Idle Connections

reactor.netty.connection.provider.pending.connections

Gauge

The number of requests that are waiting for a connection. See Pending Connections

reactor.netty.connection.provider.pending.connections.time

Timer

Time spent in pending acquire a connection from the connection pool. See Pending Connections Time

reactor.netty.connection.provider.max.pending.connections

Gauge

The maximum number of requests that will be queued while waiting for a ready connection. See Max Pending Connections

The following example enables that integration:

import reactor.netty.Connection;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;

public class Application {

	public static void main(String[] args) {
		ConnectionProvider provider =
				ConnectionProvider.builder("fixed")
				                  .maxConnections(50)
				                  .metrics(true) (1)
				                  .build();

		Connection connection =
				TcpClient.create(provider)
				         .host("example.com")
				         .port(80)
				         .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Enables the built-in integration with Micrometer

8. SSL and TLS

When you need SSL or TLS, you can apply the following configuration. By default, if OpenSSL is available, the SslProvider.OPENSSL provider is used as a provider. Otherwise, the provider is SslProvider.JDK. You can switch the provider by using SslContextBuilder or by setting -Dio.netty.handler.ssl.noOpenSsl=true.

The following example uses SslContextBuilder:

import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpSslContextSpec;

public class Application {

	public static void main(String[] args) {
		TcpSslContextSpec tcpSslContextSpec = TcpSslContextSpec.forClient();

		Connection connection =
				TcpClient.create()
				         .host("example.com")
				         .port(443)
				         .secure(spec -> spec.sslContext(tcpSslContextSpec))
				         .connectNow();

		connection.onDispose()
		          .block();
	}
}

8.1. Server Name Indication

By default, the TCP client sends the remote host name as SNI server name. When you need to change this default setting, you can configure the TCP client as follows:

import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

import javax.net.ssl.SNIHostName;

public class Application {

	public static void main(String[] args) throws Exception {
		SslContext sslContext = SslContextBuilder.forClient().build();

		Connection connection =
				TcpClient.create()
				         .host("127.0.0.1")
				         .port(8080)
				         .secure(spec -> spec.sslContext(sslContext)
				                             .serverNames(new SNIHostName("test.com")))
				         .connectNow();

		connection.onDispose()
		          .block();
	}
}

9. Proxy Support

Reactor Netty supports the proxy functionality provided by Netty and provides a way to specify non proxy hosts through the ProxyProvider builder.

Netty’s HTTP proxy support always uses CONNECT method in order to establish a tunnel to the specified proxy regardless of the scheme that is used http or https. (More information: Netty enforce HTTP proxy to support HTTP CONNECT method). Some proxies might not support CONNECT method when the scheme is http or might need to be configured in order to support this way of communication. Sometimes this might be the reason for not being able to connect to the proxy. Consider checking the proxy documentation whether it supports or needs an additional configuration in order to support CONNECT method.

The following example uses ProxyProvider:

import reactor.netty.Connection;
import reactor.netty.transport.ProxyProvider;
import reactor.netty.tcp.TcpClient;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				TcpClient.create()
				         .host("example.com")
				         .port(80)
				         .proxy(spec -> spec.type(ProxyProvider.Proxy.SOCKS4)
				                            .host("proxy")
				                            .port(8080)
				                            .nonProxyHosts("localhost")
				                            .connectTimeoutMillis(20_000)) (1)
				        .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Configures the connection establishment timeout to 20 seconds.

10. Metrics

The TCP client supports built-in integration with Micrometer. It exposes all metrics with a prefix of reactor.netty.tcp.client.

The following table provides information for the TCP client metrics:

metric name type description

reactor.netty.tcp.client.data.received

DistributionSummary

Amount of the data received, in bytes. See Data Received

reactor.netty.tcp.client.data.sent

DistributionSummary

Amount of the data sent, in bytes. See Data Sent

reactor.netty.tcp.client.errors

Counter

Number of errors that occurred. See Errors Count

reactor.netty.tcp.client.tls.handshake.time

Timer

Time spent for TLS handshake. See Tls Handshake Time

reactor.netty.tcp.client.connect.time

Timer

Time spent for connecting to the remote address. See Connect Time

reactor.netty.tcp.client.address.resolver

Timer

Time spent for resolving the address. See Hostname Resolution Time

These additional metrics are also available:

Pooled ConnectionProvider metrics

metric name type description

reactor.netty.connection.provider.total.connections

Gauge

The number of all connections, active or idle. See Total Connections

reactor.netty.connection.provider.active.connections

Gauge

The number of the connections that have been successfully acquired and are in active use. See Active Connections

reactor.netty.connection.provider.max.connections

Gauge

The maximum number of active connections that are allowed. See Max Connections

reactor.netty.connection.provider.idle.connections

Gauge

The number of the idle connections. See Idle Connections

reactor.netty.connection.provider.pending.connections

Gauge

The number of requests that are waiting for a connection. See Pending Connections

reactor.netty.connection.provider.pending.connections.time

Timer

Time spent in pending acquire a connection from the connection pool. See Pending Connections Time

reactor.netty.connection.provider.max.pending.connections

Gauge

The maximum number of requests that will be queued while waiting for a ready connection. See Max Pending Connections

ByteBufAllocator metrics

metric name type description

reactor.netty.bytebuf.allocator.used.heap.memory

Gauge

The number of bytes reserved by heap buffer allocator. See Used Heap Memory

reactor.netty.bytebuf.allocator.used.direct.memory

Gauge

The number of bytes reserved by direct buffer allocator. See Used Direct Memory

reactor.netty.bytebuf.allocator.heap.arenas

Gauge

The number of heap arenas (when PooledByteBufAllocator). See Heap Arenas

reactor.netty.bytebuf.allocator.direct.arenas

Gauge

The number of direct arenas (when PooledByteBufAllocator). See Direct Arenas

reactor.netty.bytebuf.allocator.threadlocal.caches

Gauge

The number of thread local caches (when PooledByteBufAllocator). See Thread Local Caches

reactor.netty.bytebuf.allocator.small.cache.size

Gauge

The size of the small cache (when PooledByteBufAllocator). See Small Cache Size

reactor.netty.bytebuf.allocator.normal.cache.size

Gauge

The size of the normal cache (when PooledByteBufAllocator). See Normal Cache Size

reactor.netty.bytebuf.allocator.chunk.size

Gauge

The chunk size for an arena (when PooledByteBufAllocator). See Chunk Size

reactor.netty.bytebuf.allocator.active.heap.memory

Gauge

The actual bytes consumed by in-use buffers allocated from heap buffer pools (when PooledByteBufAllocator). See Active Heap Memory

reactor.netty.bytebuf.allocator.active.direct.memory

Gauge

The actual bytes consumed by in-use buffers allocated from direct buffer pools (when PooledByteBufAllocator). See Active Direct Memory

EventLoop metrics

metric name type description

reactor.netty.eventloop.pending.tasks

Gauge

The number of tasks that are pending for processing on an event loop. See Pending Tasks

The following example enables that integration:

import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				TcpClient.create()
				         .host("example.com")
				         .port(80)
				         .metrics(true) (1)
				         .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Enables the built-in integration with Micrometer

When TCP client metrics are needed for an integration with a system other than Micrometer or you want to provide your own integration with Micrometer, you can provide your own metrics recorder, as follows:

import reactor.netty.Connection;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.tcp.TcpClient;

import java.net.SocketAddress;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				TcpClient.create()
				         .host("example.com")
				         .port(80)
				         .metrics(true, CustomChannelMetricsRecorder::new) (1)
				         .connectNow();

		connection.onDispose()
		          .block();
	}
1 Enables TCP client metrics and provides ChannelMetricsRecorder implementation.

11. Tracing

The TCP client supports built-in integration with Micrometer Tracing.

The following table provides information for the TCP client spans:

contextual name description

hostname resolution

Information and time spent for resolving the address. See Hostname Resolution Span.

connect

Information and time spent for connecting to the remote address. See Connect Span.

tls handshake

Information and time spent for TLS handshake. See Tls Handshake Span.

The following example enables that integration. This concrete example uses Brave and reports the information to Zipkin. See the Micrometer Tracing documentation for OpenTelemetry setup.

import brave.Tracing;
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext;
import io.micrometer.tracing.brave.bridge.BraveTracer;
import reactor.netty.Connection;
import reactor.netty.observability.ReactorNettyTracingObservationHandler;
import reactor.netty.tcp.TcpClient;
import zipkin2.reporter.brave.AsyncZipkinSpanHandler;
import zipkin2.reporter.urlconnection.URLConnectionSender;

import static reactor.netty.Metrics.OBSERVATION_REGISTRY;

public class Application {

	public static void main(String[] args) {
		init(); (1)

		Connection connection =
				TcpClient.create()
				         .host("example.com")
				         .port(80)
				         .metrics(true) (2)
				         .connectNow();

		connection.onDispose()
		          .block();
	}

	/**
	 * This setup is based on
	 * <a href="https://micrometer.io/docs/tracing#_micrometer_tracing_brave_setup">Micrometer Tracing Brave Setup</a>.
	 */
	static void init() {
		AsyncZipkinSpanHandler spanHandler = AsyncZipkinSpanHandler
				.create(URLConnectionSender.create("http://localhost:9411/api/v2/spans"));

		StrictCurrentTraceContext braveCurrentTraceContext = StrictCurrentTraceContext.create();

		CurrentTraceContext bridgeContext = new BraveCurrentTraceContext(braveCurrentTraceContext);

		Tracing tracing =
				Tracing.newBuilder()
				       .currentTraceContext(braveCurrentTraceContext)
				       .supportsJoin(false)
				       .traceId128Bit(true)
				       .sampler(Sampler.ALWAYS_SAMPLE)
				       .addSpanHandler(spanHandler)
				       .localServiceName("reactor-netty-examples")
				       .build();

		brave.Tracer braveTracer = tracing.tracer();

		Tracer tracer = new BraveTracer(braveTracer, bridgeContext, new BraveBaggageManager());

		OBSERVATION_REGISTRY.observationConfig()
		                    .observationHandler(new ReactorNettyTracingObservationHandler(tracer));
	}
}
1 Initializes Brave, Zipkin, and the Observation registry.
2 Enables the built-in integration with Micrometer.

The result in Zipkin looks like:

tcp client tracing

11.1. Access Current Observation

Project Micrometer provides a library that assists with context propagation across different types of context mechanisms such as ThreadLocal, Reactor Context and others.

The following example shows how to use this library in a custom ChannelHandler:

import brave.Tracing;
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext;
import io.micrometer.tracing.brave.bridge.BraveTracer;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import reactor.netty.Connection;
import reactor.netty.observability.ReactorNettyTracingObservationHandler;
import reactor.netty.tcp.TcpClient;
import zipkin2.reporter.brave.AsyncZipkinSpanHandler;
import zipkin2.reporter.urlconnection.URLConnectionSender;

import java.net.SocketAddress;

import static reactor.netty.Metrics.OBSERVATION_REGISTRY;

public class Application {

	public static void main(String[] args) {
		init(); (1)

		Connection connection =
				TcpClient.create()
				         .host("example.com")
				         .port(80)
				         .metrics(true) (2)
				         .doOnChannelInit((observer, channel, address) -> channel.pipeline().addFirst(
				                 "custom-channel-handler", CustomChannelOutboundHandler.INSTANCE)) (3)
				         .connectNow();

		connection.onDispose()
		          .block();
	}

	static final class CustomChannelOutboundHandler extends ChannelOutboundHandlerAdapter {

		static final ChannelHandler INSTANCE = new CustomChannelOutboundHandler();

		@Override
		public boolean isSharable() {
			return true;
		}

		@Override
		public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
			try (ContextSnapshot.Scope scope = ContextSnapshotFactory.builder().build().setThreadLocalsFrom(ctx.channel())) {
				System.out.println("Current Observation in Scope: " + OBSERVATION_REGISTRY.getCurrentObservation());
				//"FutureReturnValueIgnored" this is deliberate
				ctx.connect(remoteAddress, localAddress, promise);
			}
			System.out.println("Current Observation: " + OBSERVATION_REGISTRY.getCurrentObservation());
		}
	}
1 Initializes Brave, Zipkin, and the Observation registry.
2 Enables the built-in integration with Micrometer.
3 Custom ChannelHandler that uses context propagation library. This concrete example overrides only ChannelOutboundHandlerAdapter#connect, if it is needed, the same logic can be used for the rest of the methods. Also, this concrete example sets all ThreadLocal values for which there is a value in the given Channel, if another behaviour is needed please check context propagation library API. For example, you may want to set only some of the ThreadLocal values.
When you enable Reactor Netty tracing within a framework, you may need to let Reactor Netty use the ObservationRegistry created by this framework. For this purpose you need to invoke reactor.netty.Metrics#observationRegistry. You may also need to configure the Reactor Netty ObservationHandlers using the API provided by the framework.

12. Unix Domain Sockets

The TCP client supports Unix Domain Sockets (UDS) when native transport is in use.

The following example shows how to use UDS support:

import io.netty.channel.unix.DomainSocketAddress;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				TcpClient.create()
				         .remoteAddress(() -> new DomainSocketAddress("/tmp/test.sock")) (1)
				         .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Specifies DomainSocketAddress that will be used

13. Host Name Resolution

By default, the TcpClient uses Netty’s domain name lookup mechanism that resolves a domain name asynchronously. This is as an alternative of the JVM’s built-in blocking resolver.

When you need to change the default settings, you can configure the TcpClient as follows:

import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				TcpClient.create()
				         .host("example.com")
				         .port(80)
				         .resolver(spec -> spec.queryTimeout(Duration.ofMillis(500))) (1)
				         .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 The timeout of each DNS query performed by this resolver will be 500ms.

The following listing shows the available configurations. Additionally, TCP fallback is enabled by default.

Configuration name Description

bindAddressSupplier

The supplier of the local address to bind to.

cacheMaxTimeToLive

The max time to live of the cached DNS resource records (resolution: seconds). If the time to live of the DNS resource record returned by the DNS server is greater than this max time to live, this resolver ignores the time to live from the DNS server and uses use this max time to live. Default to Integer.MAX_VALUE.

cacheMinTimeToLive

The min time to live of the cached DNS resource records (resolution: seconds). If the time to live of the DNS resource record returned by the DNS server is less than this min time to live, this resolver ignores the time to live from the DNS server and uses this min time to live. Default: 0.

cacheNegativeTimeToLive

The time to live of the cache for the failed DNS queries (resolution: seconds). Default: 0.

completeOncePreferredResolved

When this setting is enabled, the resolver notifies as soon as all queries for the preferred address type are complete. When this setting is disabled, the resolver notifies when all possible address types are complete. This configuration is applicable for DnsNameResolver#resolveAll(String). By default, this setting is enabled.

disableOptionalRecord

Disables the automatic inclusion of an optional record that tries to give a hint to the remote DNS server about how much data the resolver can read per response. By default, this setting is enabled.

disableRecursionDesired

Specifies whether this resolver has to send a DNS query with the recursion desired (RD) flag set. By default, this setting is enabled.

dnsAddressResolverGroupProvider

Sets a custom function to create a DnsAddressResolverGroup given a DnsNameResolverBuilder

hostsFileEntriesResolver

Sets a custom HostsFileEntriesResolver to be used for hosts file entries. Default: DefaultHostsFileEntriesResolver.

maxPayloadSize

Sets the capacity of the datagram packet buffer (in bytes). Default: 4096.

maxQueriesPerResolve

Sets the maximum allowed number of DNS queries to send when resolving a host name. Default: 16.

ndots

Sets the number of dots that must appear in a name before an initial absolute query is made. Default: -1 (to determine the value from the OS on Unix or use a value of 1 otherwise).

queryTimeout

Sets the timeout of each DNS query performed by this resolver (resolution: milliseconds). Default: 5000.

resolveCache

The cache to use to store resolved DNS entries.

resolvedAddressTypes

The list of the protocol families of the resolved address.

retryTcpOnTimeout

Specifies whether this resolver will also fallback to TCP if a timeout is detected. By default, the resolver will only try to use TCP if the response is marked as truncated.

roundRobinSelection

Enables an AddressResolverGroup of DnsNameResolver that supports random selection of destination addresses if multiple are provided by the nameserver. See RoundRobinDnsAddressResolverGroup. Default: DnsAddressResolverGroup

runOn

Performs the communication with the DNS servers on the given LoopResources. By default, the LoopResources specified on the client level are used.

searchDomains

The list of search domains of the resolver. By default, the effective search domain list is populated by using the system DNS search domains.

trace

A specific logger and log level to be used by this resolver when generating detailed trace information in case of resolution failure.

Sometimes, you may want to switch to the JVM built-in resolver. To do so, you can configure the TcpClient as follows:

import io.netty.resolver.DefaultAddressResolverGroup;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public class Application {

	public static void main(String[] args) {
		Connection connection =
				TcpClient.create()
				         .host("example.com")
				         .port(80)
				         .resolver(DefaultAddressResolverGroup.INSTANCE) (1)
				         .connectNow();

		connection.onDispose()
		          .block();
	}
}
1 Sets the JVM built-in resolver.