Skip to content

feat(net): pluggable GroupSource for deep out-of-RAM replay window#1971

Closed
iameli wants to merge 2 commits into
moq-dev:mainfrom
streamplace:eli/group-source
Closed

feat(net): pluggable GroupSource for deep out-of-RAM replay window#1971
iameli wants to merge 2 commits into
moq-dev:mainfrom
streamplace:eli/group-source

Conversation

@iameli

@iameli iameli commented Jun 30, 2026

Copy link
Copy Markdown

@kixelated So this is what Claude came up with for implementing atmoq's 72-hour replay window. Works, but let me know if this is completely wrongheaded and I'm happy to scrap it and do a FETCH-based backfill implementation instead. That said, this behavior is kinda neat, allows library consumers to bring their own backing store for use cases where you want to backfill older content right up to live.

iameli and others added 2 commits June 30, 2026 14:11
Make the hardcoded 5s MAX_GROUP_AGE overridable per track, addressing the
existing "TODO: Replace with a configurable cache size". Adds an
Option<Duration> to the track State (default None -> the 5s const, so behavior is
unchanged), a TrackProducer::set_max_group_age setter, and a unit test. Lets a
publisher trade memory for a deeper late-join / replay window.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add a GroupSource trait so a publisher can serve groups older than the
in-RAM track cache from a pluggable (e.g. disk-backed) source, giving a
replay window deeper than memory allows without holding it all in RAM.

- GroupSource: { group(seq) -> Option<Vec<Bytes>>, oldest() } in
  model/track.rs, plus State.group_source and
  TrackProducer::set_group_source.
- TrackConsumer gains oldest() (the in-RAM cache floor) and
  group_source().
- lite/publisher.rs run_track: when a subscriber's SUBSCRIBE(start_group=G)
  predates the cache floor, serve [max(G, source.oldest())..cache_floor)
  from the source one open uni-stream at a time (so the subscriber's flow
  control paces the backfill), re-reading the floor each iteration so it
  converges on the eviction boundary and hands off to the live loop
  without a gap.

None (no source set) preserves existing behavior exactly: a start_group
below the cache simply jumps forward to the oldest cached group. The
trait is sync and only called on the backfill task, never the hot live
loop. Stacked on the configurable-retention change.

Motivation + end-to-end validation (a disk store serving a 72h resume
window) in streamplace/atmoq.

@sourcery-ai sourcery-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @iameli, you have reached your weekly rate limit of 500000 diff characters.

Please try again later or upgrade to continue using Sourcery

@iameli

iameli commented Jun 30, 2026

Copy link
Copy Markdown
Author

@sourcery-ai well now we gotta box 🦾

@coderabbitai

coderabbitai Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Walkthrough

This PR adds a GroupSource trait to rs/moq-net/src/model/track.rs providing historical group frame retrieval and oldest-sequence reporting, plus a per-track max_group_age retention override. TrackProducer gains setters for these fields; TrackConsumer gains oldest() and group_source() accessors. Eviction logic now honors the per-track override. In rs/moq-net/src/lite/publisher.rs, run_track adds a deep-replay path that, when a group_source is configured, replays historical groups from the source up to the in-RAM cache floor before continuing into the existing live group-serving loop. Tests cover retention override behavior and source plumbing.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the new pluggable GroupSource-backed deep replay window.
Description check ✅ Passed The description is related to the deep replay/backfill window and external backing-store approach in the changeset.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
✨ Finishing Touches
✨ Simplify code
  • Create PR with simplified code

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/moq-net/src/lite/publisher.rs`:
- Around line 563-566: The resolved subscribe start in run_subscribe is being
clamped too late, after SUBSCRIBE_OK has already been encoded, which can make
the acknowledged start differ from the first replayed group. Move the
source.oldest() clamp into the resolved-start calculation in run_subscribe, then
pass that resolved value through to run_track so both the advertised start and
the served replay start use the same group. Use the existing run_subscribe and
run_track flow, along with the requested and source.oldest() values, to keep the
start selection consistent before sending SUBSCRIBE_OK.
- Around line 589-598: After `Self::serve_group(...)` returns in `publisher.rs`,
update `last_sequence` to the replayed group’s sequence so historical replay is
reflected in the state used by `SUBSCRIBE_END`. The fix should be applied in the
replay-serving path around `serve_group`, using the same sequence value that was
just processed so the final end notification reports the last replayed sequence
instead of falling back to `start_group`.

In `@rs/moq-net/src/model/track.rs`:
- Around line 36-46: The GroupSource trait is exposing a blocking, infallible
read contract that is being called inline from run_track via oldest() and
group(), which can stall async work and hide read errors as None. Update
GroupSource to use a fallible, non-blocking polling/streaming-style API (or
clearly require non-blocking adapters) and adjust the publisher path
accordingly, including the GroupSource methods and any callers in run_track that
currently assume Option-based reads.
- Around line 730-741: The cache handoff logic in `Track::oldest()` is using the
minimum live sequence, which can incorrectly skip over tombstoned gaps when the
publisher resumes from cache. Update the handoff calculation in `oldest()` and
the related resume/start logic around `start_group` to be gap-aware, so it
returns the first sequence that will actually be emitted rather than just
`min(sequence)`. Make sure the cache boundary advances past any tombstones and
that the `GroupSource` fallback is used whenever the requested resume point is
not fully covered by contiguous cached entries.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c6f4cd25-485f-4b87-92ef-9f92e05a191f

📥 Commits

Reviewing files that changed from the base of the PR and between 8458b5a and d7433fc.

📒 Files selected for processing (2)
  • rs/moq-net/src/lite/publisher.rs
  • rs/moq-net/src/model/track.rs

Comment on lines +563 to +566
let mut next = match source.oldest() {
Some(oldest) => requested.max(oldest),
None => requested,
};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

Clamp the resolved start before sending SUBSCRIBE_OK.

This clamp happens after SUBSCRIBE_OK was already encoded, so a request older than source.oldest() can be acknowledged as starting at requested while the first replayed group starts at source.oldest(). Move this clamp into run_subscribe’s resolved-start calculation and pass that resolved value into run_track so the advertised and served start groups match.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-net/src/lite/publisher.rs` around lines 563 - 566, The resolved
subscribe start in run_subscribe is being clamped too late, after SUBSCRIBE_OK
has already been encoded, which can make the acknowledged start differ from the
first replayed group. Move the source.oldest() clamp into the resolved-start
calculation in run_subscribe, then pass that resolved value through to run_track
so both the advertised start and the served replay start use the same group. Use
the existing run_subscribe and run_track flow, along with the requested and
source.oldest() values, to keep the start selection consistent before sending
SUBSCRIBE_OK.

Comment on lines +589 to +598
Self::serve_group(
session.clone(),
msg,
handle,
producer.consume(),
track_stats.clone(),
track_priority.clone(),
version,
)
.await?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

Update last_sequence after serving replayed groups.

Historical groups served here are not reflected in last_sequence, so if replay succeeds and the live loop ends without serving a newer group, SUBSCRIBE_END can report start_group instead of the last replayed sequence.

🐛 Proposed fix
 						Self::serve_group(
 							session.clone(),
 							msg,
 							handle,
 							producer.consume(),
 							track_stats.clone(),
 							track_priority.clone(),
 							version,
 						)
 						.await?;
+						last_sequence = last_sequence.max(Some(next));
 					}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Self::serve_group(
session.clone(),
msg,
handle,
producer.consume(),
track_stats.clone(),
track_priority.clone(),
version,
)
.await?;
Self::serve_group(
session.clone(),
msg,
handle,
producer.consume(),
track_stats.clone(),
track_priority.clone(),
version,
)
.await?;
last_sequence = last_sequence.max(Some(next));
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-net/src/lite/publisher.rs` around lines 589 - 598, After
`Self::serve_group(...)` returns in `publisher.rs`, update `last_sequence` to
the replayed group’s sequence so historical replay is reflected in the state
used by `SUBSCRIBE_END`. The fix should be applied in the replay-serving path
around `serve_group`, using the same sequence value that was just processed so
the final end notification reports the last replayed sequence instead of falling
back to `start_group`.

Comment on lines +36 to +46
/// implementations must be `Send + Sync`. [`Self::group`] may block briefly on
/// I/O; the publisher only calls it on the backfill path, never in the hot live
/// loop.
pub trait GroupSource: Send + Sync {
/// Return the frames of the group with this sequence in order, or `None` if
/// it is not stored (never produced, or already evicted from the deep store).
fn group(&self, sequence: u64) -> Option<Vec<bytes::Bytes>>;

/// The oldest sequence the source can still serve, or `None` if it is empty.
/// Used to clamp a subscriber's requested `start_group` to what's available.
fn oldest(&self) -> Option<u64>;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | 🏗️ Heavy lift

Make GroupSource fallible and non-blocking before publishing the trait.

The publisher calls source.oldest() / source.group(next) inline from async run_track; this contract permits blocking I/O and collapses read failures into None, so a disk-backed source can stall the runtime or silently create replay gaps. Prefer a fallible poll/streaming API, or require non-blocking adapters, before this becomes public. As per coding guidelines, rs/**/*.rs: Prefer kio::Producer/Consumer and poll_* plumbing over tokio::sync primitives like channels or watch.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-net/src/model/track.rs` around lines 36 - 46, The GroupSource trait is
exposing a blocking, infallible read contract that is being called inline from
run_track via oldest() and group(), which can stall async work and hide read
errors as None. Update GroupSource to use a fallible, non-blocking
polling/streaming-style API (or clearly require non-blocking adapters) and
adjust the publisher path accordingly, including the GroupSource methods and any
callers in run_track that currently assume Option-based reads.

Source: Coding guidelines

Comment on lines +730 to +741
/// Return the oldest sequence still live in the in-RAM cache, or `None` if
/// the cache is empty. This is the floor below which a resuming subscriber
/// must be served from the [`GroupSource`] instead of the cache.
pub fn oldest(&self) -> Option<u64> {
self.state
.read()
.groups
.iter()
.flatten()
.map(|(group, _)| group.sequence)
.min()
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | 🏗️ Heavy lift

Don’t use the minimum live sequence as a cache handoff boundary.

oldest() returns min(sequence), but eviction can leave tombstoned holes above that minimum. The publisher uses this as if the cache serves [oldest..]; with a cache like [Some(5), None(3), Some(2)], start_group = 3 skips GroupSource and jumps to seq 5. Expose a gap-aware handoff, or replay until the next cached sequence that will actually be emitted and advance the live start accordingly.

Also applies to: 1431-1444

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-net/src/model/track.rs` around lines 730 - 741, The cache handoff
logic in `Track::oldest()` is using the minimum live sequence, which can
incorrectly skip over tombstoned gaps when the publisher resumes from cache.
Update the handoff calculation in `oldest()` and the related resume/start logic
around `start_group` to be gap-aware, so it returns the first sequence that will
actually be emitted rather than just `min(sequence)`. Make sure the cache
boundary advances past any tombstones and that the `GroupSource` fallback is
used whenever the requested resume point is not fully covered by contiguous
cached entries.

/// implementations must be `Send + Sync`. [`Self::group`] may block briefly on
/// I/O; the publisher only calls it on the backfill path, never in the hot live
/// loop.
pub trait GroupSource: Send + Sync {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm not a fan of "callback" traits like this. This one in particular requires all history to be synchronous (oof if you're using S3).

On dev there's a dynamic() method that allows you to receive requests for old groups.

@kixelated kixelated closed this Jul 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants