Dealing with Objects that Need Cleanup
In very specific cases, your application may deal with types that necessitate some form of cleanup once they are no longer in use.
This is an advanced scenario — for, example when you have reference-counted objects or when you deal with off-heap objects.
Netty’s ByteBuf
is a prime example of both.
In order to ensure proper cleanup of such objects, you need to account for it on a Flux
-by-Flux
basis, as well as in several of the global hooks (see Using Global Hooks):
-
The
doOnDiscard
Flux
/Mono
operator -
The
onOperatorError
hook -
The
onNextDropped
hook -
Operator-specific handlers
This is needed because each hook is made with a specific subset of cleanup in mind, and users might want (for example) to implement specific error-handling logic in addition to cleanup logic within onOperatorError
.
Note that some operators are less adapted to dealing with objects that need cleanup.
For example, bufferWhen
can introduce overlapping buffers, and that means that the discard “local hook” we used earlier might see a first buffer as being discarded and cleanup an element in it that is in a second buffer, where it is still valid.
For the purpose of cleaning up, all these hooks MUST be IDEMPOTENT.
They might on some occasions get applied several times to the same object.
Unlike the doOnDiscard operator, which performs a class-level instanceOf check, the global hooks are also dealing with instances that can be any Object . It is up to the user’s implementation to distinguish between which instances need cleanup and which do not.
|
1. The doOnDiscard
Operator or Local Hook
This hook has been specifically put in place for cleanup of objects that would otherwise never be exposed to user code.
It is intended as a cleanup hook for flows that operate under normal circumstances (not malformed sources that push too many items, which is covered by onNextDropped
).
It is local, in the sense that it is activated through an operator and applies only to a given Flux
or Mono
.
Obvious cases include operators that filter elements from upstream.
These elements never reach the next operator (or final subscriber), but this is part of the normal path of execution.
As such, they are passed to the doOnDiscard
hook.
Examples of when you might use the doOnDiscard
hook include the following:
-
filter
: Items that do not match the filter are considered to be “discarded.” -
skip
: Skipped items are discarded. -
buffer(maxSize, skip)
withmaxSize < skip
: A “dropping buffer” — items in between buffers are discarded.
But doOnDiscard
is not limited to filtering operators, and is also used by operators that internally queue data for backpressure purposes.
More specifically, most of the time, this is important during cancellation. An operator that prefetches data from its source and later drains to its subscriber upon demand could have un-emitted data when it gets cancelled.
Such operators use the doOnDiscard
hook during cancellation to clear up their internal backpressure Queue
.
Each call to doOnDiscard(Class, Consumer) is additive with the others, to the extent that it is visible and used by only operators upstream of it.
|
2. The onOperatorError
hook
The onOperatorError
hook is intended to modify errors in a transverse manner (similar to an AOP catch-and-rethrow).
When the error happens during the processing of an onNext
signal, the element that was being emitted is passed to onOperatorError
.
If that type of element needs cleanup, you need to implement it in the onOperatorError
hook, possibly on top of error-rewriting code.
3. The onNextDropped
Hook
With malformed Publishers
, there could be cases where an operator receives an element when it expected none (typically, after having received the onError
or onComplete
signals).
In such cases, the unexpected element is “dropped” — that is, passed to the onNextDropped
hook.
If you have types that need cleanup, you must detect these in the onNextDropped
hook and implement cleanup code there as well.
4. Operator-specific Handlers
Some operators that deal with buffers or collect values as part of their operations have specific handlers for cases where collected data is not propagated downstream. If you use such operators with the type(s) that need cleanup, you need to perform cleanup in these handlers.
For example, distinct
has such a callback that is invoked when the operator terminates (or is cancelled) in order to clear the collection it uses to judge whether an element is distinct or not.
By default, the collection is a HashSet
, and the cleanup callback is a HashSet::clear
.
However, if you deal with reference-counted objects, you might want to change that to a more involved handler that would release
each element in the set before calling clear()
on it.