TCP Server
Reactor Netty
provides an easy to use and configure
TcpServer
.
It hides most of the Netty
functionality that is needed to create a TCP
server
and adds Reactive Streams
backpressure.
1. Starting and Stopping
To start a TCP
server, you must create and configure a
TcpServer
instance.
By default, the host
is configured for any local address, and the system picks up an ephemeral port
when the bind
operation is invoked. The following example shows how to create and
configure a TcpServer
instance:
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create() (1)
.bindNow(); (2)
server.onDispose()
.block();
}
}
1 | Creates a TcpServer
instance that is ready for configuring. |
2 | Starts the server in a blocking fashion and waits for it to finish initializing. |
The returned DisposableServer
offers a simple server API, including disposeNow()
,
which shuts the server down in a blocking fashion.
1.1. Host and Port
To serve on a specific host
and port
, you can apply the following configuration to the TCP
server:
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.host("localhost") (1)
.port(8080) (2)
.bindNow();
server.onDispose()
.block();
}
}
1 | Configures the TCP server host |
2 | Configures the TCP server port |
To serve on multiple addresses, after having configured the TcpServer
you can bind it multiple times to obtain separate DisposableServer`s.
All created servers will share resources such as `LoopResources
because they use the same configuration instance under the hood.
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
public class MultiAddressApplication {
public static void main(String[] args) {
TcpServer tcpServer = TcpServer.create();
DisposableServer server1 = tcpServer
.host("localhost") (1)
.port(8080) (2)
.bindNow();
DisposableServer server2 = tcpServer
.host("0.0.0.0") (3)
.port(8081) (4)
.bindNow();
Mono.when(server1.onDispose(), server2.onDispose())
.block();
}
}
1 | Configures the first TCP server host |
2 | Configures the first TCP server port |
3 | Configures the second TCP server host |
4 | Configures the second TCP server port |
2. Eager Initialization
By default, the initialization of the TcpServer
resources happens on demand. This means that the bind
operation
absorbs the extra time needed to initialize and load:
-
the event loop groups
-
the native transport libraries (when native transport is used)
-
the native libraries for the security (in case of
OpenSsl
)
When you need to preload these resources, you can configure the TcpServer
as follows:
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
TcpServer tcpServer =
TcpServer.create()
.handle((inbound, outbound) -> inbound.receive().then());
tcpServer.warmup() (1)
.block();
DisposableServer server = tcpServer.bindNow();
server.onDispose()
.block();
}
}
1 | Initialize and load the event loop groups, the native transport libraries and the native libraries for the security |
3. Writing Data
In order to send data to a connected client, you must attach an I/O handler.
The I/O handler has access to NettyOutbound
to be able to write data. The following example shows how to attach an I/O handler:
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.handle((inbound, outbound) -> outbound.sendString(Mono.just("hello"))) (1)
.bindNow();
server.onDispose()
.block();
}
}
1 | Sends hello string to the connected clients |
4. Consuming Data
In order to receive data from a connected client, you must attach an I/O handler.
The I/O handler has access to NettyInbound
to be able to read data. The following example shows how to use it:
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.handle((inbound, outbound) -> inbound.receive().then()) (1)
.bindNow();
server.onDispose()
.block();
}
}
1 | Receives data from the connected clients |
5. Lifecycle Callbacks
The following lifecycle callbacks are provided to let you extend the TcpServer
:
Callback | Description |
---|---|
|
Invoked when the server channel is about to bind. |
|
Invoked when the server channel is bound. |
|
Invoked when initializing the channel. |
|
Invoked when a remote client is connected |
|
Invoked when the server channel is unbound. |
The following example uses the doOnConnection
and doOnChannelInit
callbacks:
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.util.concurrent.TimeUnit;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.doOnConnection(conn ->
conn.addHandlerFirst(new ReadTimeoutHandler(10, TimeUnit.SECONDS))) (1)
.doOnChannelInit((observer, channel, remoteAddress) ->
channel.pipeline()
.addFirst(new LoggingHandler("reactor.netty.examples"))) (2)
.bindNow();
server.onDispose()
.block();
}
}
1 | Netty pipeline is extended with ReadTimeoutHandler when a remote client is connected. |
2 | Netty pipeline is extended with LoggingHandler when initializing the channel. |
6. TCP-level Configurations
This section describes three kinds of configuration that you can use at the TCP level:
6.1. Setting Channel Options
By default, the TCP
server is configured with the following options:
TcpServerBind() {
Map<ChannelOption<?>, Boolean> childOptions = new HashMap<>(MapUtils.calculateInitialCapacity(2));
childOptions.put(ChannelOption.AUTO_READ, false);
childOptions.put(ChannelOption.TCP_NODELAY, true);
this.config = new TcpServerConfig(
Collections.singletonMap(ChannelOption.SO_REUSEADDR, true),
childOptions,
() -> new InetSocketAddress(DEFAULT_PORT));
}
If additional options are necessary or changes to the current options are needed, you can apply the following configuration:
import io.netty.channel.ChannelOption;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.bindNow();
server.onDispose()
.block();
}
}
You can find more about Netty
channel options at the following links:
6.2. Wire Logger
Reactor Netty provides wire logging for when the traffic between the peers needs to be inspected.
By default, wire logging is disabled.
To enable it, you must set the logger reactor.netty.tcp.TcpServer
level to DEBUG
and apply the following configuration:
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.wiretap(true) (1)
.bindNow();
server.onDispose()
.block();
}
}
1 | Enables the wire logging |
6.2.1. Wire Logger formatters
Reactor Netty supports 3 different formatters:
-
AdvancedByteBufFormat#HEX_DUMP - the default
/**
* When wire logging is enabled with this format, both events and content will be logged.
* The content will be in hex format.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] REGISTERED
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] ACTIVE
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] READ: 145B
* +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 50 4f 53 54 20 2f 74 65 73 74 2f 57 6f 72 6c 64 |POST /test/World|
* |00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 43 6f 6e 74 65 | HTTP/1.1..Conte|
* |00000020| 6e 74 2d 54 79 70 65 3a 20 74 65 78 74 2f 70 6c |nt-Type: text/pl|
* |00000030| 61 69 6e 0d 0a 75 73 65 72 2d 61 67 65 6e 74 3a |ain..user-agent:|
* |00000040| 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f 64 65 | ReactorNetty/de|
* ...
* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] WRITE: 38B
* +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
* |00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
* |00000020| 20 30 0d 0a 0d 0a | 0.... |
* +--------+-------------------------------------------------+----------------+
* }
* </pre>
*/
/**
* When wire logging is enabled with this format, only the events will be logged.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] REGISTERED
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] ACTIVE
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] READ: 145B
* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] WRITE: 38B
* }
* </pre>
*/
/**
* When wire logging is enabled with this format, both events and content will be logged.
* The content will be in plain text format.
* <p>Examples:</p>
* <pre>
* {@code
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] REGISTERED
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] ACTIVE
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] READ: 145B POST /test/World HTTP/1.1
* Content-Type: text/plain
* user-agent: ReactorNetty/dev
* ...
* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] WRITE: 38B HTTP/1.1 200 OK
* content-length: 0
* }
* </pre>
*/
When you need to change the default formatter you can configure it as follows:
import io.netty.handler.logging.LogLevel;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import reactor.netty.transport.logging.AdvancedByteBufFormat;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) (1)
.bindNow();
server.onDispose()
.block();
}
}
1 | Enables the wire logging, AdvancedByteBufFormat#TEXTUAL is used for printing the content. |
6.3. Event Loop Group
By default Reactor Netty
uses an “Event Loop Group”, where the number of the worker threads equals the number of
processors available to the runtime on initialization (but with a minimum value of 4). This “Event Loop Group” is shared between all servers and clients in one JVM.
When you need a different configuration, you can use one of the LoopResources
#create
methods.
The following listing shows the default configuration for the Event Loop Group:
/**
* Default worker thread count, fallback to available processor
* (but with a minimum value of 4).
*/
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
* Default selector thread count, fallback to -1 (no selector thread)
* <p><strong>Note:</strong> In most use cases using a worker thread also as a selector thread works well.
* A possible use case for specifying a separate selector thread might be when the worker threads are too busy
* and connections cannot be accepted fast enough.
* <p><strong>Note:</strong> Although more than 1 can be configured as a selector thread count, in reality
* only 1 thread will be used as a selector thread.
*/
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
* Default worker thread count for UDP, fallback to available processor
* (but with a minimum value of 4).
*/
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
/**
* Default quiet period that guarantees that the disposal of the underlying LoopResources
* will not happen, fallback to 2 seconds.
*/
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/**
* Default maximum amount of time to wait until the disposal of the underlying LoopResources
* regardless if a task was submitted during the quiet period, fallback to 15 seconds.
*/
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";
/**
* Default value whether the native transport (epoll, kqueue) will be preferred,
* fallback it will be preferred when available.
*/
public static final String NATIVE = "reactor.netty.native";
If you need changes to these settings, you can apply the following configuration:
import reactor.netty.DisposableServer;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
LoopResources loop = LoopResources.create("event-loop", 1, 4, true);
DisposableServer server =
TcpServer.create()
.runOn(loop)
.bindNow();
server.onDispose()
.block();
}
}
6.3.1. Disposing Event Loop Group
-
If you use the default
Event Loop Group
provided by Reactor Netty, invokeHttpResources
#disposeLoopsAndConnections
/#disposeLoopsAndConnectionsLater
method.
Disposing HttpResources means that every server/client that is using it, will not be able to use it anymore!
|
-
If you use custom
LoopResources
, invokeLoopResources
#dispose
/#disposeLater
method.
Disposing the custom LoopResources means that every server/client that is configured to use it, will not be able to use it anymore!
|
7. SSL and TLS
When you need SSL or TLS, you can apply the configuration shown in the next listing.
By default, if OpenSSL
is available,
SslProvider.OPENSSL
provider is used as a provider. Otherwise
SslProvider.JDK
is used.
Switching the provider can be done through
SslContextBuilder
or by setting -Dio.netty.handler.ssl.noOpenSsl=true
.
The following example uses SslContextBuilder
:
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import reactor.netty.tcp.TcpSslContextSpec;
import java.io.File;
public class Application {
public static void main(String[] args) {
File cert = new File("certificate.crt");
File key = new File("private.key");
TcpSslContextSpec tcpSslContextSpec = TcpSslContextSpec.forServer(cert, key);
DisposableServer server =
TcpServer.create()
.secure(spec -> spec.sslContext(tcpSslContextSpec))
.bindNow();
server.onDispose()
.block();
}
}
7.1. Server Name Indication
You can configure the TCP
server with multiple SslContext
mapped to a specific domain.
An exact domain name or a domain name containing a wildcard can be used when configuring the SNI
mapping.
The following example uses a domain name containing a wildcard:
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.io.File;
public class Application {
public static void main(String[] args) throws Exception {
File defaultCert = new File("default_certificate.crt");
File defaultKey = new File("default_private.key");
File testDomainCert = new File("default_certificate.crt");
File testDomainKey = new File("default_private.key");
SslContext defaultSslContext = SslContextBuilder.forServer(defaultCert, defaultKey).build();
SslContext testDomainSslContext = SslContextBuilder.forServer(testDomainCert, testDomainKey).build();
DisposableServer server =
TcpServer.create()
.secure(spec -> spec.sslContext(defaultSslContext)
.addSniMapping("*.test.com",
testDomainSpec -> testDomainSpec.sslContext(testDomainSslContext)))
.bindNow();
server.onDispose()
.block();
}
}
8. Metrics
The TCP server supports built-in integration with Micrometer
.
It exposes all metrics with a prefix of reactor.netty.tcp.server
.
The following table provides information for the TCP server metrics:
metric name | type | description |
---|---|---|
reactor.netty.tcp.server.connections.total |
Gauge |
The number of all opened connections. See Connections Total |
reactor.netty.tcp.server.data.received |
DistributionSummary |
Amount of the data received, in bytes. See Data Received |
reactor.netty.tcp.server.data.sent |
DistributionSummary |
Amount of the data sent, in bytes. See Data Sent |
reactor.netty.tcp.server.errors |
Counter |
Number of errors that occurred. See Errors Count |
reactor.netty.tcp.server.tls.handshake.time |
Timer |
Time spent for TLS handshake. See Tls Handshake Time |
These additional metrics are also available:
ByteBufAllocator
metrics
metric name | type | description |
---|---|---|
reactor.netty.bytebuf.allocator.used.heap.memory |
Gauge |
The number of bytes reserved by heap buffer allocator. See Used Heap Memory |
reactor.netty.bytebuf.allocator.used.direct.memory |
Gauge |
The number of bytes reserved by direct buffer allocator. See Used Direct Memory |
reactor.netty.bytebuf.allocator.heap.arenas |
Gauge |
The number of heap arenas (when |
reactor.netty.bytebuf.allocator.direct.arenas |
Gauge |
The number of direct arenas (when |
reactor.netty.bytebuf.allocator.threadlocal.caches |
Gauge |
The number of thread local caches (when |
reactor.netty.bytebuf.allocator.small.cache.size |
Gauge |
The size of the small cache (when |
reactor.netty.bytebuf.allocator.normal.cache.size |
Gauge |
The size of the normal cache (when |
reactor.netty.bytebuf.allocator.chunk.size |
Gauge |
The chunk size for an arena (when |
reactor.netty.bytebuf.allocator.active.heap.memory |
Gauge |
The actual bytes consumed by in-use buffers allocated from heap buffer pools (when |
reactor.netty.bytebuf.allocator.active.direct.memory |
Gauge |
The actual bytes consumed by in-use buffers allocated from direct buffer pools (when |
EventLoop
metrics
metric name | type | description |
---|---|---|
reactor.netty.eventloop.pending.tasks |
Gauge |
The number of tasks that are pending for processing on an event loop. See Pending Tasks |
The following example enables that integration:
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.metrics(true) (1)
.bindNow();
server.onDispose()
.block();
}
}
1 | Enables the built-in integration with Micrometer |
When TCP server metrics are needed for an integration with a system other than Micrometer
or you want
to provide your own integration with Micrometer
, you can provide your own metrics recorder, as follows:
import reactor.netty.DisposableServer;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.tcp.TcpServer;
import java.net.SocketAddress;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
.metrics(true, CustomChannelMetricsRecorder::new) (1)
.bindNow();
server.onDispose()
.block();
}
1 | Enables TCP server metrics and provides ChannelMetricsRecorder implementation. |
9. Tracing
The TCP server supports built-in integration with Micrometer Tracing
.
The following table provides information for the TCP server spans:
contextual name | description |
---|---|
tls handshake |
Information and time spent for TLS handshake. See Tls Handshake Span. |
The following example enables that integration. This concrete example uses Brave
and reports the information to Zipkin
.
See the Micrometer Tracing
documentation for OpenTelemetry
setup.
import brave.Tracing;
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext;
import io.micrometer.tracing.brave.bridge.BraveTracer;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.observability.ReactorNettyTracingObservationHandler;
import reactor.netty.tcp.TcpServer;
import zipkin2.reporter.brave.AsyncZipkinSpanHandler;
import zipkin2.reporter.urlconnection.URLConnectionSender;
import static reactor.netty.Metrics.OBSERVATION_REGISTRY;
public class Application {
public static void main(String[] args) {
init(); (1)
DisposableServer server =
TcpServer.create()
.metrics(true) (2)
.handle((inbound, outbound) -> outbound.sendString(Mono.just("hello")))
.bindNow();
server.onDispose()
.block();
}
/**
* This setup is based on
* <a href="https://micrometer.io/docs/tracing#_micrometer_tracing_brave_setup">Micrometer Tracing Brave Setup</a>.
*/
static void init() {
AsyncZipkinSpanHandler spanHandler = AsyncZipkinSpanHandler
.create(URLConnectionSender.create("http://localhost:9411/api/v2/spans"));
StrictCurrentTraceContext braveCurrentTraceContext = StrictCurrentTraceContext.create();
CurrentTraceContext bridgeContext = new BraveCurrentTraceContext(braveCurrentTraceContext);
Tracing tracing =
Tracing.newBuilder()
.currentTraceContext(braveCurrentTraceContext)
.supportsJoin(false)
.traceId128Bit(true)
.sampler(Sampler.ALWAYS_SAMPLE)
.addSpanHandler(spanHandler)
.localServiceName("reactor-netty-examples")
.build();
brave.Tracer braveTracer = tracing.tracer();
Tracer tracer = new BraveTracer(braveTracer, bridgeContext, new BraveBaggageManager());
OBSERVATION_REGISTRY.observationConfig()
.observationHandler(new ReactorNettyTracingObservationHandler(tracer));
}
}
1 | Initializes Brave, Zipkin, and the Observation registry. |
2 | Enables the built-in integration with Micrometer. |
The result in Zipkin
looks like:
9.1. Access Current Observation
Project Micrometer provides a library
that assists with context propagation across
different types of context mechanisms such as ThreadLocal
, Reactor Context
and others.
The following example shows how to use this library in a custom ChannelHandler
:
import brave.Tracing;
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext;
import io.micrometer.tracing.brave.bridge.BraveTracer;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.NettyPipeline;
import reactor.netty.observability.ReactorNettyTracingObservationHandler;
import reactor.netty.tcp.TcpServer;
import reactor.netty.tcp.TcpSslContextSpec;
import zipkin2.reporter.brave.AsyncZipkinSpanHandler;
import zipkin2.reporter.urlconnection.URLConnectionSender;
import java.io.File;
import static reactor.netty.Metrics.OBSERVATION_REGISTRY;
public class Application {
public static void main(String[] args) {
init(); (1)
File cert = new File("certificate.crt");
File key = new File("private.key");
TcpSslContextSpec tcpSslContextSpec = TcpSslContextSpec.forServer(cert, key);
DisposableServer server =
TcpServer.create()
.metrics(true) (2)
.doOnChannelInit((observer, channel, address) -> channel.pipeline().addAfter(
NettyPipeline.SslHandler, "custom-channel-handler", CustomChannelInboundHandler.INSTANCE)) (3)
.secure(spec -> spec.sslContext(tcpSslContextSpec))
.handle((inbound, outbound) -> outbound.sendString(Mono.just("hello")))
.bindNow();
server.onDispose()
.block();
}
static final class CustomChannelInboundHandler extends ChannelInboundHandlerAdapter {
static final ChannelHandler INSTANCE = new CustomChannelInboundHandler();
@Override
public void channelActive(ChannelHandlerContext ctx) {
try (ContextSnapshot.Scope scope = ContextSnapshotFactory.builder().build().setThreadLocalsFrom(ctx.channel())) {
System.out.println("Current Observation in Scope: " + OBSERVATION_REGISTRY.getCurrentObservation());
ctx.fireChannelActive();
}
System.out.println("Current Observation: " + OBSERVATION_REGISTRY.getCurrentObservation());
}
@Override
public boolean isSharable() {
return true;
}
}
1 | Initializes Brave, Zipkin, and the Observation registry. |
2 | Enables the built-in integration with Micrometer. |
3 | Custom ChannelHandler that uses context propagation library. This concrete example overrides only
ChannelInboundHandlerAdapter#channelActive , if it is needed, the same logic can be used for the rest of the methods.
Also, this concrete example sets all ThreadLocal values for which there is a value in the given Channel ,
if another behaviour is needed please check context propagation library API .
For example, you may want to set only some of the ThreadLocal values. |
When you enable Reactor Netty tracing within a framework, you may need to let Reactor Netty use the ObservationRegistry created by this framework.
For this purpose you need to invoke reactor.netty.Metrics#observationRegistry .
You may also need to configure the Reactor Netty ObservationHandlers using the API provided by the framework.
|
10. Unix Domain Sockets
The TCP
server supports Unix Domain Sockets (UDS) when native transport is in use for all java versions
and when NIO transport is in use for java 17 and above.
The following example shows how to use UDS support:
import io.netty.channel.unix.DomainSocketAddress;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
//import java.net.UnixDomainSocketAddress;
public class Application {
public static void main(String[] args) {
DisposableServer server =
TcpServer.create()
// The configuration below is available only when Epoll/KQueue transport is used
.bindAddress(() -> new DomainSocketAddress("/tmp/test.sock")) (1)
// The configuration below is available only when NIO transport is used with Java 17+
//.bindAddress(() -> UnixDomainSocketAddress.of("/tmp/test.sock"))
.bindNow();
server.onDispose()
.block();
}
}
1 | Specifies DomainSocketAddress that will be used |