Broadcasting to Multiple Subscribers with ConnectableFlux
Sometimes, you may want to not defer only some processing to the subscription time of one subscriber, but you might actually want for several of them to rendezvous and then trigger the subscription and data generation.
This is what ConnectableFlux
is made for. Two main patterns are covered in the Flux
API that return a ConnectableFlux
: publish
and replay
.
-
publish
dynamically tries to respect the demand from its various subscribers, in terms of backpressure, by forwarding these requests to the source. Most notably, if any subscriber has a pending demand of0
, publish pauses its requesting to the source. -
replay
buffers data seen through the first subscription, up to configurable limits (in time and buffer size). It replays the data to subsequent subscribers.
A ConnectableFlux
offers additional methods to manage subscriptions downstream
versus subscriptions to the original source. These additional methods include the
following:
-
connect()
can be called manually once you reach enough subscriptions to theFlux
. That triggers the subscription to the upstream source. -
autoConnect(n)
can do the same job automatically oncen
subscriptions have been made. -
refCount(n)
not only automatically tracks incoming subscriptions but also detects when these subscriptions are cancelled. If not enough subscribers are tracked, the source is “disconnected”, causing a new subscription to the source later if additional subscribers appear. -
refCount(int, Duration)
adds a “grace period.” Once the number of tracked subscribers becomes too low, it waits for theDuration
before disconnecting the source, potentially allowing for enough new subscribers to come in and cross the connection threshold again.
Consider the following example:
Flux<Integer> source = Flux.range(1, 3)
.doOnSubscribe(s -> System.out.println("subscribed to source"));
ConnectableFlux<Integer> co = source.publish();
co.subscribe(System.out::println, e -> {}, () -> {});
co.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("done subscribing");
Thread.sleep(500);
System.out.println("will now connect");
co.connect();
The preceding code produces the following output:
done subscribing
will now connect
subscribed to source
1
1
2
2
3
3
The following code uses autoConnect
:
Flux<Integer> source = Flux.range(1, 3)
.doOnSubscribe(s -> System.out.println("subscribed to source"));
Flux<Integer> autoCo = source.publish().autoConnect(2);
autoCo.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("subscribed first");
Thread.sleep(500);
System.out.println("subscribing second");
autoCo.subscribe(System.out::println, e -> {}, () -> {});
The preceding code produces the following output:
subscribed first
subscribing second
subscribed to source
1
1
2
2
3
3