HTTP Server

Reactor Netty provides the easy-to-use and easy-to-configure HttpServer class. It hides most of the Netty functionality that is needed in order to create a HTTP server and adds Reactive Streams backpressure.

1. Starting and Stopping

To start an HTTP server, you must create and configure a HttpServer instance. By default, the host is configured for any local address, and the system picks up an ephemeral port when the bind operation is invoked. The following example shows how to create an HttpServer instance:

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()   (1)
				          .bindNow(); (2)

		server.onDispose()
		      .block();
	}
}
1 Creates an HttpServer instance ready for configuring.
2 Starts the server in a blocking fashion and waits for it to finish initializing.

The returned DisposableServer offers a simple server API, including disposeNow(), which shuts the server down in a blocking fashion.

1.1. Host and Port

To serve on a specific host and port, you can apply the following configuration to the HTTP server:

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .host("localhost") (1)
				          .port(8080)        (2)
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Configures the HTTP server host
2 Configures the HTTP server port

To serve on multiple addresses, after having configured the HttpServer you can bind it multiple times to obtain separate DisposableServer`s. All created servers will share resources such as `LoopResources because they use the same configuration instance under the hood.

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class MultiAddressApplication {
	public static void main(String[] args) {
		HttpServer httpServer = HttpServer.create();
		DisposableServer server1 = httpServer
				.host("localhost") (1)
				.port(8080)        (2)
				.bindNow();

		DisposableServer server2 = httpServer
				.host("0.0.0.0") (3)
				.port(8081)      (4)
				.bindNow();

		Mono.when(server1.onDispose(), server2.onDispose())
				.block();
	}
}
1 Configures the first HTTP server host
2 Configures the first HTTP server port
3 Configures the second HTTP server host
4 Configures the second HTTP server port

2. Eager Initialization

By default, the initialization of the HttpServer resources happens on demand. This means that the bind operation absorbs the extra time needed to initialize and load:

  • the event loop groups

  • the native transport libraries (when native transport is used)

  • the native libraries for the security (in case of OpenSsl)

When you need to preload these resources, you can configure the HttpServer as follows:

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

	public static void main(String[] args) {
		HttpServer httpServer =
				HttpServer.create()
				          .handle((request, response) -> request.receive().then());

		httpServer.warmup() (1)
		          .block();

		DisposableServer server = httpServer.bindNow();

		server.onDispose()
		      .block();
	}
}
1 Initialize and load the event loop groups, the native transport libraries and the native libraries for the security

3. Routing HTTP

Defining routes for the HTTP server requires configuring the provided HttpServerRoutes builder. The following example shows how to do so:

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .route(routes ->
				              routes.get("/hello",        (1)
				                        (request, response) -> response.sendString(Mono.just("Hello World!")))
				                    .post("/echo",        (2)
				                        (request, response) -> response.send(request.receive().retain()))
				                    .get("/path/{param}", (3)
				                        (request, response) -> response.sendString(Mono.just(request.param("param"))))
				                    .ws("/ws",            (4)
				                        (wsInbound, wsOutbound) -> wsOutbound.send(wsInbound.receive().retain())))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Serves a GET request to /hello and returns Hello World!
2 Serves a POST request to /echo and returns the received request body as a response.
3 Serves a GET request to /path/{param} and returns the value of the path parameter.
4 Serves websocket to /ws and returns the received incoming data as outgoing data.
The server routes are unique and only the first matching in order of declaration is invoked.

3.1. SSE

The following code shows how you can configure the HTTP server to serve Server-Sent Events:

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

import java.io.ByteArrayOutputStream;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.function.BiFunction;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .route(routes -> routes.get("/sse", serveSse()))
				          .bindNow();

		server.onDispose()
		      .block();
	}

	/**
	 * Prepares SSE response.
	 * The "Content-Type" is "text/event-stream".
	 * The flushing strategy is "flush after every element" emitted by the provided Publisher.
	 */
	private static BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> serveSse() {
		Flux<Long> flux = Flux.interval(Duration.ofSeconds(10));
		return (request, response) ->
		        response.sse()
		                .send(flux.map(Application::toByteBuf), b -> true);
	}

	/**
	 * Transforms the Object to ByteBuf following the expected SSE format.
	 */
	private static ByteBuf toByteBuf(Object any) {
		ByteArrayOutputStream out = new ByteArrayOutputStream();
		try {
			out.write("data: ".getBytes(Charset.defaultCharset()));
			MAPPER.writeValue(out, any);
			out.write("\n\n".getBytes(Charset.defaultCharset()));
		}
		catch (Exception e) {
			throw new RuntimeException(e);
		}
		return ByteBufAllocator.DEFAULT
		                       .buffer()
		                       .writeBytes(out.toByteArray());
	}

	private static final ObjectMapper MAPPER = new ObjectMapper();
}

3.2. Static Resources

The following code shows how you can configure the HTTP server to serve static resources:

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;

public class Application {

	public static void main(String[] args) throws URISyntaxException {
		Path file = Paths.get(Application.class.getResource("/logback.xml").toURI());
		DisposableServer server =
				HttpServer.create()
				          .route(routes -> routes.file("/index.html", file))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}

4. Writing Data

To send data to a connected client, you must attach an I/O handler by using either handle(…​) or route(…​). The I/O handler has access to HttpServerResponse, to be able to write data. The following example uses the handle(…​) method:

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .handle((request, response) -> response.sendString(Mono.just("hello"))) (1)
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Sends hello string to the connected clients

4.1. Adding Headers and Other Metadata

When you send data to the connected clients, you may need to send additional headers, cookies, status code, and other metadata. You can provide this additional metadata by using HttpServerResponse. The following example shows how to do so:

import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .route(routes ->
				              routes.get("/hello",
				                  (request, response) ->
				                      response.status(HttpResponseStatus.OK)
				                              .header(HttpHeaderNames.CONTENT_LENGTH, "12")
				                              .sendString(Mono.just("Hello World!"))))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}

4.2. Compression

You can configure the HTTP server to send a compressed response, depending on the request header Accept-Encoding.

Reactor Netty provides three different strategies for compressing the outgoing data:

  • compress(boolean): Depending on the boolean that is provided, the compression is enabled (true) or disabled (false).

  • compress(int): The compression is performed once the response size exceeds the given value (in bytes).

  • compress(BiPredicate<HttpServerRequest, HttpServerResponse>): The compression is performed if the predicate returns true.

The following example uses the compress method (set to true) to enable compression:

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;

public class Application {

	public static void main(String[] args) throws URISyntaxException {
		Path file = Paths.get(Application.class.getResource("/logback.xml").toURI());
		DisposableServer server =
				HttpServer.create()
				          .compress(true)
				          .route(routes -> routes.file("/index.html", file))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}

5. Consuming Data

To receive data from a connected client, you must attach an I/O handler by using either handle(…​) or route(…​). The I/O handler has access to HttpServerRequest, to be able to read data.

The following example uses the handle(…​) method:

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .handle((request, response) -> request.receive().then()) (1)
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Receives data from the connected clients

5.1. Reading Headers, URI Params, and other Metadata

When you receive data from the connected clients, you might need to check request headers, parameters, and other metadata. You can obtain this additional metadata by using HttpServerRequest. The following example shows how to do so:

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .route(routes ->
				              routes.get("/{param}",
				                  (request, response) -> {
				                      if (request.requestHeaders().contains("Some-Header")) {
				                          return response.sendString(Mono.just(request.param("param")));
				                      }
				                      return response.sendNotFound();
				                  }))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}

5.2. Reading Post Form or Multipart Data

When you receive data from the connected clients, you might want to access POST form (application/x-www-form-urlencoded) or multipart (multipart/form-data) data. You can obtain this data by using HttpServerRequest.

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .route(routes ->
				              routes.post("/multipart", (request, response) -> response.sendString(
				                      request.receiveForm() (1)
				                             .flatMap(data -> Mono.just('[' + data.getName() + ']')))))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Receives POST form/multipart data.

When you need to change the default settings, you can configure the HttpServer or you can provide a configuration per request:

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .httpFormDecoder(builder -> builder.maxInMemorySize(0))                  (1)
				          .route(routes ->
				              routes.post("/multipart", (request, response) -> response.sendString(
				                      request.receiveForm(builder -> builder.maxInMemorySize(256)) (2)
				                             .flatMap(data -> Mono.just('[' + data.getName() + ']')))))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Configuration on the HttpServer that specifies that the data is stored on disk only.
2 Configuration per request that specifies that if the data size exceed the specified size, the data is stored on the disk.

The following listing shows the available configurations:

Configuration name Description

baseDirectory

Configures the directory where to store the data on the disk. Default to generated temp directory.

charset

Configures the Charset for the data. Default to StandardCharsets#UTF_8.

maxInMemorySize

Configures the maximum in-memory size per data i.e. the data is written on disk if the size is greater than maxInMemorySize, else it is in memory. If set to -1 the entire contents is stored in memory. If set to 0 the entire contents is stored on disk. Default to 16kb.

maxSize

Configures the maximum size per data. When the limit is reached, an exception is raised. If set to -1 this means no limitation. Default to -1 - unlimited.

scheduler

Configures the scheduler to be used for offloading disk operations in the decoding phase. Default to Schedulers#boundedElastic()

streaming

When set to true, the data is streamed directly from the parsed input buffer stream, which means it is not stored either in memory or file. When false, parts are backed by in-memory and/or file storage. Default to false. NOTE that with streaming enabled, the provided data might not be in a complete state i.e. HttpData#isCompleted() has to be checked. Also note that enabling this property effectively ignores maxInMemorySize, baseDirectory, and scheduler.

5.2.1. Obtaining the Remote (Client) Address

In addition to the metadata that you can obtain from the request, you can also receive the host (server) address, the remote (client) address and the scheme. Depending on the chosen factory method, you can retrieve the information directly from the channel or by using the Forwarded or X-Forwarded-* HTTP request headers. The following example shows how to do so:

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .forwarded(true) (1)
				          .route(routes ->
				              routes.get("/clientip",
				                  (request, response) ->
				                      response.sendString(Mono.just(request.remoteAddress() (2)
				                                                           .getHostString()))))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Specifies that the information about the connection is to be obtained from the Forwarded and X-Forwarded-* HTTP request headers, if possible.
2 Returns the address of the remote (client) peer.

It is also possible to customize the behavior of the Forwarded or X-Forwarded-* header handler. The following example shows how to do so:

import java.net.InetSocketAddress;

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.transport.AddressUtils;

public class CustomForwardedHeaderHandlerApplication {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .forwarded((connectionInfo, request) -> {  (1)
				              String hostHeader = request.headers().get("X-Forwarded-Host");
				              if (hostHeader != null) {
				                  String[] hosts = hostHeader.split(",", 2);
				                  InetSocketAddress hostAddress = AddressUtils.createUnresolved(
				                      hosts[hosts.length - 1].trim(),
				                      connectionInfo.getHostAddress().getPort());
				                  connectionInfo = connectionInfo.withHostAddress(hostAddress);
				              }
				              return connectionInfo;
				          })
				          .route(routes ->
				              routes.get("/clientip",
				                  (request, response) ->
				                      response.sendString(Mono.just(request.remoteAddress() (2)
				                                                           .getHostString()))))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Add a custom header handler.
2 Returns the address of the remote (client) peer.

5.3. HTTP Request Decoder

By default, Netty configures some restrictions for the incoming requests, such as:

  • The maximum length of the initial line.

  • The maximum length of all headers.

  • The maximum length of the content or each chunk.

For more information, see HttpRequestDecoder and HttpServerUpgradeHandler

By default, the HTTP server is configured with the following settings:

	public static final int DEFAULT_MAX_INITIAL_LINE_LENGTH             = 4096;
	public static final int DEFAULT_MAX_HEADER_SIZE                     = 8192;
	/**
	 * Default max chunk size.
	 *
	 * @deprecated as of 1.1.0. This will be removed in 2.0.0 as Netty 5 does not support this configuration.
	 */
	@Deprecated
	public static final int DEFAULT_MAX_CHUNK_SIZE                      = 8192;
	public static final boolean DEFAULT_VALIDATE_HEADERS                = true;
	public static final int DEFAULT_INITIAL_BUFFER_SIZE                 = 128;
	public static final boolean DEFAULT_ALLOW_DUPLICATE_CONTENT_LENGTHS = false;
	/**
	 * The maximum length of the content of the HTTP/2.0 clear-text upgrade request.
	 * By default, the server will reject an upgrade request with non-empty content,
	 * because the upgrade request is most likely a GET request.
	 */
	public static final int DEFAULT_H2C_MAX_CONTENT_LENGTH = 0;

When you need to change these default settings, you can configure the HTTP server as follows:

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .httpRequestDecoder(spec -> spec.maxHeaderSize(16384)) (1)
				          .handle((request, response) -> response.sendString(Mono.just("hello")))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 The maximum length of all headers will be 16384. When this value is exceeded, a TooLongFrameException is raised.

6. Lifecycle Callbacks

The following lifecycle callbacks are provided to let you extend the HttpServer:

Callback Description

doOnBind

Invoked when the server channel is about to bind.

doOnBound

Invoked when the server channel is bound.

doOnChannelInit

Invoked when initializing the channel.

doOnConnection

Invoked when a remote client is connected

doOnUnbound

Invoked when the server channel is unbound.

The following example uses the doOnConnection and doOnChannelInit callbacks:

import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import java.util.concurrent.TimeUnit;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .doOnConnection(conn ->
				              conn.addHandlerFirst(new ReadTimeoutHandler(10, TimeUnit.SECONDS))) (1)
				          .doOnChannelInit((observer, channel, remoteAddress) ->
				              channel.pipeline()
				                     .addFirst(new LoggingHandler("reactor.netty.examples")))    (2)
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Netty pipeline is extended with ReadTimeoutHandler when a remote client is connected.
2 Netty pipeline is extended with LoggingHandler when initializing the channel.

7. TCP-level Configuration

When you need to change configuration on the TCP level, you can use the following snippet to extend the default TCP server configuration:

import io.netty.channel.ChannelOption;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
				          .bindNow();

		server.onDispose()
		      .block();
	}
}

See TCP Server for more detail about TCP-level configuration.

7.1. Wire Logger

Reactor Netty provides wire logging for when the traffic between the peers needs to be inspected. By default, wire logging is disabled. To enable it, you must set the logger reactor.netty.http.server.HttpServer level to DEBUG and apply the following configuration:

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .wiretap(true) (1)
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Enables the wire logging

7.1.1. Wire Logger formatters

Reactor Netty supports 3 different formatters:

	/**
	 * When wire logging is enabled with this format, both events and content will be logged.
	 * The content will be in hex format.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] REGISTERED
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] ACTIVE
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] READ: 145B
	 *          +-------------------------------------------------+
	 *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
	 * +--------+-------------------------------------------------+----------------+
	 * |00000000| 50 4f 53 54 20 2f 74 65 73 74 2f 57 6f 72 6c 64 |POST /test/World|
	 * |00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 43 6f 6e 74 65 | HTTP/1.1..Conte|
	 * |00000020| 6e 74 2d 54 79 70 65 3a 20 74 65 78 74 2f 70 6c |nt-Type: text/pl|
	 * |00000030| 61 69 6e 0d 0a 75 73 65 72 2d 61 67 65 6e 74 3a |ain..user-agent:|
	 * |00000040| 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f 64 65 | ReactorNetty/de|
	 * ...
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] WRITE: 38B
	 *          +-------------------------------------------------+
	 *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
	 * +--------+-------------------------------------------------+----------------+
	 * |00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
	 * |00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
	 * |00000020| 20 30 0d 0a 0d 0a                               | 0....          |
	 * +--------+-------------------------------------------------+----------------+
	 * }
	 * </pre>
	 */
	/**
	 * When wire logging is enabled with this format, only the events will be logged.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] REGISTERED
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] ACTIVE
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] READ: 145B
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] WRITE: 38B
	 * }
	 * </pre>
	 */
	/**
	 * When wire logging is enabled with this format, both events and content will be logged.
	 * The content will be in plain text format.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] REGISTERED
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] ACTIVE
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] READ: 145B POST /test/World HTTP/1.1
	 * Content-Type: text/plain
	 * user-agent: ReactorNetty/dev
	 * ...
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] WRITE: 38B HTTP/1.1 200 OK
	 * content-length: 0
	 * }
	 * </pre>
	 */

When you need to change the default formatter you can configure it as follows:

import io.netty.handler.logging.LogLevel;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.transport.logging.AdvancedByteBufFormat;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) (1)
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Enables the wire logging, AdvancedByteBufFormat#TEXTUAL is used for printing the content.

7.2. Event Loop Group

By default Reactor Netty uses an “Event Loop Group”, where the number of the worker threads equals the number of processors available to the runtime on initialization (but with a minimum value of 4). This “Event Loop Group” is shared between all servers and clients in one JVM. When you need a different configuration, you can use one of the LoopResources#create methods.

The following listing shows the default configuration for the Event Loop Group:

	/**
	 * Default worker thread count, fallback to available processor
	 * (but with a minimum value of 4).
	 */
	public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
	/**
	 * Default selector thread count, fallback to -1 (no selector thread)
	 * <p><strong>Note:</strong> In most use cases using a worker thread also as a selector thread works well.
	 * A possible use case for specifying a separate selector thread might be when the worker threads are too busy
	 * and connections cannot be accepted fast enough.
	 * <p><strong>Note:</strong> Although more than 1 can be configured as a selector thread count, in reality
	 * only 1 thread will be used as a selector thread.
	 */
	public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
	/**
	 * Default worker thread count for UDP, fallback to available processor
	 * (but with a minimum value of 4).
	 */
	public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
	/**
	 * Default quiet period that guarantees that the disposal of the underlying LoopResources
	 * will not happen, fallback to 2 seconds.
	 */
	public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
	/**
	 * Default maximum amount of time to wait until the disposal of the underlying LoopResources
	 * regardless if a task was submitted during the quiet period, fallback to 15 seconds.
	 */
	public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";

	/**
	 * Default value whether the native transport (epoll, kqueue) will be preferred,
	 * fallback it will be preferred when available.
	 */
	public static final String NATIVE = "reactor.netty.native";

If you need changes to these settings, you can apply the following configuration:

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.resources.LoopResources;

public class Application {

	public static void main(String[] args) {
		LoopResources loop = LoopResources.create("event-loop", 1, 4, true);

		DisposableServer server =
				HttpServer.create()
				          .runOn(loop)
				          .bindNow();

		server.onDispose()
		      .block();
	}
}

7.2.1. Disposing Event Loop Group

  • If you use the default Event Loop Group provided by Reactor Netty, invoke HttpResources#disposeLoopsAndConnections/#disposeLoopsAndConnectionsLater method.

Disposing HttpResources means that every server/client that is using it, will not be able to use it anymore!
  • If you use custom LoopResources, invoke LoopResources#dispose/#disposeLater method.

Disposing the custom LoopResources means that every server/client that is configured to use it, will not be able to use it anymore!

8. SSL and TLS

When you need SSL or TLS, you can apply the configuration shown in the next example. By default, if OpenSSL is available, SslProvider.OPENSSL provider is used as a provider. Otherwise SslProvider.JDK is used. You can switch the provider by using SslContextBuilder or by setting -Dio.netty.handler.ssl.noOpenSsl=true.

The following example uses SslContextBuilder:

import reactor.netty.DisposableServer;
import reactor.netty.http.Http11SslContextSpec;
import reactor.netty.http.server.HttpServer;
import java.io.File;

public class Application {

	public static void main(String[] args) {
		File cert = new File("certificate.crt");
		File key = new File("private.key");

		Http11SslContextSpec http11SslContextSpec = Http11SslContextSpec.forServer(cert, key);

		DisposableServer server =
				HttpServer.create()
				          .secure(spec -> spec.sslContext(http11SslContextSpec))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}

8.1. Server Name Indication

You can configure the HTTP server with multiple SslContext mapped to a specific domain. An exact domain name or a domain name containing a wildcard can be used when configuring the SNI mapping.

The following example uses a domain name containing a wildcard:

import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

import java.io.File;

public class Application {

	public static void main(String[] args) throws Exception {
		File defaultCert = new File("default_certificate.crt");
		File defaultKey = new File("default_private.key");

		File testDomainCert = new File("default_certificate.crt");
		File testDomainKey = new File("default_private.key");

		SslContext defaultSslContext = SslContextBuilder.forServer(defaultCert, defaultKey).build();
		SslContext testDomainSslContext = SslContextBuilder.forServer(testDomainCert, testDomainKey).build();

		DisposableServer server =
				HttpServer.create()
				          .secure(spec -> spec.sslContext(defaultSslContext)
				                              .addSniMapping("*.test.com",
				                                      testDomainSpec -> testDomainSpec.sslContext(testDomainSslContext)))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}

9. HTTP Access Log

You can enable the HTTP access log either programmatically or by configuration. By default, it is disabled.

You can use -Dreactor.netty.http.server.accessLogEnabled=true to enable the HTTP access log by configuration.

You can use the following configuration (for Logback or similar logging frameworks) to have a separate HTTP access log file:

<appender name="accessLog" class="ch.qos.logback.core.FileAppender">
    <file>access_log.log</file>
    <encoder>
        <pattern>%msg%n</pattern>
    </encoder>
</appender>
<appender name="async" class="ch.qos.logback.classic.AsyncAppender">
    <appender-ref ref="accessLog" />
</appender>

<logger name="reactor.netty.http.server.AccessLog" level="INFO" additivity="false">
    <appender-ref ref="async"/>
</logger>

The following example enables it programmatically:

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .accessLog(true)
				          .bindNow();

		server.onDispose()
		      .block();
	}
}

Calling this method takes precedence over the system property configuration.

By default, the logging format is Common Log Format, but you can specify a custom one as a parameter, as in the following example:

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.logging.AccessLog;

public class CustomLogAccessFormatApplication {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .accessLog(true, x -> AccessLog.create("method={}, uri={}", x.method(), x.uri()))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}

You can also filter HTTP access logs by using the AccessLogFactory#createFilter method, as in the following example:

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.logging.AccessLogFactory;

public class FilterLogAccessApplication {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .accessLog(true, AccessLogFactory.createFilter(p -> !String.valueOf(p.uri()).startsWith("/health/")))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}

Note that this method can take a custom format parameter too, as in this example:

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.logging.AccessLog;
import reactor.netty.http.server.logging.AccessLogFactory;

public class CustomFormatAndFilterAccessLogApplication {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .accessLog(true, AccessLogFactory.createFilter(p -> !String.valueOf(p.uri()).startsWith("/health/"), (1)
						          x -> AccessLog.create("method={}, uri={}", x.method(), x.uri()))) (2)
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Specifies the filter predicate to use
2 Specifies the custom format to apply

10. HTTP/2

By default, the HTTP server supports HTTP/1.1. If you need HTTP/2, you can get it through configuration. In addition to the protocol configuration, if you need H2 but not H2C (cleartext), you must also configure SSL.

As Application-Layer Protocol Negotiation (ALPN) is not supported “out-of-the-box” by JDK8 (although some vendors backported ALPN to JDK8), you need an additional dependency to a native library that supports it — for example, netty-tcnative-boringssl-static.

The following listing presents a simple H2 example:

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.Http2SslContextSpec;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.server.HttpServer;
import java.io.File;

public class H2Application {

	public static void main(String[] args) {
		File cert = new File("certificate.crt");
		File key = new File("private.key");

		Http2SslContextSpec http2SslContextSpec = Http2SslContextSpec.forServer(cert, key);

		DisposableServer server =
				HttpServer.create()
				          .port(8080)
				          .protocol(HttpProtocol.H2)                            (1)
				          .secure(spec -> spec.sslContext(http2SslContextSpec)) (2)
				          .handle((request, response) -> response.sendString(Mono.just("hello")))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Configures the server to support only HTTP/2
2 Configures SSL

The application should now behave as follows:

$ curl --http2 https://localhost:8080 -i
HTTP/2 200

hello

The following listing presents a simple H2C example:

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.server.HttpServer;

public class H2CApplication {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .port(8080)
				          .protocol(HttpProtocol.H2C)
				          .handle((request, response) -> response.sendString(Mono.just("hello")))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}

The application should now behave as follows:

$ curl --http2-prior-knowledge http://localhost:8080 -i
HTTP/2 200

hello

10.1. Protocol Selection

 * @author Stephane Maldini
 */
public enum HttpProtocol {

	/**
	 * The default supported HTTP protocol by HttpServer and HttpClient.
	 */
	HTTP11,

	/**
	 * HTTP/2.0 support with TLS
	 * <p>If used along with HTTP/1.1 protocol, HTTP/2.0 will be the preferred protocol.
	 * While negotiating the application level protocol, HTTP/2.0 or HTTP/1.1 can be chosen.
	 * <p>If used without HTTP/1.1 protocol, HTTP/2.0 will always be offered as a protocol
	 * for communication with no fallback to HTTP/1.1.
	 */
	H2,

	/**
	 * HTTP/2.0 support with clear-text.
	 * <p>If used along with HTTP/1.1 protocol, will support H2C "upgrade":
	 * Request or consume requests as HTTP/1.1 first, looking for HTTP/2.0 headers
	 * and {@literal Connection: Upgrade}. A server will typically reply a successful
	 * 101 status if upgrade is successful or a fallback HTTP/1.1 response. When
	 * successful the client will start sending HTTP/2.0 traffic.
	 * <p>If used without HTTP/1.1 protocol, will support H2C "prior-knowledge": Doesn't
	 * require {@literal Connection: Upgrade} handshake between a client and server but
	 * fallback to HTTP/1.1 will not be supported.
	 */
	H2C,

	/**
	 * HTTP/3.0 support.
	 * @since 1.2.0
	 */

11. HTTP/3

By default, the HTTP server supports HTTP/1.1. If you need HTTP/3, you can get it through configuration. In addition to the protocol configuration, you need to add dependency to io.netty.incubator:netty-incubator-codec-http3.

The following listing presents a simple HTTP3 example:

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.Http3SslContextSpec;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.server.HttpServer;

import java.io.File;
import java.time.Duration;

public class Application {

	public static void main(String[] args) throws Exception {
		File certChainFile = new File("certificate chain file");
		File keyFile = new File("private key file");

		Http3SslContextSpec serverCtx = Http3SslContextSpec.forServer(keyFile, null, certChainFile);

		DisposableServer server =
				HttpServer.create()
				          .port(8080)
				          .protocol(HttpProtocol.HTTP3)                 (1)
				          .secure(spec -> spec.sslContext(serverCtx))   (2)
				          .http3Settings(spec -> spec.idleTimeout(Duration.ofSeconds(5)) (3)
				                                     .maxData(10000000)
				                                     .maxStreamDataBidirectionalLocal(1000000)
				                                     .maxStreamDataBidirectionalRemote(1000000)
				                                     .maxStreamsBidirectional(100))
				          .handle((request, response) -> response.header("server", "reactor-netty")
				                                                 .sendString(Mono.just("hello")))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Configures the server to support only HTTP/3
2 Configures SSL
3 Configures HTTP/3 settings

The application should now behave as follows:

$ curl --http3 https://localhost:8080 -i
HTTP/3 200
server: reactor-netty
content-length: 5

hello

12. Metrics

The HTTP server supports built-in integration with Micrometer. It exposes all metrics with a prefix of reactor.netty.http.server.

The following table provides information for the HTTP server metrics:

metric name type description

reactor.netty.http.server.streams.active

Gauge

The number of active HTTP/2 streams. See Streams Active

reactor.netty.http.server.connections.active

Gauge

The number of http connections currently processing requests. See Connections Active

reactor.netty.http.server.connections.total

Gauge

The number of all opened connections. See Connections Total

reactor.netty.http.server.data.received

DistributionSummary

Amount of the data received, in bytes. See Data Received

reactor.netty.http.server.data.sent

DistributionSummary

Amount of the data sent, in bytes. See Data Sent

reactor.netty.http.server.errors

Counter

Number of errors that occurred. See Errors Count

reactor.netty.http.server.data.received.time

Timer

Time spent in consuming incoming data. See Http Server Data Received Time

reactor.netty.http.server.data.sent.time

Timer

Time spent in sending outgoing data. See Http Server Data Sent Time

reactor.netty.http.server.response.time

Timer

Total time for the request/response See Http Server Response Time

These additional metrics are also available:

ByteBufAllocator metrics

metric name type description

reactor.netty.bytebuf.allocator.used.heap.memory

Gauge

The number of bytes reserved by heap buffer allocator. See Used Heap Memory

reactor.netty.bytebuf.allocator.used.direct.memory

Gauge

The number of bytes reserved by direct buffer allocator. See Used Direct Memory

reactor.netty.bytebuf.allocator.heap.arenas

Gauge

The number of heap arenas (when PooledByteBufAllocator). See Heap Arenas

reactor.netty.bytebuf.allocator.direct.arenas

Gauge

The number of direct arenas (when PooledByteBufAllocator). See Direct Arenas

reactor.netty.bytebuf.allocator.threadlocal.caches

Gauge

The number of thread local caches (when PooledByteBufAllocator). See Thread Local Caches

reactor.netty.bytebuf.allocator.small.cache.size

Gauge

The size of the small cache (when PooledByteBufAllocator). See Small Cache Size

reactor.netty.bytebuf.allocator.normal.cache.size

Gauge

The size of the normal cache (when PooledByteBufAllocator). See Normal Cache Size

reactor.netty.bytebuf.allocator.chunk.size

Gauge

The chunk size for an arena (when PooledByteBufAllocator). See Chunk Size

reactor.netty.bytebuf.allocator.active.heap.memory

Gauge

The actual bytes consumed by in-use buffers allocated from heap buffer pools (when PooledByteBufAllocator). See Active Heap Memory

reactor.netty.bytebuf.allocator.active.direct.memory

Gauge

The actual bytes consumed by in-use buffers allocated from direct buffer pools (when PooledByteBufAllocator). See Active Direct Memory

EventLoop metrics

metric name type description

reactor.netty.eventloop.pending.tasks

Gauge

The number of tasks that are pending for processing on an event loop. See Pending Tasks

The following example enables that integration:

import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.config.MeterFilter;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

	public static void main(String[] args) {
		Metrics.globalRegistry (1)
		       .config()
		       .meterFilter(MeterFilter.maximumAllowableTags("reactor.netty.http.server", "URI", 100, MeterFilter.deny()));

		DisposableServer server =
				HttpServer.create()
				          .metrics(true, s -> {
				              if (s.startsWith("/stream/")) { (2)
				                  return "/stream/{n}";
				              }
				              else if (s.startsWith("/bytes/")) {
				                  return "/bytes/{n}";
				              }
				              return s;
				          }) (3)
				          .route(r ->
				              r.get("/stream/{n}",
				                   (req, res) -> res.sendString(Mono.just(req.param("n"))))
				               .get("/bytes/{n}",
				                   (req, res) -> res.sendString(Mono.just(req.param("n")))))
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Applies upper limit for the meters with URI tag
2 Templated URIs will be used as an URI tag value when possible
3 Enables the built-in integration with Micrometer
In order to avoid a memory and CPU overhead of the enabled metrics, it is important to convert the real URIs to templated URIs when possible. Without a conversion to a template-like form, each distinct URI leads to the creation of a distinct tag, which takes a lot of memory for the metrics.
Always apply an upper limit for the meters with URI tags. Configuring an upper limit on the number of meters can help in cases when the real URIs cannot be templated. You can find more information at maximumAllowableTags.

When HTTP server metrics are needed for an integration with a system other than Micrometer or you want to provide your own integration with Micrometer, you can provide your own metrics recorder, as follows:

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerMetricsRecorder;

import java.net.SocketAddress;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .metrics(true, CustomHttpServerMetricsRecorder::new) (1)
				          .route(r ->
				              r.get("/stream/{n}",
				                   (req, res) -> res.sendString(Mono.just(req.param("n"))))
				               .get("/bytes/{n}",
				                   (req, res) -> res.sendString(Mono.just(req.param("n")))))
				          .bindNow();

		server.onDispose()
		      .block();
	}
1 Enables HTTP server metrics and provides HttpServerMetricsRecorder implementation.

13. Tracing

The HTTP server supports built-in integration with Micrometer Tracing.

The following table provides information for the HTTP server spans:

contextual name description

<HTTP METHOD>_<URI>

Information and total time for the request. See Http Server Response Span.

The following example enables that integration. This concrete example uses Brave and reports the information to Zipkin. See the Micrometer Tracing documentation for OpenTelemetry setup.

import brave.Tracing;
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext;
import io.micrometer.tracing.brave.bridge.BravePropagator;
import io.micrometer.tracing.brave.bridge.BraveTracer;
import io.micrometer.tracing.propagation.Propagator;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.observability.ReactorNettyPropagatingReceiverTracingObservationHandler;
import reactor.netty.http.server.HttpServer;
import zipkin2.reporter.brave.AsyncZipkinSpanHandler;
import zipkin2.reporter.urlconnection.URLConnectionSender;

import static reactor.netty.Metrics.OBSERVATION_REGISTRY;

public class Application {

	public static void main(String[] args) {
		init(); (1)

		DisposableServer server =
				HttpServer.create()
				          .metrics(true, s -> {
				              if (s.startsWith("/stream/")) { (2)
				                  return "/stream/{n}";
				              }
				              return s;
				          }) (3)
				          .route(r -> r.get("/stream/{n}",
				              (req, res) -> res.sendString(Mono.just(req.param("n")))))
				          .bindNow();

		server.onDispose()
		      .block();
	}

	/**
	 * This setup is based on
	 * <a href="https://micrometer.io/docs/tracing#_micrometer_tracing_brave_setup">Micrometer Tracing Brave Setup</a>.
	 */
	static void init() {
		AsyncZipkinSpanHandler spanHandler = AsyncZipkinSpanHandler
				.create(URLConnectionSender.create("http://localhost:9411/api/v2/spans"));

		StrictCurrentTraceContext braveCurrentTraceContext = StrictCurrentTraceContext.create();

		CurrentTraceContext bridgeContext = new BraveCurrentTraceContext(braveCurrentTraceContext);

		Tracing tracing =
				Tracing.newBuilder()
				       .currentTraceContext(braveCurrentTraceContext)
				       .supportsJoin(false)
				       .traceId128Bit(true)
				       .sampler(Sampler.ALWAYS_SAMPLE)
				       .addSpanHandler(spanHandler)
				       .localServiceName("reactor-netty-examples")
				       .build();

		brave.Tracer braveTracer = tracing.tracer();

		Tracer tracer = new BraveTracer(braveTracer, bridgeContext, new BraveBaggageManager());

		Propagator propagator = new BravePropagator(tracing);

		OBSERVATION_REGISTRY.observationConfig()
		                    .observationHandler(new ReactorNettyPropagatingReceiverTracingObservationHandler(tracer, propagator));
	}
}
1 Initializes Brave, Zipkin, and the Observation registry.
2 Templated URIs are used as an URI tag value when possible.
3 Enables the built-in integration with Micrometer.

The result in Zipkin looks like:

http server tracing

13.1. Access Current Observation

Project Micrometer provides a library that assists with context propagation across different types of context mechanisms such as ThreadLocal, Reactor Context and others.

The following example shows how to use this library in a custom ChannelHandler:

import brave.Tracing;
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext;
import io.micrometer.tracing.brave.bridge.BravePropagator;
import io.micrometer.tracing.brave.bridge.BraveTracer;
import io.micrometer.tracing.propagation.Propagator;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.observability.ReactorNettyPropagatingReceiverTracingObservationHandler;
import reactor.netty.http.server.HttpServer;
import zipkin2.reporter.brave.AsyncZipkinSpanHandler;
import zipkin2.reporter.urlconnection.URLConnectionSender;

import static reactor.netty.Metrics.OBSERVATION_REGISTRY;

public class Application {

	public static void main(String[] args) {
		init(); (1)

		DisposableServer server =
				HttpServer.create()
				          .metrics(true, s -> {
				              if (s.startsWith("/stream/")) { (2)
				                  return "/stream/{n}";
				              }
				              return s;
				          }) (3)
				          .doOnConnection(conn -> conn.addHandlerLast(CustomChannelOutboundHandler.INSTANCE)) (4)
				          .route(r -> r.get("/stream/{n}",
				              (req, res) -> res.sendString(Mono.just(req.param("n")))))
				          .bindNow();

		server.onDispose()
		      .block();
	}

	static final class CustomChannelOutboundHandler extends ChannelOutboundHandlerAdapter {

		static final ChannelHandler INSTANCE = new CustomChannelOutboundHandler();

		@Override
		public boolean isSharable() {
			return true;
		}

		@Override
		public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
			try (ContextSnapshot.Scope scope = ContextSnapshotFactory.builder().build().setThreadLocalsFrom(ctx.channel())) {
				System.out.println("Current Observation in Scope: " + OBSERVATION_REGISTRY.getCurrentObservation());
				//"FutureReturnValueIgnored" this is deliberate
				ctx.write(msg, promise);
			}
			System.out.println("Current Observation: " + OBSERVATION_REGISTRY.getCurrentObservation());
		}
	}
1 Initializes Brave, Zipkin, and the Observation registry.
2 Templated URIs are used as an URI tag value when possible.
3 Enables the built-in integration with Micrometer.
4 Custom ChannelHandler that uses context propagation library. This concrete example overrides only ChannelOutboundHandlerAdapter#write, if it is needed, the same logic can be used for the rest of the methods. Also, this concrete example sets all ThreadLocal values for which there is a value in the given Channel, if another behaviour is needed please check context propagation library API. For example, you may want to set only some of the ThreadLocal values.
When you enable Reactor Netty tracing within a framework, you may need to let Reactor Netty use the ObservationRegistry created by this framework. For this purpose you need to invoke reactor.netty.Metrics#observationRegistry. You may also need to configure the Reactor Netty ObservationHandlers using the API provided by the framework.

14. Unix Domain Sockets

The HTTP server supports Unix Domain Sockets (UDS) when native transport is in use.

The following example shows how to use UDS support:

import io.netty.channel.unix.DomainSocketAddress;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .bindAddress(() -> new DomainSocketAddress("/tmp/test.sock")) (1)
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Specifies DomainSocketAddress that will be used

15. Timeout Configuration

This section describes various timeout configuration options that can be used in HttpServer. Configuring a proper timeout may improve or solve issues in the communication process. The configuration options can be grouped as follows:

15.1. Request Timeout

The following listing shows all available request timeout configuration options.

  • readTimeout - the maximum time between each network-level read operation while reading a given request content (resolution: ms)

  • requestTimeout - the maximum time for reading a given request content (resolution: ms).

It is always a good practice to configure a read/request timeout.

To customize the default settings, you can configure HttpServer as follows:

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .readTimeout(Duration.ofSeconds(5))     (1)
				          .requestTimeout(Duration.ofSeconds(30)) (2)
				          .handle((request, response) -> request.receive().then())
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Configures the read timeout to 5 second.
2 Configures the request timeout to 30 second.

15.2. Connection Timeout

The following listing shows all available connection timeout configuration options.

  • idleTimeout - The maximum time (resolution: ms) that this connection stays opened and waits for HTTP request. Once the timeout is reached, the connection is closed. By default, idleTimeout is not specified, this indicates no timeout (i.e. infinite), which means the connection is closed only if one of the peers decides to close it explicitly.

It is always a good practice to configure an idle timeout.

To customize the default settings, you can configure HttpServer as follows:

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .idleTimeout(Duration.ofSeconds(1)) (1)
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Configures the default idle timeout to 1 second.

15.3. SSL/TLS Timeout

HttpServer supports the SSL/TLS functionality provided by Netty.

The following list describes the available timeout configuration options:

  • handshakeTimeout - Use this option to configure the SSL handshake timeout (resolution: ms). Default: 10s.

You should consider increasing the SSL handshake timeout when expecting slow network connections.
  • closeNotifyFlushTimeout - Use this option to configure the SSL close_notify flush timeout (resolution: ms). Default: 3s.

  • closeNotifyReadTimeout - Use this option to configure the SSL close_notify read timeout (resolution: ms). Default: 0s.

To customize the default settings, you can configure HttpServer as follows:

import reactor.netty.DisposableServer;
import reactor.netty.http.Http11SslContextSpec;
import reactor.netty.http.server.HttpServer;

import java.io.File;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		File cert = new File("certificate.crt");
		File key = new File("private.key");

		Http11SslContextSpec http11SslContextSpec = Http11SslContextSpec.forServer(cert, key);

		DisposableServer server =
				HttpServer.create()
				          .secure(spec -> spec.sslContext(http11SslContextSpec)
				                              .handshakeTimeout(Duration.ofSeconds(30))         (1)
				                              .closeNotifyFlushTimeout(Duration.ofSeconds(10))  (2)
				                              .closeNotifyReadTimeout(Duration.ofSeconds(10)))  (3)
				          .bindNow();

		server.onDispose()
		      .block();
	}
}
1 Configures the SSL handshake timeout to 30 seconds.
2 Configures the SSL close_notify flush timeout to 10 seconds.
3 Configures the SSL close_notify read timeout to 10 seconds.