Skip to content

feat(moq-cli): add --pace to emit TS output, following the live edge#1973

Open
kixelated wants to merge 11 commits into
mainfrom
claude/elastic-bell-28b97c
Open

feat(moq-cli): add --pace to emit TS output, following the live edge#1973
kixelated wants to merge 11 commits into
mainfrom
claude/elastic-bell-28b97c

Conversation

@kixelated

@kixelated kixelated commented Jun 30, 2026

Copy link
Copy Markdown
Collaborator

Summary

moq-cli subscribe --format ts writes a broadcast to stdout as fast as it can be read. This adds --pace to emit it at its real-time rate instead (like ffmpeg's -re), driven by a jitter-buffer pacer in ts::Export that follows the live edge with a bounded latency budget.

The pacer

Pacer maps a frame's decode timestamp to the wall-clock instant it's due, holding at most lead of buffer ahead of the live edge. When the source outruns that (a tune-in burst delivers a whole GOP at once, or it drifts ahead of wall-clock), it re-anchors to the edge so latency never grows past lead. A frame that merely trails the edge (jitter, a reordered B-frame) keeps its earlier instant.

This is the moq-srt egress pacer generalized:

  • lead = 0 → never lead now (the SRT receiver owns the jitter buffer). moq-srt's case.
  • lead > 0 → a sleeping caller holds that much buffer itself. moq-cli's case (lead = --max-latency).

So Output.pace:

  • streams a retained broadcast at media rate (the caller sleeps in lockstep, so the cap never trips → -re); and
  • keeps a live/bursty source within --max-latency of the edge, instead of anchoring on the first (oldest) frame and playing the tune-in burst out slowly as permanent latency.

It paces on the decode clock (DTS), which the muxer already authors from the catalog jitter reserve, so reordered B-frame streams pace evenly rather than bursting on their non-monotonic PTS. Export only computes the instant (no sleep), so moq-mux stays runtime-free; the caller owns the delay.

What changed

  • ts::Export::next() returns ts::Output ({ payload, timestamp, keyframe, pace }) instead of a bare container::Frame. --latency doubles as the pace buffer.
  • moq-cli --pace: sleep_until(frame.pace). Rejected (before connecting, via SubscribeArgs::validate()) for any non-ts format. Docs note --max-latency is the buffer.
  • moq-srt unified onto the shared pacer: its Export already runs at latency 0, so serve_subscribe just stamps each SRT payload with frame.pace. Its bespoke pace() + Paced + the standalone re-anchor test are deleted (logic moved to the moq-mux Pacer, which also upgrades it from PTS to decode-clock pacing).
  • container::Pacer is pub(crate)Export owns it; not public surface.

⚠️ Branch targeting

Includes a breaking moq-mux API change (ts::Export::next() return type container::Framets::Output). Per the repo rules that normally targets dev; there's precedent for landing ts::Export::next() signature changes on main (the earlier BytesFrame), so I've left the base as main — happy to retarget to dev.

Notes for reviewers

  • Public API: new pub struct ts::Output; ts::Export::{next, poll_next} return type changed (breaking); container::Pacer demoted pubpub(crate); new pub fn SubscribeArgs::validate(); new --pace flag.
  • moq-srt egress timing changes from PTS to decode-clock pacing (behaviorally equivalent re-anchor + a small correctness gain). Not re-verified against a live SRT receiver this session.

Test plan

  • cargo build --workspace clean
  • cargo test -p moq-mux -p moq-srt -p moq-cli (340 / 6 / 1; Pacer covers media-rate, re-anchor-past-budget, trailing-frame, zero-lead)
  • cargo clippy ... -D warnings + RUSTDOCFLAGS=-D warnings cargo doc --all-features clean (nix)
  • --pace --format fmp4 fails fast before connecting
  • Not exercised end-to-end against a live relay / SRT receiver this session

(Written by Claude Opus 4.8)

`moq-cli subscribe --format ts` writes a retained broadcast as fast as it can
be read. Add `--pace` to instead emit frames on the media clock (anchored to the
first frame), like ffmpeg's `-re`, which is what a downstream player or
re-publish expects.

The pacing itself is a reusable `moq_mux::container::Pacer` building block: it
maps each frame's presentation timestamp onto the wall clock and returns the
Instant the frame is due, leaving the clock read and the sleep to the caller so
it stays runtime-free. The anchor never moves, so a burst-read retained stream
still drains at its media rate; reordered B-frames map into the past and emit
immediately. (This is the opposite policy from moq-srt's live-edge stamper,
which re-anchors to the edge because the SRT receiver owns the jitter buffer.)

Only TS exposes per-frame timestamps today, so `--pace` with any other format is
rejected by `SubscribeArgs::validate()` before connecting to the relay.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@sourcery-ai sourcery-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @kixelated, you have reached your weekly rate limit of 500000 diff characters.

Please try again later or upgrade to continue using Sourcery

`due` now reads the clock itself, and only on the first frame: the anchor is the
only place `Instant::now()` is needed, so `get_or_insert_with` skips the call on
every later frame (which map off the anchor). The first call returns the anchor
instant, so the unit tests stay deterministic by reading it back rather than
stubbing the clock, keeping the public surface free of a tokio time type.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Warning

Review limit reached

@kixelated, you've reached your PR review limit, so we couldn't start this review.

Next review available in: 18 minutes

Enable usage-based reviews in Billing to review now. Otherwise, wait until the next included review is available.
You're only billed for reviews past your plan's rate limits ($0.25/file).

How can I continue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

To avoid repeated limits, reduce automatic review volume by pausing incremental auto-reviews earlier, using label-based review opt-in, excluding WIP or generated PR titles, or requesting reviews manually when the PR is ready. If your team needs uninterrupted high-volume reviews, an organization admin can enable usage-based reviews.

How do review limits work?

CodeRabbit enforces per-developer PR review limits for each organization. Most developers receive the normal plan review availability.

For paid Pro and Pro+ PR reviews, CodeRabbit uses adaptive limits for sustained high-volume activity. When a developer's recent PR review activity reaches the 95th percentile or higher among CodeRabbit users, additional reviews become available more gradually as earlier reviews age out of the rolling window.

Please refer docs for additional details.

Review details
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e8d69a8a-2a7c-4e7b-9867-5efba7310073

📥 Commits

Reviewing files that changed from the base of the PR and between bb50f99 and 84fc0dc.

📒 Files selected for processing (7)
  • demo/pub/justfile
  • rs/moq-mux/src/container/ts/export.rs
  • rs/moq-native/src/server.rs
  • rs/moq-srt/README.md
  • rs/moq-srt/bin/moq-srt.rs
  • rs/moq-srt/src/server.rs
  • rs/moq-srt/src/ts.rs

Walkthrough

This change adds real-time pacing support for MPEG-TS subscriptions. A new Pacer type in moq-mux maps media timestamps to wall-clock instants, and TS export now returns Output values that include a pacing instant. In moq-cli, SubscribeArgs gains a --pace flag with validation that restricts it to TS format; run_subscribe validates arguments and run_ts sleeps until each frame's pacing instant when enabled. moq-srt now consumes ts::Output and uses its pacing instant, and the documentation adds retained-broadcast pacing behavior plus an FFmpeg/UDP example.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly matches the main change: adding --pace for TS output and following the live edge.
Description check ✅ Passed The description is detailed and directly describes the pacing changes, CLI flag, docs, and related API updates.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
✨ Finishing Touches
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch claude/elastic-bell-28b97c

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
rs/moq-cli/src/subscribe.rs (1)

73-81: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Consider a unit test for validate().

A quick inline test (pace=true, format=Ts → Ok; pace=true, format!=Ts → Err) would lock in this contract cheaply, consistent with the existing pattern of inline #[cfg(test)] tests elsewhere in the crate.

✅ Example test
#[cfg(test)]
mod tests {
	use super::*;

	#[test]
	fn pace_requires_ts_format() {
		let mut args = SubscribeArgs {
			format: SubscribeFormat::Fmp4,
			max_latency: Duration::from_millis(500),
			pace: true,
			fragment_duration: None,
			catalog: None,
		};
		assert!(args.validate().is_err());

		args.format = SubscribeFormat::Ts;
		assert!(args.validate().is_ok());
	}
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-cli/src/subscribe.rs` around lines 73 - 81, Add a small unit test for
SubscribeArgs::validate to lock in the pace/format contract: verify that
pace=true with SubscribeFormat::Ts returns Ok, and pace=true with any non-Ts
format returns Err. Place it in an inline #[cfg(test)] mod next to validate(),
following the crate’s existing test style, and reference SubscribeArgs,
SubscribeFormat, and validate() so the behavior stays covered.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@rs/moq-cli/src/subscribe.rs`:
- Around line 73-81: Add a small unit test for SubscribeArgs::validate to lock
in the pace/format contract: verify that pace=true with SubscribeFormat::Ts
returns Ok, and pace=true with any non-Ts format returns Err. Place it in an
inline #[cfg(test)] mod next to validate(), following the crate’s existing test
style, and reference SubscribeArgs, SubscribeFormat, and validate() so the
behavior stays covered.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f8c656ba-f455-4c1f-9525-65595a4316d3

📥 Commits

Reviewing files that changed from the base of the PR and between e334822 and ffc5c1b.

📒 Files selected for processing (5)
  • doc/bin/cli.md
  • rs/moq-cli/src/main.rs
  • rs/moq-cli/src/subscribe.rs
  • rs/moq-mux/src/container/mod.rs
  • rs/moq-mux/src/container/pace.rs

kixelated and others added 2 commits June 30, 2026 15:55
Lock in that SubscribeArgs::validate accepts --pace only with --format ts.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Pacing on the raw frame PTS was wrong for reordered (B-frame) streams: the
muxer emits in decode order, where the PTS is non-monotonic, so the per-frame
schedule came out bursty within the reorder window (bounded by the catalog
`jitter`).

ts::Export now paces on its internal decode clock (the authored DTS, seeded
from the catalog jitter reserve), which is monotonic. It does not sleep; it
hands back the due wall-clock instant on a new `ts::Output` struct (replacing
the bare container::Frame return) so the caller decides whether to honor it.
This keeps moq-mux runtime-free (no tokio time dependency).

moq-cli `--pace` now just sleeps until `frame.pace`. The standalone
container::Pacer is demoted to pub(crate) (Export owns its use). moq-srt's
Subscriber keeps its own live-edge stamper (its re-anchor policy differs) and
only needed the return-type rename.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
rs/moq-mux/src/container/ts/export.rs (1)

246-246: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Document poll_next.

poll_next is public and newly changed here, but it has no rustdoc contract like next. Add a short doc comment or reduce the visibility if it is not meant to be exported. As per coding guidelines, **/*.{rs,ts,tsx,js,jsx}: Document every exported public API symbol.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-mux/src/container/ts/export.rs` at line 246, The public API method
poll_next on Export needs rustdoc because it is exported and newly changed; add
a short doc comment that matches the contract style used for next, or reduce its
visibility if it is not intended to be public. Update the documentation directly
on Export::poll_next so the exported symbol is covered by the project’s public
API docs requirement.

Source: Coding guidelines

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/moq-mux/src/container/ts/export.rs`:
- Around line 236-246: `Export::next` and `Export::poll_next` are making a
breaking public API change by switching their return type from `Option<Frame>`
to `Option<Output>`. Restore the existing `Frame`-based signatures for the
current public methods, and introduce a separate paced-output API (using
`Output` and its pacing fields) so callers can opt in without breaking
`rs/moq-srt/src/ts.rs` and other downstream users. Keep the new pacing behavior
additive by routing it through a new method or type while preserving the
existing `Export::next`/`poll_next` contract.

In `@rs/moq-srt/src/ts.rs`:
- Line 90: The public Subscriber::next API is leaking
moq_mux::container::ts::Output through moq-srt’s boundary, so update
ts::Subscriber::next to return the local moq-srt Output type instead. Keep the
conversion from the muxer’s ts::Output to moq-srt::Output inside this module,
and adjust any internal call sites or helper logic so the dependency type
remains encapsulated behind the moq-srt layer.

---

Nitpick comments:
In `@rs/moq-mux/src/container/ts/export.rs`:
- Line 246: The public API method poll_next on Export needs rustdoc because it
is exported and newly changed; add a short doc comment that matches the contract
style used for next, or reduce its visibility if it is not intended to be
public. Update the documentation directly on Export::poll_next so the exported
symbol is covered by the project’s public API docs requirement.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 78c89b1a-c103-482a-bf8b-812bb60006ec

📥 Commits

Reviewing files that changed from the base of the PR and between ffc5c1b and 72b229c.

📒 Files selected for processing (5)
  • rs/moq-cli/src/subscribe.rs
  • rs/moq-mux/src/container/mod.rs
  • rs/moq-mux/src/container/pace.rs
  • rs/moq-mux/src/container/ts/export.rs
  • rs/moq-srt/src/ts.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • rs/moq-mux/src/container/mod.rs

Comment thread rs/moq-mux/src/container/ts/export.rs
Comment thread rs/moq-srt/src/ts.rs
…ency budget

Rework the pacer into a jitter-buffer model that holds at most `lead` of buffer
ahead of the live edge, re-anchoring past that so a tune-in burst or a
faster-than-real source never accrues unbounded latency. This is the moq-srt
egress pacer generalized: `lead = 0` is "never lead now" (the SRT receiver owns
the buffer), `lead > 0` lets a sleeping caller hold that much buffer itself.

ts::Export feeds the pacer its `--latency` budget, so `Output.pace`:
- streams a retained broadcast at media rate (like `-re`) when the caller sleeps
  in lockstep, the cap never trips;
- keeps a live or bursty source within `latency` of the edge instead of playing
  the initial GOP burst out slowly as permanent latency.

moq-srt now shares this pacer: its Export already runs at latency 0, so it just
stamps each SRT payload with `frame.pace` and its bespoke `pace()` + `Paced` +
the standalone re-anchor test are deleted (the logic moved to the moq-mux Pacer,
which also paces on the decode clock, a small improvement over moq-srt's PTS).

moq-cli `--pace` documents that `--max-latency` is the pace buffer.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@kixelated kixelated changed the title feat(moq-cli): add --pace to emit TS output at real-time media rate feat(moq-cli): add --pace to emit TS output, following the live edge Jun 30, 2026
kixelated and others added 2 commits June 30, 2026 16:41
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The pace buffer and the read/skip budget were both driven by the muxer's single
`latency`, so moq-srt could only get its required pace lead of 0 by leaving the
read budget at 0 too. Decouple them: `ts::Export` gains `with_pace_lead`, which
overrides the pace buffer independently of `with_latency`.

moq-srt now sets the read/skip budget to the connection's negotiated SRT latency
(`socket.settings().send_tsbpd_latency`, the max of our floor and the caller's
`?latency`), so a track tolerates the same jitter the receiver's TSBPD does,
while pinning the pace lead to zero. Pace lead must stay 0 for SRT: the receiver
owns the jitter buffer, and a muxer lead would just stack on top of TSBPD and
double the end-to-end latency.

moq-cli is unchanged: it leaves the pace lead following `--max-latency`.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
rs/moq-mux/src/container/ts/export.rs (1)

141-166: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Make Output non-exhaustive before publishing it.

Output is a newly exposed return type; leaving it exhaustive makes any future field addition another breaking change. Since callers receive it from Export::next, #[non_exhaustive] keeps field access while preventing exhaustive construction/matching outside the crate.

♻️ Proposed API hardening
+#[non_exhaustive]
 pub struct Output {

As per coding guidelines, “Before exposing a new public type, function, field, or enum variant, prefer smaller, more future-proof API shapes such as ... non-exhaustive public types when appropriate.”

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-mux/src/container/ts/export.rs` around lines 141 - 166, Mark the
public Output type in export::Export::next as non-exhaustive so future fields
can be added without breaking callers; update the Output definition to use
#[non_exhaustive] while keeping its existing public fields (payload, timestamp,
keyframe, pace) accessible for read-only use.

Source: Coding guidelines

rs/moq-srt/src/ts.rs (1)

97-99: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Update next docs to mention the pacing instant.

The returned ts::Output now carries pace; the current text still describes only TS bytes plus media timestamp.

📝 Proposed doc update
-	/// Pull the next muxed frame (TS bytes + media timestamp), or `None` once the
-	/// broadcast ends.
+	/// Pull the next muxed frame (TS bytes, media timestamp, and pacing instant),
+	/// or `None` once the broadcast ends.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-srt/src/ts.rs` around lines 97 - 99, The doc comment for
`ts::Muxer::next` is outdated because `ts::Output` now includes a pacing instant
in `pace` in addition to TS bytes and media timestamp. Update the `next`
documentation to mention that the returned `ts::Output` carries pacing
information, and keep the wording aligned with the `ts::Output` struct’s current
fields so callers understand all values they receive.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@rs/moq-mux/src/container/ts/export.rs`:
- Around line 141-166: Mark the public Output type in export::Export::next as
non-exhaustive so future fields can be added without breaking callers; update
the Output definition to use #[non_exhaustive] while keeping its existing public
fields (payload, timestamp, keyframe, pace) accessible for read-only use.

In `@rs/moq-srt/src/ts.rs`:
- Around line 97-99: The doc comment for `ts::Muxer::next` is outdated because
`ts::Output` now includes a pacing instant in `pace` in addition to TS bytes and
media timestamp. Update the `next` documentation to mention that the returned
`ts::Output` carries pacing information, and keep the wording aligned with the
`ts::Output` struct’s current fields so callers understand all values they
receive.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 4799c6cf-4def-412b-92ba-b12c03fca476

📥 Commits

Reviewing files that changed from the base of the PR and between 5cfa011 and bb50f99.

📒 Files selected for processing (3)
  • rs/moq-mux/src/container/ts/export.rs
  • rs/moq-srt/src/server.rs
  • rs/moq-srt/src/ts.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • rs/moq-srt/src/server.rs

kixelated and others added 4 commits June 30, 2026 19:47
The egress read/skip budget was derived from the per-connection negotiated SRT
latency (`socket.settings().send_tsbpd_latency`), which rises with a caller's
`?latency`. But the gateway's latency is a fixed policy, not a per-caller knob,
so use the configured value: thread `Server`'s latency through to the egress
muxer instead of reading it off the socket.

Rename the flag `--srt-latency` -> `--latency` to match (the `srt-` prefix is
redundant on the moq-srt binary, and it's a fixed value, not a per-call max).
`--srt-latency` stays a hidden alias for back-compat.

A caller's `?latency` still raises its own SRT receive buffer as before; it just
no longer moves the gateway-side read budget.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Keep future fields on the new public ts::Output additive, and mention the pacing
instant in Subscriber::next's docs.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Unlike --latency (free), --listen collides with moq-native's --server-bind, which
aliases --listen and is flattened into the same subcommand. Keep the srt- prefix
on the listener flags to disambiguate, and note why in the help text.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
`moq-native`'s `--server-bind` aliased the bare `--listen`, which forced the SRT
listener to stay `--srt-listen`. Rename that clap alias to `--server-listen` (in
keeping with the `--server-*` namespace) so `--listen` is free. The TOML
`listen` key is untouched (kept as a serde alias), so relay configs keep working.

With `--listen` free, drop the redundant `srt-` prefix from the moq-srt listener
flags: `--srt-listen` -> `--listen`, `--srt-prefix` -> `--prefix` (both keep the
old names as hidden aliases). `--latency` was already de-prefixed.

Updated the one CLI site that used the old alias (demo/pub/justfile: `serve
--listen` -> `--server-listen`) and the moq-srt README examples. moq-rtc/moq-cli
`--listen` are their own separate flags and are unaffected.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant