Package reactor.pool

Class PoolBuilder<T,CONF extends PoolConfig<T>>

java.lang.Object
reactor.pool.PoolBuilder<T,CONF>
Type Parameters:
T - the type of elements in the produced Pool
CONF - the PoolConfig flavor this builder will provide to the created Pool

public class PoolBuilder<T,CONF extends PoolConfig<T>> extends Object
A builder for Pool implementations, which tuning methods map to a PoolConfig subclass, CONF.
Author:
Simon Baslé
  • Method Details

    • from

      public static <T> PoolBuilder<T,PoolConfig<T>> from(Publisher<? extends T> allocator)
      Start building a Pool by describing how new objects are to be asynchronously allocated. Note that the Publisher allocator is subscribed to each time a new resource is needed and will be cancelled past the first received element (unless it is already a Mono).

      Adapting from blocking code is only acceptable if ensuring the work is offset on another Scheduler (eg. a constructor materialized via Mono.fromCallable(Callable) should be augmented with Mono.subscribeOn(Scheduler)).

      Type Parameters:
      T - the type of resource created and recycled by the Pool
      Parameters:
      allocator - the asynchronous creator of poolable resources, subscribed each time a new resource needs to be created.
      Returns:
      a builder of Pool
    • acquisitionScheduler

      public PoolBuilder<T,CONF> acquisitionScheduler(Scheduler acquisitionScheduler)
      Provide a Scheduler that can optionally be used by a Pool to deliver its resources in a more deterministic (albeit potentially less efficient) way, thread-wise. Other implementations MAY completely ignore this parameter.

      Defaults to Schedulers.immediate().

      Parameters:
      acquisitionScheduler - the Scheduler on which to deliver acquired resources
      Returns:
      this Pool builder
    • pendingAcquireTimer

      public PoolBuilder<T,CONF> pendingAcquireTimer(BiFunction<Runnable,Duration,Disposable> pendingAcquireTimer)
      Define how timeouts are scheduled when a Pool.acquire(Duration) call is made and the acquisition is pending. i.e. there is no idle resource and no new resource can be created currently, so a timeout is scheduled using the provided function.

      By default, the Schedulers.parallel() scheduler is used.

      Parameters:
      pendingAcquireTimer - the function to apply when scheduling timers for acquisitions that are added to the pending queue.
      Returns:
      this Pool builder
    • allocationStrategy

      public PoolBuilder<T,CONF> allocationStrategy(AllocationStrategy allocationStrategy)
      Limits in how many resources can be allocated and managed by the Pool are driven by the provided AllocationStrategy. This is a customization escape hatch that replaces the last configured strategy, but most cases should be covered by the sizeBetween(int, int) or sizeUnbounded() pre-made strategies.

      Without a call to any of these 3 methods, the builder defaults to an unbounded creation of resources, although it is not a recommended one.

      Parameters:
      allocationStrategy - the AllocationStrategy to use
      Returns:
      this Pool builder
      See Also:
    • destroyHandler

      public PoolBuilder<T,CONF> destroyHandler(Function<T,? extends Publisher<Void>> destroyHandler)
      Provide a handler that will derive a destroy Publisher whenever a resource isn't fit for usage anymore (either through eviction, manual invalidation, or because something went wrong with it). The destroy procedure is applied asynchronously and errors are swallowed.

      Defaults to recognizing Disposable and Closeable elements and disposing them.

      Parameters:
      destroyHandler - the Function supplying the state-resetting Publisher
      Returns:
      this Pool builder
    • evictionIdle

      public PoolBuilder<T,CONF> evictionIdle(Duration maxIdleTime)
      Set the eviction predicate to cause eviction (ie returns true) of resources that have been idle (ie released and available in the Pool) for more than the ttl Duration (inclusive). Such a predicate could be used to evict too idle objects when next encountered by an Pool.acquire().

      This replaces any evictionPredicate(BiPredicate) previously set. If you need to combine idle predicate with more custom logic, prefer directly providing a BiPredicate. Note that the idle predicate from this method is written as (poolable, meta) -> meta.idleTime() >= maxIdleTime.toMillis().

      Parameters:
      maxIdleTime - the Duration after which an object should not be passed to a borrower, but destroyed (resolution: ms)
      Returns:
      this Pool builder
      See Also:
    • evictionPredicate

      public PoolBuilder<T,CONF> evictionPredicate(BiPredicate<T,PooledRefMetadata> evictionPredicate)
      Provide an eviction BiPredicate that allows to decide if a resource is fit for being placed in the Pool. This can happen whenever a resource is released back to the Pool (after it was processed by the releaseHandler(Function)), but also when being acquired from the pool (triggering a second pass if the object is found to be unfit, eg. it has been idle for too long). Finally, some pool implementations MAY implement a reaper thread mechanism that detect idle resources through this predicate and destroy them.

      Defaults to never evicting (a BiPredicate that always returns false).

      In case some asynchronous verification of the health of a resource is needed, one possible approach is to rely on Pool.acquire() and PooledRef.invalidate() instead of the evictionPredicate: performing the check in a flatMap after having acquired the resource, then further chain an invalidate call if the resource is not healthy. The acquisition should then be retried, and a good way of doing so is by continuing the invalidate call with a Mono.error(Throwable) with a custom exception which would trigger an outer Mono.retryWhen(Retry).

      Parameters:
      evictionPredicate - a Predicate that returns true if the resource is unfit for the pool and should be destroyed, false if it should be put back into the pool
      Returns:
      this Pool builder
      See Also:
    • evictInBackgroundDisabled

      public PoolBuilder<T,CONF> evictInBackgroundDisabled()
      Disable background eviction entirely, so that evictionPredicate is only checked upon acquire and release (ie only when there is pool activity).
      Returns:
      this Pool builder
      See Also:
    • evictInBackground

      public PoolBuilder<T,CONF> evictInBackground(Duration evictionInterval)
      Enable background eviction so that evictionPredicate is regularly applied to elements that are idle in the pool when there is no pool activity (i.e. acquire and release).

      Providing an evictionInterval of zero is similar to disabling background eviction.

      Background eviction support is optional: although all vanilla reactor-pool implementations DO support it, other implementations MAY ignore it. The background eviction process can be implemented in a best effort fashion, backing off if it detects any pool activity.

      Returns:
      this Pool builder
      See Also:
    • evictInBackground

      public PoolBuilder<T,CONF> evictInBackground(Duration evictionInterval, Scheduler reaperTaskScheduler)
      Enable background eviction so that evictionPredicate is regularly applied to elements that are idle in the pool when there is no pool activity (i.e. acquire and release).

      Providing an evictionInterval of zero is similar to disabling background eviction.

      Background eviction support is optional: although all vanilla reactor-pool implementations DO support it, other implementations MAY ignore it. The background eviction process can be implemented in a best effort fashion, backing off if it detects any pool activity.

      Returns:
      this Pool builder
      See Also:
    • maxLifeTime

      public PoolBuilder<T,CONF> maxLifeTime(Duration maxLifeTime)
      Set the maximum lifetime for pooled resources. Resources that have been alive for longer than this duration will be evicted on acquire, release, or during background eviction.

      When configured, this is composed with any evictionPredicate(BiPredicate) using OR logic — a resource is evicted if either condition triggers.

      Defaults to Duration.ZERO (disabled).

      Parameters:
      maxLifeTime - maximum resource lifetime (resolution: ms), or Duration.ZERO to disable
      Returns:
      this Pool builder
      Since:
      1.2.4
      See Also:
    • maxLifeTimeVariance

      public PoolBuilder<T,CONF> maxLifeTimeVariance(double variancePercent)
      Set a variance percentage for maxLifeTime(Duration), introducing per-resource jitter to prevent simultaneous expiry of resources created around the same time.

      The effective max lifetime for each resource will be in the range: [maxLifeTime * (1 - variance/100), maxLifeTime], spreading resource renewal over a window instead of a single point in time.

      For example, with a maxLifeTime of 60 seconds and a variance of 2.5%, resources would expire between 58.5s and 60s.

      Only meaningful when maxLifeTime(Duration) is configured (non-zero). Defaults to 0 (no variance — all resources expire at exactly maxLifeTime).

      Parameters:
      variancePercent - percentage of maxLifeTime to use as the variance range (0–100)
      Returns:
      this Pool builder
      Since:
      1.2.4
      See Also:
    • maxPendingAcquire

      public PoolBuilder<T,CONF> maxPendingAcquire(int maxPending)
      Set the maximum number of subscribed Pool.acquire() Monos that can be in a pending state (ie they wait for a resource to be released, as no idle resource was immediately available, and the pool add already allocated the maximum permitted amount). Set to 0 to immediately fail all such acquisition attempts. Set to -1 to deactivate (or prefer using the more explicit maxPendingAcquireUnbounded()).

      Default to -1.

      Parameters:
      maxPending - the maximum number of registered acquire monos to keep in a pending queue
      Returns:
      a builder of Pool with a maximum pending queue size.
    • maxPendingAcquireUnbounded

      public PoolBuilder<T,CONF> maxPendingAcquireUnbounded()
      Uncap the number of subscribed Pool.acquire() Monos that can be in a pending state (ie they wait for a resource to be released, as no idle resource was immediately available, and the pool add already allocated the maximum permitted amount).

      This is the default.

      Returns:
      a builder of Pool with no maximum pending queue size
    • clock

      public PoolBuilder<T,CONF> clock(Clock clock)
      Set the Clock to use for timestamps, notably marking the times at which a resource is allocated, released and acquired. The Clock.millis() method is used for this purpose, which produces timestamps and durations in milliseconds for eg. the evictionPredicate(BiPredicate).

      These durations can also be exported as metrics, through the metricsRecorder, but having a separate Clock separates the concerns and allows to disregard metrics without crippling eviction.

      Parameters:
      clock - the Clock to use to measure timestamps and durations
      Returns:
      this Pool builder
    • metricsRecorder

      public PoolBuilder<T,CONF> metricsRecorder(PoolMetricsRecorder recorder)
      Set up the optional PoolMetricsRecorder for Pool to use for instrumentation purposes.
      Parameters:
      recorder - the PoolMetricsRecorder
      Returns:
      this Pool builder
    • releaseHandler

      public PoolBuilder<T,CONF> releaseHandler(Function<T,? extends Publisher<Void>> releaseHandler)
      Provide a handler that will derive a reset Publisher whenever a resource is released. The reset procedure is applied asynchronously before vetting the object through evictionPredicate. If the reset Publisher couldn't put the resource back in a usable state, it will be destroyed.

      Defaults to not resetting anything.

      Parameters:
      releaseHandler - the Function supplying the state-resetting Publisher
      Returns:
      this Pool builder
    • sizeBetween

      public PoolBuilder<T,CONF> sizeBetween(int min, int max)
      Replace the AllocationStrategy with one that lets the Pool allocate between min and max resources. When acquiring and there is no available resource, the pool should strive to warm up enough resources to reach min live resources before serving the acquire with (one of) the newly created resource(s). At the same time it MUST NOT allocate any resource if that would bring the number of live resources over the max, rejecting further allocations until some resources have been released.

      Pre-allocation of warmed-up resources, if any, will be performed sequentially by subscribing to the allocator one at a time. The process waits for a resource to be created before subscribing again to the allocator. This sequence continues until all pre-allocated resources have been successfully created.

      Parameters:
      min - the minimum number of live resources to keep in the pool (can be best effort)
      max - the maximum number of live resources to keep in the pool. use Integer.MAX_VALUE when you only need a minimum and no upper bound
      Returns:
      this Pool builder
      See Also:
    • sizeBetween

      public PoolBuilder<T,CONF> sizeBetween(int min, int max, int warmupParallelism)
      Replace the AllocationStrategy with one that lets the Pool allocate between min and max resources. When acquiring and there is no available resource, the pool should strive to warm up enough resources to reach min live resources before serving the acquire with (one of) the newly created resource(s). At the same time it MUST NOT allocate any resource if that would bring the number of live resources over the max, rejecting further allocations until some resources have been released.
      Parameters:
      min - the minimum number of live resources to keep in the pool (can be best effort)
      max - the maximum number of live resources to keep in the pool. use Integer.MAX_VALUE when you only need a minimum and no upper bound
      warmupParallelism - Specifies the concurrency level used when the allocator is subscribed to during the warmup phase, if any. During warmup, resources that can be pre-allocated will be created eagerly, but at most warmupParallelism resources are subscribed to at the same time. A warmupParallelism of 1 means that pre-allocation of resources is achieved by sequentially subscribing to the allocator, waiting for a resource to be created before subscribing a next time to the allocator, and so on until the last allocation completes.
      Returns:
      this Pool builder
      Since:
      1.0.1
      See Also:
    • sizeUnbounded

      public PoolBuilder<T,CONF> sizeUnbounded()
      Replace the AllocationStrategy with one that lets the Pool allocate new resources when no idle resource is available, without limit.

      Note this is the default, if no previous call to allocationStrategy(AllocationStrategy) or sizeBetween(int, int) has been made on this PoolBuilder.

      Returns:
      this Pool builder
      See Also:
    • idleResourceReuseLruOrder

      public PoolBuilder<T,CONF> idleResourceReuseLruOrder()
      Configure the pool so that if there are idle resources (ie pool is under-utilized), the next Pool.acquire() will get the Least Recently Used resource (LRU, ie. the resource that was released first among the current idle resources).
      Returns:
      this Pool builder
    • idleResourceReuseMruOrder

      public PoolBuilder<T,CONF> idleResourceReuseMruOrder()
      Configure the pool so that if there are idle resources (ie pool is under-utilized), the next Pool.acquire() will get the Most Recently Used resource (MRU, ie. the resource that was released last among the current idle resources).
      Returns:
      this Pool builder
    • idleResourceReuseOrder

      public PoolBuilder<T,CONF> idleResourceReuseOrder(boolean isLru)
      Configure the order in which idle resources are used when the next Pool.acquire() is performed (while the pool is under-utilized). Allows to chose between the Least Recently Used order when true (LRU, ie. the resource that was released first among the current idle resources, the default) and Most Recently Used order (MRU, ie. the resource that was released last among the current idle resources).
      Parameters:
      isLru - true for LRU (the default) or false for MRU
      Returns:
      this Pool builder
      See Also:
    • extraConfiguration

      public <CONF2 extends PoolConfig<T>> PoolBuilder<T,CONF2> extraConfiguration(Function<? super CONF,CONF2> configModifier)
      Add implementation-specific configuration, changing the type of PoolConfig passed to the Pool factory in build(Function).
      Type Parameters:
      CONF2 - new type for the configuration
      Parameters:
      configModifier - Function to transform the type of PoolConfig created by this builder for the benefit of the pool factory, allowing for custom implementations with custom configurations
      Returns:
      a new PoolBuilder that now produces a different type of PoolConfig
    • buildPool

      public InstrumentedPool<T> buildPool()
      Construct a default reactor pool with the builder's configuration.
      Returns:
      an InstrumentedPool
    • buildPoolAndDecorateWith

      public <P extends InstrumentedPool<T>> P buildPoolAndDecorateWith(Function<? super InstrumentedPool<T>,P> decorator)
      Construct a default reactor pool with the builder's configuration, then wrap it into a decorator implementation using the provided Function.
      Type Parameters:
      P - the type of decorated pool, must extend InstrumentedPool (with same type of resource)
      Parameters:
      decorator - a decorator Function returning a decorated version of the InstrumentedPool
      Returns:
      the built-then-decorated pool
    • build

      public <POOL extends Pool<T>> POOL build(Function<? super CONF,POOL> poolFactory)
      Build a custom flavor of Pool, given a Pool factory Function that is provided with a PoolConfig copy of this builder's configuration.
      Parameters:
      poolFactory - the factory of pool implementation
      Returns:
      the Pool