Three Sorts of Batching

When you have lots of elements and you want to separate them into batches, you have three broad solutions in Reactor: grouping, windowing, and buffering. These three are conceptually close, because they redistribute a Flux<T> into an aggregate. Grouping and windowing create a Flux<Flux<T>>, while buffering aggregates into a Collection<T>.

1. Grouping with Flux<GroupedFlux<T>>

Grouping is the act of splitting the source Flux<T> into multiple batches, each of which matches a key.

The associated operator is groupBy.

Each group is represented as a GroupedFlux<T>, which lets you retrieve the key by calling its key() method.

There is no necessary continuity in the content of the groups. Once a source element produces a new key, the group for this key is opened and elements that match the key end up in the group (several groups could be open at the same time).

This means that groups:

  1. Are always disjoint (a source element belongs to one and only one group).

  2. Can contain elements from different places in the original sequence.

  3. Are never empty.

The following example groups values by whether they are even or odd:

StepVerifier.create(
	Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
		.groupBy(i -> i % 2 == 0 ? "even" : "odd")
		.concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
				.map(String::valueOf) //map to string
				.startWith(g.key())) //start with the group's key
	)
	.expectNext("odd", "1", "3", "5", "11", "13")
	.expectNext("even", "2", "4", "6", "12")
	.verifyComplete();
Grouping is best suited for when you have a medium to low number of groups. The groups must also imperatively be consumed (such as by a flatMap) so that groupBy continues fetching data from upstream and feeding more groups. Sometimes, these two constraints multiply and lead to hangs, such as when you have a high cardinality and the concurrency of the flatMap consuming the groups is too low.

2. Windowing with Flux<Flux<T>>

Windowing is the act of splitting the source Flux<T> into windows, by criteria of size, time, boundary-defining predicates, or boundary-defining Publisher.

The associated operators are window, windowTimeout, windowUntil, windowWhile, and windowWhen.

Contrary to groupBy, which randomly overlaps according to incoming keys, windows are (most of the time) opened sequentially.

Some variants can still overlap, though. For instance, in window(int maxSize, int skip) the maxSize parameter is the number of elements after which a window closes, and the skip parameter is the number of elements in the source after which a new window is opened. So if maxSize > skip, a new window opens before the previous one closes and the two windows overlap.

The following example shows overlapping windows:

StepVerifier.create(
	Flux.range(1, 10)
		.window(5, 3) //overlapping windows
		.concatMap(g -> g.defaultIfEmpty(-1)) //show empty windows as -1
	)
		.expectNext(1, 2, 3, 4, 5)
		.expectNext(4, 5, 6, 7, 8)
		.expectNext(7, 8, 9, 10)
		.expectNext(10)
		.verifyComplete();
With the reverse configuration (maxSize < skip), some elements from the source are dropped and are not part of any window.

In the case of predicate-based windowing through windowUntil and windowWhile, having subsequent source elements that do not match the predicate can also lead to empty windows, as demonstrated in the following example:

StepVerifier.create(
	Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
		.windowWhile(i -> i % 2 == 0)
		.concatMap(g -> g.defaultIfEmpty(-1))
	)
		.expectNext(-1, -1, -1) //respectively triggered by odd 1 3 5
		.expectNext(2, 4, 6) // triggered by 11
		.expectNext(12) // triggered by 13
		// however, no empty completion window is emitted (would contain extra matching elements)
		.verifyComplete();

3. Buffering with Flux<List<T>>

Buffering is similar to windowing, with the following twist: Instead of emitting windows (each of which is each a Flux<T>), it emits buffers (which are Collection<T> — by default, List<T>).

The operators for buffering mirror those for windowing: buffer, bufferTimeout, bufferUntil, bufferWhile, and bufferWhen.

Where the corresponding windowing operator opens a window, a buffering operator creates a new collection and starts adding elements to it. Where a window closes, the buffering operator emits the collection.

Buffering can also lead to dropping source elements or having overlapping buffers, as the following example shows:

StepVerifier.create(
	Flux.range(1, 10)
		.buffer(5, 3) //overlapping buffers
	)
		.expectNext(Arrays.asList(1, 2, 3, 4, 5))
		.expectNext(Arrays.asList(4, 5, 6, 7, 8))
		.expectNext(Arrays.asList(7, 8, 9, 10))
		.expectNext(Collections.singletonList(10))
		.verifyComplete();

Unlike in windowing, bufferUntil and bufferWhile do not emit an empty buffer, as the following example shows:

StepVerifier.create(
	Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
		.bufferWhile(i -> i % 2 == 0)
	)
	.expectNext(Arrays.asList(2, 4, 6)) // triggered by 11
	.expectNext(Collections.singletonList(12)) // triggered by 13
	.verifyComplete();