Skip to content

fix: release reference to initial interpreter shell after it is shut down#3031

Open
He-Pin wants to merge 1 commit into
mainfrom
fix/actor-graph-interpreter-initial-shell-leak
Open

fix: release reference to initial interpreter shell after it is shut down#3031
He-Pin wants to merge 1 commit into
mainfrom
fix/actor-graph-interpreter-initial-shell-leak

Conversation

@He-Pin

@He-Pin He-Pin commented May 31, 2026

Copy link
Copy Markdown
Member

Motivation

An ActorGraphInterpreter starts with an initial shell but can host additional ones created using the subfusing materializer. The lifetime of ActorGraphInterpreter is not tied to the lifetime of the initial interpreter. However, a reference to the initial GraphInterpreterShell is kept around in the _initial constructor parameter, which can lead to unexpected memory usage because the initial GraphInterpreter and all its logics might be kept referenced much longer than expected.

This issue was originally reported in akka/akka-core#23535.

Modification

  • Changed _initial from val to private var
  • Set _initial to null after initialization in preStart()
  • Fixed debug logging to use shell parameter instead of _initial

Result

ActorGraphInterpreter no longer keeps a reference to the initial interpreter shell after it has been shut down, allowing it and its logics to be garbage collected even when the actor is still alive hosting other subfused interpreters.

Tests

  • Added test to verify initial shell reference is released after completion
  • Added test to verify actor can host additional interpreters after initial shell completes

References

@He-Pin He-Pin requested a review from Copilot May 31, 2026 20:29
@He-Pin He-Pin added this to the 2.0.0-M4 milestone May 31, 2026
@He-Pin He-Pin added the t:stream Pekko Streams label May 31, 2026

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Releases the ActorGraphInterpreter's reference to its initial GraphInterpreterShell after preStart finishes initializing it, so that when the actor outlives the initial shell (e.g. while hosting other subfused interpreters), the initial shell and its logics can be garbage collected. Also fixes a debug log line that was printing the initial shell instead of the shell being registered.

Changes:

  • Convert _initial from a constructor val to a private var and null it out at the end of preStart after tryInit.
  • Update the tryInit debug println to reference the shell parameter rather than _initial.
  • Add two tests in ActorGraphInterpreterSpec intended to cover the initial-shell-release behavior.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala Makes _initial a mutable field, clears it after init, and fixes the registration debug log to use the registered shell.
stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala Adds two specs aiming to verify the initial-shell reference release and continued operation with additional interpreters.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +439 to +480
"release reference to initial interpreter shell after it completes" in {
val mat = Materializer(system)
val initialShellCompleted = TestLatch(1)

// Create a stage that tracks when it completes
val trackingStage = new SimpleLinearGraphStage[String] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(in,
new InHandler {
override def onPush(): Unit = push(out, grab(in))
override def onUpstreamFinish(): Unit = {
complete(out)
initialShellCompleted.countDown()
}
})
setHandler(out,
new OutHandler {
override def onPull(): Unit = pull(in)
})
}
}

val upstream = TestPublisher.probe[String]()
val downstream = TestSubscriber.probe[String]()

Source.fromPublisher(upstream).via(trackingStage).to(Sink.fromSubscriber(downstream)).run()(mat)

downstream.request(1)
upstream.sendNext("test")
downstream.expectNext("test")

// Complete the upstream to trigger stage completion
upstream.sendComplete()
downstream.expectComplete()

// Wait for the initial shell to complete
Await.ready(initialShellCompleted, remainingOrDefault)

// The actor should have stopped since there's only one interpreter shell
// and it has completed
// This verifies that the initial shell reference is released and the actor can stop
}
Comment on lines +482 to +509
"not keep initial shell alive when actor hosts additional subfused interpreters" in {
val mat = Materializer(system)
val firstStreamDone = Promise[Done]()
val secondStreamDone = Promise[Done]()

// Create a flow that we'll use twice through the same materializer
val identityFlow = Flow[Int].map(identity)

// Run first stream and complete it
val sub1 = TestSubscriber.probe[Int]()
Source(1 to 10).via(identityFlow).to(Sink.fromSubscriber(sub1)).run()(mat)
sub1.request(10)
sub1.expectNextN(1 to 10)
sub1.expectComplete()
firstStreamDone.success(Done)

// Run second stream - this should work even though the first stream's
// initial shell has been completed
val sub2 = TestSubscriber.probe[Int]()
Source(20 to 30).via(identityFlow).to(Sink.fromSubscriber(sub2)).run()(mat)
sub2.request(11)
sub2.expectNextN(20 to 30)
sub2.expectComplete()
secondStreamDone.success(Done)

Await.result(firstStreamDone.future, remainingOrDefault)
Await.result(secondStreamDone.future, remainingOrDefault)
}
@He-Pin He-Pin requested a review from pjfanning June 16, 2026 10:01
…down

Motivation:
ActorGraphInterpreter retains a strong reference to its initial
GraphInterpreterShell via the _initial constructor val. When the actor
outlives the initial shell (e.g., while hosting subfused interpreters
registered via registerShell), the initial shell and all its logics
cannot be garbage collected for the lifetime of the actor.

Modification:
- Convert _initial from a constructor val to a private var
- Set _initial = null at the end of preStart() after tryInit completes
- Fix debug println in tryInit to reference the shell parameter instead
  of _initial (which may be null after preStart)
- Add behavioral test using flatMapConcat which triggers subfusing via
  SubFusingActorMaterializerImpl, verifying the actor continues
  processing subfused shells after the initial shell is released

Result:
The initial GraphInterpreterShell and its stage logics become eligible
for garbage collection once the initial shell shuts down, reducing
memory retention in long-lived ActorGraphInterpreter actors.

Tests:
- stream-tests/testOnly ActorGraphInterpreterSpec: 12/12 passed

References:
None - memory leak fix
@He-Pin He-Pin force-pushed the fix/actor-graph-interpreter-initial-shell-leak branch from bc40a16 to 6bc0142 Compare June 16, 2026 12:15
@He-Pin He-Pin removed this from the 2.0.0-M4 milestone Jun 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

t:stream Pekko Streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants