Dealing with Objects that Need Cleanup

In very specific cases, your application may deal with types that necessitate some form of cleanup once they are no longer in use. This is an advanced scenario — for, example when you have reference-counted objects or when you deal with off-heap objects. Netty’s ByteBuf is a prime example of both.

In order to ensure proper cleanup of such objects, you need to account for it on a Flux-by-Flux basis, as well as in several of the global hooks (see Using Global Hooks):

  • The doOnDiscard Flux/Mono operator

  • The onOperatorError hook

  • The onNextDropped hook

  • Operator-specific handlers

This is needed because each hook is made with a specific subset of cleanup in mind, and users might want (for example) to implement specific error-handling logic in addition to cleanup logic within onOperatorError.

Note that some operators are less adapted to dealing with objects that need cleanup. For example, bufferWhen can introduce overlapping buffers, and that means that the discard “local hook” we used earlier might see a first buffer as being discarded and cleanup an element in it that is in a second buffer, where it is still valid.

For the purpose of cleaning up, all these hooks MUST be IDEMPOTENT. They might on some occasions get applied several times to the same object. Unlike the doOnDiscard operator, which performs a class-level instanceOf check, the global hooks are also dealing with instances that can be any Object. It is up to the user’s implementation to distinguish between which instances need cleanup and which do not.

1. The doOnDiscard Operator or Local Hook

This hook has been specifically put in place for cleanup of objects that would otherwise never be exposed to user code. It is intended as a cleanup hook for flows that operate under normal circumstances (not malformed sources that push too many items, which is covered by onNextDropped).

It is local, in the sense that it is activated through an operator and applies only to a given Flux or Mono.

Obvious cases include operators that filter elements from upstream. These elements never reach the next operator (or final subscriber), but this is part of the normal path of execution. As such, they are passed to the doOnDiscard hook. Examples of when you might use the doOnDiscard hook include the following:

  • filter: Items that do not match the filter are considered to be “discarded.”

  • skip: Skipped items are discarded.

  • buffer(maxSize, skip) with maxSize < skip: A “dropping buffer” — items in between buffers are discarded.

But doOnDiscard is not limited to filtering operators, and is also used by operators that internally queue data for backpressure purposes. More specifically, most of the time, this is important during cancellation. An operator that prefetches data from its source and later drains to its subscriber upon demand could have un-emitted data when it gets cancelled. Such operators use the doOnDiscard hook during cancellation to clear up their internal backpressure Queue.

Each call to doOnDiscard(Class, Consumer) is additive with the others, to the extent that it is visible and used by only operators upstream of it.

2. The onOperatorError hook

The onOperatorError hook is intended to modify errors in a transverse manner (similar to an AOP catch-and-rethrow).

When the error happens during the processing of an onNext signal, the element that was being emitted is passed to onOperatorError.

If that type of element needs cleanup, you need to implement it in the onOperatorError hook, possibly on top of error-rewriting code.

3. The onNextDropped Hook

With malformed Publishers, there could be cases where an operator receives an element when it expected none (typically, after having received the onError or onComplete signals). In such cases, the unexpected element is “dropped” — that is, passed to the onNextDropped hook. If you have types that need cleanup, you must detect these in the onNextDropped hook and implement cleanup code there as well.

4. Operator-specific Handlers

Some operators that deal with buffers or collect values as part of their operations have specific handlers for cases where collected data is not propagated downstream. If you use such operators with the type(s) that need cleanup, you need to perform cleanup in these handlers.

For example, distinct has such a callback that is invoked when the operator terminates (or is cancelled) in order to clear the collection it uses to judge whether an element is distinct or not. By default, the collection is a HashSet, and the cleanup callback is a HashSet::clear. However, if you deal with reference-counted objects, you might want to change that to a more involved handler that would release each element in the set before calling clear() on it.