-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Description
The callback for on window expiry is a special timer based callback that the runner triggers prior to garbage collection of a window's state and side inputs. This allows user code to clean up use or clean up state instead of losing it.
The feature is notably used in the GroupIntoBatches transforms for the Java and Python SDKs.
Garbage collection currently happens in a single phase, when watermarks are being updated.
We need 1: to indicate that a WindowExpiry timer needs to be sent for the stage. That'll probably be similar to the existing stateful and aggregate bools on stageState, and setter methods for the
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L1052 Though we'll need the SDK given "name" for the callback, so it'll be a string field in stageState.
- We need to know when we may send the WindowExpiries timer messages, and for which windows. In principle this would be a timer per user key, but we only want to set/up/send those per user key at the last moment to avoid unnecessary duplication.
In principle this means that should be in startEventTimeBundle, after indicating the state should be garbage Collected. But we need to indicate windows which are ready for
So sort of like a mark and sweep algorithm for general garbage collection.
The current place we garbage collect is probably fine for both. We must "mark" the windows that have expired if they have OnWindowExpiry, and then afterwards do the current garbage collection "sweep" with the set of collectable windows. But then we'd need the sweeps to be aware of both the window and the key which is a bit heavier on memory.
Ideally we want to be able to clean up in PersistBundle, where the cleanup is just done by not storing the state back for the window of the stage in question.
But then how do we communicate that intent back and avoid too much lock contention?
What I've got so far:
- Indicate in stageState that it has an onwindowexpiry callback.
- Mark in StageState that window X needs to fire an expiry timer on watermark refresh garbage collection. Otherwise garbage collect the state as we do now.
- In startEventTimeBundle (and technically, readyBundle), we produce the synthetic timer elements for that, using the stageState field name. Remove the state from StageState. Remove the mark. This prevents the mark from re-appearing.
- Include a map of "state (window+user key) to GC by bundle"
- On PersistBundle do not persist the stage state for GC'd.
That's not too bad.
This ties nicely to the cleanup of state to be directly part of the bundle instead of being requested after the fact. That avoids certain read/write contentions too.