Sinks
- 1. Safely Produce from Multiple Threads by Using
Sinks.One
andSinks.Many
- 2. Overview of Available Sinks
- 2.1. Sinks.many().unicast().onBackpressureBuffer(args?)
- 2.2. Sinks.many().multicast().onBackpressureBuffer(args?)
- 2.3. Sinks.many().multicast().directAllOrNothing()
- 2.4. Sinks.many().multicast().directBestEffort()
- 2.5. Sinks.many().replay()
- 2.6. Sinks.unsafe().many()
- 2.7. Sinks.one()
- 2.8. Sinks.empty()
In Reactor a sink is a class that allows safe manual triggering of signals in a standalone fashion, creating a Publisher
-like structure capable of dealing with multiple Subscriber
(with the exception of unicast()
flavors).
Before 3.5.0
, there was also a set of Processor
implementations which has been phased out.
1. Safely Produce from Multiple Threads by Using Sinks.One
and Sinks.Many
Default flavors of Sinks
exposed by reactor-core ensure that multi-threaded usage is detected
and cannot lead to spec violations or undefined behavior from the perspective of downstream
subscribers. When using the tryEmit*
API, parallel calls fail fast. When using the emit*
API, the provided EmissionFailureHandler
may allow to retry on contention (eg. busy looping),
otherwise the sink will terminate with an error.
This is an improvement over Processor.onNext
, which must be synchronized externally or
lead to undefined behavior from the perspective of the downstream subscribers.
Processors are a special kind of A common mistake when coming across a Such manual calls should be made with care, especially regarding external synchronization
of calls with respect to the Reactive Streams specification.
Processors are actually probably marginally useful, unless one comes across a Reactive Streams
based API that requires a Sinks are usually a better alternative. |
The Sinks
builder provide a guided API to the main supported producer types.
You will recognize some of the behavior found in Flux
such as onBackpressureBuffer
.
Sinks.Many<Integer> replaySink = Sinks.many().replay().all();
Multiple producer threads may concurrently generate data on the sink by doing the following:
//thread1
replaySink.emitNext(1, EmitFailureHandler.FAIL_FAST);
//thread2, later
replaySink.emitNext(2, EmitFailureHandler.FAIL_FAST);
//thread3, concurrently with thread 2
//would retry emitting for 2 seconds and fail with EmissionException if unsuccessful
replaySink.emitNext(3, EmitFailureHandler.busyLooping(Duration.ofSeconds(2)));
//thread3, concurrently with thread 2
//would return FAIL_NON_SERIALIZED
EmitResult result = replaySink.tryEmitNext(4);
When using the |
The Sinks.Many
can be presented to downstream consumers as a Flux
, like in the below example:
Flux<Integer> fluxView = replaySink.asFlux();
fluxView
.takeWhile(i -> i < 10)
.log()
.blockLast();
Similarly, the Sinks.Empty
and Sinks.One
flavors can be viewed as a Mono
with the asMono()
method.
The Sinks
categories are:
-
many().multicast()
: a sink that will transmit only newly pushed data to its subscribers, honoring their backpressure (newly pushed as in "after the subscriber’s subscription"). -
many().unicast()
: same as above, with the twist that data pushed before the first subscriber registers is buffered. -
many().replay()
: a sink that will replay a specified history size of pushed data to new subscribers then continue pushing new data live. -
one()
: a sink that will play a single element to its subscribers -
empty()
: a sink that will play a terminal signal only to its subscribers (error or complete), but can still be viewed as aMono<T>
(notice the generic type<T>
).
2. Overview of Available Sinks
2.1. Sinks.many().unicast().onBackpressureBuffer(args?)
A unicast Sinks.Many
can deal with backpressure by using an internal buffer.
The trade-off is that it can have at most one Subscriber
.
The basic unicast sink is created via Sinks.many().unicast().onBackpressureBuffer()
.
But there are a few additional unicast
static factory methods in Sinks.many().unicast()
allowing finer tuning.
For instance, by default, it is unbounded: if you push any amount of data through it while
its Subscriber
has not yet requested data, it buffers all of the data.
You can change this by providing a custom Queue
implementation for the internal
buffering in the Sinks.many().unicast().onBackpressureBuffer(Queue)
factory method.
If that queue is bounded, the sink could reject the push of a value when the buffer
is full and not enough requests from downstream have been received.
2.2. Sinks.many().multicast().onBackpressureBuffer(args?)
A multicast Sinks.Many
can emit to several subscribers while honoring backpressure for each of its subscribers.
Subscribers receive only the signals pushed through the sink after they have subscribed.
The basic multicast sink is created via Sinks.many().multicast().onBackpressureBuffer()
.
By default, if all of its subscribers are cancelled (which basically means they have all
un-subscribed), it clears its internal buffer and stops accepting new subscribers.
You can tune this by using the autoCancel
parameter in the multicast
static factory methods
under Sinks.many().multicast()
.
2.3. Sinks.many().multicast().directAllOrNothing()
A multicast Sinks.Many
with a simplistic handling of backpressure: if any of the subscribers
is too slow (has zero demand), the onNext
is dropped for all subscribers.
However, the slow subscribers are not terminated and once the slow subscribers have started requesting again, all will resume receiving elements pushed from there on.
Once the Sinks.Many
has terminated (usually through its emitError(Throwable)
or
emitComplete()
methods being called), it lets more subscribers subscribe but replays the
termination signal to them immediately.
2.4. Sinks.many().multicast().directBestEffort()
A multicast Sinks.Many
with a best effort handling of backpressure: if a subscriber
is too slow (has zero demand), the onNext
is dropped for this slow subscriber only.
However, the slow subscribers are not terminated and once they have started requesting again they will resume receiving newly pushed elements.
Once the Sinks.Many
has terminated (usually through its emitError(Throwable)
or
emitComplete()
methods being called), it lets more subscribers subscribe but replays the
termination signal to them immediately.
2.5. Sinks.many().replay()
A replay Sinks.Many
caches emitted elements and replays them to late subscribers.
It can be created in multiple configurations:
-
Caching a limited history (
Sinks.many().replay().limit(int)
) or an unbounded history (Sinks.many().replay().all()
). -
Caching a time-based replay window (
Sinks.many().replay().limit(Duration)
). -
Caching a combination of history size and time window (
Sinks.many().replay().limit(int, Duration)
).
Additional overloads for fine tuning of the above can also be found under Sinks.many().replay()
, as well
as a variant that allows caching of a single element (latest()
and latestOrDefault(T)
).
2.6. Sinks.unsafe().many()
Advanced users and operators builders might want to consider using Sinks.unsafe().many()
which will provide the same Sinks.Many
factories without the extra producer thread safety.
As a result there will be less overhead per sink, since thread-safe sinks have to detect multi-threaded access.
Library developers should not expose unsafe sinks but can use them internally in a controlled
calling environment where they can ensure external synchronization of the calls that lead to
onNext
, onComplete
and onError
signals, in respect of the Reactive Streams specification.
2.7. Sinks.one()
This method directly construct a simple instance of Sinks.One<T>
.
This flavor of Sinks
is viewable as a Mono
(through its asMono()
view method), and
has slightly different emit
methods to better convey this Mono-like semantics:
-
emitValue(T value)
generates anonNext(value)
signal and - in most implementations - will also trigger an implicitonComplete()
-
emitEmpty()
generates an isolatedonComplete()
signal, intended as generating the equivalent of an emptyMono
-
emitError(Throwable t)
generates anonError(t)
signal
Sinks.one()
accepts one call of any of these methods, effectively generating a Mono
that either completed with a value, completed empty or failed.
2.8. Sinks.empty()
This method directly constructs a simple instance of Sinks.Empty<T>
.
This flavor of Sinks
is like Sinks.One<T>
, except it doesn’t offer the emitValue
method.
As a result, it can only generates a Mono
that completes empty or fails.
The sink is still typed with a generic <T>
despite being unable to trigger an onNext
,
because it allows easy composition and inclusion in chains of operators that require a specific type.