fix: release references to completed stage logics in GraphInterpreter#3030
Open
He-Pin wants to merge 1 commit into
Open
fix: release references to completed stage logics in GraphInterpreter#3030He-Pin wants to merge 1 commit into
He-Pin wants to merge 1 commit into
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR attempts to reduce GraphInterpreter memory retention by clearing stage logic references after finalization and adapting snapshots/tests for released logics.
Changes:
- Clears entries from
GraphInterpreter.logicsduringfinish(). - Adds null-aware logic snapshot handling.
- Adds tests asserting the
logicsarray is nulled afterfinish().
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala |
Updates finalization and snapshot behavior for null stage logic entries. |
stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpec.scala |
Adds regression tests for releasing stage logic references. |
Comments suppressed due to low confidence (1)
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala:757
toSnapshotstill dereferenceslogicIndexes(connection.inOwner)/outOwner, butlogicIndexesnow excludes every released (null) logic. Afterfinish()nullslogics, any snapshot of a shell that still has itsconnectionsarray will throwNoSuchElementExceptioninstead of producing the<completed>snapshots added above. Use the owners' stablestageIdto indexlogicSnapshotsso released logics are handled.
val logicIndexes = logics.zipWithIndex.collect { case (stage, idx) if stage ne null => stage -> idx }.toMap
val connectionSnapshots = connections.filter(_ ne null).map { connection =>
ConnectionSnapshotImpl(
connection.id,
logicSnapshots(logicIndexes(connection.inOwner)),
logicSnapshots(logicIndexes(connection.outOwner)),
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+322
to
+324
| builder(GraphStages.identity[Int]) | ||
| .connect(source, GraphStages.identity[Int].in) | ||
| .connect(GraphStages.identity[Int].out, sink) |
Comment on lines
+349
to
+351
| // Release reference to the stage logic so it can be garbage collected | ||
| // even if the GraphInterpreter is still alive due to other references | ||
| logics(i) = null |
| } | ||
| // Release reference to the stage logic so it can be garbage collected | ||
| // even if the GraphInterpreter is still alive due to other references | ||
| logics(i) = null |
Motivation: GraphInterpreter retains strong references to completed stage logics in the logics array, preventing garbage collection even after stages have finished. This is especially problematic for long-running fused interpreters that host subfused shells, where completed initial stages remain referenced for the lifetime of the actor. Additionally, Connection objects retain strong references to handler objects (inHandler/outHandler), further preventing GC of completed stage logics. Modification: - afterStageHasRun now scans ALL logics for completed stages instead of only checking the single activeStage. This ensures stages that complete without being activeStage (e.g., boundary stages calling complete(), cascading completions) are properly finalized and released. - finish() and afterStageHasRun null connection handlers (inHandler/ outHandler) for finalized stages, breaking handler-to-logic reference chains to enable garbage collection. - toSnapshot handles null logics gracefully with <completed> placeholder and uses connection.inOwner.stageId for positional indexing instead of identity-based map lookups that fail with NPE on null logics. - inLogicName/outLogicName debug methods guard against null logics. - finish() sets logics(i) = null after finalizing each stage. - Fix GraphStageLogicSpec "not double-terminate" test to save logic references before execute() since afterStageHasRun now releases completed logics. Result: Completed stage logics are properly finalized and released for GC during normal stream execution (not just in finish()), reducing memory retention in long-running fused interpreters. Tests: - stream-tests/testOnly GraphInterpreterSpec: 13/13 passed (includes 2 new regression tests) - stream-tests/testOnly GraphStageLogicSpec: 17/17 passed - stream-tests/testOnly InterpreterSpec+LifecycleInterpreterSpec+InterpreterSupervisionSpec+GraphInterpreterFailureModesSpec: 54/54 passed References: None - memory leak fix
c58c59e to
d29add0
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
When multiple stages are fused, a single alive stage keeps the GraphInterpreter alive and with it references to all already completed GraphStage logics, which may keep a significant amount of memory.
This issue was originally reported in akka/akka-core#23439.
Modification
Result
GraphInterpreter no longer keeps references to completed stage logics, allowing them to be garbage collected even if the interpreter is still alive due to other fused stages.
Tests
References