Three Sorts of Batching

When you have lots of elements and you want to separate them into batches, you have three broad solutions in Reactor: grouping, windowing, and buffering. These three are conceptually close, because they redistribute a Flux<T> into an aggregate. Grouping and windowing create a Flux<Flux<T>>, while buffering aggregates into a Collection<T>.

1. Grouping with Flux<GroupedFlux<T>>

Grouping is the act of splitting the source Flux<T> into multiple batches, each of which matches a key.

The associated operator is groupBy.

Each group is represented as a GroupedFlux<T>, which lets you retrieve the key by calling its key() method.

There is no necessary continuity in the content of the groups. Once a source element produces a new key, the group for this key is opened and elements that match the key end up in the group (several groups could be open at the same time).

This means that groups:

  1. Are always disjoint (a source element belongs to one and only one group).

  2. Can contain elements from different places in the original sequence.

  3. Are never empty.

The following example groups values by whether they are even or odd:

StepVerifier.create(
	Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
		.groupBy(i -> i % 2 == 0 ? "even" : "odd")
		.concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
				.map(String::valueOf) //map to string
				.startWith(g.key())) //start with the group's key
	)
	.expectNext("odd", "1", "3", "5", "11", "13")
	.expectNext("even", "2", "4", "6", "12")
	.verifyComplete();
Grouping is best suited for when you have a medium to low number of groups. The groups must also imperatively be consumed (such as by a flatMap) so that groupBy continues fetching data from upstream and feeding more groups. Sometimes, these two constraints multiply and lead to hangs, such as when you have a high cardinality and the concurrency of the flatMap consuming the groups is too low.

To better understand the risks of hangs when combining groupBy with inappropriate operators, let’s consider an example.

The following snippet groups strings by their 1st character:

 public static Flux<String> createGroupedFlux() {
        List<String> data = List.of("alpha", "air", "aim", "beta", "cat", "ball", "apple", "bat", "dog", "ace");
        return Flux.fromIterable(data)
                .groupBy(d -> d.charAt(0), 5)
                .concatMap(g -> g.map(String::valueOf)
                        .startWith(String.valueOf(g.key()))
                        .map(o -> {
                            System.out.println(o);
                            return o;
                        })
                );
    }

    @Test
    public void testGroupBy() {
        StepVerifier.create(createGroupedFlux())
                .expectNext("a", "alpha", "air", "aim", "apple", "ace")
                .expectNext("b", "beta", "ball", "bat")
                .expectNext("c", "cat", "d", "dog")
                .verifyComplete();
    }

In the above:

  • The cardinality of groups is 4 ("a", "b", "c", and "d" being the group keys).

  • The concurrency of concatMap is 1.

  • The buffer size of groupBy is 5 (since we defined prefetch as 5; by default it’s 256),

a alpha air aim apple

The test gets stuck after printing these elements. Let’s consider what happened.

  1. Initially, groupBy requests 5 elements. They end up in a buffer: "alpha", "air", "aim", "beta", "cat".

  2. concatMap has concurrency of 1. Therefore, the group with key "a" is the only group that gets subscribed. Out of the initial items, "alpha", "air", "aim" are consumed by concatMap and "beta", "cat" remain in the buffer.

  3. Next, groupBy requests additional 3 items (2 items are already buffered). The buffer now contains "beta", "cat", "ball", "apple", "bat". Out of these, "apple" is consumed and the rest remain in the buffer.

  4. Next, groupBy requests for additional 1 item (4 spaces are already occupied in buffer). The buffered items are "beta", "cat", "ball", "bat","dog".

  5. Now, nothing in the buffer belongs to group "a", hence no more consumption happens by concatMap and the first Flux remains not completed. groupBy is unable to request more data from the publisher since its buffer size is full. The publisher faces backpressure and is not able to publish the remaining items. This results in the deadlock.

In the same example, if the data was in a slightly different order, for example:

"alpha", "air", "aim", "beta", "cat", "ball", "apple", "dog", "ace", "bat"

The same test would pass successfully: the same concatMap would be able to receive a complete signal, complete one group, then subscribe to next group and so on. Hence, when the pattern of data published is arbitrary, groupBy is likely to face a deadlock when the consumption pattern doesn’t match the capacity of the groupBy buffer.

2. Windowing with Flux<Flux<T>>

Windowing is the act of splitting the source Flux<T> into windows, by criteria of size, time, boundary-defining predicates, or boundary-defining Publisher.

The associated operators are window, windowTimeout, windowUntil, windowWhile, and windowWhen.

Contrary to groupBy, which randomly overlaps according to incoming keys, windows are (most of the time) opened sequentially.

Some variants can still overlap, though. For instance, in window(int maxSize, int skip) the maxSize parameter is the number of elements after which a window closes, and the skip parameter is the number of elements in the source after which a new window is opened. So if maxSize > skip, a new window opens before the previous one closes and the two windows overlap.

The following example shows overlapping windows:

StepVerifier.create(
	Flux.range(1, 10)
		.window(5, 3) //overlapping windows
		.concatMap(g -> g.defaultIfEmpty(-1)) //show empty windows as -1
	)
		.expectNext(1, 2, 3, 4, 5)
		.expectNext(4, 5, 6, 7, 8)
		.expectNext(7, 8, 9, 10)
		.expectNext(10)
		.verifyComplete();
With the reverse configuration (maxSize < skip), some elements from the source are dropped and are not part of any window.

In the case of predicate-based windowing through windowUntil and windowWhile, having subsequent source elements that do not match the predicate can also lead to empty windows, as demonstrated in the following example:

StepVerifier.create(
	Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
		.windowWhile(i -> i % 2 == 0)
		.concatMap(g -> g.defaultIfEmpty(-1))
	)
		.expectNext(-1, -1, -1) //respectively triggered by odd 1 3 5
		.expectNext(2, 4, 6) // triggered by 11
		.expectNext(12) // triggered by 13
		// however, no empty completion window is emitted (would contain extra matching elements)
		.verifyComplete();

3. Buffering with Flux<List<T>>

Buffering is similar to windowing, with the following twist: Instead of emitting windows (each of which is each a Flux<T>), it emits buffers (which are Collection<T> — by default, List<T>).

The operators for buffering mirror those for windowing: buffer, bufferTimeout, bufferUntil, bufferWhile, and bufferWhen.

Where the corresponding windowing operator opens a window, a buffering operator creates a new collection and starts adding elements to it. Where a window closes, the buffering operator emits the collection.

Buffering can also lead to dropping source elements or having overlapping buffers, as the following example shows:

StepVerifier.create(
	Flux.range(1, 10)
		.buffer(5, 3) //overlapping buffers
	)
		.expectNext(Arrays.asList(1, 2, 3, 4, 5))
		.expectNext(Arrays.asList(4, 5, 6, 7, 8))
		.expectNext(Arrays.asList(7, 8, 9, 10))
		.expectNext(Collections.singletonList(10))
		.verifyComplete();

Unlike in windowing, bufferUntil and bufferWhile do not emit an empty buffer, as the following example shows:

StepVerifier.create(
	Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
		.bufferWhile(i -> i % 2 == 0)
	)
	.expectNext(Arrays.asList(2, 4, 6)) // triggered by 11
	.expectNext(Collections.singletonList(12)) // triggered by 13
	.verifyComplete();