feat(moq-cli): add --pace to emit TS output, following the live edge#1973
feat(moq-cli): add --pace to emit TS output, following the live edge#1973kixelated wants to merge 11 commits into
Conversation
`moq-cli subscribe --format ts` writes a retained broadcast as fast as it can be read. Add `--pace` to instead emit frames on the media clock (anchored to the first frame), like ffmpeg's `-re`, which is what a downstream player or re-publish expects. The pacing itself is a reusable `moq_mux::container::Pacer` building block: it maps each frame's presentation timestamp onto the wall clock and returns the Instant the frame is due, leaving the clock read and the sleep to the caller so it stays runtime-free. The anchor never moves, so a burst-read retained stream still drains at its media rate; reordered B-frames map into the past and emit immediately. (This is the opposite policy from moq-srt's live-edge stamper, which re-anchors to the edge because the SRT receiver owns the jitter buffer.) Only TS exposes per-frame timestamps today, so `--pace` with any other format is rejected by `SubscribeArgs::validate()` before connecting to the relay. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Sorry @kixelated, you have reached your weekly rate limit of 500000 diff characters.
Please try again later or upgrade to continue using Sourcery
`due` now reads the clock itself, and only on the first frame: the anchor is the only place `Instant::now()` is needed, so `get_or_insert_with` skips the call on every later frame (which map off the anchor). The first call returns the anchor instant, so the unit tests stay deterministic by reading it back rather than stubbing the clock, keeping the public surface free of a tokio time type. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Warning Review limit reached
Next review available in: 18 minutes Enable usage-based reviews in Billing to review now. Otherwise, wait until the next included review is available. How can I continue?After more reviews become available, a review can be triggered using the To avoid repeated limits, reduce automatic review volume by pausing incremental auto-reviews earlier, using label-based review opt-in, excluding WIP or generated PR titles, or requesting reviews manually when the PR is ready. If your team needs uninterrupted high-volume reviews, an organization admin can enable usage-based reviews. How do review limits work?CodeRabbit enforces per-developer PR review limits for each organization. Most developers receive the normal plan review availability. For paid Pro and Pro+ PR reviews, CodeRabbit uses adaptive limits for sustained high-volume activity. When a developer's recent PR review activity reaches the 95th percentile or higher among CodeRabbit users, additional reviews become available more gradually as earlier reviews age out of the rolling window. Please refer docs for additional details. Review details⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (7)
WalkthroughThis change adds real-time pacing support for MPEG-TS subscriptions. A new 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
rs/moq-cli/src/subscribe.rs (1)
73-81: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winConsider a unit test for
validate().A quick inline test (
pace=true, format=Ts→ Ok;pace=true, format!=Ts→ Err) would lock in this contract cheaply, consistent with the existing pattern of inline#[cfg(test)]tests elsewhere in the crate.✅ Example test
#[cfg(test)] mod tests { use super::*; #[test] fn pace_requires_ts_format() { let mut args = SubscribeArgs { format: SubscribeFormat::Fmp4, max_latency: Duration::from_millis(500), pace: true, fragment_duration: None, catalog: None, }; assert!(args.validate().is_err()); args.format = SubscribeFormat::Ts; assert!(args.validate().is_ok()); } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-cli/src/subscribe.rs` around lines 73 - 81, Add a small unit test for SubscribeArgs::validate to lock in the pace/format contract: verify that pace=true with SubscribeFormat::Ts returns Ok, and pace=true with any non-Ts format returns Err. Place it in an inline #[cfg(test)] mod next to validate(), following the crate’s existing test style, and reference SubscribeArgs, SubscribeFormat, and validate() so the behavior stays covered.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@rs/moq-cli/src/subscribe.rs`:
- Around line 73-81: Add a small unit test for SubscribeArgs::validate to lock
in the pace/format contract: verify that pace=true with SubscribeFormat::Ts
returns Ok, and pace=true with any non-Ts format returns Err. Place it in an
inline #[cfg(test)] mod next to validate(), following the crate’s existing test
style, and reference SubscribeArgs, SubscribeFormat, and validate() so the
behavior stays covered.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f8c656ba-f455-4c1f-9525-65595a4316d3
📒 Files selected for processing (5)
doc/bin/cli.mdrs/moq-cli/src/main.rsrs/moq-cli/src/subscribe.rsrs/moq-mux/src/container/mod.rsrs/moq-mux/src/container/pace.rs
Lock in that SubscribeArgs::validate accepts --pace only with --format ts. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Pacing on the raw frame PTS was wrong for reordered (B-frame) streams: the muxer emits in decode order, where the PTS is non-monotonic, so the per-frame schedule came out bursty within the reorder window (bounded by the catalog `jitter`). ts::Export now paces on its internal decode clock (the authored DTS, seeded from the catalog jitter reserve), which is monotonic. It does not sleep; it hands back the due wall-clock instant on a new `ts::Output` struct (replacing the bare container::Frame return) so the caller decides whether to honor it. This keeps moq-mux runtime-free (no tokio time dependency). moq-cli `--pace` now just sleeps until `frame.pace`. The standalone container::Pacer is demoted to pub(crate) (Export owns its use). moq-srt's Subscriber keeps its own live-edge stamper (its re-anchor policy differs) and only needed the return-type rename. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
rs/moq-mux/src/container/ts/export.rs (1)
246-246: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winDocument
poll_next.
poll_nextis public and newly changed here, but it has no rustdoc contract likenext. Add a short doc comment or reduce the visibility if it is not meant to be exported. As per coding guidelines,**/*.{rs,ts,tsx,js,jsx}: Document every exported public API symbol.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-mux/src/container/ts/export.rs` at line 246, The public API method poll_next on Export needs rustdoc because it is exported and newly changed; add a short doc comment that matches the contract style used for next, or reduce its visibility if it is not intended to be public. Update the documentation directly on Export::poll_next so the exported symbol is covered by the project’s public API docs requirement.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/moq-mux/src/container/ts/export.rs`:
- Around line 236-246: `Export::next` and `Export::poll_next` are making a
breaking public API change by switching their return type from `Option<Frame>`
to `Option<Output>`. Restore the existing `Frame`-based signatures for the
current public methods, and introduce a separate paced-output API (using
`Output` and its pacing fields) so callers can opt in without breaking
`rs/moq-srt/src/ts.rs` and other downstream users. Keep the new pacing behavior
additive by routing it through a new method or type while preserving the
existing `Export::next`/`poll_next` contract.
In `@rs/moq-srt/src/ts.rs`:
- Line 90: The public Subscriber::next API is leaking
moq_mux::container::ts::Output through moq-srt’s boundary, so update
ts::Subscriber::next to return the local moq-srt Output type instead. Keep the
conversion from the muxer’s ts::Output to moq-srt::Output inside this module,
and adjust any internal call sites or helper logic so the dependency type
remains encapsulated behind the moq-srt layer.
---
Nitpick comments:
In `@rs/moq-mux/src/container/ts/export.rs`:
- Line 246: The public API method poll_next on Export needs rustdoc because it
is exported and newly changed; add a short doc comment that matches the contract
style used for next, or reduce its visibility if it is not intended to be
public. Update the documentation directly on Export::poll_next so the exported
symbol is covered by the project’s public API docs requirement.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 78c89b1a-c103-482a-bf8b-812bb60006ec
📒 Files selected for processing (5)
rs/moq-cli/src/subscribe.rsrs/moq-mux/src/container/mod.rsrs/moq-mux/src/container/pace.rsrs/moq-mux/src/container/ts/export.rsrs/moq-srt/src/ts.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- rs/moq-mux/src/container/mod.rs
…ency budget Rework the pacer into a jitter-buffer model that holds at most `lead` of buffer ahead of the live edge, re-anchoring past that so a tune-in burst or a faster-than-real source never accrues unbounded latency. This is the moq-srt egress pacer generalized: `lead = 0` is "never lead now" (the SRT receiver owns the buffer), `lead > 0` lets a sleeping caller hold that much buffer itself. ts::Export feeds the pacer its `--latency` budget, so `Output.pace`: - streams a retained broadcast at media rate (like `-re`) when the caller sleeps in lockstep, the cap never trips; - keeps a live or bursty source within `latency` of the edge instead of playing the initial GOP burst out slowly as permanent latency. moq-srt now shares this pacer: its Export already runs at latency 0, so it just stamps each SRT payload with `frame.pace` and its bespoke `pace()` + `Paced` + the standalone re-anchor test are deleted (the logic moved to the moq-mux Pacer, which also paces on the decode clock, a small improvement over moq-srt's PTS). moq-cli `--pace` documents that `--max-latency` is the pace buffer. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The pace buffer and the read/skip budget were both driven by the muxer's single `latency`, so moq-srt could only get its required pace lead of 0 by leaving the read budget at 0 too. Decouple them: `ts::Export` gains `with_pace_lead`, which overrides the pace buffer independently of `with_latency`. moq-srt now sets the read/skip budget to the connection's negotiated SRT latency (`socket.settings().send_tsbpd_latency`, the max of our floor and the caller's `?latency`), so a track tolerates the same jitter the receiver's TSBPD does, while pinning the pace lead to zero. Pace lead must stay 0 for SRT: the receiver owns the jitter buffer, and a muxer lead would just stack on top of TSBPD and double the end-to-end latency. moq-cli is unchanged: it leaves the pace lead following `--max-latency`. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (2)
rs/moq-mux/src/container/ts/export.rs (1)
141-166: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winMake
Outputnon-exhaustive before publishing it.
Outputis a newly exposed return type; leaving it exhaustive makes any future field addition another breaking change. Since callers receive it fromExport::next,#[non_exhaustive]keeps field access while preventing exhaustive construction/matching outside the crate.♻️ Proposed API hardening
+#[non_exhaustive] pub struct Output {As per coding guidelines, “Before exposing a new public type, function, field, or enum variant, prefer smaller, more future-proof API shapes such as ... non-exhaustive public types when appropriate.”
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-mux/src/container/ts/export.rs` around lines 141 - 166, Mark the public Output type in export::Export::next as non-exhaustive so future fields can be added without breaking callers; update the Output definition to use #[non_exhaustive] while keeping its existing public fields (payload, timestamp, keyframe, pace) accessible for read-only use.Source: Coding guidelines
rs/moq-srt/src/ts.rs (1)
97-99: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winUpdate
nextdocs to mention the pacing instant.The returned
ts::Outputnow carriespace; the current text still describes only TS bytes plus media timestamp.📝 Proposed doc update
- /// Pull the next muxed frame (TS bytes + media timestamp), or `None` once the - /// broadcast ends. + /// Pull the next muxed frame (TS bytes, media timestamp, and pacing instant), + /// or `None` once the broadcast ends.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-srt/src/ts.rs` around lines 97 - 99, The doc comment for `ts::Muxer::next` is outdated because `ts::Output` now includes a pacing instant in `pace` in addition to TS bytes and media timestamp. Update the `next` documentation to mention that the returned `ts::Output` carries pacing information, and keep the wording aligned with the `ts::Output` struct’s current fields so callers understand all values they receive.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@rs/moq-mux/src/container/ts/export.rs`:
- Around line 141-166: Mark the public Output type in export::Export::next as
non-exhaustive so future fields can be added without breaking callers; update
the Output definition to use #[non_exhaustive] while keeping its existing public
fields (payload, timestamp, keyframe, pace) accessible for read-only use.
In `@rs/moq-srt/src/ts.rs`:
- Around line 97-99: The doc comment for `ts::Muxer::next` is outdated because
`ts::Output` now includes a pacing instant in `pace` in addition to TS bytes and
media timestamp. Update the `next` documentation to mention that the returned
`ts::Output` carries pacing information, and keep the wording aligned with the
`ts::Output` struct’s current fields so callers understand all values they
receive.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4799c6cf-4def-412b-92ba-b12c03fca476
📒 Files selected for processing (3)
rs/moq-mux/src/container/ts/export.rsrs/moq-srt/src/server.rsrs/moq-srt/src/ts.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- rs/moq-srt/src/server.rs
The egress read/skip budget was derived from the per-connection negotiated SRT latency (`socket.settings().send_tsbpd_latency`), which rises with a caller's `?latency`. But the gateway's latency is a fixed policy, not a per-caller knob, so use the configured value: thread `Server`'s latency through to the egress muxer instead of reading it off the socket. Rename the flag `--srt-latency` -> `--latency` to match (the `srt-` prefix is redundant on the moq-srt binary, and it's a fixed value, not a per-call max). `--srt-latency` stays a hidden alias for back-compat. A caller's `?latency` still raises its own SRT receive buffer as before; it just no longer moves the gateway-side read budget. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Keep future fields on the new public ts::Output additive, and mention the pacing instant in Subscriber::next's docs. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Unlike --latency (free), --listen collides with moq-native's --server-bind, which aliases --listen and is flattened into the same subcommand. Keep the srt- prefix on the listener flags to disambiguate, and note why in the help text. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
`moq-native`'s `--server-bind` aliased the bare `--listen`, which forced the SRT listener to stay `--srt-listen`. Rename that clap alias to `--server-listen` (in keeping with the `--server-*` namespace) so `--listen` is free. The TOML `listen` key is untouched (kept as a serde alias), so relay configs keep working. With `--listen` free, drop the redundant `srt-` prefix from the moq-srt listener flags: `--srt-listen` -> `--listen`, `--srt-prefix` -> `--prefix` (both keep the old names as hidden aliases). `--latency` was already de-prefixed. Updated the one CLI site that used the old alias (demo/pub/justfile: `serve --listen` -> `--server-listen`) and the moq-srt README examples. moq-rtc/moq-cli `--listen` are their own separate flags and are unaffected. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Summary
moq-cli subscribe --format tswrites a broadcast to stdout as fast as it can be read. This adds--paceto emit it at its real-time rate instead (like ffmpeg's-re), driven by a jitter-buffer pacer ints::Exportthat follows the live edge with a bounded latency budget.The pacer
Pacermaps a frame's decode timestamp to the wall-clock instant it's due, holding at mostleadof buffer ahead of the live edge. When the source outruns that (a tune-in burst delivers a whole GOP at once, or it drifts ahead of wall-clock), it re-anchors to the edge so latency never grows pastlead. A frame that merely trails the edge (jitter, a reordered B-frame) keeps its earlier instant.This is the moq-srt egress pacer generalized:
lead = 0→ never lead now (the SRT receiver owns the jitter buffer). moq-srt's case.lead > 0→ a sleeping caller holds that much buffer itself. moq-cli's case (lead = --max-latency).So
Output.pace:-re); and--max-latencyof the edge, instead of anchoring on the first (oldest) frame and playing the tune-in burst out slowly as permanent latency.It paces on the decode clock (DTS), which the muxer already authors from the catalog
jitterreserve, so reordered B-frame streams pace evenly rather than bursting on their non-monotonic PTS.Exportonly computes the instant (no sleep), so moq-mux stays runtime-free; the caller owns the delay.What changed
ts::Export::next()returnsts::Output({ payload, timestamp, keyframe, pace }) instead of a barecontainer::Frame.--latencydoubles as the pace buffer.--pace:sleep_until(frame.pace). Rejected (before connecting, viaSubscribeArgs::validate()) for any non-tsformat. Docs note--max-latencyis the buffer.Exportalready runs at latency 0, soserve_subscribejust stamps each SRT payload withframe.pace. Its bespokepace()+Paced+ the standalone re-anchor test are deleted (logic moved to the moq-muxPacer, which also upgrades it from PTS to decode-clock pacing).container::Pacerispub(crate)—Exportowns it; not public surface.Includes a breaking moq-mux API change (
ts::Export::next()return typecontainer::Frame→ts::Output). Per the repo rules that normally targets dev; there's precedent for landingts::Export::next()signature changes onmain(the earlierBytes→Frame), so I've left the base asmain— happy to retarget todev.Notes for reviewers
pub struct ts::Output;ts::Export::{next, poll_next}return type changed (breaking);container::Pacerdemotedpub→pub(crate); newpub fn SubscribeArgs::validate(); new--paceflag.Test plan
cargo build --workspacecleancargo test -p moq-mux -p moq-srt -p moq-cli(340 / 6 / 1; Pacer covers media-rate, re-anchor-past-budget, trailing-frame, zero-lead)cargo clippy ... -D warnings+RUSTDOCFLAGS=-D warnings cargo doc --all-featuresclean (nix)--pace --format fmp4fails fast before connecting(Written by Claude Opus 4.8)