Three Sorts of Batching
When you have lots of elements and you want to separate them into batches, you have three
broad solutions in Reactor: grouping, windowing, and buffering. These three are
conceptually close, because they redistribute a Flux<T>
into an aggregate. Grouping and
windowing create a Flux<Flux<T>>
, while buffering aggregates into a Collection<T>
.
1. Grouping with Flux<GroupedFlux<T>>
Grouping is the act of splitting the source Flux<T>
into multiple batches, each of which
matches a key.
The associated operator is groupBy
.
Each group is represented as a GroupedFlux<T>
, which lets you retrieve the key by calling its
key()
method.
There is no necessary continuity in the content of the groups. Once a source element produces a new key, the group for this key is opened and elements that match the key end up in the group (several groups could be open at the same time).
This means that groups:
-
Are always disjoint (a source element belongs to one and only one group).
-
Can contain elements from different places in the original sequence.
-
Are never empty.
The following example groups values by whether they are even or odd:
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.groupBy(i -> i % 2 == 0 ? "even" : "odd")
.concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
.map(String::valueOf) //map to string
.startWith(g.key())) //start with the group's key
)
.expectNext("odd", "1", "3", "5", "11", "13")
.expectNext("even", "2", "4", "6", "12")
.verifyComplete();
Grouping is best suited for when you have a medium to low number of groups. The
groups must also imperatively be consumed (such as by a flatMap ) so that groupBy
continues fetching data from upstream and feeding more groups. Sometimes, these two
constraints multiply and lead to hangs, such as when you have a high cardinality and the
concurrency of the flatMap consuming the groups is too low.
|
To better understand the risks of hangs when combining groupBy
with inappropriate
operators, let’s consider an example.
The following snippet groups strings by their 1st character:
public static Flux<String> createGroupedFlux() {
List<String> data = List.of("alpha", "air", "aim", "beta", "cat", "ball", "apple", "bat", "dog", "ace");
return Flux.fromIterable(data)
.groupBy(d -> d.charAt(0), 5)
.concatMap(g -> g.map(String::valueOf)
.startWith(String.valueOf(g.key()))
.map(o -> {
System.out.println(o);
return o;
})
);
}
@Test
public void testGroupBy() {
StepVerifier.create(createGroupedFlux())
.expectNext("a", "alpha", "air", "aim", "apple", "ace")
.expectNext("b", "beta", "ball", "bat")
.expectNext("c", "cat", "d", "dog")
.verifyComplete();
}
In the above:
-
The cardinality of groups is 4 (
"a"
,"b"
,"c"
, and"d"
being the group keys). -
The concurrency of
concatMap
is 1. -
The buffer size of
groupBy
is 5 (since we definedprefetch
as 5; by default it’s 256),
a alpha air aim apple
The test gets stuck after printing these elements. Let’s consider what happened.
-
Initially,
groupBy
requests 5 elements. They end up in a buffer:"alpha", "air", "aim", "beta", "cat"
. -
concatMap
has concurrency of 1. Therefore, the group with key"a"
is the only group that gets subscribed. Out of the initial items,"alpha", "air", "aim"
are consumed byconcatMap
and"beta", "cat"
remain in the buffer. -
Next,
groupBy
requests additional 3 items (2 items are already buffered). The buffer now contains"beta", "cat", "ball", "apple", "bat"
. Out of these,"apple"
is consumed and the rest remain in the buffer. -
Next,
groupBy
requests for additional 1 item (4 spaces are already occupied in buffer). The buffered items are"beta", "cat", "ball", "bat","dog"
. -
Now, nothing in the buffer belongs to group
"a"
, hence no more consumption happens byconcatMap
and the first Flux remains not completed.groupBy
is unable to request more data from the publisher since its buffer size is full. The publisher faces backpressure and is not able to publish the remaining items. This results in the deadlock.
In the same example, if the data was in a slightly different order, for example:
"alpha", "air", "aim", "beta", "cat", "ball", "apple", "dog", "ace", "bat"
The same test would pass successfully: the same concatMap
would be able to receive a
complete signal, complete one group, then subscribe to next group and so on.
Hence, when the pattern of data published is arbitrary, groupBy
is likely to face a
deadlock when the consumption pattern doesn’t match the capacity of the groupBy
buffer.
2. Windowing with Flux<Flux<T>>
Windowing is the act of splitting the source Flux<T>
into windows, by criteria of
size, time, boundary-defining predicates, or boundary-defining Publisher
.
The associated operators are window
, windowTimeout
, windowUntil
, windowWhile
, and
windowWhen
.
Contrary to groupBy
, which randomly overlaps according to incoming keys,
windows are (most of the time) opened sequentially.
Some variants can still overlap, though. For instance, in window(int maxSize, int skip)
the maxSize
parameter is the number of elements after which a window
closes, and the skip
parameter is the number of elements in the source after which a
new window is opened. So if maxSize > skip
, a new window opens before the previous one
closes and the two windows overlap.
The following example shows overlapping windows:
StepVerifier.create(
Flux.range(1, 10)
.window(5, 3) //overlapping windows
.concatMap(g -> g.defaultIfEmpty(-1)) //show empty windows as -1
)
.expectNext(1, 2, 3, 4, 5)
.expectNext(4, 5, 6, 7, 8)
.expectNext(7, 8, 9, 10)
.expectNext(10)
.verifyComplete();
With the reverse configuration (maxSize < skip ), some elements from
the source are dropped and are not part of any window.
|
In the case of predicate-based windowing through windowUntil
and windowWhile
,
having subsequent source elements that do not match the predicate can also lead
to empty windows, as demonstrated in the following example:
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.windowWhile(i -> i % 2 == 0)
.concatMap(g -> g.defaultIfEmpty(-1))
)
.expectNext(-1, -1, -1) //respectively triggered by odd 1 3 5
.expectNext(2, 4, 6) // triggered by 11
.expectNext(12) // triggered by 13
// however, no empty completion window is emitted (would contain extra matching elements)
.verifyComplete();
3. Buffering with Flux<List<T>>
Buffering is similar to windowing, with the following twist: Instead of emitting
windows (each of which is each a Flux<T>
), it emits buffers (which are Collection<T>
— by default, List<T>
).
The operators for buffering mirror those for windowing: buffer
, bufferTimeout
,
bufferUntil
, bufferWhile
, and bufferWhen
.
Where the corresponding windowing operator opens a window, a buffering operator creates a new collection and starts adding elements to it. Where a window closes, the buffering operator emits the collection.
Buffering can also lead to dropping source elements or having overlapping buffers, as the following example shows:
StepVerifier.create(
Flux.range(1, 10)
.buffer(5, 3) //overlapping buffers
)
.expectNext(Arrays.asList(1, 2, 3, 4, 5))
.expectNext(Arrays.asList(4, 5, 6, 7, 8))
.expectNext(Arrays.asList(7, 8, 9, 10))
.expectNext(Collections.singletonList(10))
.verifyComplete();
Unlike in windowing, bufferUntil
and bufferWhile
do not emit an empty buffer, as
the following example shows:
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.bufferWhile(i -> i % 2 == 0)
)
.expectNext(Arrays.asList(2, 4, 6)) // triggered by 11
.expectNext(Collections.singletonList(12)) // triggered by 13
.verifyComplete();