Sinks

In Reactor a sink is a class that allows safe manual triggering of signals in a standalone fashion, creating a Publisher-like structure capable of dealing with multiple Subscriber (with the exception of unicast() flavors).

Before 3.5.0, there was also a set of Processor implementations which has been phased out.

1. Safely Produce from Multiple Threads by Using Sinks.One and Sinks.Many

Default flavors of Sinks exposed by reactor-core ensure that multi-threaded usage is detected and cannot lead to spec violations or undefined behavior from the perspective of downstream subscribers. When using the tryEmit* API, parallel calls fail fast. When using the emit* API, the provided EmissionFailureHandler may allow to retry on contention (eg. busy looping), otherwise the sink will terminate with an error.

This is an improvement over Processor.onNext, which must be synchronized externally or lead to undefined behavior from the perspective of the downstream subscribers.

Processors are a special kind of Publisher that are also a Subscriber. They were originally intended as a possible representation of an intermediate step that could then be shared between Reactive Streams implementations. In Reactor however, such steps are rather represented by operators that are Publisher.

A common mistake when coming across a Processor for the first time is the temptation to directly call the exposed onNext, onComplete and onError methods from the Subscriber interface.

Such manual calls should be made with care, especially regarding external synchronization of calls with respect to the Reactive Streams specification. Processors are actually probably marginally useful, unless one comes across a Reactive Streams based API that requires a Subscriber to be passed, rather than exposing a Publisher.

Sinks are usually a better alternative.

The Sinks builder provide a guided API to the main supported producer types. You will recognize some of the behavior found in Flux such as onBackpressureBuffer.

Sinks.Many<Integer> replaySink = Sinks.many().replay().all();

Multiple producer threads may concurrently generate data on the sink by doing the following:

//thread1
replaySink.emitNext(1, EmitFailureHandler.FAIL_FAST);

//thread2, later
replaySink.emitNext(2, EmitFailureHandler.FAIL_FAST);

//thread3, concurrently with thread 2
//would retry emitting for 2 seconds and fail with EmissionException if unsuccessful
replaySink.emitNext(3, EmitFailureHandler.busyLooping(Duration.ofSeconds(2)));

//thread3, concurrently with thread 2
//would return FAIL_NON_SERIALIZED
EmitResult result = replaySink.tryEmitNext(4);

When using the busyLooping, be aware that returned instances of EmitFailureHandler can not be reused, e.g., it should be one call of busyLooping per emitNext. Also, it is recommended to use a timeout above 100ms since smaller values don’t make practical sense.

The Sinks.Many can be presented to downstream consumers as a Flux, like in the below example:

Flux<Integer> fluxView = replaySink.asFlux();
fluxView
	.takeWhile(i -> i < 10)
	.log()
	.blockLast();

Similarly, the Sinks.Empty and Sinks.One flavors can be viewed as a Mono with the asMono() method.

The Sinks categories are:

  1. many().multicast(): a sink that will transmit only newly pushed data to its subscribers, honoring their backpressure (newly pushed as in "after the subscriber’s subscription").

  2. many().unicast(): same as above, with the twist that data pushed before the first subscriber registers is buffered.

  3. many().replay(): a sink that will replay a specified history size of pushed data to new subscribers then continue pushing new data live.

  4. one(): a sink that will play a single element to its subscribers

  5. empty(): a sink that will play a terminal signal only to its subscribers (error or complete), but can still be viewed as a Mono<T> (notice the generic type <T>).

2. Overview of Available Sinks

2.1. Sinks.many().unicast().onBackpressureBuffer(args?)

A unicast Sinks.Many can deal with backpressure by using an internal buffer. The trade-off is that it can have at most one Subscriber.

The basic unicast sink is created via Sinks.many().unicast().onBackpressureBuffer(). But there are a few additional unicast static factory methods in Sinks.many().unicast() allowing finer tuning.

For instance, by default, it is unbounded: if you push any amount of data through it while its Subscriber has not yet requested data, it buffers all of the data. You can change this by providing a custom Queue implementation for the internal buffering in the Sinks.many().unicast().onBackpressureBuffer(Queue) factory method. If that queue is bounded, the sink could reject the push of a value when the buffer is full and not enough requests from downstream have been received.

2.2. Sinks.many().multicast().onBackpressureBuffer(args?)

A multicast Sinks.Many can emit to several subscribers while honoring backpressure for each of its subscribers. Subscribers receive only the signals pushed through the sink after they have subscribed.

The basic multicast sink is created via Sinks.many().multicast().onBackpressureBuffer().

By default, if all of its subscribers are cancelled (which basically means they have all un-subscribed), it clears its internal buffer and stops accepting new subscribers. You can tune this by using the autoCancel parameter in the multicast static factory methods under Sinks.many().multicast().

2.3. Sinks.many().multicast().directAllOrNothing()

A multicast Sinks.Many with a simplistic handling of backpressure: if any of the subscribers is too slow (has zero demand), the onNext is dropped for all subscribers.

However, the slow subscribers are not terminated and once the slow subscribers have started requesting again, all will resume receiving elements pushed from there on.

Once the Sinks.Many has terminated (usually through its emitError(Throwable) or emitComplete() methods being called), it lets more subscribers subscribe but replays the termination signal to them immediately.

2.4. Sinks.many().multicast().directBestEffort()

A multicast Sinks.Many with a best effort handling of backpressure: if a subscriber is too slow (has zero demand), the onNext is dropped for this slow subscriber only.

However, the slow subscribers are not terminated and once they have started requesting again they will resume receiving newly pushed elements.

Once the Sinks.Many has terminated (usually through its emitError(Throwable) or emitComplete() methods being called), it lets more subscribers subscribe but replays the termination signal to them immediately.

2.5. Sinks.many().replay()

A replay Sinks.Many caches emitted elements and replays them to late subscribers.

It can be created in multiple configurations:

  • Caching a limited history (Sinks.many().replay().limit(int)) or an unbounded history (Sinks.many().replay().all()).

  • Caching a time-based replay window (Sinks.many().replay().limit(Duration)).

  • Caching a combination of history size and time window (Sinks.many().replay().limit(int, Duration)).

Additional overloads for fine tuning of the above can also be found under Sinks.many().replay(), as well as a variant that allows caching of a single element (latest() and latestOrDefault(T)).

2.6. Sinks.unsafe().many()

Advanced users and operators builders might want to consider using Sinks.unsafe().many() which will provide the same Sinks.Many factories without the extra producer thread safety. As a result there will be less overhead per sink, since thread-safe sinks have to detect multi-threaded access.

Library developers should not expose unsafe sinks but can use them internally in a controlled calling environment where they can ensure external synchronization of the calls that lead to onNext, onComplete and onError signals, in respect of the Reactive Streams specification.

2.7. Sinks.one()

This method directly construct a simple instance of Sinks.One<T>. This flavor of Sinks is viewable as a Mono (through its asMono() view method), and has slightly different emit methods to better convey this Mono-like semantics:

  • emitValue(T value) generates an onNext(value) signal and - in most implementations - will also trigger an implicit onComplete()

  • emitEmpty() generates an isolated onComplete() signal, intended as generating the equivalent of an empty Mono

  • emitError(Throwable t) generates an onError(t) signal

Sinks.one() accepts one call of any of these methods, effectively generating a Mono that either completed with a value, completed empty or failed.

2.8. Sinks.empty()

This method directly constructs a simple instance of Sinks.Empty<T>. This flavor of Sinks is like Sinks.One<T>, except it doesn’t offer the emitValue method.

As a result, it can only generates a Mono that completes empty or fails.

The sink is still typed with a generic <T> despite being unable to trigger an onNext, because it allows easy composition and inclusion in chains of operators that require a specific type.