Broadcasting to Multiple Subscribers with ConnectableFlux

Sometimes, you may want to not defer only some processing to the subscription time of one subscriber, but you might actually want for several of them to rendezvous and then trigger the subscription and data generation.

This is what ConnectableFlux is made for. Two main patterns are covered in the Flux API that return a ConnectableFlux: publish and replay.

  • publish dynamically tries to respect the demand from its various subscribers, in terms of backpressure, by forwarding these requests to the source. Most notably, if any subscriber has a pending demand of 0, publish pauses its requesting to the source.

  • replay buffers data seen through the first subscription, up to configurable limits (in time and buffer size). It replays the data to subsequent subscribers.

A ConnectableFlux offers additional methods to manage subscriptions downstream versus subscriptions to the original source. These additional methods include the following:

  • connect() can be called manually once you reach enough subscriptions to the Flux. That triggers the subscription to the upstream source.

  • autoConnect(n) can do the same job automatically once n subscriptions have been made.

  • refCount(n) not only automatically tracks incoming subscriptions but also detects when these subscriptions are cancelled. If not enough subscribers are tracked, the source is “disconnected”, causing a new subscription to the source later if additional subscribers appear.

  • refCount(int, Duration) adds a “grace period.” Once the number of tracked subscribers becomes too low, it waits for the Duration before disconnecting the source, potentially allowing for enough new subscribers to come in and cross the connection threshold again.

Consider the following example:

Flux<Integer> source = Flux.range(1, 3)
                           .doOnSubscribe(s -> System.out.println("subscribed to source"));

ConnectableFlux<Integer> co = source.publish();

co.subscribe(System.out::println, e -> {}, () -> {});
co.subscribe(System.out::println, e -> {}, () -> {});

System.out.println("done subscribing");
Thread.sleep(500);
System.out.println("will now connect");

co.connect();

The preceding code produces the following output:

done subscribing
will now connect
subscribed to source
1
1
2
2
3
3

The following code uses autoConnect:

Flux<Integer> source = Flux.range(1, 3)
                           .doOnSubscribe(s -> System.out.println("subscribed to source"));

Flux<Integer> autoCo = source.publish().autoConnect(2);

autoCo.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("subscribed first");
Thread.sleep(500);
System.out.println("subscribing second");
autoCo.subscribe(System.out::println, e -> {}, () -> {});

The preceding code produces the following output:

subscribed first
subscribing second
subscribed to source
1
1
2
2
3
3