Skip to content

Data tracks UniFFI#1034

Draft
ladvoc wants to merge 58 commits into
mainfrom
ladvoc/data-tracks-uniffi
Draft

Data tracks UniFFI#1034
ladvoc wants to merge 58 commits into
mainfrom
ladvoc/data-tracks-uniffi

Conversation

@ladvoc

@ladvoc ladvoc commented Apr 22, 2026

Copy link
Copy Markdown
Contributor

Summary of changes:

  • Exposes data tracks core functionality through livekit-uniffi
    • This will eventually enable the following clients to share the Rust implementation: Swift, Kotlin, React Native, Flutter
  • Minor changes to the data tracks crate to support this

Resolves CLT-2472

}

/// Unpublishes the track.
pub fn unpublish(&self) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the new APIs a lot, the only asymmetry for me is that you cannot surface the fact that unpublished track is "invalid" just via language constructs ("consuming", scopes, etc.).

I'm thinking of sth like:

  public func withDataTrack<T>(
      name: String,
      body: (LocalDataTrack) async throws -> T
  ) async throws -> T {
      let track = try await publishDataTrack(name: name)
      defer { track.unpublish() }
      return try await body(track)
  }

So you can pipe streams e.g.

  // Instead of handing back an object that can go stale
  try await localParticipant.withDataTrack(name: "game-state") { track in
      // track is guaranteed valid here
      for await state in gameStates {
          track.tryPush(frame: state.toFrame())
      }
  }
  // automatically unpublished when scope exits (cancellation, throw, or return)

Or even:

  // Publishing: AsyncSequence → DataTrack (sink)
  try await localParticipant.publishDataTrack(name: "game", sending:
      gameLoop.map { $0.toFrame() }
  )

  // Subscribing: DataTrack → AsyncSequence (source)
  for await frame in stream {
      process(frame)
  }

It's purely additive, so take it with a grain of 🧂 but IMO provides a great DX on top of data tracks if you stick to certain stream-like constructs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally makes sense to me! Though IMO since this is somewhat swift specific, maybe this makes sense to expose in the swift specific code above this rather than in the uniffi definition. Also because I don't think sending a closure over uniffi is possible.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than in the uniffi definition

yes, I'm talking purely about wrapping here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to be aware of is the client doesn't have sole control over the publish state of a data track (the SFU reserves the right to unpublish at any time without the client initiating it) which is why I didn't do something similar in Rust (e.g., modeling with type state).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah so if someone "unpublished it for me" it will just throw and break the loop?

#[uniffi::export]
impl DataTrackStream {
/// Returns the next received frame or `None` if the subscription has ended.
pub async fn next(&self) -> Option<DataTrackFrame> {

@pblazej pblazej Apr 23, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be mapped in a generic way; the impl is trivial in Swift, basically using AsyncStream(unfolding: next), as we do for logs.


use bytes::Bytes;

uniffi::custom_type!(Bytes, Vec<u8>, { remote });

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General: do we plan to introduce some helpers for encoding/decoding into that or leaving that to experienced users?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This macro handles the conversion from Bytes to Vec<u8> automatically (under the hood it implements impl uniffi::FfiConverter<crate::UniFfiTag> for Bytes), so Swift can just pass Data as expected wherever the API on the Rust side accepts Bytes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I just mean "something higher-level than Data" like Encodable types (JSON etc.)

Comment thread livekit-datatrack/src/frame.rs Outdated
pub(crate) payload: Bytes,
pub(crate) user_timestamp: Option<u64>,
pub payload: Bytes,
pub user_timestamp: Option<u64>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: shall we expose duration_since_timestamp e.g. for benchmarks?

@pblazej pblazej Apr 23, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is not an uniffi::Object we won't get withTimestampNow etc. for free, right? As it basically maps to raw struct:

public struct DataTrackFrame: Equatable, Hashable, Sendable {
  public var payload: Data
  public var userTimestamp: UInt64?
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applies to other "DTOs" as well, so good to discuss that now.

@ladvoc ladvoc Apr 29, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this type of "DTO" is more naturally modeled as a value type on the Swift side. I don't think you can currently have associated functions on a uniffi::Record (even though this makes sense in a Swift context), so I see two options:

  1. Use uniffi::Object
  2. Export standalone helper functions (e.g., fn with_user_timestamp(frame: DataTrackFrame) → DataTrackFrame) and define an extension DataTrackFrame on the Swift side to make it an associated function

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good news: UniFFI v0.31.0 added support for methods on records and enums!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup let's avoid handwritten extensions and just try bumping UniFFI.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit: tried that locally, and it does not work for #[uniffi::remote(Record)] e.g. DataTrackFrame

Comment thread livekit-uniffi/src/data_track/remote.rs Outdated
/// Returns the next received frame or `None` if the subscription has ended.
pub async fn next(&self) -> Option<DataTrackFrame> {
// TODO: avoid mutex?
self.0.lock().await.next().await

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @reenboog talking about async 😄

@pblazej pblazej left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't see any real blockers here, would be nice to discuss some common cases like errors and records now.

@pblazej

pblazej commented Apr 24, 2026

Copy link
Copy Markdown
Contributor

Hm, I see one more gap in the runtime after introducing 90cf95a

UniFFI polls async futures from Swift's thread via continuation callbacks. That thread has no tokio runtime context. When the future hits tokio::time::timeout (line 453 in manager.rs), tokio tries to register a timer with a runtime that doesn't exist on that thread → panic.

The naive claude solution was:

pub(crate) fn contextualize<F: std::future::Future>(
    future: F,
) -> impl std::future::Future<Output = F::Output> {
    use std::pin::Pin;
    use std::task::{Context, Poll};

    struct Contextualized<F> {
        inner: Pin<Box<F>>,
    }

    impl<F: std::future::Future> std::future::Future for Contextualized<F> {
        type Output = F::Output;

        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            let _enter = runtime().enter();
            self.inner.as_mut().poll(cx)
        }
    }

    Contextualized { inner: Box::pin(future) }
}

then e.g.

crate::runtime::contextualize(self.input.publish_track(options))
            .await
            .map(LocalDataTrack)

@reenboog

reenboog commented Apr 26, 2026

Copy link
Copy Markdown
Contributor

It's even more generic: at some point, a reactor might be expected (timers, network, any io), and none found. So, I think we should wrap all publicly exportable functions with something like this:

// a cancelable version could be introduced as well
pub async fn on_runtime<F, Res>(future: F) -> Res
where
    F: Future<Output = Res> + Send + 'static,
    Res: Send + 'static,
{
    get_runtime()
        .spawn(future)
        .await
        .expect("A background Tokio task panicked")
}

...

// when exporting, wrap with on_runtime
#[uniffi::export]
pub async fn ffi_do_work() -> Result<MyFfiType, FfiErr> {
    on_runtime(async {
        let res = core::do_work(params).await?;
        Ok(res)
    })
    .await
}

I already started drafting this in my work related to signaling.

#[uniffi(flat_error)]
pub enum HandleSignalResponseError {
#[error("Response decoding failed: {0}")]
Decode(prost::DecodeError),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: It is worth making a unique error type here rather than using prost::DecodeError so that the internal protobuf implementation type doesn't leak through the interface?

@ladvoc ladvoc Apr 29, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this uses the #[uniffi(flat_error)] macro (docs), the associated value for each case gets converted to a string via display for the purposes of crossing the FFI boundary so none of the inner error types are exposed—but we still get the enum cases.

Comment thread livekit-uniffi/src/data_track/e2ee.rs Outdated
}

/// Adapts [`DataTrackEncryptionProvider`] to implement [`EncryptionProvider`].
pub(super) struct FfiEncryptionProvider(pub(super) Arc<dyn DataTrackEncryptionProvider>);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: Do you need a uniffi attr macro of some sort here and on FfiDecryptionProvider?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, since this type never crosses the FFI boundary. The current approach for exporting the E2EE traits is a bit awkward; I had to introduce FFI-only wrapper traits since nothing like #[uniffi::remote(Trait)] exists. However, once we move the UniFFI macros to the livekit-datatrack crate, this should no longer be necessary—#[uniffi::export(with_foreign)] can be applied directly to the trait definitions then.

}

/// Unpublishes the track.
pub fn unpublish(&self) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally makes sense to me! Though IMO since this is somewhat swift specific, maybe this makes sense to expose in the swift specific code above this rather than in the uniffi definition. Also because I don't think sending a closure over uniffi is possible.

Comment on lines +125 to +126
// TODO: in a follow-up PR, refactor manager to work with cancellation tokens directly, eliminating the
// need for this additional task.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Just a note that the web implementation has quite a bit of of logic dealing with this, so if nothing else there's a lot of good test cases to pull in)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will map nicely; CancellationToken is roughly equivalent to AbortSignal on web.

Comment thread livekit-uniffi/src/data_track/local.rs Outdated
///
/// If a signal response type not listed above is provided, the result is an error.
///
pub fn handle_signal_response(&self, res: &[u8]) -> Result<(), HandleSignalResponseError> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given each client implementation pulls in the protobufs today anyway and that isn't likely to change, I think I'd vote for 2 (dispatch - client deserializes and knows what to do, passes rust_handle_specific_message down). This was generally what I did on the web (I realize it's not exactly the same set of problems though) and it worked quite well. If there's a desire to eventually do all protobuf serialization / deserialization at the manager level, then I think it's fairly reasonable to then move to 1 or 3 once we're more confident which pattern makes the most sense.

.manager
.encrypt_data(payload.into(), &self.sender_identity, key_index)
.map_err(|_| dt::EncryptionError)?;
.map_err(|_| dt::EncryptionError::Failed)?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Is this a breaking api change to existing rust sdk users?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, since this type is never exposed in the public API in the livekit crate.

@ladvoc

ladvoc commented Apr 29, 2026

Copy link
Copy Markdown
Contributor Author

It's even more generic: at some point, a reactor might be expected (timers, network, any io), and none found. So, I think we should wrap all publicly exportable functions with something like this [...]

@reenboog, is this something you would want to add in the livekit_runtime crate?

@reenboog

reenboog commented Apr 30, 2026

Copy link
Copy Markdown
Contributor

is this something you would want to add in the livekit_runtime crate?
@ladvoc, yes, the idea is all async public functions, exported with uniffi, should be wrapped with that on_runtime I described earlier, to make it actually run on a runtime instead of piggybacking on whatever each platform provides. No manual enter is required, nor manual polling. I'm wrapping up a small PR to expose a runtime, so that both, uniffi-based modules and the legacy (protobuf-based) ones could use that global runtime.

@pblazej pblazej requested review from davidliu and removed request for reenboog May 21, 2026 14:41
@pblazej

pblazej commented May 21, 2026

Copy link
Copy Markdown
Contributor

@davidliu tagging you as it would be great to get some approval before the actual "0.1" release.

Let's make sure the DTOs etc. look fine at least on 2 platforms, the async things work, etc.

@davidliu

Copy link
Copy Markdown
Contributor

@pblazej looks good, got it compiled for Android and played around with the API. Seems like everything would integrate fairly seamlessly into the Android SDK.

@ladvoc I did have to add the "macros" feature to this line in the Cargo.toml to get it building:

tokio = { workspace = true, default-features = false, features = ["macros", "sync"] }

I'm not sure if this is android specific or affects other builds as well.

@ladvoc ladvoc force-pushed the ladvoc/data-tracks-uniffi branch from 6c06943 to de6198f Compare June 23, 2026 23:38
pblazej and others added 3 commits June 25, 2026 13:24
0.31.2's Swift backend emits `nonisolated(unsafe) static let` for callback-interface
vtables (Swift 6 strict-concurrency clean) and supports methods on records/enums.
0.30.0 required consumers to compile the generated bindings in Swift 5 language mode.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
UniFFI drives exported async functions from the foreign bindings' executor, on a thread
with no tokio runtime. Anything that reaches a reactor — `publish_track` via
`tokio::time::timeout`, the receive path (`subscribe`/`DataTrackStream::next`) via
`livekit_runtime::timeout` in the depacketizer — then panics "there is no reactor running".

Annotate the data track impl blocks with `#[uniffi::export(async_runtime = "tokio")]`, which
wraps each future in `async_compat::Compat` so it is polled within a tokio runtime context.
Applied uniformly, since which exports reach a reactor is not locally auditable.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
cargo-swift mispackages a UniFFI library with more than one component (livekit-uniffi +
livekit-datatrack): wrong xcframework name and unimportable framework modules. Pin the
swift-xcframework task to the fork branch that fixes this
(https://github.com/livekit/cargo-swift/tree/fix/multi-crate-framework), and drop
`--debug-symbols` since that branch is based on main without the dSYM-embedding support.

Add a swift-workarounds task that drops the duplicate `Bytes` FfiConverter from
livekit_datatrack.swift — the `Bytes` custom type is registered in both crates, so UniFFI
emits it in both component files and they collide ("invalid redeclaration of 'Bytes'") when
compiled into one Swift module.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants