HTTP Client

Reactor Netty provides the easy-to-use and easy-to-configure HttpClient. It hides most of the Netty functionality that is required to create an HTTP client and adds Reactive Streams backpressure.

1. Connect

To connect the HTTP client to a given HTTP endpoint, you must create and configure a HttpClient instance. By default, the host is configured for localhost and the port is 80. The following example shows how to do so:

import reactor.netty.http.client.HttpClient;

public class Application {

	public static void main(String[] args) {
		HttpClient client = HttpClient.create();  (1)

		client.get()                       (2)
		      .uri("https://example.com/") (3)
		      .response()                  (4)
		      .block();
	}
}
1 Creates a HttpClient instance ready for configuring.
2 Specifies that GET method will be used.
3 Specifies the path.
4 Obtains the response HttpClientResponse

The following example uses WebSocket:

import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import reactor.core.publisher.Flux;
import reactor.netty.http.client.HttpClient;

public class Application {

	public static void main(String[] args) {
		HttpClient client = HttpClient.create();

		client.websocket()
		      .uri("wss://echo.websocket.org")
		      .handle((inbound, outbound) -> {
		          inbound.receive()
		                 .asString()
		                 .take(1)
		                 .subscribe(System.out::println);

		          final byte[] msgBytes = "hello".getBytes(CharsetUtil.ISO_8859_1);
		          return outbound.send(Flux.just(Unpooled.wrappedBuffer(msgBytes), Unpooled.wrappedBuffer(msgBytes)))
		                         .neverComplete();
		      })
		      .blockLast();
	}
}

1.1. Host and Port

In order to connect to a specific host and port, you can apply the following configuration to the HTTP client:

import reactor.netty.http.client.HttpClient;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .host("example.com") (1)
				          .port(80);           (2)

		client.get()
		      .uri("/")
		      .response()
		      .block();
	}
}
1 Configures the HTTP host
2 Configures the HTTP port
The port can be specified also with PORT environment variable.

2. Eager Initialization

By default, the initialization of the HttpClient resources happens on demand. This means that the first request absorbs the extra time needed to initialize and load:

  • the event loop group

  • the host name resolver

  • the native transport libraries (when native transport is used)

  • the native libraries for the security (in case of OpenSsl)

When you need to preload these resources, you can configure the HttpClient as follows:

import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClient;

public class Application {

	public static void main(String[] args) {
		HttpClient client = HttpClient.create();

		client.warmup() (1)
		      .block();

		client.post()
		      .uri("https://example.com/")
		      .send(ByteBufFlux.fromString(Mono.just("hello")))
		      .response()
		      .block(); (2)
	}
}
1 Initialize and load the event loop group, the host name resolver, the native transport libraries and the native libraries for the security
2 Host name resolution happens with the first request. In this example, a connection pool is used, so with the first request the connection to the URL is established, the subsequent requests to the same URL reuse the connections from the pool.

3. Writing Data

To send data to a given HTTP endpoint, you can provide a Publisher by using the send(Publisher) method. By default, Transfer-Encoding: chunked is applied for those HTTP methods for which a request body is expected. Content-Length provided through request headers disables Transfer-Encoding: chunked, if necessary. The following example sends hello:

import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClient;

public class Application {

	public static void main(String[] args) {
		HttpClient client = HttpClient.create();

		client.post()
		      .uri("https://example.com/")
		      .send(ByteBufFlux.fromString(Mono.just("hello"))) (1)
		      .response()
		      .block();
	}
}
1 Sends a hello string to the given HTTP endpoint

3.1. Adding Headers and Other Metadata

When sending data to a given HTTP endpoint, you may need to send additional headers, cookies and other metadata. You can use the following configuration to do so:

import io.netty.handler.codec.http.HttpHeaderNames;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClient;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .headers(h -> h.set(HttpHeaderNames.CONTENT_LENGTH, 5)); (1)

		client.post()
		      .uri("https://example.com/")
		      .send(ByteBufFlux.fromString(Mono.just("hello")))
		      .response()
		      .block();
	}
}
1 Disables Transfer-Encoding: chunked and provides Content-Length header.

3.1.1. Compression

You can enable compression on the HTTP client, which means the request header Accept-Encoding is added to the request headers. The following example shows how to do so:

import reactor.netty.http.client.HttpClient;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .compress(true);

		client.get()
		      .uri("https://example.com/")
		      .response()
		      .block();
	}
}

3.1.2. Auto-Redirect Support

You can configure the HTTP client to enable auto-redirect support.

Reactor Netty provides two different strategies for auto-redirect support:

  • followRedirect(boolean): Specifies whether HTTP auto-redirect support is enabled for statuses 301|302|303|307|308. When it is 303 status code, GET method is used for the redirect.

  • followRedirect(BiPredicate<HttpClientRequest, HttpClientResponse>): Enables auto-redirect support if the supplied predicate matches.

The following example uses followRedirect(true):

import reactor.netty.http.client.HttpClient;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .followRedirect(true);

		client.get()
		      .uri("https://example.com/")
		      .response()
		      .block();
	}
}

4. Consuming Data

To receive data from a given HTTP endpoint, you can use one of the methods from HttpClient.ResponseReceiver. The following example uses the responseContent method:

import reactor.netty.http.client.HttpClient;

public class Application {

	public static void main(String[] args) {
		HttpClient client = HttpClient.create();

		client.get()
		      .uri("https://example.com/")
		      .responseContent() (1)
		      .aggregate()       (2)
		      .asString()        (3)
		      .block();
	}
}
1 Receives data from a given HTTP endpoint
2 Aggregates the data
3 Transforms the data as string

4.1. Reading Headers and Other Metadata

When receiving data from a given HTTP endpoint, you can check response headers, status code, and other metadata. You can obtain this additional metadata by using HttpClientResponse. The following example shows how to do so.

import reactor.netty.http.client.HttpClient;

public class Application {

	public static void main(String[] args) {
		HttpClient client = HttpClient.create();

		client.get()
		      .uri("https://example.com/")
		      .responseSingle((resp, bytes) -> {
		          System.out.println(resp.status()); (1)
		          return bytes.asString();
		      })
		      .block();
	}
}
1 Obtains the status code.

4.2. HTTP Response Decoder

By default, Netty configures some restrictions for the incoming responses, such as:

  • The maximum length of the initial line.

  • The maximum length of all headers.

  • The maximum length of the content or each chunk.

For more information, see HttpResponseDecoder

By default, the HTTP client is configured with the following settings:

	public static final int DEFAULT_MAX_INITIAL_LINE_LENGTH             = 4096;
	public static final int DEFAULT_MAX_HEADER_SIZE                     = 8192;
	/**
	 * Default max chunk size.
	 *
	 * @deprecated as of 1.1.0. This will be removed in 2.0.0 as Netty 5 does not support this configuration.
	 */
	@Deprecated
	public static final int DEFAULT_MAX_CHUNK_SIZE                      = 8192;
	public static final boolean DEFAULT_VALIDATE_HEADERS                = true;
	public static final int DEFAULT_INITIAL_BUFFER_SIZE                 = 128;
	public static final boolean DEFAULT_ALLOW_DUPLICATE_CONTENT_LENGTHS = false;
	/**
	 * The maximum length of the content of the HTTP/2.0 clear-text upgrade request.
	 * By default, the client will allow an upgrade request with up to 65536 as
	 * the maximum length of the aggregated content.
	 */
	public static final int DEFAULT_H2C_MAX_CONTENT_LENGTH = 65536;

	boolean failOnMissingResponse        = DEFAULT_FAIL_ON_MISSING_RESPONSE;

When you need to change these default settings, you can configure the HTTP client as follows:

import reactor.netty.http.client.HttpClient;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .httpResponseDecoder(spec -> spec.maxHeaderSize(16384)); (1)

		client.get()
		      .uri("https://example.com/")
		      .responseContent()
		      .aggregate()
		      .asString()
		      .block();
	}
}
1 The maximum length of all headers will be 16384. When this value is exceeded, a TooLongFrameException is raised.

5. Lifecycle Callbacks

The following lifecycle callbacks are provided to let you extend the HttpClient.

Callback Description

doAfterRequest

Invoked when the request has been sent.

doAfterResolve

Invoked after the remote address has been resolved successfully.

doAfterResponseSuccess

Invoked after the response has been fully received.

doOnChannelInit

Invoked when initializing the channel.

doOnConnect

Invoked when the channel is about to connect.

doOnConnected

Invoked after the channel has been connected.

doOnDisconnected

Invoked after the channel has been disconnected.

doOnError

Invoked when the request has not been sent and when the response has not been fully received.

doOnRedirect

Invoked when the response headers have been received, and the request is about to be redirected.

doOnRequest

Invoked when the request is about to be sent.

doOnRequestError

Invoked when the request has not been sent.

doOnResolve

Invoked when the remote address is about to be resolved.

doOnResolveError

Invoked in case the remote address hasn’t been resolved successfully.

doOnResponse

Invoked after the response headers have been received.

doOnResponseError

Invoked when the response has not been fully received.

The following example uses the doOnConnected and doOnChannelInit callbacks:

import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import reactor.netty.http.client.HttpClient;
import java.util.concurrent.TimeUnit;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .doOnConnected(conn ->
				              conn.addHandlerFirst(new ReadTimeoutHandler(10, TimeUnit.SECONDS)))   (1)
				          .doOnChannelInit((observer, channel, remoteAddress) ->
				              channel.pipeline()
				                     .addFirst(new LoggingHandler("reactor.netty.examples")));      (2)

		client.get()
		      .uri("https://example.com/")
		      .response()
		      .block();
	}
}
1 Netty pipeline is extended with ReadTimeoutHandler when the channel has been connected.
2 Netty pipeline is extended with LoggingHandler when initializing the channel.

6. TCP-level Configuration

When you need configurations on a TCP level, you can use the following snippet to extend the default TCP client configuration (add an option, bind address etc.):

import io.netty.channel.ChannelOption;
import io.netty.channel.epoll.EpollChannelOption;
//import io.netty.channel.socket.nio.NioChannelOption;
//import jdk.net.ExtendedSocketOptions;
import reactor.netty.http.client.HttpClient;
import java.net.InetSocketAddress;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .bindAddress(() -> new InetSocketAddress("host", 1234))
				          .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) (1)
				          .option(ChannelOption.SO_KEEPALIVE, true)            (2)
				          // The options below are available only when NIO transport (Java 11) is used
				          // on Mac or Linux (Java does not currently support these extended options on Windows)
				          // https://bugs.openjdk.java.net/browse/JDK-8194298
				          //.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPIDLE), 300)
				          //.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPINTERVAL), 60)
				          //.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPCOUNT), 8);
				          // The options below are available only when Epoll transport is used
				          .option(EpollChannelOption.TCP_KEEPIDLE, 300)        (3)
				          .option(EpollChannelOption.TCP_KEEPINTVL, 60)        (4)
				          .option(EpollChannelOption.TCP_KEEPCNT, 8);          (5)

		String response =
				client.get()
				      .uri("https://example.com/")
				      .responseContent()
				      .aggregate()
				      .asString()
				      .block();

		System.out.println("Response " + response);
	}
}
1 Configures the connection establishment timeout to 10 seconds.
2 Enables TCP keepalive. This means that TCP starts sending keepalive probes when a connection is idle for some time.
3 The connection needs to remain idle for 5 minutes before TCP starts sending keepalive probes.
4 Configures the time between individual keepalive probes to 1 minute.
5 Configures the maximum number of TCP keepalive probes to 8.

See TCP Client for more about TCP level configurations.

6.1. Wire Logger

Reactor Netty provides wire logging for when the traffic between the peers needs to be inspected. By default, wire logging is disabled. To enable it, you must set the logger reactor.netty.http.client.HttpClient level to DEBUG and apply the following configuration:

import reactor.netty.http.client.HttpClient;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .wiretap(true); (1)

		client.get()
		      .uri("https://example.com/")
		      .response()
		      .block();
	}
}
1 Enables the wire logging

6.1.1. Wire Logger formatters

Reactor Netty supports 3 different formatters:

	/**
	 * When wire logging is enabled with this format, both events and content will be logged.
	 * The content will be in hex format.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] REGISTERED
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] ACTIVE
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] READ: 145B
	 *          +-------------------------------------------------+
	 *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
	 * +--------+-------------------------------------------------+----------------+
	 * |00000000| 50 4f 53 54 20 2f 74 65 73 74 2f 57 6f 72 6c 64 |POST /test/World|
	 * |00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 43 6f 6e 74 65 | HTTP/1.1..Conte|
	 * |00000020| 6e 74 2d 54 79 70 65 3a 20 74 65 78 74 2f 70 6c |nt-Type: text/pl|
	 * |00000030| 61 69 6e 0d 0a 75 73 65 72 2d 61 67 65 6e 74 3a |ain..user-agent:|
	 * |00000040| 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f 64 65 | ReactorNetty/de|
	 * ...
	 * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] WRITE: 38B
	 *          +-------------------------------------------------+
	 *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
	 * +--------+-------------------------------------------------+----------------+
	 * |00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
	 * |00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
	 * |00000020| 20 30 0d 0a 0d 0a                               | 0....          |
	 * +--------+-------------------------------------------------+----------------+
	 * }
	 * </pre>
	 */
	/**
	 * When wire logging is enabled with this format, only the events will be logged.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] REGISTERED
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] ACTIVE
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] READ: 145B
	 * reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] WRITE: 38B
	 * }
	 * </pre>
	 */
	/**
	 * When wire logging is enabled with this format, both events and content will be logged.
	 * The content will be in plain text format.
	 * <p>Examples:</p>
	 * <pre>
	 * {@code
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] REGISTERED
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] ACTIVE
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] READ: 145B POST /test/World HTTP/1.1
	 * Content-Type: text/plain
	 * user-agent: ReactorNetty/dev
	 * ...
	 * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] WRITE: 38B HTTP/1.1 200 OK
	 * content-length: 0
	 * }
	 * </pre>
	 */

When you need to change the default formatter you can configure it as follows:

import io.netty.handler.logging.LogLevel;
import reactor.netty.http.client.HttpClient;
import reactor.netty.transport.logging.AdvancedByteBufFormat;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL); (1)

		client.get()
		      .uri("https://example.com/")
		      .response()
		      .block();
	}
}
1 Enables the wire logging, AdvancedByteBufFormat#TEXTUAL is used for printing the content.

6.2. Event Loop Group

By default Reactor Netty uses an “Event Loop Group”, where the number of the worker threads equals the number of processors available to the runtime on initialization (but with a minimum value of 4). When you need a different configuration, you can use one of the LoopResources#create methods.

The following listing shows the default configuration for the Event Loop Group:

	/**
	 * Default worker thread count, fallback to available processor
	 * (but with a minimum value of 4).
	 */
	public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
	/**
	 * Default selector thread count, fallback to -1 (no selector thread)
	 * <p><strong>Note:</strong> In most use cases using a worker thread also as a selector thread works well.
	 * A possible use case for specifying a separate selector thread might be when the worker threads are too busy
	 * and connections cannot be accepted fast enough.
	 * <p><strong>Note:</strong> Although more than 1 can be configured as a selector thread count, in reality
	 * only 1 thread will be used as a selector thread.
	 */
	public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
	/**
	 * Default worker thread count for UDP, fallback to available processor
	 * (but with a minimum value of 4).
	 */
	public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
	/**
	 * Default quiet period that guarantees that the disposal of the underlying LoopResources
	 * will not happen, fallback to 2 seconds.
	 */
	public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
	/**
	 * Default maximum amount of time to wait until the disposal of the underlying LoopResources
	 * regardless if a task was submitted during the quiet period, fallback to 15 seconds.
	 */
	public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";

	/**
	 * Default value whether the native transport (epoll, kqueue) will be preferred,
	 * fallback it will be preferred when available.
	 */
	public static final String NATIVE = "reactor.netty.native";

If you need changes to these settings, you can apply the following configuration:

import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.LoopResources;

public class Application {

	public static void main(String[] args) {
		LoopResources loop = LoopResources.create("event-loop", 1, 4, true);
		HttpClient client =
				HttpClient.create()
				          .runOn(loop);

		client.get()
		      .uri("https://example.com/")
		      .responseContent()
		      .aggregate()
		      .asString()
		      .block();
	}
}

6.2.1. Disposing Event Loop Group

  • If you use the default Event Loop Group provided by Reactor Netty, invoke HttpResources#disposeLoopsAndConnections/#disposeLoopsAndConnectionsLater method.

Disposing HttpResources means that every server/client that is using it, will not be able to use it anymore!
  • If you use custom LoopResources, invoke LoopResources#dispose/#disposeLater method.

Disposing the custom LoopResources means that every server/client that is configured to use it, will not be able to use it anymore!

7. Connection Pool

By default, HttpClient uses a “fixed” connection pool with 500 as the maximum number of active channels and 1000 as the maximum number of further channel acquisition attempts allowed to be kept in a pending state (for the rest of the configurations check the system properties or the builder configurations below). This means that the implementation creates a new channel if someone tries to acquire a channel as long as less than 500 have been created and are managed by the pool. When the maximum number of channels in the pool is reached, up to 1000 new attempts to acquire a channel are delayed (pending) until a channel is returned to the pool again, and further attempts are declined with an error.

	/**
	 * Default max connections. Fallback to
	 * 2 * available number of processors (but with a minimum value of 16)
	 */
	public static final String POOL_MAX_CONNECTIONS = "reactor.netty.pool.maxConnections";
	/**
	 * Default acquisition timeout (milliseconds) before error. If -1 will never wait to
	 * acquire before opening a new
	 * connection in an unbounded fashion. Fallback 45 seconds
	 */
	public static final String POOL_ACQUIRE_TIMEOUT = "reactor.netty.pool.acquireTimeout";
	/**
	 * Default max idle time, fallback - max idle time is not specified.
	 * <p><strong>Note:</strong> This configuration is not applicable for {@link reactor.netty.tcp.TcpClient}.
	 * A TCP connection is always closed and never returned to the pool.
	 */
	public static final String POOL_MAX_IDLE_TIME = "reactor.netty.pool.maxIdleTime";
	/**
	 * Default max life time, fallback - max life time is not specified.
	 * <p><strong>Note:</strong> This configuration is not applicable for {@link reactor.netty.tcp.TcpClient}.
	 * A TCP connection is always closed and never returned to the pool.
	 */
	public static final String POOL_MAX_LIFE_TIME = "reactor.netty.pool.maxLifeTime";
	/**
	 * Default leasing strategy (fifo, lifo), fallback to fifo.
	 * <ul>
	 *     <li>fifo - The connection selection is first in, first out</li>
	 *     <li>lifo - The connection selection is last in, first out</li>
	 * </ul>
	 * <p><strong>Note:</strong> This configuration is not applicable for {@link reactor.netty.tcp.TcpClient}.
	 * A TCP connection is always closed and never returned to the pool.
	 */
	public static final String POOL_LEASING_STRATEGY = "reactor.netty.pool.leasingStrategy";
	/**
	 * Default {@code getPermitsSamplingRate} (between 0d and 1d (percentage))
	 * to be used with a {@link SamplingAllocationStrategy}.
	 * This strategy wraps a {@link PoolBuilder#sizeBetween(int, int) sizeBetween} {@link AllocationStrategy}
	 * and samples calls to {@link AllocationStrategy#getPermits(int)}.
	 * Fallback - sampling is not enabled.
	 */
	public static final String POOL_GET_PERMITS_SAMPLING_RATE = "reactor.netty.pool.getPermitsSamplingRate";
	/**
	 * Default {@code returnPermitsSamplingRate} (between 0d and 1d (percentage))
	 * to be used with a {@link SamplingAllocationStrategy}.
	 * This strategy wraps a {@link PoolBuilder#sizeBetween(int, int) sizeBetween} {@link AllocationStrategy}
	 * and samples calls to {@link AllocationStrategy#returnPermits(int)}.
	 * Fallback - sampling is not enabled.
	 */
	public static final String POOL_RETURN_PERMITS_SAMPLING_RATE = "reactor.netty.pool.returnPermitsSamplingRate";

When you need to change the default settings, you can configure the ConnectionProvider as follows:

import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		ConnectionProvider provider =
				ConnectionProvider.builder("custom")
				                  .maxConnections(50)
				                  .maxIdleTime(Duration.ofSeconds(20))           (1)
				                  .maxLifeTime(Duration.ofSeconds(60))           (2)
				                  .pendingAcquireTimeout(Duration.ofSeconds(60)) (3)
				                  .evictInBackground(Duration.ofSeconds(120))    (4)
				                  .build();

		HttpClient client = HttpClient.create(provider);

		String response =
				client.get()
				      .uri("https://example.com/")
				      .responseContent()
				      .aggregate()
				      .asString()
				      .block();

		System.out.println("Response " + response);

		provider.disposeLater()
		        .block();
	}
}
1 Configures the maximum time for a connection to stay idle to 20 seconds.
2 Configures the maximum time for a connection to stay alive to 60 seconds.
3 Configures the maximum time for the pending acquire operation to 60 seconds.
4 Every two minutes, the connection pool is regularly checked for connections that are applicable for removal.
Notice that only the default HttpClient (HttpClient.create()) uses 500 as the maximum number of active channels. In the example above, when instantiating a custom ConnectionProvider, we are changing this value to 50 using maxConnections. Also, if you don’t set this parameter the default maxConnections is used (2 * available number of processors).

The following listing shows the available configurations:

Configuration name Description

disposeInactivePoolsInBackground

When this option is enabled, connection pools are regularly checked in the background, and those that are empty and been inactive for a specified time become eligible for disposal. Connection pool is considered empty when there are no active connections, idle connections and pending acquisitions. By default, this background disposal of inactive pools is disabled.

disposeTimeout

When ConnectionProvider#dispose() or ConnectionProvider#disposeLater() is called, trigger a graceful shutdown for the connection pools, with this grace period timeout. From there on, all calls for acquiring a connection will fail fast with an exception. However, for the provided Duration, pending acquires will get a chance to be served. Note: The rejection of new acquires and the grace timer start immediately, irrespective of subscription to the Mono returned by ConnectionProvider#disposeLater(). Subsequent calls return the same Mono, effectively getting notifications from the first graceful shutdown call and ignoring subsequently provided timeouts. By default, dispose timeout is not specified.

evictInBackground

When this option is enabled, each connection pool regularly checks for connections that are eligible for removal according to eviction criteria like maxIdleTime. By default, this background eviction is disabled.

fifo

Configure the connection pool so that if there are idle connections (i.e. pool is under-utilized), the next acquire operation will get the Least Recently Used connection (LRU, i.e. the connection that was released first among the current idle connections). Default leasing strategy.

lifo

Configure the connection pool so that if there are idle connections (i.e. pool is under-utilized), the next acquire operation will get the Most Recently Used connection (MRU, i.e. the connection that was released last among the current idle connections).

maxConnections

The maximum number of connections (per connection pool) before start pending. Default to 2 * available number of processors (but with a minimum value of 16).

maxIdleTime

The time after which the channel is eligible to be closed when idle (resolution: ms). Default: max idle time is not specified.

maxLifeTime

The total life time after which the channel is eligible to be closed (resolution: ms). Default: max life time is not specified.

metrics

Enables/disables built-in integration with Micrometer. ConnectionProvider.MeterRegistrar can be provided for integration with another metrics system. By default, metrics are not enabled.

pendingAcquireMaxCount

The maximum number of extra attempts at acquiring a connection to keep in a pending queue. If -1 is specified, the pending queue does not have upper limit. Default to 2 * max connections.

pendingAcquireTimeout

The maximum time before which a pending acquire must complete, or a TimeoutException is thrown (resolution: ms). If -1 is specified, no such timeout is applied. Default: 45 seconds.

When you expect a high load, be cautious with a connection pool with a very high value for maximum connections. You might experience reactor.netty.http.client.PrematureCloseException exception with a root cause "Connect Timeout" due to too many concurrent connections opened/acquired.

If you need to disable the connection pool, you can apply the following configuration:

import reactor.netty.http.client.HttpClient;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.newConnection()
				          .doOnConnected(conn -> System.out.println("Connection " + conn.channel()));

		String response =
				// A new connection is established for every request
				client.get()
				      .uri("https://httpbin.org/get")
				      .responseContent()
				      .aggregate()
				      .asString()
				      .block();

		System.out.println("Response " + response);

		response =
				// A new connection is established for every request
				client.post()
				      .uri("https://httpbin.org/post")
				      .responseContent()
				      .aggregate()
				      .asString()
				      .block();

		System.out.println("Response " + response);
	}
}

7.1. Disposing Connection Pool

  • If you use the default ConnectionProvider provided by Reactor Netty, invoke HttpResources#disposeLoopsAndConnections/#disposeLoopsAndConnectionsLater method.

Disposing HttpResources means that every client that is using it, will not be able to use it anymore!
  • If you use custom ConnectionProvider, invoke ConnectionProvider#dispose/#disposeLater/#disposeWhen method.

Disposing the custom ConnectionProvider means that every client that is configured to use it, will not be able to use it anymore!

7.2. Metrics

The pooled ConnectionProvider supports built-in integration with Micrometer. It exposes all metrics with a prefix of reactor.netty.connection.provider.

Pooled ConnectionProvider metrics

metric name type description

reactor.netty.connection.provider.total.connections

Gauge

The number of all connections, active or idle. See Total Connections

reactor.netty.connection.provider.active.connections

Gauge

The number of the connections that have been successfully acquired and are in active use. See Active Connections

reactor.netty.connection.provider.max.connections

Gauge

The maximum number of active connections that are allowed. See Max Connections

reactor.netty.connection.provider.idle.connections

Gauge

The number of the idle connections. See Idle Connections

reactor.netty.connection.provider.pending.connections

Gauge

The number of requests that are waiting for a connection. See Pending Connections

reactor.netty.connection.provider.pending.connections.time

Timer

Time spent in pending acquire a connection from the connection pool. See Pending Connections Time

reactor.netty.connection.provider.max.pending.connections

Gauge

The maximum number of requests that will be queued while waiting for a ready connection. See Max Pending Connections

The following table provides information for the HTTP client metrics when it is configured to serve HTTP/2 traffic:

metric name type description

reactor.netty.connection.provider.active.streams

Gauge

The number of the active HTTP/2 streams. See Active Streams

reactor.netty.connection.provider.pending.streams

Gauge

The number of requests that are waiting for opening HTTP/2 stream. See Pending Streams

The following example enables that integration:

import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;

public class Application {

	public static void main(String[] args) {
		ConnectionProvider provider =
				ConnectionProvider.builder("custom")
				                  .maxConnections(50)
				                  .metrics(true) (1)
				                  .build();

		HttpClient client = HttpClient.create(provider);

		String response =
				client.get()
				      .uri("https://example.com/")
				      .responseContent()
				      .aggregate()
				      .asString()
				      .block();

		System.out.println("Response " + response);

		provider.disposeLater()
		        .block();
	}
}
1 Enables the built-in integration with Micrometer

8. SSL and TLS

When you need SSL or TLS, you can apply the configuration shown in the next example. By default, if OpenSSL is available, a SslProvider.OPENSSL provider is used as a provider. Otherwise, a SslProvider.JDK provider is used You can switch the provider by using SslContextBuilder or by setting -Dio.netty.handler.ssl.noOpenSsl=true. The following example uses SslContextBuilder:

import reactor.netty.http.Http11SslContextSpec;
import reactor.netty.http.client.HttpClient;

public class Application {

	public static void main(String[] args) {
		Http11SslContextSpec http11SslContextSpec = Http11SslContextSpec.forClient();

		HttpClient client =
				HttpClient.create()
				          .secure(spec -> spec.sslContext(http11SslContextSpec));

		client.get()
		      .uri("https://example.com/")
		      .response()
		      .block();
	}
}

8.1. Server Name Indication

By default, the HTTP client sends the remote host name as SNI server name. When you need to change this default setting, you can configure the HTTP client as follows:

import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.http.client.HttpClient;

import javax.net.ssl.SNIHostName;

public class Application {

	public static void main(String[] args) throws Exception {
		SslContext sslContext = SslContextBuilder.forClient().build();

		HttpClient client =
				HttpClient.create()
				          .secure(spec -> spec.sslContext(sslContext)
				                              .serverNames(new SNIHostName("test.com")));

		client.get()
		      .uri("https://127.0.0.1:8080/")
		      .response()
		      .block();
	}
}

9. Retry Strategies

By default, the HTTP client retries the request once if it was aborted on the TCP level.

10. HTTP/2

By default, the HTTP client supports HTTP/1.1. If you need HTTP/2, you can get it through configuration. In addition to the protocol configuration, if you need H2 but not H2C (cleartext), you must also configure SSL.

As Application-Layer Protocol Negotiation (ALPN) is not supported “out-of-the-box” by JDK8 (although some vendors backported ALPN to JDK8), you need an additional dependency to a native library that supports it — for example, netty-tcnative-boringssl-static.

The following listing presents a simple H2 example:

import io.netty.handler.codec.http.HttpHeaders;
import reactor.core.publisher.Mono;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.HttpClient;
import reactor.util.function.Tuple2;

public class H2Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .protocol(HttpProtocol.H2) (1)
				          .secure();                 (2)

		Tuple2<String, HttpHeaders> response =
				client.get()
				      .uri("https://example.com/")
				      .responseSingle((res, bytes) -> bytes.asString()
				                                           .zipWith(Mono.just(res.responseHeaders())))
				      .block();

		System.out.println("Used stream ID: " + response.getT2().get("x-http2-stream-id"));
		System.out.println("Response: " + response.getT1());
	}
}
1 Configures the client to support only HTTP/2
2 Configures SSL

The following listing presents a simple H2C example:

import io.netty.handler.codec.http.HttpHeaders;
import reactor.core.publisher.Mono;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.HttpClient;
import reactor.util.function.Tuple2;

public class H2CApplication {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .protocol(HttpProtocol.H2C);

		Tuple2<String, HttpHeaders> response =
				client.get()
				      .uri("http://localhost:8080/")
				      .responseSingle((res, bytes) -> bytes.asString()
				                                           .zipWith(Mono.just(res.responseHeaders())))
				      .block();

		System.out.println("Used stream ID: " + response.getT2().get("x-http2-stream-id"));
		System.out.println("Response: " + response.getT1());
	}
}

10.1. Protocol Selection

 * @author Stephane Maldini
 */
public enum HttpProtocol {

	/**
	 * The default supported HTTP protocol by HttpServer and HttpClient.
	 */
	HTTP11,

	/**
	 * HTTP/2.0 support with TLS
	 * <p>If used along with HTTP/1.1 protocol, HTTP/2.0 will be the preferred protocol.
	 * While negotiating the application level protocol, HTTP/2.0 or HTTP/1.1 can be chosen.
	 * <p>If used without HTTP/1.1 protocol, HTTP/2.0 will always be offered as a protocol
	 * for communication with no fallback to HTTP/1.1.
	 */
	H2,

	/**
	 * HTTP/2.0 support with clear-text.
	 * <p>If used along with HTTP/1.1 protocol, will support H2C "upgrade":
	 * Request or consume requests as HTTP/1.1 first, looking for HTTP/2.0 headers
	 * and {@literal Connection: Upgrade}. A server will typically reply a successful
	 * 101 status if upgrade is successful or a fallback HTTP/1.1 response. When
	 * successful the client will start sending HTTP/2.0 traffic.
	 * <p>If used without HTTP/1.1 protocol, will support H2C "prior-knowledge": Doesn't
	 * require {@literal Connection: Upgrade} handshake between a client and server but
	 * fallback to HTTP/1.1 will not be supported.
	 */
	H2C,

	/**
	 * HTTP/3.0 support.
	 * @since 1.2.0
	 */

11. Proxy Support

Reactor Netty supports the proxy functionality provided by Netty and provides a way to specify non proxy hosts through the ProxyProvider builder.

Netty’s HTTP proxy support always uses CONNECT method in order to establish a tunnel to the specified proxy regardless of the scheme that is used http or https. (More information: Netty enforce HTTP proxy to support HTTP CONNECT method). Some proxies might not support CONNECT method when the scheme is http or might need to be configured in order to support this way of communication. Sometimes this might be the reason for not being able to connect to the proxy. Consider checking the proxy documentation whether it supports or needs an additional configuration in order to support CONNECT method.

The following example uses ProxyProvider:

import reactor.netty.http.client.HttpClient;
import reactor.netty.transport.ProxyProvider;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .proxy(spec -> spec.type(ProxyProvider.Proxy.HTTP)
				                             .host("proxy")
				                             .port(8080)
				                             .nonProxyHosts("localhost")
				                             .connectTimeoutMillis(20_000)); (1)

		String response =
				client.get()
				      .uri("https://example.com/")
				      .responseContent()
				      .aggregate()
				      .asString()
				      .block();

		System.out.println("Response " + response);
	}
}
1 Configures the connection establishment timeout to 20 seconds.

12. Metrics

The HTTP client supports built-in integration with Micrometer. It exposes all metrics with a prefix of reactor.netty.http.client.

The following table provides information for the HTTP client metrics:

metric name type description

reactor.netty.http.client.data.received

DistributionSummary

Amount of the data received, in bytes. See Data Received

reactor.netty.http.client.data.sent

DistributionSummary

Amount of the data sent, in bytes. See Data Sent

reactor.netty.http.client.errors

Counter

Number of errors that occurred. See Errors Count

reactor.netty.http.client.tls.handshake.time

Timer

Time spent for TLS handshake. See Tls Handshake Time

reactor.netty.http.client.connect.time

Timer

Time spent for connecting to the remote address. See Connect Time

reactor.netty.http.client.address.resolver

Timer

Time spent for resolving the address. See Hostname Resolution Time

reactor.netty.http.client.data.received.time

Timer

Time spent in consuming incoming data. See Http Client Data Received Time

reactor.netty.http.client.data.sent.time

Timer

Time spent in sending outgoing data. See Http Client Data Sent Time

reactor.netty.http.client.response.time

Timer

Total time for the request/response See Http Client Response Time

These additional metrics are also available:

Pooled ConnectionProvider metrics

metric name type description

reactor.netty.connection.provider.total.connections

Gauge

The number of all connections, active or idle. See Total Connections

reactor.netty.connection.provider.active.connections

Gauge

The number of the connections that have been successfully acquired and are in active use. See Active Connections

reactor.netty.connection.provider.max.connections

Gauge

The maximum number of active connections that are allowed. See Max Connections

reactor.netty.connection.provider.idle.connections

Gauge

The number of the idle connections. See Idle Connections

reactor.netty.connection.provider.pending.connections

Gauge

The number of requests that are waiting for a connection. See Pending Connections

reactor.netty.connection.provider.pending.connections.time

Timer

Time spent in pending acquire a connection from the connection pool. See Pending Connections Time

reactor.netty.connection.provider.max.pending.connections

Gauge

The maximum number of requests that will be queued while waiting for a ready connection. See Max Pending Connections

The following table provides information for the HTTP client metrics when it is configured to serve HTTP/2 traffic:

metric name type description

reactor.netty.connection.provider.active.streams

Gauge

The number of the active HTTP/2 streams. See Active Streams

reactor.netty.connection.provider.pending.streams

Gauge

The number of requests that are waiting for opening HTTP/2 stream. See Pending Streams

ByteBufAllocator metrics

metric name type description

reactor.netty.bytebuf.allocator.used.heap.memory

Gauge

The number of bytes reserved by heap buffer allocator. See Used Heap Memory

reactor.netty.bytebuf.allocator.used.direct.memory

Gauge

The number of bytes reserved by direct buffer allocator. See Used Direct Memory

reactor.netty.bytebuf.allocator.heap.arenas

Gauge

The number of heap arenas (when PooledByteBufAllocator). See Heap Arenas

reactor.netty.bytebuf.allocator.direct.arenas

Gauge

The number of direct arenas (when PooledByteBufAllocator). See Direct Arenas

reactor.netty.bytebuf.allocator.threadlocal.caches

Gauge

The number of thread local caches (when PooledByteBufAllocator). See Thread Local Caches

reactor.netty.bytebuf.allocator.small.cache.size

Gauge

The size of the small cache (when PooledByteBufAllocator). See Small Cache Size

reactor.netty.bytebuf.allocator.normal.cache.size

Gauge

The size of the normal cache (when PooledByteBufAllocator). See Normal Cache Size

reactor.netty.bytebuf.allocator.chunk.size

Gauge

The chunk size for an arena (when PooledByteBufAllocator). See Chunk Size

reactor.netty.bytebuf.allocator.active.heap.memory

Gauge

The actual bytes consumed by in-use buffers allocated from heap buffer pools (when PooledByteBufAllocator). See Active Heap Memory

reactor.netty.bytebuf.allocator.active.direct.memory

Gauge

The actual bytes consumed by in-use buffers allocated from direct buffer pools (when PooledByteBufAllocator). See Active Direct Memory

EventLoop metrics

metric name type description

reactor.netty.eventloop.pending.tasks

Gauge

The number of tasks that are pending for processing on an event loop. See Pending Tasks

The following example enables that integration:

import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.config.MeterFilter;
import reactor.netty.http.client.HttpClient;

public class Application {

	public static void main(String[] args) {
		Metrics.globalRegistry (1)
		       .config()
		       .meterFilter(MeterFilter.maximumAllowableTags("reactor.netty.http.client", "URI", 100, MeterFilter.deny()));

		HttpClient client =
				HttpClient.create()
				          .metrics(true, s -> {
				              if (s.startsWith("/stream/")) { (2)
				                  return "/stream/{n}";
				              }
				              else if (s.startsWith("/bytes/")) {
				                  return "/bytes/{n}";
				              }
				              return s;
				          }); (3)

		client.get()
		      .uri("https://httpbin.org/stream/2")
		      .responseContent()
		      .blockLast();

		client.get()
		      .uri("https://httpbin.org/bytes/1024")
		      .responseContent()
		      .blockLast();
	}
}
1 Applies upper limit for the meters with URI tag
2 Templated URIs will be used as a URI tag value when possible
3 Enables the built-in integration with Micrometer
In order to avoid a memory and CPU overhead of the enabled metrics, it is important to convert the real URIs to templated URIs when possible. Without a conversion to a template-like form, each distinct URI leads to the creation of a distinct tag, which takes a lot of memory for the metrics.
Always apply an upper limit for the meters with URI tags. Configuring an upper limit on the number of meters can help in cases when the real URIs cannot be templated. You can find more information at maximumAllowableTags.

When HTTP client metrics are needed for an integration with a system other than Micrometer or you want to provide your own integration with Micrometer, you can provide your own metrics recorder, as follows:

import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientMetricsRecorder;

import java.net.SocketAddress;
import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .metrics(true, CustomHttpClientMetricsRecorder::new); (1)

		client.get()
		      .uri("https://httpbin.org/stream/2")
		      .response()
		      .block();
	}
1 Enables HTTP client metrics and provides HttpClientMetricsRecorder implementation.

13. Tracing

The HTTP client supports built-in integration with Micrometer Tracing.

The following table provides information for the HTTP client spans:

contextual name description

HTTP <HTTP METHOD>

Information and total time for the request. See Http Client Response Span.

hostname resolution

Information and time spent for resolving the address. See Hostname Resolution Span.

connect

Information and time spent for connecting to the remote address. See Connect Span.

tls handshake

Information and time spent for TLS handshake. See Tls Handshake Span.

The following example enables that integration. This concrete example uses Brave and reports the information to Zipkin. See the Micrometer Tracing documentation for OpenTelemetry setup.

import brave.Tracing;
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext;
import io.micrometer.tracing.brave.bridge.BravePropagator;
import io.micrometer.tracing.brave.bridge.BraveTracer;
import io.micrometer.tracing.propagation.Propagator;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.observability.ReactorNettyPropagatingSenderTracingObservationHandler;
import reactor.netty.observability.ReactorNettyTracingObservationHandler;
import zipkin2.reporter.brave.AsyncZipkinSpanHandler;
import zipkin2.reporter.urlconnection.URLConnectionSender;

import static reactor.netty.Metrics.OBSERVATION_REGISTRY;

public class Application {

	public static void main(String[] args) {
		init(); (1)

		HttpClient client =
				HttpClient.create()
				          .metrics(true, s -> {
				              if (s.startsWith("/stream/")) { (2)
				                  return "/stream/{n}";
				              }
				              return s;
				          }); (3)

		client.get()
		      .uri("https://httpbin.org/stream/3")
		      .responseContent()
		      .blockLast();
	}

	/**
	 * This setup is based on
	 * <a href="https://micrometer.io/docs/tracing#_micrometer_tracing_brave_setup">Micrometer Tracing Brave Setup</a>.
	 */
	static void init() {
		AsyncZipkinSpanHandler spanHandler = AsyncZipkinSpanHandler
				.create(URLConnectionSender.create("http://localhost:9411/api/v2/spans"));

		StrictCurrentTraceContext braveCurrentTraceContext = StrictCurrentTraceContext.create();

		CurrentTraceContext bridgeContext = new BraveCurrentTraceContext(braveCurrentTraceContext);

		Tracing tracing =
				Tracing.newBuilder()
				       .currentTraceContext(braveCurrentTraceContext)
				       .supportsJoin(false)
				       .traceId128Bit(true)
				       .sampler(Sampler.ALWAYS_SAMPLE)
				       .addSpanHandler(spanHandler)
				       .localServiceName("reactor-netty-examples")
				       .build();

		brave.Tracer braveTracer = tracing.tracer();

		Tracer tracer = new BraveTracer(braveTracer, bridgeContext, new BraveBaggageManager());

		Propagator propagator = new BravePropagator(tracing);

		OBSERVATION_REGISTRY.observationConfig()
		                    .observationHandler(new ReactorNettyPropagatingSenderTracingObservationHandler(tracer, propagator))
		                    .observationHandler(new ReactorNettyTracingObservationHandler(tracer));
	}
}
1 Initializes Brave, Zipkin, and the Observation registry.
2 Templated URIs are used as an URI tag value when possible.
3 Enables the built-in integration with Micrometer.

The result in Zipkin looks like:

http client tracing

13.1. Access Current Observation

Project Micrometer provides a library that assists with context propagation across different types of context mechanisms such as ThreadLocal, Reactor Context and others.

The following example shows how to use this library in a custom ChannelHandler:

import brave.Tracing;
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext;
import io.micrometer.tracing.brave.bridge.BravePropagator;
import io.micrometer.tracing.brave.bridge.BraveTracer;
import io.micrometer.tracing.propagation.Propagator;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import reactor.netty.NettyPipeline;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.observability.ReactorNettyPropagatingSenderTracingObservationHandler;
import reactor.netty.observability.ReactorNettyTracingObservationHandler;
import zipkin2.reporter.brave.AsyncZipkinSpanHandler;
import zipkin2.reporter.urlconnection.URLConnectionSender;

import static reactor.netty.Metrics.OBSERVATION_REGISTRY;

public class Application {

	public static void main(String[] args) {
		init(); (1)

		HttpClient client =
				HttpClient.create()
				          .metrics(true, s -> {
				              if (s.startsWith("/stream/")) { (2)
				                  return "/stream/{n}";
				              }
				              return s;
				          }) (3)
				          .doOnConnected(conn -> conn.channel().pipeline().addAfter(NettyPipeline.HttpCodec,
				                  "custom-channel-handler", CustomChannelOutboundHandler.INSTANCE)); (4)

		client.get()
		      .uri("https://httpbin.org/stream/3")
		      .responseContent()
		      .blockLast();
	}

	static final class CustomChannelOutboundHandler extends ChannelOutboundHandlerAdapter {

		static final ChannelHandler INSTANCE = new CustomChannelOutboundHandler();

		@Override
		public boolean isSharable() {
			return true;
		}

		@Override
		public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
			try (ContextSnapshot.Scope scope = ContextSnapshotFactory.builder().build().setThreadLocalsFrom(ctx.channel())) {
				System.out.println("Current Observation in Scope: " + OBSERVATION_REGISTRY.getCurrentObservation());
				//"FutureReturnValueIgnored" this is deliberate
				ctx.write(msg, promise);
			}
			System.out.println("Current Observation: " + OBSERVATION_REGISTRY.getCurrentObservation());
		}
	}
1 Initializes Brave, Zipkin, and the Observation registry.
2 Templated URIs are used as an URI tag value when possible.
3 Enables the built-in integration with Micrometer.
4 Custom ChannelHandler that uses context propagation library. This concrete example overrides only ChannelOutboundHandlerAdapter#write, if it is needed, the same logic can be used for the rest of the methods. Also, this concrete example sets all ThreadLocal values for which there is a value in the given Channel, if another behaviour is needed please check context propagation library API. For example, you may want to set only some of the ThreadLocal values.
When you enable Reactor Netty tracing within a framework, you may need to let Reactor Netty use the ObservationRegistry created by this framework. For this purpose you need to invoke reactor.netty.Metrics#observationRegistry. You may also need to configure the Reactor Netty ObservationHandlers using the API provided by the framework.

14. Unix Domain Sockets

The HTTP client supports Unix Domain Sockets (UDS) when native transport is in use.

The following example shows how to use UDS support:

import io.netty.channel.unix.DomainSocketAddress;
import reactor.netty.http.client.HttpClient;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .remoteAddress(() -> new DomainSocketAddress("/tmp/test.sock")); (1)

		client.get()
		      .uri("/")
		      .response()
		      .block();
	}
}
1 Specifies DomainSocketAddress that will be used

15. Host Name Resolution

By default, the HttpClient uses Netty’s domain name lookup mechanism that resolves a domain name asynchronously. This is as an alternative of the JVM’s built-in blocking resolver.

When you need to change the default settings, you can configure the HttpClient as follows:

import reactor.netty.http.client.HttpClient;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .resolver(spec -> spec.queryTimeout(Duration.ofMillis(500))); (1)

		String response =
				client.get()
				      .uri("https://example.com/")
				      .responseContent()
				      .aggregate()
				      .asString()
				      .block();

		System.out.println("Response " + response);
	}
}
1 The timeout of each DNS query performed by this resolver will be 500ms.

The following listing shows the available configurations. Additionally, TCP fallback is enabled by default.

Configuration name Description

bindAddressSupplier

The supplier of the local address to bind to.

cacheMaxTimeToLive

The max time to live of the cached DNS resource records (resolution: seconds). If the time to live of the DNS resource record returned by the DNS server is greater than this max time to live, this resolver ignores the time to live from the DNS server and uses this max time to live. Default to Integer.MAX_VALUE.

cacheMinTimeToLive

The min time to live of the cached DNS resource records (resolution: seconds). If the time to live of the DNS resource record returned by the DNS server is less than this min time to live, this resolver ignores the time to live from the DNS server and uses this min time to live. Default: 0.

cacheNegativeTimeToLive

The time to live of the cache for the failed DNS queries (resolution: seconds). Default: 0.

completeOncePreferredResolved

When this setting is enabled, the resolver notifies as soon as all queries for the preferred address type are complete. When this setting is disabled, the resolver notifies when all possible address types are complete. This configuration is applicable for DnsNameResolver#resolveAll(String). By default, this setting is enabled.

disableOptionalRecord

Disables the automatic inclusion of an optional record that tries to give a hint to the remote DNS server about how much data the resolver can read per response. By default, this setting is enabled.

disableRecursionDesired

Specifies whether this resolver has to send a DNS query with the recursion desired (RD) flag set. By default, this setting is enabled.

dnsAddressResolverGroupProvider

Sets a custom function to create a DnsAddressResolverGroup given a DnsNameResolverBuilder

hostsFileEntriesResolver

Sets a custom HostsFileEntriesResolver to be used for hosts file entries. Default: DefaultHostsFileEntriesResolver.

maxPayloadSize

Sets the capacity of the datagram packet buffer (in bytes). Default: 4096.

maxQueriesPerResolve

Sets the maximum allowed number of DNS queries to send when resolving a host name. Default: 16.

ndots

Sets the number of dots that must appear in a name before an initial absolute query is made. Default: -1 (to determine the value from the OS on Unix or use a value of 1 otherwise).

queryTimeout

Sets the timeout of each DNS query performed by this resolver (resolution: milliseconds). Default: 5000.

resolveCache

The cache to use to store resolved DNS entries.

resolvedAddressTypes

The list of the protocol families of the resolved address.

retryTcpOnTimeout

Specifies whether this resolver will also fallback to TCP if a timeout is detected. By default, the resolver will only try to use TCP if the response is marked as truncated.

roundRobinSelection

Enables an AddressResolverGroup of DnsNameResolver that supports random selection of destination addresses if multiple are provided by the nameserver. See RoundRobinDnsAddressResolverGroup. Default: DnsAddressResolverGroup

runOn

Performs the communication with the DNS servers on the given LoopResources. By default, the LoopResources specified on the client level are used.

searchDomains

The list of search domains of the resolver. By default, the effective search domain list is populated by using the system DNS search domains.

trace

A specific logger and log level to be used by this resolver when generating detailed trace information in case of resolution failure.

Sometimes, you may want to switch to the JVM built-in resolver. To do so, you can configure the HttpClient as follows:

import io.netty.resolver.DefaultAddressResolverGroup;
import reactor.netty.http.client.HttpClient;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .resolver(DefaultAddressResolverGroup.INSTANCE); (1)

		String response =
				client.get()
				      .uri("https://example.com/")
				      .responseContent()
				      .aggregate()
				      .asString()
				      .block();

		System.out.println("Response " + response);
	}
}
1 Sets the JVM built-in resolver.

16. Timeout Configuration

This section describes various timeout configuration options that can be used in HttpClient. Configuring a proper timeout may improve or solve issues in the communication process. The configuration options can be grouped as follows:

16.1. Connection Pool Timeout

By default, HttpClient uses a connection pool. When a request is completed successfully and if the connection is not scheduled for closing, the connection is returned to the connection pool and can thus be reused for processing another request. The connection may be reused immediately for another request or may stay idle in the connection pool for some time.

The following list describes the available timeout configuration options:

  • maxIdleTime - The maximum time (resolution: ms) that this connection stays idle in the connection pool. By default, maxIdleTime is not specified.

When you configure maxIdleTime, you should consider the idle timeout configuration on the target server. Choose a configuration that is equal to or less than the one on the target server. By doing so, you can reduce the I/O issues caused by a connection closed by the target server.
  • maxLifeTime - The maximum time (resolution: ms) that this connection stays alive. By default, maxLifeTime is not specified.

  • pendingAcquireTimeout - The maximum time (resolution: ms) after which a pending acquire operation must complete, or a PoolAcquireTimeoutException is thrown. Default: 45s.

By default, these timeouts are checked on connection release or acquire operations and, if some timeout is reached, the connection is closed and removed from the connection pool. However, you can also configure the connection pool, by setting evictInBackground, to perform periodic checks on connections.

To customize the default settings, you can configure HttpClient as follows:

import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		ConnectionProvider provider =
				ConnectionProvider.builder("custom")
				                  .maxConnections(50)
				                  .maxIdleTime(Duration.ofSeconds(20))           (1)
				                  .maxLifeTime(Duration.ofSeconds(60))           (2)
				                  .pendingAcquireTimeout(Duration.ofSeconds(60)) (3)
				                  .evictInBackground(Duration.ofSeconds(120))    (4)
				                  .build();

		HttpClient client = HttpClient.create(provider);

		String response =
				client.get()
				      .uri("https://example.com/")
				      .responseContent()
				      .aggregate()
				      .asString()
				      .block();

		System.out.println("Response " + response);

		provider.disposeLater()
		        .block();
	}
}
1 Configures the maximum time for a connection to stay idle to 20 seconds.
2 Configures the maximum time for a connection to stay alive to 60 seconds.
3 Configures the maximum time for the pending acquire operation to 60 seconds.
4 Every two minutes, the connection pool is regularly checked for connections that are applicable for removal.

16.2. HttpClient Timeout

This section provides information for the various timeout configuration options at the HttpClient level.

Reactor Netty uses Reactor Core as its Reactive Streams implementation, and you may want to use the timeout operator that Mono and Flux provide. Keep in mind, however, that it is better to use the more specific timeout configuration options available in Reactor Netty, since they provide more control for a specific purpose and use case. By contrast, the timeout operator can only apply to the operation as a whole, from establishing the connection to the remote peer to receiving the response.

16.2.1. Response Timeout

HttpClient provides an API for configuring a default response timeout for all requests. You can change this default response timeout through an API for a specific request. By default, responseTimeout is not specified.

It is always a good practice to configure a response timeout.

To customize the default settings, you can configure HttpClient as follows:

import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .responseTimeout(Duration.ofSeconds(1));    (1)

		String response1 =
				client.post()
				      .uri("https://example.com/")
				      .send((req, out) -> {
				          req.responseTimeout(Duration.ofSeconds(2)); (2)
				          return out.sendString(Mono.just("body1"));
				      })
				      .responseContent()
				      .aggregate()
				      .asString()
				      .block();

		System.out.println("Response " + response1);

		String response2 =
				client.post()
				      .uri("https://example.com/")
				      .send((req, out) -> out.sendString(Mono.just("body2")))
				      .responseContent()
				      .aggregate()
				      .asString()
				      .block();

		System.out.println("Response " + response2);
	}
}
1 Configures the default response timeout to 1 second.
2 Configures a response timeout for a specific request to 2 seconds.

16.2.2. Connection Timeout

The following listing shows all available connection timeout configuration options, but some of them may apply only to a specific transport.

  • CONNECT_TIMEOUT_MILLIS - If the connection establishment attempt to the remote peer does not finish within the configured connect timeout (resolution: ms), the connection establishment attempt fails. Default: 30s.

  • SO_KEEPALIVE - When the connection stays idle for some time (the time is implementation dependent, but the default is typically two hours), TCP automatically sends a keepalive probe to the remote peer. By default, SO_KEEPALIVE is not enabled. When you run with Epoll/NIO (since Java 11 on Mac or Linux) transport, you may also configure:

    • TCP_KEEPIDLE - The maximum time (resolution: seconds) that this connection stays idle before TCP starts sending keepalive probes, if SO_KEEPALIVE has been set. The maximum time is implementation dependent, but the default is typically two hours.

    • TCP_KEEPINTVL (Epoll)/TCP_KEEPINTERVAL (NIO) - The time (resolution: seconds) between individual keepalive probes.

    • TCP_KEEPCNT (Epoll)/TCP_KEEPCOUNT (NIO) - The maximum number of keepalive probes TCP should send before dropping the connection.

Sometimes, between the client and the server, you may have a network component that silently drops the idle connections without sending a response. From the Reactor Netty point of view, in this use case, the remote peer just does not respond. To be able to handle such a use case you may consider configuring SO_KEEPALIVE.

To customize the default settings, you can configure HttpClient as follows:

import io.netty.channel.ChannelOption;
import io.netty.channel.epoll.EpollChannelOption;
//import io.netty.channel.socket.nio.NioChannelOption;
//import jdk.net.ExtendedSocketOptions;
import reactor.netty.http.client.HttpClient;
import java.net.InetSocketAddress;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .bindAddress(() -> new InetSocketAddress("host", 1234))
				          .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) (1)
				          .option(ChannelOption.SO_KEEPALIVE, true)            (2)
				          // The options below are available only when NIO transport (Java 11) is used
				          // on Mac or Linux (Java does not currently support these extended options on Windows)
				          // https://bugs.openjdk.java.net/browse/JDK-8194298
				          //.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPIDLE), 300)
				          //.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPINTERVAL), 60)
				          //.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPCOUNT), 8);
				          // The options below are available only when Epoll transport is used
				          .option(EpollChannelOption.TCP_KEEPIDLE, 300)        (3)
				          .option(EpollChannelOption.TCP_KEEPINTVL, 60)        (4)
				          .option(EpollChannelOption.TCP_KEEPCNT, 8);          (5)

		String response =
				client.get()
				      .uri("https://example.com/")
				      .responseContent()
				      .aggregate()
				      .asString()
				      .block();

		System.out.println("Response " + response);
	}
}
1 Configures the connection establishment timeout to 10 seconds.
2 Enables TCP keepalive. This means that TCP starts sending keepalive probes when a connection is idle for some time.
3 The connection needs to remain idle for 5 minutes before TCP starts sending keepalive probes.
4 Configures the time between individual keepalive probes to 1 minute.
5 Configures the maximum number of TCP keepalive probes to 8.

16.2.3. SSL/TLS Timeout

HttpClient supports the SSL/TLS functionality provided by Netty.

The following list describes the available timeout configuration options:

  • handshakeTimeout - Use this option to configure the SSL handshake timeout (resolution: ms). Default: 10s.

You should consider increasing the SSL handshake timeout when expecting slow network connections.
  • closeNotifyFlushTimeout - Use this option to configure the SSL close_notify flush timeout (resolution: ms). Default: 3s.

  • closeNotifyReadTimeout - Use this option to configure the SSL close_notify read timeout (resolution: ms). Default: 0s.

To customize the default settings, you can configure HttpClient as follows:

import reactor.netty.http.Http11SslContextSpec;
import reactor.netty.http.client.HttpClient;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		Http11SslContextSpec http11SslContextSpec = Http11SslContextSpec.forClient();

		HttpClient client =
				HttpClient.create()
				          .secure(spec -> spec.sslContext(http11SslContextSpec)
				                              .handshakeTimeout(Duration.ofSeconds(30))         (1)
				                              .closeNotifyFlushTimeout(Duration.ofSeconds(10))  (2)
				                              .closeNotifyReadTimeout(Duration.ofSeconds(10))); (3)

		String response =
				client.get()
				      .uri("https://example.com/")
				      .responseContent()
				      .aggregate()
				      .asString()
				      .block();

		System.out.println("Response " + response);
	}
}
1 Configures the SSL handshake timeout to 30 seconds.
2 Configures the SSL close_notify flush timeout to 10 seconds.
3 Configures the SSL close_notify read timeout to 10 seconds.

16.2.4. Proxy Timeout

HttpClient supports the proxy functionality provided by Netty and provides a way to specify the connection establishment timeout. If the connection establishment attempt to the remote peer does not finish within the timeout, the connection establishment attempt fails. Default: 10s.

To customize the default settings, you can configure HttpClient as follows:

import reactor.netty.http.client.HttpClient;
import reactor.netty.transport.ProxyProvider;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .proxy(spec -> spec.type(ProxyProvider.Proxy.HTTP)
				                             .host("proxy")
				                             .port(8080)
				                             .nonProxyHosts("localhost")
				                             .connectTimeoutMillis(20_000)); (1)

		String response =
				client.get()
				      .uri("https://example.com/")
				      .responseContent()
				      .aggregate()
				      .asString()
				      .block();

		System.out.println("Response " + response);
	}
}
1 Configures the connection establishment timeout to 20 seconds.

16.2.5. Host Name Resolution Timeout

By default, the HttpClient uses Netty’s domain name lookup mechanism to resolve a domain name asynchronously.

The following list describes the available timeout configuration options:

  • cacheMaxTimeToLive - The maximum time to live of the cached DNS resource records (resolution: seconds). If the time to live of the DNS resource record returned by the DNS server is greater than this maximum time to live, this resolver ignores the time to live from the DNS server and uses this maximum time to live. Default: Integer.MAX_VALUE.

  • cacheMinTimeToLive - The minimum time to live of the cached DNS resource records (resolution: seconds). If the time to live of the DNS resource record returned by the DNS server is less than this minimum time to live, this resolver ignores the time to live from the DNS server and uses this minimum time to live. Default: 0s.

  • cacheNegativeTimeToLive - The time to live of the cache for the failed DNS queries (resolution: seconds). Default: 0s.

  • queryTimeout - Sets the timeout of each DNS query performed by this resolver (resolution: milliseconds). Default: 5s.

To customize the default settings, you can configure HttpClient as follows:

import reactor.netty.http.client.HttpClient;

import java.time.Duration;

public class Application {

	public static void main(String[] args) {
		HttpClient client =
				HttpClient.create()
				          .resolver(spec -> spec.queryTimeout(Duration.ofMillis(500))); (1)

		String response =
				client.get()
				      .uri("https://example.com/")
				      .responseContent()
				      .aggregate()
				      .asString()
				      .block();

		System.out.println("Response " + response);
	}
}
1 The timeout of each DNS query performed by this resolver will be 500ms.