TCP Server

Reactor Netty provides an easy to use and configure TcpServer. It hides most of the Netty functionality that is needed to create a TCP server and adds Reactive Streams backpressure.

1. Starting and Stopping

To start a TCP server, you must create and configure a TcpServer instance. By default, the host is configured for any local address, and the system picks up an ephemeral port when the bind operation is invoked. The following example shows how to create and configure a TcpServer instance:

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				TcpServer.create()   (1)
				         .bindNow(); (2)

		server.onDispose()
		      .block();
	}
}
1 Creates a TcpServer instance that is ready for configuring.
2 Starts the server in a blocking fashion and waits for it to finish initializing.

The returned DisposableServer offers a simple server API, including disposeNow(), which shuts the server down in a blocking fashion.

1.1. Host and Port

To serve on a specific host and port, you can apply the following configuration to the TCP server:

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				TcpServer.create()
				         .host("localhost") (1)
				         .port(8080)        (2)
				         .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Configures the TCP server host
2 Configures the TCP server port

To serve on multiple addresses, after having configured the TcpServer you can bind it multiple times to obtain separate DisposableServer`s. All created servers will share resources such as `LoopResources because they use the same configuration instance under the hood.

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class MultiAddressApplication {

	public static void main(String[] args) {
		TcpServer tcpServer = TcpServer.create();
		DisposableServer server1 = tcpServer
				.host("localhost") (1)
				.port(8080)        (2)
				.bindNow();

		DisposableServer server2 = tcpServer
				.host("0.0.0.0") (3)
				.port(8081)      (4)
				.bindNow();

		Mono.when(server1.onDispose(), server2.onDispose())
				.block();
	}
}
1 Configures the first TCP server host
2 Configures the first TCP server port
3 Configures the second TCP server host
4 Configures the second TCP server port

2. Eager Initialization

By default, the initialization of the TcpServer resources happens on demand. This means that the bind operation absorbs the extra time needed to initialize and load:

  • the event loop groups

  • the native transport libraries (when native transport is used)

  • the native libraries for the security (in case of OpenSsl)

When you need to preload these resources, you can configure the TcpServer as follows:

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

	public static void main(String[] args) {
		TcpServer tcpServer =
				TcpServer.create()
				         .handle((inbound, outbound) -> inbound.receive().then());

		tcpServer.warmup() (1)
		         .block();

		DisposableServer server = tcpServer.bindNow();

		server.onDispose()
		      .block();
	}
}
1 Initialize and load the event loop groups, the native transport libraries and the native libraries for the security

3. Writing Data

In order to send data to a connected client, you must attach an I/O handler. The I/O handler has access to NettyOutbound to be able to write data. The following example shows how to attach an I/O handler:

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				TcpServer.create()
				         .handle((inbound, outbound) -> outbound.sendString(Mono.just("hello"))) (1)
				         .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Sends hello string to the connected clients

4. Consuming Data

In order to receive data from a connected client, you must attach an I/O handler. The I/O handler has access to NettyInbound to be able to read data. The following example shows how to use it:

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				TcpServer.create()
				         .handle((inbound, outbound) -> inbound.receive().then()) (1)
				         .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Receives data from the connected clients

5. Lifecycle Callbacks

The following lifecycle callbacks are provided to let you extend the TcpServer:

Callback Description

doOnBind

Invoked when the server channel is about to bind.

doOnBound

Invoked when the server channel is bound.

doOnChannelInit

Invoked when initializing the channel.

doOnConnection

Invoked when a remote client is connected

doOnUnbound

Invoked when the server channel is unbound.

The following example uses the doOnConnection and doOnChannelInit callbacks:

import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.util.concurrent.TimeUnit;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				TcpServer.create()
				         .doOnConnection(conn ->
				             conn.addHandlerFirst(new ReadTimeoutHandler(10, TimeUnit.SECONDS))) (1)
				         .doOnChannelInit((observer, channel, remoteAddress) ->
				             channel.pipeline()
				                    .addFirst(new LoggingHandler("reactor.netty.examples")))     (2)
				         .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Netty pipeline is extended with ReadTimeoutHandler when a remote client is connected.
2 Netty pipeline is extended with LoggingHandler when initializing the channel.

6. TCP-level Configurations

This section describes three kinds of configuration that you can use at the TCP level:

6.1. Setting Channel Options

By default, the TCP server is configured with the following options:

	TcpServerBind() {
		Map<ChannelOption<?>, Boolean> childOptions = new HashMap<>(MapUtils.calculateInitialCapacity(2));
		childOptions.put(ChannelOption.AUTO_READ, false);
		childOptions.put(ChannelOption.TCP_NODELAY, true);
		this.config = new TcpServerConfig(
				Collections.singletonMap(ChannelOption.SO_REUSEADDR, true),
				childOptions,
				() -> new InetSocketAddress(DEFAULT_PORT));
	}

If additional options are necessary or changes to the current options are needed, you can apply the following configuration:

import io.netty.channel.ChannelOption;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				TcpServer.create()
				         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
				         .bindNow();

		server.onDispose()
		      .block();
	}
}

You can find more about Netty channel options at the following links:

6.2. Wire Logger

Reactor Netty provides wire logging for when the traffic between the peers needs to be inspected. By default, wire logging is disabled. To enable it, you must set the logger reactor.netty.tcp.TcpServer level to DEBUG and apply the following configuration:

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				TcpServer.create()
				         .wiretap(true) (1)
				         .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Enables the wire logging

6.2.1. Wire Logger formatters

Reactor Netty supports 3 different formatters:

	/**
	 * When wire logging is enabled with this format, both events and content will be logged.
	 * The content will be in hex format.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] REGISTERED
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] ACTIVE
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] READ: 145B
	 *          +-------------------------------------------------+
	 *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
	 * +--------+-------------------------------------------------+----------------+
	 * |00000000| 50 4f 53 54 20 2f 74 65 73 74 2f 57 6f 72 6c 64 |POST /test/World|
	 * |00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 43 6f 6e 74 65 | HTTP/1.1..Conte|
	 * |00000020| 6e 74 2d 54 79 70 65 3a 20 74 65 78 74 2f 70 6c |nt-Type: text/pl|
	 * |00000030| 61 69 6e 0d 0a 75 73 65 72 2d 61 67 65 6e 74 3a |ain..user-agent:|
	 * |00000040| 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f 64 65 | ReactorNetty/de|
	 * ...
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] WRITE: 38B
	 *          +-------------------------------------------------+
	 *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
	 * +--------+-------------------------------------------------+----------------+
	 * |00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
	 * |00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
	 * |00000020| 20 30 0d 0a 0d 0a                               | 0....          |
	 * +--------+-------------------------------------------------+----------------+
	 * }
	 * </pre>
	 */
	/**
	 * When wire logging is enabled with this format, only the events will be logged.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] REGISTERED
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] ACTIVE
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] READ: 145B
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] WRITE: 38B
	 * }
	 * </pre>
	 */
	/**
	 * When wire logging is enabled with this format, both events and content will be logged.
	 * The content will be in plain text format.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] REGISTERED
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] ACTIVE
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] READ: 145B POST /test/World HTTP/1.1
	 * Content-Type: text/plain
	 * user-agent: ReactorNetty/dev
	 * ...
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] WRITE: 38B HTTP/1.1 200 OK
	 * content-length: 0
	 * }
	 * </pre>
	 */

When you need to change the default formatter you can configure it as follows:

import io.netty.handler.logging.LogLevel;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import reactor.netty.transport.logging.AdvancedByteBufFormat;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				TcpServer.create()
				         .wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) (1)
				         .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Enables the wire logging, AdvancedByteBufFormat#TEXTUAL is used for printing the content.

6.3. Event Loop Group

By default Reactor Netty uses an “Event Loop Group”, where the number of the worker threads equals the number of processors available to the runtime on initialization (but with a minimum value of 4). This “Event Loop Group” is shared between all servers and clients in one JVM. When you need a different configuration, you can use one of the LoopResources#create methods.

The following listing shows the default configuration for the Event Loop Group:

	/**
	 * Default worker thread count, fallback to available processor
	 * (but with a minimum value of 4).
	 */
	public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
	/**
	 * Default selector thread count, fallback to -1 (no selector thread)
	 * <p><strong>Note:</strong> In most use cases using a worker thread also as a selector thread works well.
	 * A possible use case for specifying a separate selector thread might be when the worker threads are too busy
	 * and connections cannot be accepted fast enough.
	 * <p><strong>Note:</strong> Although more than 1 can be configured as a selector thread count, in reality
	 * only 1 thread will be used as a selector thread.
	 */
	public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
	/**
	 * Default worker thread count for UDP, fallback to available processor
	 * (but with a minimum value of 4).
	 */
	public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
	/**
	 * Default quiet period that guarantees that the disposal of the underlying LoopResources
	 * will not happen, fallback to 2 seconds.
	 */
	public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
	/**
	 * Default maximum amount of time to wait until the disposal of the underlying LoopResources
	 * regardless if a task was submitted during the quiet period, fallback to 15 seconds.
	 */
	public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";

	/**
	 * Default value whether the native transport (epoll, kqueue) will be preferred,
	 * fallback it will be preferred when available.
	 */

If you need changes to these settings, you can apply the following configuration:

import reactor.netty.DisposableServer;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpServer;

public class Application {

	public static void main(String[] args) {
		LoopResources loop = LoopResources.create("event-loop", 1, 4, true);

		DisposableServer server =
				TcpServer.create()
				         .runOn(loop)
				         .bindNow();

		server.onDispose()
		      .block();
	}
}

6.3.1. Disposing Event Loop Group

  • If you use the default Event Loop Group provided by Reactor Netty, invoke HttpResources#disposeLoopsAndConnections/#disposeLoopsAndConnectionsLater method.

Disposing HttpResources means that every server/client that is using it, will not be able to use it anymore!
  • If you use custom LoopResources, invoke LoopResources#dispose/#disposeLater method.

Disposing the custom LoopResources means that every server/client that is configured to use it, will not be able to use it anymore!

7. SSL and TLS

When you need SSL or TLS, you can apply the configuration shown in the next listing. By default, if OpenSSL is available, SslProvider.OPENSSL provider is used as a provider. Otherwise SslProvider.JDK is used. Switching the provider can be done through SslContextBuilder or by setting -Dio.netty.handler.ssl.noOpenSsl=true.

The following example uses SslContextBuilder:

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import reactor.netty.tcp.TcpSslContextSpec;

import java.io.File;

public class Application {

	public static void main(String[] args) {
		File cert = new File("certificate.crt");
		File key = new File("private.key");

		TcpSslContextSpec tcpSslContextSpec = TcpSslContextSpec.forServer(cert, key);

		DisposableServer server =
				TcpServer.create()
				         .secure(spec -> spec.sslContext(tcpSslContextSpec))
				         .bindNow();

		server.onDispose()
		      .block();
	}
}

7.1. Server Name Indication

You can configure the TCP server with multiple SslContext mapped to a specific domain. An exact domain name or a domain name containing a wildcard can be used when configuring the SNI mapping.

The following example uses a domain name containing a wildcard:

import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

import java.io.File;

public class Application {

	public static void main(String[] args) throws Exception {
		File defaultCert = new File("default_certificate.crt");
		File defaultKey = new File("default_private.key");

		File testDomainCert = new File("default_certificate.crt");
		File testDomainKey = new File("default_private.key");

		SslContext defaultSslContext = SslContextBuilder.forServer(defaultCert, defaultKey).build();
		SslContext testDomainSslContext = SslContextBuilder.forServer(testDomainCert, testDomainKey).build();

		DisposableServer server =
				TcpServer.create()
				         .secure(spec -> spec.sslContext(defaultSslContext)
				                             .addSniMapping("*.test.com",
				                                     testDomainSpec -> testDomainSpec.sslContext(testDomainSslContext)))
				         .bindNow();

		server.onDispose()
		      .block();
	}
}

8. Metrics

The TCP server supports built-in integration with Micrometer. It exposes all metrics with a prefix of reactor.netty.tcp.server.

The following table provides information for the TCP server metrics:

metric name type description

reactor.netty.tcp.server.connections.total

Gauge

The number of all opened connections. See Connections Total

reactor.netty.tcp.server.data.received

DistributionSummary

Amount of the data received, in bytes. See Data Received

reactor.netty.tcp.server.data.sent

DistributionSummary

Amount of the data sent, in bytes. See Data Sent

reactor.netty.tcp.server.errors

Counter

Number of errors that occurred. See Errors Count

reactor.netty.tcp.server.tls.handshake.time

Timer

Time spent for TLS handshake. See Tls Handshake Time

These additional metrics are also available:

ByteBufAllocator metrics

metric name type description

reactor.netty.bytebuf.allocator.used.heap.memory

Gauge

The number of bytes reserved by heap buffer allocator. See Used Heap Memory

reactor.netty.bytebuf.allocator.used.direct.memory

Gauge

The number of bytes reserved by direct buffer allocator. See Used Direct Memory

reactor.netty.bytebuf.allocator.heap.arenas

Gauge

The number of heap arenas (when PooledByteBufAllocator). See Heap Arenas

reactor.netty.bytebuf.allocator.direct.arenas

Gauge

The number of direct arenas (when PooledByteBufAllocator). See Direct Arenas

reactor.netty.bytebuf.allocator.threadlocal.caches

Gauge

The number of thread local caches (when PooledByteBufAllocator). See Thread Local Caches

reactor.netty.bytebuf.allocator.small.cache.size

Gauge

The size of the small cache (when PooledByteBufAllocator). See Small Cache Size

reactor.netty.bytebuf.allocator.normal.cache.size

Gauge

The size of the normal cache (when PooledByteBufAllocator). See Normal Cache Size

reactor.netty.bytebuf.allocator.chunk.size

Gauge

The chunk size for an arena (when PooledByteBufAllocator). See Chunk Size

reactor.netty.bytebuf.allocator.active.heap.memory

Gauge

The actual bytes consumed by in-use buffers allocated from heap buffer pools (when PooledByteBufAllocator). See Active Heap Memory

reactor.netty.bytebuf.allocator.active.direct.memory

Gauge

The actual bytes consumed by in-use buffers allocated from direct buffer pools (when PooledByteBufAllocator). See Active Direct Memory

EventLoop metrics

metric name type description

reactor.netty.eventloop.pending.tasks

Gauge

The number of tasks that are pending for processing on an event loop. See Pending Tasks

The following example enables that integration:

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				TcpServer.create()
				         .metrics(true) (1)
				         .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Enables the built-in integration with Micrometer

When TCP server metrics are needed for an integration with a system other than Micrometer or you want to provide your own integration with Micrometer, you can provide your own metrics recorder, as follows:

import reactor.netty.DisposableServer;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.tcp.TcpServer;

import java.net.SocketAddress;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				TcpServer.create()
				         .metrics(true, CustomChannelMetricsRecorder::new) (1)
				         .bindNow();

		server.onDispose()
		      .block();
	}
1 Enables TCP server metrics and provides ChannelMetricsRecorder implementation.

9. Tracing

The TCP server supports built-in integration with Micrometer Tracing.

The following table provides information for the TCP server spans:

contextual name description

tls handshake

Information and time spent for TLS handshake. See Tls Handshake Span.

The following example enables that integration. This concrete example uses Brave and reports the information to Zipkin. See the Micrometer Tracing documentation for OpenTelemetry setup.

import brave.Tracing;
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext;
import io.micrometer.tracing.brave.bridge.BraveTracer;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.observability.ReactorNettyTracingObservationHandler;
import reactor.netty.tcp.TcpServer;
import zipkin2.reporter.brave.AsyncZipkinSpanHandler;
import zipkin2.reporter.urlconnection.URLConnectionSender;

import static reactor.netty.Metrics.OBSERVATION_REGISTRY;

public class Application {

	public static void main(String[] args) {
		init(); (1)

		DisposableServer server =
				TcpServer.create()
				         .metrics(true) (2)
				         .handle((inbound, outbound) -> outbound.sendString(Mono.just("hello")))
				         .bindNow();

		server.onDispose()
		      .block();
	}

	/**
	 * This setup is based on
	 * <a href="https://micrometer.io/docs/tracing#_micrometer_tracing_brave_setup">Micrometer Tracing Brave Setup</a>.
	 */
	static void init() {
		AsyncZipkinSpanHandler spanHandler = AsyncZipkinSpanHandler
				.create(URLConnectionSender.create("http://localhost:9411/api/v2/spans"));

		StrictCurrentTraceContext braveCurrentTraceContext = StrictCurrentTraceContext.create();

		CurrentTraceContext bridgeContext = new BraveCurrentTraceContext(braveCurrentTraceContext);

		Tracing tracing =
				Tracing.newBuilder()
				       .currentTraceContext(braveCurrentTraceContext)
				       .supportsJoin(false)
				       .traceId128Bit(true)
				       .sampler(Sampler.ALWAYS_SAMPLE)
				       .addSpanHandler(spanHandler)
				       .localServiceName("reactor-netty-examples")
				       .build();

		brave.Tracer braveTracer = tracing.tracer();

		Tracer tracer = new BraveTracer(braveTracer, bridgeContext, new BraveBaggageManager());

		OBSERVATION_REGISTRY.observationConfig()
		                    .observationHandler(new ReactorNettyTracingObservationHandler(tracer));
	}
}
1 Initializes Brave, Zipkin, and the Observation registry.
2 Enables the built-in integration with Micrometer.

The result in Zipkin looks like:

tcp server tracing

9.1. Access Current Observation

Project Micrometer provides a library that assists with context propagation across different types of context mechanisms such as ThreadLocal, Reactor Context and others.

The following example shows how to use this library in a custom ChannelHandler:

import brave.Tracing;
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext;
import io.micrometer.tracing.brave.bridge.BraveTracer;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.NettyPipeline;
import reactor.netty.observability.ReactorNettyTracingObservationHandler;
import reactor.netty.tcp.TcpServer;
import reactor.netty.tcp.TcpSslContextSpec;
import zipkin2.reporter.brave.AsyncZipkinSpanHandler;
import zipkin2.reporter.urlconnection.URLConnectionSender;

import java.io.File;

import static reactor.netty.Metrics.OBSERVATION_REGISTRY;

public class Application {

	public static void main(String[] args) {
		init(); (1)

		File cert = new File("certificate.crt");
		File key = new File("private.key");

		TcpSslContextSpec tcpSslContextSpec = TcpSslContextSpec.forServer(cert, key);

		DisposableServer server =
				TcpServer.create()
				         .metrics(true) (2)
				         .doOnChannelInit((observer, channel, address) -> channel.pipeline().addAfter(
				                 NettyPipeline.SslHandler, "custom-channel-handler", CustomChannelInboundHandler.INSTANCE)) (3)
				         .secure(spec -> spec.sslContext(tcpSslContextSpec))
				         .handle((inbound, outbound) -> outbound.sendString(Mono.just("hello")))
				         .bindNow();

		server.onDispose()
		      .block();
	}

	static final class CustomChannelInboundHandler extends ChannelInboundHandlerAdapter {

		static final ChannelHandler INSTANCE = new CustomChannelInboundHandler();

		@Override
		public void channelActive(ChannelHandlerContext ctx) {
			try (ContextSnapshot.Scope scope = ContextSnapshotFactory.builder().build().setThreadLocalsFrom(ctx.channel())) {
				System.out.println("Current Observation in Scope: " + OBSERVATION_REGISTRY.getCurrentObservation());
				ctx.fireChannelActive();
			}
			System.out.println("Current Observation: " + OBSERVATION_REGISTRY.getCurrentObservation());
		}

		@Override
		public boolean isSharable() {
			return true;
		}
	}
1 Initializes Brave, Zipkin, and the Observation registry.
2 Enables the built-in integration with Micrometer.
3 Custom ChannelHandler that uses context propagation library. This concrete example overrides only ChannelInboundHandlerAdapter#channelActive, if it is needed, the same logic can be used for the rest of the methods. Also, this concrete example sets all ThreadLocal values for which there is a value in the given Channel, if another behaviour is needed please check context propagation library API. For example, you may want to set only some of the ThreadLocal values.
When you enable Reactor Netty tracing within a framework, you may need to let Reactor Netty use the ObservationRegistry created by this framework. For this purpose you need to invoke reactor.netty.Metrics#observationRegistry. You may also need to configure the Reactor Netty ObservationHandlers using the API provided by the framework.

10. Unix Domain Sockets

The TCP server supports Unix Domain Sockets (UDS) when native transport is in use.

The following example shows how to use UDS support:

import io.netty.channel.unix.DomainSocketAddress;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				TcpServer.create()
				         .bindAddress(() -> new DomainSocketAddress("/tmp/test.sock")) (1)
				         .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Specifies DomainSocketAddress that will be used