Skip to content

Avoid substream subscription timeout for empty prefixAndTail tails#2991

Draft
He-Pin wants to merge 3 commits into
apache:mainfrom
He-Pin:fix-prefixAndTail-empty-tail
Draft

Avoid substream subscription timeout for empty prefixAndTail tails#2991
He-Pin wants to merge 3 commits into
apache:mainfrom
He-Pin:fix-prefixAndTail-empty-tail

Conversation

@He-Pin

@He-Pin He-Pin commented May 25, 2026

Copy link
Copy Markdown
Member

Motivation

prefixAndTail emits (prefix, tailSource) and then waits for the tailSource to be materialized before pulling upstream again. When a downstream consumer intentionally discards the tail Source — a common pattern in protocol parsers using splitWhen + prefixAndTail + mapAsync where a known-empty substream is skipped — the stage holds the substream open until the configured pekko.stream.materializer.subscription-timeout fires (default 5 s), silently stalling the rest of the pipeline.

Reproduces the long-standing akka/akka#20008:

Source(List("2", "a", "b", "0", "3", "a", "b", "c", "0", "1", "a"))
  .splitWhen(_.matches("\\d+"))
  .prefixAndTail(1)
  .mapAsync(1) {
    case (Seq("0"), _)             => Future.successful("")   // intentionally drop tail
    case (Seq(_),   elements)      => elements.runWith(Sink.seq).map(_.mkString)
  }
  .concatSubstreams
  .runWith(Sink.seq)

Without the fix every discarded empty tail adds a 5 s wait before the next group is emitted.

Modification

PrefixAndTail now issues a single speculative pull(in) from openSubstream() as soon as the tail Source has been emitted:

  • If upstream finishes before any further element arrives, the stage completes immediately instead of waiting for the substream subscription timer.
  • If upstream pushes an element before the tail Source has been materialized, the element is buffered in a one-element slot and delivered as the first element on the first downstream pull of the tail; upstream finish that arrives while the buffer is occupied is deferred and replayed together with the buffered element when the tail finally subscribes.
  • Subscription-timeout behaviour for tails that are not subscribed but still have unread upstream elements is unchanged.

Operator docs updated to describe the new pre-pull / one-element-buffer semantics.

Result

Discarding a tail Source from prefixAndTail no longer blocks the parent flow for the subscription-timeout window when the tail is empty. The directional test for the issue scenario now completes in ~60 ms instead of ~15 s.

Tests

  • sbt \"stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowPrefixAndTailSpec\" – 18/18 passed locally (incl. new "complete promptly on empty tail without waiting for subscription timeout (akka #20008)" and "deliver buffered tail element when tail is materialized after upstream emitted one").
  • Verified the new directional test fails against the pre-fix implementation by stashing the impl change and re-running the same testOnly.
  • scalafmt --list – clean for the changed files.
  • Full stream-tests/Test/test and +mimaReportBinaryIssues – Not run locally, deferred to CI (Check / Binary Compatibility should cover MiMa).

References

Refs akka/akka#20008

@He-Pin He-Pin marked this pull request as draft May 25, 2026 08:11
He-Pin added a commit to He-Pin/incubator-pekko that referenced this pull request Jun 15, 2026
Motivation:
The synchronous pull(in) inside openSubstream() cascades to the
upstream operator when called inside a push() handler. When
PrefixAndTail is used inside a Split operator with single-element
substreams, the cascade triggers Split.onPull() which tries to push
while the current push is still in progress, causing
"Cannot push port (Split.out) twice".

Modification:
Wrap the speculative pre-pull in getAsyncCallback to defer it through
the interpreter's async event loop, preventing synchronous cascade.

Result:
FlowSplitAfterSpec "work with single elem splits" passes.

Tests:
- sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.FlowSplitAfterSpec" (planned)

References:
Fixes apache#2991
He-Pin and others added 3 commits June 15, 2026 12:04
Motivation:
prefixAndTail emits (prefix, tailSource) and then waits for the tailSource to
be materialized before pulling upstream again. When a downstream consumer
intentionally discards the tail (typical for protocol-style splitWhen +
prefixAndTail + mapAsync flows where a known-empty substream is ignored), the
stage holds the substream open until the configured stream-subscription-timeout
fires (default 5 seconds), silently stalling the rest of the pipeline.

Modification:
Issue a single speculative pull from upstream as soon as openSubstream() has
emitted the tail Source. If upstream finishes before any further element
arrives, the stage now completes immediately instead of waiting for the
substream subscription timer. If upstream pushes an element before the tail
Source has been materialized, it is buffered in a one-element slot and
delivered on the first downstream pull of the tail, preserving normal
back-pressure semantics for late materialization.

Result:
Discarding a tail Source from prefixAndTail no longer blocks the parent flow
for the subscription timeout window when the tail happens to be empty.
Existing subscription-timeout behaviour for non-empty unsubscribed tails is
unchanged.

Tests:
- sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowPrefixAndTailSpec" - 18/18 passed
- Verified directional test "complete promptly on empty tail without waiting for subscription timeout (akka #20008)" fails against the pre-fix implementation
- Full stream-tests/Test/test and +mimaReportBinaryIssues - Not run locally - deferred to CI

References:
Refs akka/akka-core#20008
Motivation:
The synchronous pull(in) inside openSubstream() cascades to the
upstream operator when called inside a push() handler. When
PrefixAndTail is used inside a Split operator with single-element
substreams, the cascade triggers Split.onPull() which tries to push
while the current push is still in progress, causing
"Cannot push port (Split.out) twice".

Modification:
Wrap the speculative pre-pull in getAsyncCallback to defer it through
the interpreter's async event loop, preventing synchronous cascade.

Result:
FlowSplitAfterSpec "work with single elem splits" passes.

Tests:
- sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.FlowSplitAfterSpec" (planned)

References:
Fixes apache#2991
@He-Pin He-Pin force-pushed the fix-prefixAndTail-empty-tail branch from ffc2857 to 6f707d6 Compare June 15, 2026 04:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant