Data tracks UniFFI#1034
Conversation
| } | ||
|
|
||
| /// Unpublishes the track. | ||
| pub fn unpublish(&self) { |
There was a problem hiding this comment.
I like the new APIs a lot, the only asymmetry for me is that you cannot surface the fact that unpublished track is "invalid" just via language constructs ("consuming", scopes, etc.).
I'm thinking of sth like:
public func withDataTrack<T>(
name: String,
body: (LocalDataTrack) async throws -> T
) async throws -> T {
let track = try await publishDataTrack(name: name)
defer { track.unpublish() }
return try await body(track)
}So you can pipe streams e.g.
// Instead of handing back an object that can go stale
try await localParticipant.withDataTrack(name: "game-state") { track in
// track is guaranteed valid here
for await state in gameStates {
track.tryPush(frame: state.toFrame())
}
}
// automatically unpublished when scope exits (cancellation, throw, or return)Or even:
// Publishing: AsyncSequence → DataTrack (sink)
try await localParticipant.publishDataTrack(name: "game", sending:
gameLoop.map { $0.toFrame() }
)
// Subscribing: DataTrack → AsyncSequence (source)
for await frame in stream {
process(frame)
}It's purely additive, so take it with a grain of 🧂 but IMO provides a great DX on top of data tracks if you stick to certain stream-like constructs.
There was a problem hiding this comment.
Generally makes sense to me! Though IMO since this is somewhat swift specific, maybe this makes sense to expose in the swift specific code above this rather than in the uniffi definition. Also because I don't think sending a closure over uniffi is possible.
There was a problem hiding this comment.
rather than in the uniffi definition
yes, I'm talking purely about wrapping here.
There was a problem hiding this comment.
Something to be aware of is the client doesn't have sole control over the publish state of a data track (the SFU reserves the right to unpublish at any time without the client initiating it) which is why I didn't do something similar in Rust (e.g., modeling with type state).
There was a problem hiding this comment.
Ah so if someone "unpublished it for me" it will just throw and break the loop?
| #[uniffi::export] | ||
| impl DataTrackStream { | ||
| /// Returns the next received frame or `None` if the subscription has ended. | ||
| pub async fn next(&self) -> Option<DataTrackFrame> { |
There was a problem hiding this comment.
This could be mapped in a generic way; the impl is trivial in Swift, basically using AsyncStream(unfolding: next), as we do for logs.
|
|
||
| use bytes::Bytes; | ||
|
|
||
| uniffi::custom_type!(Bytes, Vec<u8>, { remote }); |
There was a problem hiding this comment.
General: do we plan to introduce some helpers for encoding/decoding into that or leaving that to experienced users?
There was a problem hiding this comment.
This macro handles the conversion from Bytes to Vec<u8> automatically (under the hood it implements impl uniffi::FfiConverter<crate::UniFfiTag> for Bytes), so Swift can just pass Data as expected wherever the API on the Rust side accepts Bytes.
There was a problem hiding this comment.
Yeah, I just mean "something higher-level than Data" like Encodable types (JSON etc.)
| pub(crate) payload: Bytes, | ||
| pub(crate) user_timestamp: Option<u64>, | ||
| pub payload: Bytes, | ||
| pub user_timestamp: Option<u64>, |
There was a problem hiding this comment.
Nit: shall we expose duration_since_timestamp e.g. for benchmarks?
There was a problem hiding this comment.
If this is not an uniffi::Object we won't get withTimestampNow etc. for free, right? As it basically maps to raw struct:
public struct DataTrackFrame: Equatable, Hashable, Sendable {
public var payload: Data
public var userTimestamp: UInt64?
}There was a problem hiding this comment.
Applies to other "DTOs" as well, so good to discuss that now.
There was a problem hiding this comment.
It seems this type of "DTO" is more naturally modeled as a value type on the Swift side. I don't think you can currently have associated functions on a uniffi::Record (even though this makes sense in a Swift context), so I see two options:
- Use
uniffi::Object - Export standalone helper functions (e.g.,
fn with_user_timestamp(frame: DataTrackFrame) → DataTrackFrame) and define anextension DataTrackFrameon the Swift side to make it an associated function
There was a problem hiding this comment.
Good news: UniFFI v0.31.0 added support for methods on records and enums!
There was a problem hiding this comment.
Yup let's avoid handwritten extensions and just try bumping UniFFI.
There was a problem hiding this comment.
Edit: tried that locally, and it does not work for #[uniffi::remote(Record)] e.g. DataTrackFrame
| /// Returns the next received frame or `None` if the subscription has ended. | ||
| pub async fn next(&self) -> Option<DataTrackFrame> { | ||
| // TODO: avoid mutex? | ||
| self.0.lock().await.next().await |
pblazej
left a comment
There was a problem hiding this comment.
I still don't see any real blockers here, would be nice to discuss some common cases like errors and records now.
|
Hm, I see one more gap in the runtime after introducing 90cf95a UniFFI polls async futures from Swift's thread via continuation callbacks. That thread has no tokio runtime context. When the future hits The naive claude solution was: pub(crate) fn contextualize<F: std::future::Future>(
future: F,
) -> impl std::future::Future<Output = F::Output> {
use std::pin::Pin;
use std::task::{Context, Poll};
struct Contextualized<F> {
inner: Pin<Box<F>>,
}
impl<F: std::future::Future> std::future::Future for Contextualized<F> {
type Output = F::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let _enter = runtime().enter();
self.inner.as_mut().poll(cx)
}
}
Contextualized { inner: Box::pin(future) }
}then e.g. crate::runtime::contextualize(self.input.publish_track(options))
.await
.map(LocalDataTrack) |
|
It's even more generic: at some point, a reactor might be expected (timers, network, any io), and none found. So, I think we should wrap all publicly exportable functions with something like this: // a cancelable version could be introduced as well
pub async fn on_runtime<F, Res>(future: F) -> Res
where
F: Future<Output = Res> + Send + 'static,
Res: Send + 'static,
{
get_runtime()
.spawn(future)
.await
.expect("A background Tokio task panicked")
}
...
// when exporting, wrap with on_runtime
#[uniffi::export]
pub async fn ffi_do_work() -> Result<MyFfiType, FfiErr> {
on_runtime(async {
let res = core::do_work(params).await?;
Ok(res)
})
.await
}I already started drafting this in my work related to signaling. |
| #[uniffi(flat_error)] | ||
| pub enum HandleSignalResponseError { | ||
| #[error("Response decoding failed: {0}")] | ||
| Decode(prost::DecodeError), |
There was a problem hiding this comment.
thought: It is worth making a unique error type here rather than using prost::DecodeError so that the internal protobuf implementation type doesn't leak through the interface?
There was a problem hiding this comment.
Since this uses the #[uniffi(flat_error)] macro (docs), the associated value for each case gets converted to a string via display for the purposes of crossing the FFI boundary so none of the inner error types are exposed—but we still get the enum cases.
| } | ||
|
|
||
| /// Adapts [`DataTrackEncryptionProvider`] to implement [`EncryptionProvider`]. | ||
| pub(super) struct FfiEncryptionProvider(pub(super) Arc<dyn DataTrackEncryptionProvider>); |
There was a problem hiding this comment.
thought: Do you need a uniffi attr macro of some sort here and on FfiDecryptionProvider?
There was a problem hiding this comment.
No, since this type never crosses the FFI boundary. The current approach for exporting the E2EE traits is a bit awkward; I had to introduce FFI-only wrapper traits since nothing like #[uniffi::remote(Trait)] exists. However, once we move the UniFFI macros to the livekit-datatrack crate, this should no longer be necessary—#[uniffi::export(with_foreign)] can be applied directly to the trait definitions then.
| } | ||
|
|
||
| /// Unpublishes the track. | ||
| pub fn unpublish(&self) { |
There was a problem hiding this comment.
Generally makes sense to me! Though IMO since this is somewhat swift specific, maybe this makes sense to expose in the swift specific code above this rather than in the uniffi definition. Also because I don't think sending a closure over uniffi is possible.
| // TODO: in a follow-up PR, refactor manager to work with cancellation tokens directly, eliminating the | ||
| // need for this additional task. |
There was a problem hiding this comment.
(Just a note that the web implementation has quite a bit of of logic dealing with this, so if nothing else there's a lot of good test cases to pull in)
There was a problem hiding this comment.
I think this will map nicely; CancellationToken is roughly equivalent to AbortSignal on web.
| /// | ||
| /// If a signal response type not listed above is provided, the result is an error. | ||
| /// | ||
| pub fn handle_signal_response(&self, res: &[u8]) -> Result<(), HandleSignalResponseError> { |
There was a problem hiding this comment.
Given each client implementation pulls in the protobufs today anyway and that isn't likely to change, I think I'd vote for 2 (dispatch - client deserializes and knows what to do, passes rust_handle_specific_message down). This was generally what I did on the web (I realize it's not exactly the same set of problems though) and it worked quite well. If there's a desire to eventually do all protobuf serialization / deserialization at the manager level, then I think it's fairly reasonable to then move to 1 or 3 once we're more confident which pattern makes the most sense.
| .manager | ||
| .encrypt_data(payload.into(), &self.sender_identity, key_index) | ||
| .map_err(|_| dt::EncryptionError)?; | ||
| .map_err(|_| dt::EncryptionError::Failed)?; |
There was a problem hiding this comment.
question: Is this a breaking api change to existing rust sdk users?
There was a problem hiding this comment.
No, since this type is never exposed in the public API in the livekit crate.
@reenboog, is this something you would want to add in the livekit_runtime crate? |
|
|
@davidliu tagging you as it would be great to get some approval before the actual "0.1" release. Let's make sure the DTOs etc. look fine at least on 2 platforms, the async things work, etc. |
|
@pblazej looks good, got it compiled for Android and played around with the API. Seems like everything would integrate fairly seamlessly into the Android SDK. @ladvoc I did have to add the "macros" feature to this line in the Cargo.toml to get it building: I'm not sure if this is android specific or affects other builds as well. |
Previously made builder fields public to allow direct export as remote type.
6c06943 to
de6198f
Compare
0.31.2's Swift backend emits `nonisolated(unsafe) static let` for callback-interface vtables (Swift 6 strict-concurrency clean) and supports methods on records/enums. 0.30.0 required consumers to compile the generated bindings in Swift 5 language mode. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
UniFFI drives exported async functions from the foreign bindings' executor, on a thread with no tokio runtime. Anything that reaches a reactor — `publish_track` via `tokio::time::timeout`, the receive path (`subscribe`/`DataTrackStream::next`) via `livekit_runtime::timeout` in the depacketizer — then panics "there is no reactor running". Annotate the data track impl blocks with `#[uniffi::export(async_runtime = "tokio")]`, which wraps each future in `async_compat::Compat` so it is polled within a tokio runtime context. Applied uniformly, since which exports reach a reactor is not locally auditable. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
cargo-swift mispackages a UniFFI library with more than one component (livekit-uniffi + livekit-datatrack): wrong xcframework name and unimportable framework modules. Pin the swift-xcframework task to the fork branch that fixes this (https://github.com/livekit/cargo-swift/tree/fix/multi-crate-framework), and drop `--debug-symbols` since that branch is based on main without the dSYM-embedding support. Add a swift-workarounds task that drops the duplicate `Bytes` FfiConverter from livekit_datatrack.swift — the `Bytes` custom type is registered in both crates, so UniFFI emits it in both component files and they collide ("invalid redeclaration of 'Bytes'") when compiled into one Swift module. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Summary of changes:
livekit-uniffiResolves CLT-2472