TCP Client
Reactor Netty provides the easy-to-use and easy-to-configure
TcpClient
.
It hides most of the Netty functionality that is needed in order to create a TCP
client
and adds Reactive Streams backpressure.
1. Connect and Disconnect
To connect the TCP
client to a given endpoint, you must create and configure a
TcpClient
instance.
By default, the host
is localhost
and the port
is 12012
.
The following example shows how to create a TcpClient
:
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create() (1)
.connectNow(); (2)
connection.onDispose()
.block();
}
}
1 | Creates a TcpClient
instance that is ready for configuring. |
2 | Connects the client in a blocking fashion and waits for it to finish initializing. |
The returned Connection
offers a simple connection API, including disposeNow()
,
which shuts the client down in a blocking fashion.
1.1. Host and Port
To connect to a specific host
and port
, you can apply the following configuration to the TCP
client.
The following example shows how to do so:
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com") (1)
.port(80) (2)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Configures the TCP host |
2 | Configures the TCP port |
The port can be specified also with PORT environment variable. |
2. Eager Initialization
By default, the initialization of the TcpClient
resources happens on demand. This means that the connect
operation
absorbs the extra time needed to initialize and load:
-
the event loop group
-
the host name resolver
-
the native transport libraries (when native transport is used)
-
the native libraries for the security (in case of
OpenSsl
)
When you need to preload these resources, you can configure the TcpClient
as follows:
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
TcpClient tcpClient =
TcpClient.create()
.host("example.com")
.port(80)
.handle((inbound, outbound) -> outbound.sendString(Mono.just("hello")));
tcpClient.warmup() (1)
.block();
Connection connection = tcpClient.connectNow(); (2)
connection.onDispose()
.block();
}
}
1 | Initialize and load the event loop group, the host name resolver, the native transport libraries and the native libraries for the security |
2 | Host name resolution happens when connecting to the remote peer |
3. Writing Data
To send data to a given endpoint, you must attach an I/O handler.
The I/O handler has access to NettyOutbound
to be able to write data.
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.handle((inbound, outbound) -> outbound.sendString(Mono.just("hello"))) (1)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Sends hello string to the endpoint. |
When you need more control over the writing process, as an alternative for I/O handler you may use
Connection#outbound
. As opposed to I/O handler
where the connection is closed when the provided Publisher
finishes (in case of finite Publisher
),
when using Connection#outbound
, you have to invoke explicitly
Connection#dispose
in order to close the connection.
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.connectNow();
connection.outbound()
.sendString(Mono.just("hello 1")) (1)
.then()
.subscribe();
connection.outbound()
.sendString(Mono.just("hello 2")) (2)
.then()
.subscribe(null, null, connection::dispose); (3)
connection.onDispose()
.block();
}
}
1 | Sends hello 1 string to the endpoint. |
2 | Sends hello 2 string to the endpoint. |
3 | Closes the connection once the message is sent to the endpoint. |
4. Consuming Data
To receive data from a given endpoint, you must attach an I/O handler.
The I/O handler has access to NettyInbound
to be able to read data. The following example shows how to do so:
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.handle((inbound, outbound) -> inbound.receive().then()) (1)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Receives data from a given endpoint |
When you need more control over the reading process, as an alternative for I/O handler you may use
Connection#inbound
. As opposed to I/O handler
where the connection is closed when the provided Publisher
finishes (in case of finite Publisher
),
when using Connection#inbound
, you have to invoke explicitly
Connection#dispose
in order to close the connection.
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.connectNow();
connection.inbound()
.receive() (1)
.then()
.subscribe();
connection.onDispose()
.block();
}
}
1 | Receives data from a given endpoint. |
5. Lifecycle Callbacks
The following lifecycle callbacks are provided to let you extend the TcpClient
.
Callback | Description |
---|---|
|
Invoked after the remote address has been resolved successfully. |
|
Invoked when initializing the channel. |
|
Invoked when the channel is about to connect. |
|
Invoked after the channel has been connected. |
|
Invoked after the channel has been disconnected. |
|
Invoked when the remote address is about to be resolved. |
|
Invoked in case the remote address hasn’t been resolved successfully. |
The following example uses the doOnConnected
and doOnChannelInit
callbacks:
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
import java.util.concurrent.TimeUnit;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.doOnConnected(conn ->
conn.addHandlerFirst(new ReadTimeoutHandler(10, TimeUnit.SECONDS))) (1)
.doOnChannelInit((observer, channel, remoteAddress) ->
channel.pipeline()
.addFirst(new LoggingHandler("reactor.netty.examples"))) (2)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Netty pipeline is extended with ReadTimeoutHandler when the channel has been connected. |
2 | Netty pipeline is extended with LoggingHandler when initializing the channel. |
6. TCP-level Configurations
This section describes three kinds of configuration that you can use at the TCP level:
6.1. Channel Options
By default, the TCP
client is configured with the following options:
TcpClientConnect(ConnectionProvider provider) {
this.config = new TcpClientConfig(
provider,
Collections.singletonMap(ChannelOption.AUTO_READ, false),
() -> AddressUtils.createUnresolved(NetUtil.LOCALHOST.getHostAddress(), DEFAULT_PORT));
}
If additional options are necessary or changes to the current options are needed, you can apply the following configuration:
import io.netty.channel.ChannelOption;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.connectNow();
connection.onDispose()
.block();
}
}
You can find more about Netty
channel options at the following links:
6.2. Wire Logger
Reactor Netty provides wire logging for when the traffic between the peers needs to be inspected.
By default, wire logging is disabled.
To enable it, you must set the logger reactor.netty.tcp.TcpClient
level to DEBUG
and apply the following configuration:
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.wiretap(true) (1)
.host("example.com")
.port(80)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Enables the wire logging |
6.2.1. Wire Logger formatters
Reactor Netty supports 3 different formatters:
-
AdvancedByteBufFormat#HEX_DUMP - the default
/**
* When wire logging is enabled with this format, both events and content will be logged.
* The content will be in hex format.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] REGISTERED
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] ACTIVE
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] READ: 145B
* +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 50 4f 53 54 20 2f 74 65 73 74 2f 57 6f 72 6c 64 |POST /test/World|
* |00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 43 6f 6e 74 65 | HTTP/1.1..Conte|
* |00000020| 6e 74 2d 54 79 70 65 3a 20 74 65 78 74 2f 70 6c |nt-Type: text/pl|
* |00000030| 61 69 6e 0d 0a 75 73 65 72 2d 61 67 65 6e 74 3a |ain..user-agent:|
* |00000040| 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f 64 65 | ReactorNetty/de|
* ...
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] WRITE: 38B
* +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
* |00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
* |00000020| 20 30 0d 0a 0d 0a | 0.... |
* +--------+-------------------------------------------------+----------------+
* }
* </pre>
*/
/**
* When wire logging is enabled with this format, only the events will be logged.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] REGISTERED
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] ACTIVE
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] READ: 145B
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] WRITE: 38B
* }
* </pre>
*/
/**
* When wire logging is enabled with this format, both events and content will be logged.
* The content will be in plain text format.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] REGISTERED
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] ACTIVE
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] READ: 145B POST /test/World HTTP/1.1
* Content-Type: text/plain
* user-agent: ReactorNetty/dev
* ...
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] WRITE: 38B HTTP/1.1 200 OK
* content-length: 0
* }
* </pre>
*/
When you need to change the default formatter you can configure it as follows:
import io.netty.handler.logging.LogLevel;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
import reactor.netty.transport.logging.AdvancedByteBufFormat;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) (1)
.host("example.com")
.port(80)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Enables the wire logging, AdvancedByteBufFormat#TEXTUAL is used for printing the content. |
6.3. Event Loop Group
By default Reactor Netty
uses an “Event Loop Group”, where the number of the worker threads equals the number of
processors available to the runtime on initialization (but with a minimum value of 4). This “Event Loop Group” is shared between all servers and clients in one JVM.
When you need a different configuration, you can use one of the LoopResources
#create
methods.
The following listing shows the default configuration for the Event Loop Group:
/**
* Default worker thread count, fallback to available processor
* (but with a minimum value of 4).
*/
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
* Default selector thread count, fallback to -1 (no selector thread)
* <p><strong>Note:</strong> In most use cases using a worker thread also as a selector thread works well.
* A possible use case for specifying a separate selector thread might be when the worker threads are too busy
* and connections cannot be accepted fast enough.
* <p><strong>Note:</strong> Although more than 1 can be configured as a selector thread count, in reality
* only 1 thread will be used as a selector thread.
*/
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
* Default worker thread count for UDP, fallback to available processor
* (but with a minimum value of 4).
*/
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
/**
* Default quiet period that guarantees that the disposal of the underlying LoopResources
* will not happen, fallback to 2 seconds.
*/
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/**
* Default maximum amount of time to wait until the disposal of the underlying LoopResources
* regardless if a task was submitted during the quiet period, fallback to 15 seconds.
*/
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";
/**
* Default value whether the native transport (epoll, kqueue) will be preferred,
* fallback it will be preferred when available.
*/
public static final String NATIVE = "reactor.netty.native";
If you need changes to these settings, you can apply the following configuration:
import reactor.netty.Connection;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
LoopResources loop = LoopResources.create("event-loop", 1, 4, true);
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.runOn(loop)
.connectNow();
connection.onDispose()
.block();
}
}
6.3.1. Disposing Event Loop Group
-
If you use the default
Event Loop Group
provided by Reactor Netty, invokeHttpResources
#disposeLoopsAndConnections
/#disposeLoopsAndConnectionsLater
method.
Disposing HttpResources means that every server/client that is using it, will not be able to use it anymore!
|
-
If you use custom
LoopResources
, invokeLoopResources
#dispose
/#disposeLater
method.
Disposing the custom LoopResources means that every server/client that is configured to use it, will not be able to use it anymore!
|
7. Connection Pool
By default, TcpClient
(TcpClient.create()
) uses a shared ConnectionProvider
. This ConnectionProvider
is configured to create
a “fixed” connection pool per remote host (a remote host implies the combination of a hostname and its associated port number) with:
-
500
as the maximum number of active channels -
1000
as the maximum number of further channel acquisition attempts allowed to be kept in a pending state -
The rest of the configurations are the defaults (check the system properties or the builder configurations below)
This means that the implementation creates a new channel if someone tries to acquire a channel
as long as less than 500
have been created and are managed by the pool.
When the maximum number of channels in the pool is reached, up to 1000
new attempts to
acquire a channel are delayed (pending) until a channel is closed (and thus a slot is free and a new connection can be opened),
and further attempts are declined with an error.
Connections used by the TcpClient are never returned to the pool, but closed. When a connection is closed, a slot is freed
in the pool and thus a new connection can be opened when needed. This behaviour is specific only for TcpClient
and is intentional because only the user/framework knows if the actual protocol is compatible with reusing connections.
(opposed to HttpClient where the protocol is known and Reactor Netty can return the connection to the pool when this is possible)
|
/**
* Default max connections. Fallback to
* 2 * available number of processors (but with a minimum value of 16)
*/
public static final String POOL_MAX_CONNECTIONS = "reactor.netty.pool.maxConnections";
/**
* Default acquisition timeout (milliseconds) before error. If -1 will never wait to
* acquire before opening a new
* connection in an unbounded fashion. Fallback 45 seconds
*/
public static final String POOL_ACQUIRE_TIMEOUT = "reactor.netty.pool.acquireTimeout";
When you need to change the default settings, you can configure the ConnectionProvider
as follows:
import reactor.netty.Connection;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
ConnectionProvider provider =
ConnectionProvider.builder("fixed")
.maxConnections(50)
.pendingAcquireTimeout(Duration.ofSeconds(60)) (1)
.build();
Connection connection =
TcpClient.create(provider)
.host("example.com")
.port(80)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Configures the maximum time for the pending acquire operation to 60 seconds. |
The following listing shows the available configurations:
Configuration name | Description |
---|---|
|
When this option is enabled, connection pools are regularly checked in the background, and those that are empty and been inactive for a specified time become eligible for disposal. By default, this background disposal of inactive pools is disabled. |
|
When |
|
The maximum number of connections (per connection pool) before start pending. Default to 2 * available number of processors (but with a minimum value of 16). |
|
Enables/disables built-in integration with Micrometer. |
|
The maximum number of extra attempts at acquiring a connection to keep in a pending queue. If -1 is specified, the pending queue does not have upper limit. Default to 2 * max connections. |
|
The maximum time before which a pending acquire must complete, or a TimeoutException is thrown (resolution: ms). If -1 is specified, no such timeout is applied. Default: 45 seconds. |
If you need to disable the connection pool, you can apply the following configuration:
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.newConnection()
.host("example.com")
.port(80)
.connectNow();
connection.onDispose()
.block();
}
}
7.1. Disposing Connection Pool
-
If you use the default
ConnectionProvider
provided by Reactor Netty, invokeHttpResources
#disposeLoopsAndConnections
/#disposeLoopsAndConnectionsLater
method.
Disposing HttpResources means that every client that is using it, will not be able to use it anymore!
|
-
If you use custom
ConnectionProvider
, invokeConnectionProvider
#dispose
/#disposeLater
/#disposeWhen
method.
Disposing the custom ConnectionProvider means that every client that is configured to use it, will not be able to use it anymore!
|
7.2. Metrics
The pooled ConnectionProvider
supports built-in integration with Micrometer
.
It exposes all metrics with a prefix of reactor.netty.connection.provider
.
Pooled ConnectionProvider
metrics
metric name | type | description |
---|---|---|
reactor.netty.connection.provider.total.connections |
Gauge |
The number of all connections, active or idle. See Total Connections |
reactor.netty.connection.provider.active.connections |
Gauge |
The number of the connections that have been successfully acquired and are in active use. See Active Connections |
reactor.netty.connection.provider.max.connections |
Gauge |
The maximum number of active connections that are allowed. See Max Connections |
reactor.netty.connection.provider.idle.connections |
Gauge |
The number of the idle connections. See Idle Connections |
reactor.netty.connection.provider.pending.connections |
Gauge |
The number of requests that are waiting for a connection. See Pending Connections |
reactor.netty.connection.provider.pending.connections.time |
Timer |
Time spent in pending acquire a connection from the connection pool. See Pending Connections Time |
reactor.netty.connection.provider.max.pending.connections |
Gauge |
The maximum number of requests that will be queued while waiting for a ready connection. See Max Pending Connections |
The following example enables that integration:
import reactor.netty.Connection;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
ConnectionProvider provider =
ConnectionProvider.builder("fixed")
.maxConnections(50)
.metrics(true) (1)
.build();
Connection connection =
TcpClient.create(provider)
.host("example.com")
.port(80)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Enables the built-in integration with Micrometer |
8. SSL and TLS
When you need SSL or TLS, you can apply the following configuration.
By default, if OpenSSL
is available, the
SslProvider.OPENSSL
provider is used as a provider. Otherwise, the provider is
SslProvider.JDK.
You can switch the provider by using
SslContextBuilder
or by setting -Dio.netty.handler.ssl.noOpenSsl=true
.
The following example uses SslContextBuilder
:
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpSslContextSpec;
public class Application {
public static void main(String[] args) {
TcpSslContextSpec tcpSslContextSpec = TcpSslContextSpec.forClient();
Connection connection =
TcpClient.create()
.host("example.com")
.port(443)
.secure(spec -> spec.sslContext(tcpSslContextSpec))
.connectNow();
connection.onDispose()
.block();
}
}
8.1. Server Name Indication
By default, the TCP
client sends the remote host name as SNI
server name.
When you need to change this default setting, you can configure the TCP
client as follows:
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
import javax.net.ssl.SNIHostName;
public class Application {
public static void main(String[] args) throws Exception {
SslContext sslContext = SslContextBuilder.forClient().build();
Connection connection =
TcpClient.create()
.host("127.0.0.1")
.port(8080)
.secure(spec -> spec.sslContext(sslContext)
.serverNames(new SNIHostName("test.com")))
.connectNow();
connection.onDispose()
.block();
}
}
9. Proxy Support
Reactor Netty supports the proxy functionality provided by Netty and provides a way
to specify non proxy hosts
through the ProxyProvider
builder.
Netty’s HTTP proxy support always uses CONNECT
method in order to establish a tunnel to the specified proxy regardless of the scheme that is used http
or https
.
(More information: Netty enforce HTTP proxy to support HTTP CONNECT method).
Some proxies might not support CONNECT
method when the scheme is http
or might need to be configured in order to support this way of communication.
Sometimes this might be the reason for not being able to connect to the proxy. Consider checking the proxy documentation
whether it supports or needs an additional configuration in order to support CONNECT
method.
The following example uses ProxyProvider
:
import reactor.netty.Connection;
import reactor.netty.transport.ProxyProvider;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.proxy(spec -> spec.type(ProxyProvider.Proxy.SOCKS4)
.host("proxy")
.port(8080)
.nonProxyHosts("localhost")
.connectTimeoutMillis(20_000)) (1)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Configures the connection establishment timeout to 20 seconds. |
10. Metrics
The TCP client supports built-in integration with Micrometer
.
It exposes all metrics with a prefix of reactor.netty.tcp.client
.
The following table provides information for the TCP client metrics:
metric name | type | description |
---|---|---|
reactor.netty.tcp.client.data.received |
DistributionSummary |
Amount of the data received, in bytes. See Data Received |
reactor.netty.tcp.client.data.sent |
DistributionSummary |
Amount of the data sent, in bytes. See Data Sent |
reactor.netty.tcp.client.errors |
Counter |
Number of errors that occurred. See Errors Count |
reactor.netty.tcp.client.tls.handshake.time |
Timer |
Time spent for TLS handshake. See Tls Handshake Time |
reactor.netty.tcp.client.connect.time |
Timer |
Time spent for connecting to the remote address. See Connect Time |
reactor.netty.tcp.client.address.resolver |
Timer |
Time spent for resolving the address. See Hostname Resolution Time |
These additional metrics are also available:
Pooled ConnectionProvider
metrics
metric name | type | description |
---|---|---|
reactor.netty.connection.provider.total.connections |
Gauge |
The number of all connections, active or idle. See Total Connections |
reactor.netty.connection.provider.active.connections |
Gauge |
The number of the connections that have been successfully acquired and are in active use. See Active Connections |
reactor.netty.connection.provider.max.connections |
Gauge |
The maximum number of active connections that are allowed. See Max Connections |
reactor.netty.connection.provider.idle.connections |
Gauge |
The number of the idle connections. See Idle Connections |
reactor.netty.connection.provider.pending.connections |
Gauge |
The number of requests that are waiting for a connection. See Pending Connections |
reactor.netty.connection.provider.pending.connections.time |
Timer |
Time spent in pending acquire a connection from the connection pool. See Pending Connections Time |
reactor.netty.connection.provider.max.pending.connections |
Gauge |
The maximum number of requests that will be queued while waiting for a ready connection. See Max Pending Connections |
ByteBufAllocator
metrics
metric name | type | description |
---|---|---|
reactor.netty.bytebuf.allocator.used.heap.memory |
Gauge |
The number of bytes reserved by heap buffer allocator. See Used Heap Memory |
reactor.netty.bytebuf.allocator.used.direct.memory |
Gauge |
The number of bytes reserved by direct buffer allocator. See Used Direct Memory |
reactor.netty.bytebuf.allocator.heap.arenas |
Gauge |
The number of heap arenas (when |
reactor.netty.bytebuf.allocator.direct.arenas |
Gauge |
The number of direct arenas (when |
reactor.netty.bytebuf.allocator.threadlocal.caches |
Gauge |
The number of thread local caches (when |
reactor.netty.bytebuf.allocator.small.cache.size |
Gauge |
The size of the small cache (when |
reactor.netty.bytebuf.allocator.normal.cache.size |
Gauge |
The size of the normal cache (when |
reactor.netty.bytebuf.allocator.chunk.size |
Gauge |
The chunk size for an arena (when |
reactor.netty.bytebuf.allocator.active.heap.memory |
Gauge |
The actual bytes consumed by in-use buffers allocated from heap buffer pools (when |
reactor.netty.bytebuf.allocator.active.direct.memory |
Gauge |
The actual bytes consumed by in-use buffers allocated from direct buffer pools (when |
EventLoop
metrics
metric name | type | description |
---|---|---|
reactor.netty.eventloop.pending.tasks |
Gauge |
The number of tasks that are pending for processing on an event loop. See Pending Tasks |
The following example enables that integration:
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.metrics(true) (1)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Enables the built-in integration with Micrometer |
When TCP client metrics are needed for an integration with a system other than Micrometer
or you want
to provide your own integration with Micrometer
, you can provide your own metrics recorder, as follows:
import reactor.netty.Connection;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.tcp.TcpClient;
import java.net.SocketAddress;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.metrics(true, CustomChannelMetricsRecorder::new) (1)
.connectNow();
connection.onDispose()
.block();
}
1 | Enables TCP client metrics and provides ChannelMetricsRecorder implementation. |
11. Tracing
The TCP client supports built-in integration with Micrometer Tracing
.
The following table provides information for the TCP client spans:
contextual name | description |
---|---|
hostname resolution |
Information and time spent for resolving the address. See Hostname Resolution Span. |
connect |
Information and time spent for connecting to the remote address. See Connect Span. |
tls handshake |
Information and time spent for TLS handshake. See Tls Handshake Span. |
The following example enables that integration. This concrete example uses Brave
and reports the information to Zipkin
.
See the Micrometer Tracing
documentation for OpenTelemetry
setup.
import brave.Tracing;
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext;
import io.micrometer.tracing.brave.bridge.BraveTracer;
import reactor.netty.Connection;
import reactor.netty.observability.ReactorNettyTracingObservationHandler;
import reactor.netty.tcp.TcpClient;
import zipkin2.reporter.brave.AsyncZipkinSpanHandler;
import zipkin2.reporter.urlconnection.URLConnectionSender;
import static reactor.netty.Metrics.OBSERVATION_REGISTRY;
public class Application {
public static void main(String[] args) {
init(); (1)
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.metrics(true) (2)
.connectNow();
connection.onDispose()
.block();
}
/**
* This setup is based on
* <a href="https://micrometer.io/docs/tracing#_micrometer_tracing_brave_setup">Micrometer Tracing Brave Setup</a>.
*/
static void init() {
AsyncZipkinSpanHandler spanHandler = AsyncZipkinSpanHandler
.create(URLConnectionSender.create("http://localhost:9411/api/v2/spans"));
StrictCurrentTraceContext braveCurrentTraceContext = StrictCurrentTraceContext.create();
CurrentTraceContext bridgeContext = new BraveCurrentTraceContext(braveCurrentTraceContext);
Tracing tracing =
Tracing.newBuilder()
.currentTraceContext(braveCurrentTraceContext)
.supportsJoin(false)
.traceId128Bit(true)
.sampler(Sampler.ALWAYS_SAMPLE)
.addSpanHandler(spanHandler)
.localServiceName("reactor-netty-examples")
.build();
brave.Tracer braveTracer = tracing.tracer();
Tracer tracer = new BraveTracer(braveTracer, bridgeContext, new BraveBaggageManager());
OBSERVATION_REGISTRY.observationConfig()
.observationHandler(new ReactorNettyTracingObservationHandler(tracer));
}
}
1 | Initializes Brave, Zipkin, and the Observation registry. |
2 | Enables the built-in integration with Micrometer. |
The result in Zipkin
looks like:
11.1. Access Current Observation
Project Micrometer provides a library
that assists with context propagation across
different types of context mechanisms such as ThreadLocal
, Reactor Context
and others.
The following example shows how to use this library in a custom ChannelHandler
:
import brave.Tracing;
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext;
import io.micrometer.tracing.brave.bridge.BraveTracer;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import reactor.netty.Connection;
import reactor.netty.observability.ReactorNettyTracingObservationHandler;
import reactor.netty.tcp.TcpClient;
import zipkin2.reporter.brave.AsyncZipkinSpanHandler;
import zipkin2.reporter.urlconnection.URLConnectionSender;
import java.net.SocketAddress;
import static reactor.netty.Metrics.OBSERVATION_REGISTRY;
public class Application {
public static void main(String[] args) {
init(); (1)
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.metrics(true) (2)
.doOnChannelInit((observer, channel, address) -> channel.pipeline().addFirst(
"custom-channel-handler", CustomChannelOutboundHandler.INSTANCE)) (3)
.connectNow();
connection.onDispose()
.block();
}
static final class CustomChannelOutboundHandler extends ChannelOutboundHandlerAdapter {
static final ChannelHandler INSTANCE = new CustomChannelOutboundHandler();
@Override
public boolean isSharable() {
return true;
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
try (ContextSnapshot.Scope scope = ContextSnapshotFactory.builder().build().setThreadLocalsFrom(ctx.channel())) {
System.out.println("Current Observation in Scope: " + OBSERVATION_REGISTRY.getCurrentObservation());
//"FutureReturnValueIgnored" this is deliberate
ctx.connect(remoteAddress, localAddress, promise);
}
System.out.println("Current Observation: " + OBSERVATION_REGISTRY.getCurrentObservation());
}
}
1 | Initializes Brave, Zipkin, and the Observation registry. |
2 | Enables the built-in integration with Micrometer. |
3 | Custom ChannelHandler that uses context propagation library. This concrete example overrides only
ChannelOutboundHandlerAdapter#connect , if it is needed, the same logic can be used for the rest of the methods.
Also, this concrete example sets all ThreadLocal values for which there is a value in the given Channel ,
if another behaviour is needed please check context propagation library API .
For example, you may want to set only some of the ThreadLocal values. |
When you enable Reactor Netty tracing within a framework, you may need to let Reactor Netty use the ObservationRegistry created by this framework.
For this purpose you need to invoke reactor.netty.Metrics#observationRegistry .
You may also need to configure the Reactor Netty ObservationHandlers using the API provided by the framework.
|
12. Unix Domain Sockets
The TCP
client supports Unix Domain Sockets (UDS) when native transport is in use for all java versions
and when NIO transport is in use for java 17 and above.
The following example shows how to use UDS support:
import io.netty.channel.unix.DomainSocketAddress;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
//import java.net.UnixDomainSocketAddress;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
// The configuration below is available only when Epoll/KQueue transport is used
.remoteAddress(() -> new DomainSocketAddress("/tmp/test.sock")) (1)
// The configuration below is available only when NIO transport is used with Java 17+
//.remoteAddress(() -> UnixDomainSocketAddress.of("/tmp/test.sock"))
.connectNow();
connection.onDispose()
.block();
}
}
1 | Specifies DomainSocketAddress that will be used |
13. Host Name Resolution
By default, the TcpClient
uses Netty’s domain name lookup mechanism that resolves a domain name asynchronously.
This is as an alternative of the JVM’s built-in blocking resolver.
When you need to change the default settings, you can configure the TcpClient
as follows:
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.resolver(spec -> spec.queryTimeout(Duration.ofMillis(500))) (1)
.connectNow();
connection.onDispose()
.block();
}
}
1 | The timeout of each DNS query performed by this resolver will be 500ms. |
The following listing shows the available configurations.
Additionally, TCP fallback
is enabled by default.
Configuration name | Description |
---|---|
|
The supplier of the local address to bind to. |
|
The max time to live of the cached DNS resource records (resolution: seconds).
If the time to live of the DNS resource record returned by the DNS server is greater
than this max time to live, this resolver ignores the time to live from
the DNS server and uses use this max time to live.
Default to |
|
The min time to live of the cached DNS resource records (resolution: seconds). If the time to live of the DNS resource record returned by the DNS server is less than this min time to live, this resolver ignores the time to live from the DNS server and uses this min time to live. Default: 0. |
|
The time to live of the cache for the failed DNS queries (resolution: seconds). Default: 0. |
|
When this setting is enabled, the resolver notifies as soon as all queries for the preferred address type are complete.
When this setting is disabled, the resolver notifies when all possible address types are complete.
This configuration is applicable for |
|
Disables the automatic inclusion of an optional record that tries to give a hint to the remote DNS server about how much data the resolver can read per response. By default, this setting is enabled. |
|
Specifies whether this resolver has to send a DNS query with the recursion desired (RD) flag set. By default, this setting is enabled. |
|
Sets a custom function to create a |
|
Sets a custom |
|
Sets the capacity of the datagram packet buffer (in bytes). Default: 4096. |
|
Sets the maximum allowed number of DNS queries to send when resolving a host name. Default: 16. |
|
Sets the number of dots that must appear in a name before an initial absolute query is made. Default: -1 (to determine the value from the OS on Unix or use a value of 1 otherwise). |
|
Sets the timeout of each DNS query performed by this resolver (resolution: milliseconds). Default: 5000. |
|
The cache to use to store resolved DNS entries. |
|
The list of the protocol families of the resolved address. |
|
Specifies whether this resolver will also fallback to TCP if a timeout is detected. By default, the resolver will only try to use TCP if the response is marked as truncated. |
|
Enables an
|
|
Performs the communication with the DNS servers on the given
|
|
The list of search domains of the resolver. By default, the effective search domain list is populated by using the system DNS search domains. |
|
A specific logger and log level to be used by this resolver when generating detailed trace information in case of resolution failure. |
Sometimes, you may want to switch to the JVM built-in resolver. To do so, you can configure the TcpClient
as follows:
import io.netty.resolver.DefaultAddressResolverGroup;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
public class Application {
public static void main(String[] args) {
Connection connection =
TcpClient.create()
.host("example.com")
.port(80)
.resolver(DefaultAddressResolverGroup.INSTANCE) (1)
.connectNow();
connection.onDispose()
.block();
}
}
1 | Sets the JVM built-in resolver. |