Three Sorts of Batching
When you have lots of elements and you want to separate them into batches, you have three
broad solutions in Reactor: grouping, windowing, and buffering. These three are
conceptually close, because they redistribute a Flux<T>
into an aggregate. Grouping and
windowing create a Flux<Flux<T>>
, while buffering aggregates into a Collection<T>
.
1. Grouping with Flux<GroupedFlux<T>>
Grouping is the act of splitting the source Flux<T>
into multiple batches, each of which
matches a key.
The associated operator is groupBy
.
Each group is represented as a GroupedFlux<T>
, which lets you retrieve the key by calling its
key()
method.
There is no necessary continuity in the content of the groups. Once a source element produces a new key, the group for this key is opened and elements that match the key end up in the group (several groups could be open at the same time).
This means that groups:
-
Are always disjoint (a source element belongs to one and only one group).
-
Can contain elements from different places in the original sequence.
-
Are never empty.
The following example groups values by whether they are even or odd:
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.groupBy(i -> i % 2 == 0 ? "even" : "odd")
.concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
.map(String::valueOf) //map to string
.startWith(g.key())) //start with the group's key
)
.expectNext("odd", "1", "3", "5", "11", "13")
.expectNext("even", "2", "4", "6", "12")
.verifyComplete();
Grouping is best suited for when you have a medium to low number of groups. The
groups must also imperatively be consumed (such as by a flatMap ) so that groupBy
continues fetching data from upstream and feeding more groups. Sometimes, these two
constraints multiply and lead to hangs, such as when you have a high cardinality and the
concurrency of the flatMap consuming the groups is too low.
|
2. Windowing with Flux<Flux<T>>
Windowing is the act of splitting the source Flux<T>
into windows, by criteria of
size, time, boundary-defining predicates, or boundary-defining Publisher
.
The associated operators are window
, windowTimeout
, windowUntil
, windowWhile
, and
windowWhen
.
Contrary to groupBy
, which randomly overlaps according to incoming keys,
windows are (most of the time) opened sequentially.
Some variants can still overlap, though. For instance, in window(int maxSize, int skip)
the maxSize
parameter is the number of elements after which a window
closes, and the skip
parameter is the number of elements in the source after which a
new window is opened. So if maxSize > skip
, a new window opens before the previous one
closes and the two windows overlap.
The following example shows overlapping windows:
StepVerifier.create(
Flux.range(1, 10)
.window(5, 3) //overlapping windows
.concatMap(g -> g.defaultIfEmpty(-1)) //show empty windows as -1
)
.expectNext(1, 2, 3, 4, 5)
.expectNext(4, 5, 6, 7, 8)
.expectNext(7, 8, 9, 10)
.expectNext(10)
.verifyComplete();
With the reverse configuration (maxSize < skip ), some elements from
the source are dropped and are not part of any window.
|
In the case of predicate-based windowing through windowUntil
and windowWhile
,
having subsequent source elements that do not match the predicate can also lead
to empty windows, as demonstrated in the following example:
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.windowWhile(i -> i % 2 == 0)
.concatMap(g -> g.defaultIfEmpty(-1))
)
.expectNext(-1, -1, -1) //respectively triggered by odd 1 3 5
.expectNext(2, 4, 6) // triggered by 11
.expectNext(12) // triggered by 13
// however, no empty completion window is emitted (would contain extra matching elements)
.verifyComplete();
3. Buffering with Flux<List<T>>
Buffering is similar to windowing, with the following twist: Instead of emitting
windows (each of which is each a Flux<T>
), it emits buffers (which are Collection<T>
— by default, List<T>
).
The operators for buffering mirror those for windowing: buffer
, bufferTimeout
,
bufferUntil
, bufferWhile
, and bufferWhen
.
Where the corresponding windowing operator opens a window, a buffering operator creates a new collection and starts adding elements to it. Where a window closes, the buffering operator emits the collection.
Buffering can also lead to dropping source elements or having overlapping buffers, as the following example shows:
StepVerifier.create(
Flux.range(1, 10)
.buffer(5, 3) //overlapping buffers
)
.expectNext(Arrays.asList(1, 2, 3, 4, 5))
.expectNext(Arrays.asList(4, 5, 6, 7, 8))
.expectNext(Arrays.asList(7, 8, 9, 10))
.expectNext(Collections.singletonList(10))
.verifyComplete();
Unlike in windowing, bufferUntil
and bufferWhile
do not emit an empty buffer, as
the following example shows:
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.bufferWhile(i -> i % 2 == 0)
)
.expectNext(Arrays.asList(2, 4, 6)) // triggered by 11
.expectNext(Collections.singletonList(12)) // triggered by 13
.verifyComplete();