feat(net): pluggable GroupSource for deep out-of-RAM replay window#1971
feat(net): pluggable GroupSource for deep out-of-RAM replay window#1971iameli wants to merge 2 commits into
Conversation
Make the hardcoded 5s MAX_GROUP_AGE overridable per track, addressing the existing "TODO: Replace with a configurable cache size". Adds an Option<Duration> to the track State (default None -> the 5s const, so behavior is unchanged), a TrackProducer::set_max_group_age setter, and a unit test. Lets a publisher trade memory for a deeper late-join / replay window. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add a GroupSource trait so a publisher can serve groups older than the
in-RAM track cache from a pluggable (e.g. disk-backed) source, giving a
replay window deeper than memory allows without holding it all in RAM.
- GroupSource: { group(seq) -> Option<Vec<Bytes>>, oldest() } in
model/track.rs, plus State.group_source and
TrackProducer::set_group_source.
- TrackConsumer gains oldest() (the in-RAM cache floor) and
group_source().
- lite/publisher.rs run_track: when a subscriber's SUBSCRIBE(start_group=G)
predates the cache floor, serve [max(G, source.oldest())..cache_floor)
from the source one open uni-stream at a time (so the subscriber's flow
control paces the backfill), re-reading the floor each iteration so it
converges on the eviction boundary and hands off to the live loop
without a gap.
None (no source set) preserves existing behavior exactly: a start_group
below the cache simply jumps forward to the oldest cached group. The
trait is sync and only called on the backfill task, never the hot live
loop. Stacked on the configurable-retention change.
Motivation + end-to-end validation (a disk store serving a 72h resume
window) in streamplace/atmoq.
|
@sourcery-ai well now we gotta box 🦾 |
WalkthroughThis PR adds a 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/moq-net/src/lite/publisher.rs`:
- Around line 563-566: The resolved subscribe start in run_subscribe is being
clamped too late, after SUBSCRIBE_OK has already been encoded, which can make
the acknowledged start differ from the first replayed group. Move the
source.oldest() clamp into the resolved-start calculation in run_subscribe, then
pass that resolved value through to run_track so both the advertised start and
the served replay start use the same group. Use the existing run_subscribe and
run_track flow, along with the requested and source.oldest() values, to keep the
start selection consistent before sending SUBSCRIBE_OK.
- Around line 589-598: After `Self::serve_group(...)` returns in `publisher.rs`,
update `last_sequence` to the replayed group’s sequence so historical replay is
reflected in the state used by `SUBSCRIBE_END`. The fix should be applied in the
replay-serving path around `serve_group`, using the same sequence value that was
just processed so the final end notification reports the last replayed sequence
instead of falling back to `start_group`.
In `@rs/moq-net/src/model/track.rs`:
- Around line 36-46: The GroupSource trait is exposing a blocking, infallible
read contract that is being called inline from run_track via oldest() and
group(), which can stall async work and hide read errors as None. Update
GroupSource to use a fallible, non-blocking polling/streaming-style API (or
clearly require non-blocking adapters) and adjust the publisher path
accordingly, including the GroupSource methods and any callers in run_track that
currently assume Option-based reads.
- Around line 730-741: The cache handoff logic in `Track::oldest()` is using the
minimum live sequence, which can incorrectly skip over tombstoned gaps when the
publisher resumes from cache. Update the handoff calculation in `oldest()` and
the related resume/start logic around `start_group` to be gap-aware, so it
returns the first sequence that will actually be emitted rather than just
`min(sequence)`. Make sure the cache boundary advances past any tombstones and
that the `GroupSource` fallback is used whenever the requested resume point is
not fully covered by contiguous cached entries.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c6f4cd25-485f-4b87-92ef-9f92e05a191f
📒 Files selected for processing (2)
rs/moq-net/src/lite/publisher.rsrs/moq-net/src/model/track.rs
| let mut next = match source.oldest() { | ||
| Some(oldest) => requested.max(oldest), | ||
| None => requested, | ||
| }; |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Clamp the resolved start before sending SUBSCRIBE_OK.
This clamp happens after SUBSCRIBE_OK was already encoded, so a request older than source.oldest() can be acknowledged as starting at requested while the first replayed group starts at source.oldest(). Move this clamp into run_subscribe’s resolved-start calculation and pass that resolved value into run_track so the advertised and served start groups match.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-net/src/lite/publisher.rs` around lines 563 - 566, The resolved
subscribe start in run_subscribe is being clamped too late, after SUBSCRIBE_OK
has already been encoded, which can make the acknowledged start differ from the
first replayed group. Move the source.oldest() clamp into the resolved-start
calculation in run_subscribe, then pass that resolved value through to run_track
so both the advertised start and the served replay start use the same group. Use
the existing run_subscribe and run_track flow, along with the requested and
source.oldest() values, to keep the start selection consistent before sending
SUBSCRIBE_OK.
| Self::serve_group( | ||
| session.clone(), | ||
| msg, | ||
| handle, | ||
| producer.consume(), | ||
| track_stats.clone(), | ||
| track_priority.clone(), | ||
| version, | ||
| ) | ||
| .await?; |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Update last_sequence after serving replayed groups.
Historical groups served here are not reflected in last_sequence, so if replay succeeds and the live loop ends without serving a newer group, SUBSCRIBE_END can report start_group instead of the last replayed sequence.
🐛 Proposed fix
Self::serve_group(
session.clone(),
msg,
handle,
producer.consume(),
track_stats.clone(),
track_priority.clone(),
version,
)
.await?;
+ last_sequence = last_sequence.max(Some(next));
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Self::serve_group( | |
| session.clone(), | |
| msg, | |
| handle, | |
| producer.consume(), | |
| track_stats.clone(), | |
| track_priority.clone(), | |
| version, | |
| ) | |
| .await?; | |
| Self::serve_group( | |
| session.clone(), | |
| msg, | |
| handle, | |
| producer.consume(), | |
| track_stats.clone(), | |
| track_priority.clone(), | |
| version, | |
| ) | |
| .await?; | |
| last_sequence = last_sequence.max(Some(next)); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-net/src/lite/publisher.rs` around lines 589 - 598, After
`Self::serve_group(...)` returns in `publisher.rs`, update `last_sequence` to
the replayed group’s sequence so historical replay is reflected in the state
used by `SUBSCRIBE_END`. The fix should be applied in the replay-serving path
around `serve_group`, using the same sequence value that was just processed so
the final end notification reports the last replayed sequence instead of falling
back to `start_group`.
| /// implementations must be `Send + Sync`. [`Self::group`] may block briefly on | ||
| /// I/O; the publisher only calls it on the backfill path, never in the hot live | ||
| /// loop. | ||
| pub trait GroupSource: Send + Sync { | ||
| /// Return the frames of the group with this sequence in order, or `None` if | ||
| /// it is not stored (never produced, or already evicted from the deep store). | ||
| fn group(&self, sequence: u64) -> Option<Vec<bytes::Bytes>>; | ||
|
|
||
| /// The oldest sequence the source can still serve, or `None` if it is empty. | ||
| /// Used to clamp a subscriber's requested `start_group` to what's available. | ||
| fn oldest(&self) -> Option<u64>; |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | 🏗️ Heavy lift
Make GroupSource fallible and non-blocking before publishing the trait.
The publisher calls source.oldest() / source.group(next) inline from async run_track; this contract permits blocking I/O and collapses read failures into None, so a disk-backed source can stall the runtime or silently create replay gaps. Prefer a fallible poll/streaming API, or require non-blocking adapters, before this becomes public. As per coding guidelines, rs/**/*.rs: Prefer kio::Producer/Consumer and poll_* plumbing over tokio::sync primitives like channels or watch.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-net/src/model/track.rs` around lines 36 - 46, The GroupSource trait is
exposing a blocking, infallible read contract that is being called inline from
run_track via oldest() and group(), which can stall async work and hide read
errors as None. Update GroupSource to use a fallible, non-blocking
polling/streaming-style API (or clearly require non-blocking adapters) and
adjust the publisher path accordingly, including the GroupSource methods and any
callers in run_track that currently assume Option-based reads.
Source: Coding guidelines
| /// Return the oldest sequence still live in the in-RAM cache, or `None` if | ||
| /// the cache is empty. This is the floor below which a resuming subscriber | ||
| /// must be served from the [`GroupSource`] instead of the cache. | ||
| pub fn oldest(&self) -> Option<u64> { | ||
| self.state | ||
| .read() | ||
| .groups | ||
| .iter() | ||
| .flatten() | ||
| .map(|(group, _)| group.sequence) | ||
| .min() | ||
| } |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | 🏗️ Heavy lift
Don’t use the minimum live sequence as a cache handoff boundary.
oldest() returns min(sequence), but eviction can leave tombstoned holes above that minimum. The publisher uses this as if the cache serves [oldest..]; with a cache like [Some(5), None(3), Some(2)], start_group = 3 skips GroupSource and jumps to seq 5. Expose a gap-aware handoff, or replay until the next cached sequence that will actually be emitted and advance the live start accordingly.
Also applies to: 1431-1444
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-net/src/model/track.rs` around lines 730 - 741, The cache handoff
logic in `Track::oldest()` is using the minimum live sequence, which can
incorrectly skip over tombstoned gaps when the publisher resumes from cache.
Update the handoff calculation in `oldest()` and the related resume/start logic
around `start_group` to be gap-aware, so it returns the first sequence that will
actually be emitted rather than just `min(sequence)`. Make sure the cache
boundary advances past any tombstones and that the `GroupSource` fallback is
used whenever the requested resume point is not fully covered by contiguous
cached entries.
| /// implementations must be `Send + Sync`. [`Self::group`] may block briefly on | ||
| /// I/O; the publisher only calls it on the backfill path, never in the hot live | ||
| /// loop. | ||
| pub trait GroupSource: Send + Sync { |
There was a problem hiding this comment.
Yeah I'm not a fan of "callback" traits like this. This one in particular requires all history to be synchronous (oof if you're using S3).
On dev there's a dynamic() method that allows you to receive requests for old groups.
@kixelated So this is what Claude came up with for implementing atmoq's 72-hour replay window. Works, but let me know if this is completely wrongheaded and I'm happy to scrap it and do a FETCH-based backfill implementation instead. That said, this behavior is kinda neat, allows library consumers to bring their own backing store for use cases where you want to backfill older content right up to live.