diff --git a/.changeset/data_track_public_fields.md b/.changeset/data_track_public_fields.md new file mode 100644 index 000000000..512a98ced --- /dev/null +++ b/.changeset/data_track_public_fields.md @@ -0,0 +1,6 @@ +--- +livekit-datatrack: patch +livekit: patch +--- + +# Make some fields public for data track types diff --git a/.changeset/make_data_track_e2ee_errors_enums.md b/.changeset/make_data_track_e2ee_errors_enums.md new file mode 100644 index 000000000..ac8853123 --- /dev/null +++ b/.changeset/make_data_track_e2ee_errors_enums.md @@ -0,0 +1,6 @@ +--- +livekit: patch +livekit-datatrack: patch +--- + +# Make data track E2EE errors enums diff --git a/.changeset/uniffi_data_tracks.md b/.changeset/uniffi_data_tracks.md new file mode 100644 index 000000000..d3abfea92 --- /dev/null +++ b/.changeset/uniffi_data_tracks.md @@ -0,0 +1,5 @@ +--- +livekit-uniffi: minor +--- + +# Expose data tracks core functionality diff --git a/.changeset/use_concrete_type_for_data_track_manager_output_events.md b/.changeset/use_concrete_type_for_data_track_manager_output_events.md new file mode 100644 index 000000000..1edb95e10 --- /dev/null +++ b/.changeset/use_concrete_type_for_data_track_manager_output_events.md @@ -0,0 +1,7 @@ +--- +livekit-datatrack: patch +livekit: patch +livekit-ffi: patch +--- + +# Use concrete type for data track manager output events diff --git a/Cargo.lock b/Cargo.lock index 2439fa58c..06da6e39a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4433,11 +4433,17 @@ dependencies = [ name = "livekit-uniffi" version = "0.1.0" dependencies = [ + "bytes", + "futures-util", "livekit-api", + "livekit-datatrack", "livekit-protocol", "log", "once_cell", + "prost 0.12.6", + "thiserror 2.0.18", "tokio", + "tokio-util", "uniffi", ] diff --git a/livekit-datatrack/src/e2ee.rs b/livekit-datatrack/src/e2ee.rs index 1f52c40ad..701cd13fd 100644 --- a/livekit-datatrack/src/e2ee.rs +++ b/livekit-datatrack/src/e2ee.rs @@ -19,22 +19,29 @@ use thiserror::Error; // TODO: If a core module for end-to-end encryption is created in the future // (livekit-e2ee), these traits should be moved to there. +/// Twelve byte AES initialization vector (IV). +pub type InitializationVector = [u8; 12]; + /// Encrypted payload and metadata required for decryption. pub struct EncryptedPayload { pub payload: Bytes, - pub iv: [u8; 12], + pub iv: InitializationVector, pub key_index: u8, } /// An error indicating a payload could not be encrypted. #[derive(Debug, Error)] -#[error("Encryption failed")] -pub struct EncryptionError; +pub enum EncryptionError { + #[error("Encryption failed")] + Failed, +} /// An error indicating a payload could not be decrypted. #[derive(Debug, Error)] -#[error("Decryption failed")] -pub struct DecryptionError; +pub enum DecryptionError { + #[error("Decryption failed")] + Failed, +} /// Provider for encrypting payloads for E2EE. pub trait EncryptionProvider: Send + Sync + Debug { diff --git a/livekit-datatrack/src/frame.rs b/livekit-datatrack/src/frame.rs index cfa792afa..e84276d60 100644 --- a/livekit-datatrack/src/frame.rs +++ b/livekit-datatrack/src/frame.rs @@ -32,8 +32,8 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; /// #[derive(Clone, Default)] pub struct DataTrackFrame { - pub(crate) payload: Bytes, - pub(crate) user_timestamp: Option, + pub payload: Bytes, + pub user_timestamp: Option, } impl DataTrackFrame { diff --git a/livekit-datatrack/src/local/manager.rs b/livekit-datatrack/src/local/manager.rs index 17212ac0b..e0e6c2a55 100644 --- a/livekit-datatrack/src/local/manager.rs +++ b/livekit-datatrack/src/local/manager.rs @@ -25,7 +25,13 @@ use crate::{ }; use anyhow::{anyhow, Context}; use futures_core::Stream; -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + pin::Pin, + sync::Arc, + task::{Context as TaskContext, Poll}, + time::Duration, +}; use tokio::sync::{mpsc, oneshot, watch}; use tokio_stream::wrappers::ReceiverStream; @@ -58,7 +64,7 @@ impl Manager { /// - Channel for sending [`InputEvent`]s to be processed by the manager. /// - Stream for receiving [`OutputEvent`]s produced by the manager. /// - pub fn new(options: ManagerOptions) -> (Self, ManagerInput, impl Stream) { + pub fn new(options: ManagerOptions) -> (Self, ManagerInput, ManagerOutput) { let (event_in_tx, event_in_rx) = mpsc::channel(Self::EVENT_BUFFER_COUNT); let (event_out_tx, event_out_rx) = mpsc::channel(Self::EVENT_BUFFER_COUNT); @@ -72,7 +78,7 @@ impl Manager { descriptors: HashMap::new(), }; - let event_out = ReceiverStream::new(event_out_rx); + let event_out = ManagerOutput(ReceiverStream::new(event_out_rx)); (manager, event_in, event_out) } @@ -400,6 +406,18 @@ pub struct ManagerInput { _drop_guard: Arc, } +/// Stream of [`OutputEvent`]s produced by [`Manager`]. +#[derive(Debug)] +pub struct ManagerOutput(ReceiverStream); + +impl Stream for ManagerOutput { + type Item = OutputEvent; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { + Pin::new(&mut self.0).poll_next(cx) + } +} + /// Guard that sends shutdown event when the last reference is dropped. #[derive(Debug)] struct DropGuard { diff --git a/livekit-datatrack/src/local/mod.rs b/livekit-datatrack/src/local/mod.rs index 358547146..c47882065 100644 --- a/livekit-datatrack/src/local/mod.rs +++ b/livekit-datatrack/src/local/mod.rs @@ -152,7 +152,7 @@ impl Drop for LocalTrackInner { /// #[derive(Clone, Debug)] pub struct DataTrackOptions { - pub(crate) name: String, + pub name: String, } impl DataTrackOptions { diff --git a/livekit-datatrack/src/remote/manager.rs b/livekit-datatrack/src/remote/manager.rs index e9548d7b4..9e3e8e005 100644 --- a/livekit-datatrack/src/remote/manager.rs +++ b/livekit-datatrack/src/remote/manager.rs @@ -27,7 +27,9 @@ use bytes::Bytes; use std::{ collections::{HashMap, HashSet}, mem, + pin::Pin, sync::Arc, + task::{Context as TaskContext, Poll}, }; use tokio::sync::{broadcast, mpsc, oneshot, watch}; use tokio_stream::{wrappers::ReceiverStream, Stream}; @@ -70,7 +72,7 @@ impl Manager { /// - Channel for sending [`InputEvent`]s to be processed by the manager. /// - Stream for receiving [`OutputEvent`]s produced by the manager. /// - pub fn new(options: ManagerOptions) -> (Self, ManagerInput, impl Stream) { + pub fn new(options: ManagerOptions) -> (Self, ManagerInput, ManagerOutput) { let (event_in_tx, event_in_rx) = mpsc::channel(Self::EVENT_BUFFER_COUNT); let (event_out_tx, event_out_rx) = mpsc::channel(Self::EVENT_BUFFER_COUNT); @@ -84,7 +86,7 @@ impl Manager { sub_handles: HashMap::default(), }; - let event_out = ReceiverStream::new(event_out_rx); + let event_out = ManagerOutput(ReceiverStream::new(event_out_rx)); (manager, event_in, event_out) } @@ -443,6 +445,18 @@ pub struct ManagerInput { _drop_guard: Arc, } +/// Stream of [`OutputEvent`]s produced by [`Manager`]. +#[derive(Debug)] +pub struct ManagerOutput(ReceiverStream); + +impl Stream for ManagerOutput { + type Item = OutputEvent; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { + Pin::new(&mut self.0).poll_next(cx) + } +} + /// Guard that sends shutdown event when the last reference is dropped. #[derive(Debug)] struct DropGuard { diff --git a/livekit-datatrack/src/track.rs b/livekit-datatrack/src/track.rs index 8c313b2b1..3d5e3cd03 100644 --- a/livekit-datatrack/src/track.rs +++ b/livekit-datatrack/src/track.rs @@ -42,7 +42,7 @@ impl DataTrack { &self.info } - /// Whether or not the track is still published. + /// Whether or not the track is currently published. pub fn is_published(&self) -> bool { match self.inner.as_ref() { DataTrackInner::Local(inner) => inner.is_published(), diff --git a/livekit-uniffi/Cargo.toml b/livekit-uniffi/Cargo.toml index 8293512e9..4386ed85c 100644 --- a/livekit-uniffi/Cargo.toml +++ b/livekit-uniffi/Cargo.toml @@ -14,10 +14,16 @@ publish = false [dependencies] livekit-protocol = { workspace = true } livekit-api = { workspace = true } +livekit-datatrack = { workspace = true } uniffi = { version = "0.30.0", features = ["cli", "scaffolding-ffi-buffer-fns"] } log = { workspace = true } -tokio = { workspace = true, features = ["sync"] } +tokio = { workspace = true, features = ["sync", "rt-multi-thread"] } +tokio-util = "0.7.18" +prost = "0.12" +futures-util = { workspace = true, default-features = false, features = ["sink"] } +bytes = { workspace = true } once_cell = "1.21.3" +thiserror = { workspace = true } [build-dependencies] uniffi = { version = "0.30.0", features = ["build", "scaffolding-ffi-buffer-fns"] } diff --git a/livekit-uniffi/src/common.rs b/livekit-uniffi/src/common.rs new file mode 100644 index 000000000..6ff66a67e --- /dev/null +++ b/livekit-uniffi/src/common.rs @@ -0,0 +1,17 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use bytes::Bytes; + +uniffi::custom_type!(Bytes, Vec, { remote }); diff --git a/livekit-uniffi/src/data_track/common.rs b/livekit-uniffi/src/data_track/common.rs new file mode 100644 index 000000000..54c3ba813 --- /dev/null +++ b/livekit-uniffi/src/data_track/common.rs @@ -0,0 +1,67 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use bytes::Bytes; +use livekit_datatrack::api::{DataTrackFrame, DataTrackSid}; +use livekit_protocol as proto; +use prost::Message; + +uniffi::custom_type!(DataTrackSid, String, { + remote, + lower: |s| String::from(s), + try_lift: |s| DataTrackSid::try_from(s).map_err(|e| uniffi::deps::anyhow::anyhow!("{e}")), +}); + +#[uniffi::remote(Record)] +pub struct DataTrackFrame { + pub payload: Bytes, + pub user_timestamp: Option, +} + +/// Information about a published data track. +#[derive(uniffi::Record)] +pub struct DataTrackInfo { + pub sid: DataTrackSid, + pub name: String, + pub uses_e2ee: bool, +} + +impl From<&livekit_datatrack::api::DataTrackInfo> for DataTrackInfo { + fn from(info: &livekit_datatrack::api::DataTrackInfo) -> Self { + Self { sid: info.sid(), name: info.name().to_string(), uses_e2ee: info.uses_e2ee() } + } +} + +/// Signal response crossing the FFI boundary could not be processed. +#[derive(uniffi::Error, thiserror::Error, Debug)] +#[uniffi(flat_error)] +pub enum HandleSignalResponseError { + #[error("Response decoding failed: {0}")] + Decode(prost::DecodeError), + #[error("Response container has no message")] + EmptyMessage, + #[error("Unsupported response type in this context")] + UnsupportedType, + #[error(transparent)] + Internal(livekit_datatrack::api::InternalError), +} + +/// Deserializes a signal response crossing the FFI boundary, returning the message variant. +pub(crate) fn deserialize_signal_response( + res: &[u8], +) -> Result { + let res = + proto::SignalResponse::decode(res).map_err(|err| HandleSignalResponseError::Decode(err))?; + res.message.ok_or(HandleSignalResponseError::EmptyMessage) +} diff --git a/livekit-uniffi/src/data_track/e2ee.rs b/livekit-uniffi/src/data_track/e2ee.rs new file mode 100644 index 000000000..9ef35e3e6 --- /dev/null +++ b/livekit-uniffi/src/data_track/e2ee.rs @@ -0,0 +1,103 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use bytes::Bytes; +use core::fmt; +use livekit_datatrack::backend::{ + DecryptionError, DecryptionProvider, EncryptedPayload, EncryptionError, EncryptionProvider, + InitializationVector, +}; +use std::sync::Arc; + +#[uniffi::remote(Error)] +#[uniffi(flat_error)] +pub enum EncryptionError { + Failed, +} + +#[uniffi::remote(Error)] +#[uniffi(flat_error)] +pub enum DecryptionError { + Failed, +} + +#[uniffi::remote(Record)] +pub struct EncryptedPayload { + pub payload: Bytes, + pub iv: InitializationVector, + pub key_index: u8, +} + +uniffi::custom_type!(InitializationVector, Vec, { + remote, + lower: |iv| iv.to_vec(), + try_lift: |v| v.try_into() + .map_err(|_| uniffi::deps::anyhow::anyhow!("IV must be exactly 12 bytes")) +}); + +/// Provider for encrypting payloads for E2EE. +#[uniffi::export(with_foreign)] +pub trait DataTrackEncryptionProvider: Send + Sync { + /// Encrypts the given payload being sent by the local participant. + fn encrypt(&self, payload: Bytes) -> Result; +} + +/// Provider for decrypting payloads for E2EE. +#[uniffi::export(with_foreign)] +pub trait DataTrackDecryptionProvider: Send + Sync { + /// Decrypts the given payload received from a remote participant. + /// + /// Sender identity is required in order for the proper key to be used + /// for decryption. + /// + fn decrypt( + &self, + payload: EncryptedPayload, + sender_identity: String, + ) -> Result; +} + +/// Adapts [`DataTrackEncryptionProvider`] to implement [`EncryptionProvider`]. +pub(super) struct FfiEncryptionProvider(pub(super) Arc); + +impl EncryptionProvider for FfiEncryptionProvider { + fn encrypt(&self, payload: Bytes) -> Result { + self.0.encrypt(payload) + } +} + +impl fmt::Debug for FfiEncryptionProvider { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FfiEncryptionProvider").finish() + } +} + +/// Adapts [`DataTrackDecryptionProvider`] to implement [`DecryptionProvider`]. +pub(super) struct FfiDecryptionProvider(pub(super) Arc); + +impl DecryptionProvider for FfiDecryptionProvider { + fn decrypt( + &self, + payload: EncryptedPayload, + sender_identity: &str, + ) -> Result { + self.0.decrypt(payload, sender_identity.to_string()) + } +} + +impl fmt::Debug for FfiDecryptionProvider { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FfiDecryptionProvider").finish() + } +} diff --git a/livekit-uniffi/src/data_track/local.rs b/livekit-uniffi/src/data_track/local.rs new file mode 100644 index 000000000..c3fe58e1a --- /dev/null +++ b/livekit-uniffi/src/data_track/local.rs @@ -0,0 +1,238 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::{ + common::{deserialize_signal_response, DataTrackInfo, HandleSignalResponseError}, + e2ee::{DataTrackEncryptionProvider, FfiEncryptionProvider}, +}; +use bytes::Bytes; +use futures_util::StreamExt; +use livekit_datatrack::{ + api::{DataTrack, DataTrackFrame, DataTrackOptions, Local, PublishError, PushFrameErrorReason}, + backend::{local, EncryptionProvider}, +}; +use livekit_protocol as proto; +use prost::Message; +use std::sync::Arc; +use tokio_util::sync::{CancellationToken, DropGuard}; + +/// Data track published by the local participant. +#[derive(uniffi::Object)] +pub struct LocalDataTrack(DataTrack); + +#[uniffi::export] +impl LocalDataTrack { + /// Whether or not the track is currently published. + pub fn is_published(&self) -> bool { + self.0.is_published() + } + + /// Waits asynchronously until the track is unpublished. + /// + /// Use this to trigger follow-up work once the track is no longer published. + /// If the track is already unpublished, this method returns immediately. + /// + pub async fn wait_for_unpublish(&self) { + self.0.wait_for_unpublish().await + } + + /// Information about the data track. + pub fn info(&self) -> DataTrackInfo { + self.0.info().into() + } + + /// Try pushing a frame to subscribers of the track. + pub fn try_push(&self, frame: DataTrackFrame) -> Result<(), PushFrameErrorReason> { + // `PushFrameError` returns ownership of the unpublished frame to the caller; + // since this isn't applicable in an FFI context, just provide the reason. + self.0.try_push(frame).map_err(|err| err.reason()) + } + + /// Unpublishes the track. + pub fn unpublish(&self) { + self.0.unpublish(); + } +} + +#[uniffi::remote(Error)] +pub enum PushFrameErrorReason { + TrackUnpublished, + QueueFull, +} + +#[uniffi::remote(Record)] +pub struct DataTrackOptions { + pub name: String, +} + +#[uniffi::remote(Error)] +#[uniffi(flat_error)] +pub enum PublishError { + NotAllowed, + DuplicateName, + InvalidName, + Timeout, + LimitReached, + Disconnected, + Internal, +} + +/// System for managing data track publications. +#[derive(uniffi::Object)] +struct LocalDataTrackManager { + input: local::ManagerInput, + _guard: DropGuard, +} + +/// Delegate for receiving output events from [`LocalDataTrackManager`]. +#[uniffi::export(with_foreign)] +pub trait LocalDataTrackManagerDelegate: Send + Sync { + /// Encoded signal request to be forwarded to the SFU. + fn on_signal_request(&self, request: Vec); + + /// Packets available to be sent over the data channel transport. + fn on_packets_available(&self, packets: Vec); +} + +#[uniffi::export] +impl LocalDataTrackManager { + #[uniffi::constructor] + pub fn new( + delegate: Arc, + encryption_provider: Option>, + ) -> Arc { + let token = CancellationToken::new(); + + let encryption_provider = encryption_provider + .map(|p| Arc::new(FfiEncryptionProvider(p)) as Arc); + let manager_options = local::ManagerOptions { encryption_provider }; + + let (manager, input, output) = local::Manager::new(manager_options); + + let rt = crate::runtime::runtime(); + + // TODO: in a follow-up PR, refactor manager to work with cancellation tokens directly, eliminating the + // need for this additional task. + rt.spawn(shutdown_forward_task(input.clone(), token.clone())); + + let delegate_forward = DelegateForwardTask { output, delegate, token: token.clone() }; + rt.spawn(delegate_forward.run()); + + rt.spawn(manager.run()); + + Self { input, _guard: token.drop_guard() }.into() + } + + /// Publishes a data track with given options. + pub async fn publish_track( + &self, + options: DataTrackOptions, + ) -> Result { + self.input.publish_track(options).await.map(LocalDataTrack) + } + + /// Get information about all currently published tracks. + /// + /// This does not include publications that are still pending. + /// + pub async fn query_tracks(&self) -> Vec { + self.input.query_tracks().await.into_iter().map(|info| info.as_ref().into()).collect() + } + + /// Republish all tracks. + /// + /// This must be invoked after a full reconnect in order for existing publications + /// to be recognized by the SFU. Each republished track will be assigned a new SID. + /// + pub fn republish_tracks(&self) { + _ = self.input.send(local::InputEvent::RepublishTracks); + } + + /// Handles a serialized `RequestResponse` signal response from the SFU. + pub fn handle_sfu_request_response(&self, res: &[u8]) -> Result<(), HandleSignalResponseError> { + let proto::signal_response::Message::RequestResponse(msg) = + deserialize_signal_response(res)? + else { + return Err(HandleSignalResponseError::UnsupportedType); + }; + + let Some(publish_res) = local::publish_result_from_request_response(&msg) else { + // Not from data track publish request. + return Ok(()); + }; + let event: local::InputEvent = publish_res.into(); + _ = self.input.send(event); + + Ok(()) + } + + /// Handles a serialized `PublishDataTrackResponse` signal response from the SFU. + pub fn handle_sfu_publish_response(&self, res: &[u8]) -> Result<(), HandleSignalResponseError> { + let proto::signal_response::Message::PublishDataTrackResponse(msg) = + deserialize_signal_response(res)? + else { + return Err(HandleSignalResponseError::UnsupportedType); + }; + + let event: local::SfuPublishResponse = + msg.try_into().map_err(|err| HandleSignalResponseError::Internal(err))?; + _ = self.input.send(event.into()); + + Ok(()) + } +} + +/// Task for forwarding manager output events to the foreign [`LocalDataTrackManagerDelegate`]. +struct DelegateForwardTask { + output: local::ManagerOutput, + delegate: Arc, + token: CancellationToken, +} + +impl DelegateForwardTask { + async fn run(mut self) { + loop { + tokio::select! { + _ = self.token.cancelled() => break, + Some(event) = self.output.next() => self.forward_event(event) + } + } + } + + fn forward_event(&self, event: local::OutputEvent) { + match event { + local::OutputEvent::PacketsAvailable(packets) => { + self.delegate.on_packets_available(packets) + } + local::OutputEvent::SfuPublishRequest(req) => { + let req = proto::signal_request::Message::PublishDataTrackRequest(req.into()); + self.forward_signal_request(req); + } + local::OutputEvent::SfuUnpublishRequest(req) => { + let req = proto::signal_request::Message::UnpublishDataTrackRequest(req.into()); + self.forward_signal_request(req); + } + } + } + + fn forward_signal_request(&self, message: proto::signal_request::Message) { + let req = proto::SignalRequest { message: Some(message) }.encode_to_vec(); + self.delegate.on_signal_request(req); + } +} + +async fn shutdown_forward_task(input: local::ManagerInput, token: CancellationToken) { + token.cancelled().await; + _ = input.send(local::InputEvent::Shutdown); +} diff --git a/livekit-uniffi/src/data_track/mod.rs b/livekit-uniffi/src/data_track/mod.rs new file mode 100644 index 000000000..d146f2d55 --- /dev/null +++ b/livekit-uniffi/src/data_track/mod.rs @@ -0,0 +1,26 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Data tracks core functionality from the [`livekit-datatrack`] crate. +//! +//! At a high level, FFI clients integrate this by instantiating a [`local::LocalDataTrackManager`] and a +//! [`remote::RemoteDataTrackManager`] inside their implementation of `Room`, forwarding input events and handling +//! output events. Architecturally, the managers have no dependency on WebRTC or the signaling +//! client, allowing them to be wired up to the FFI client's own implementations of these components. +//! + +pub mod common; +pub mod e2ee; +pub mod local; +pub mod remote; diff --git a/livekit-uniffi/src/data_track/remote.rs b/livekit-uniffi/src/data_track/remote.rs new file mode 100644 index 000000000..f62cee824 --- /dev/null +++ b/livekit-uniffi/src/data_track/remote.rs @@ -0,0 +1,237 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::{ + common::{deserialize_signal_response, DataTrackInfo, HandleSignalResponseError}, + e2ee::{DataTrackDecryptionProvider, FfiDecryptionProvider}, +}; +use bytes::Bytes; +use futures_util::StreamExt; +use livekit_datatrack::{ + api::{DataTrack, DataTrackFrame, DataTrackSid, DataTrackSubscribeError, Remote}, + backend::{remote, DecryptionProvider}, +}; +use livekit_protocol as proto; +use prost::Message; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio_util::sync::{CancellationToken, DropGuard}; + +/// Data track published by the remote participant. +#[derive(uniffi::Object)] +pub struct RemoteDataTrack(DataTrack); + +#[uniffi::export] +impl RemoteDataTrack { + /// Whether or not the track is currently published. + pub fn is_published(&self) -> bool { + self.0.is_published() + } + + /// Waits asynchronously until the track is unpublished. + /// + /// Use this to trigger follow-up work once the track is no longer published. + /// If the track is already unpublished, this method returns immediately. + /// + pub async fn wait_for_unpublish(&self) { + self.0.wait_for_unpublish().await + } + + /// Information about the data track. + pub fn info(&self) -> DataTrackInfo { + self.0.info().into() + } + + /// Identity of the participant who published the track. + pub fn publisher_identity(&self) -> String { + self.0.publisher_identity().to_string() + } + + /// Subscribes to the data track. + pub async fn subscribe(&self) -> Result { + self.0.subscribe().await.map(|stream| DataTrackStream(Mutex::new(stream))) + } +} + +#[uniffi::remote(Error)] +#[uniffi(flat_error)] +pub enum DataTrackSubscribeError { + Unpublished, + Timeout, + Disconnected, + Internal, +} + +/// A stream of [`DataTrackFrame`]s received from a [`RemoteDataTrack`]. +#[derive(uniffi::Object)] +pub struct DataTrackStream(Mutex); + +#[uniffi::export] +impl DataTrackStream { + /// Returns the next received frame or `None` if the subscription has ended. + pub async fn next(&self) -> Option { + // TODO: avoid mutex? + self.0.lock().await.next().await + } +} + +/// System for managing data track subscriptions. +#[derive(uniffi::Object)] +struct RemoteDataTrackManager { + input: remote::ManagerInput, + _guard: DropGuard, +} + +/// Delegate for receiving output events from [`RemoteDataTrackManager`]. +#[uniffi::export(with_foreign)] +pub trait RemoteDataTrackManagerDelegate: Send + Sync { + /// Encoded signal request to be forwarded to the SFU. + fn on_signal_request(&self, request: Vec); + + /// A track has been published by a remote participant and is available to be + /// subscribed to. + /// + /// Emit a public event to deliver the track to the user, allowing them to subscribe + /// with [`RemoteDataTrack::subscribe`] if desired. + /// + fn on_track_published(&self, track: Arc); + + /// A track with the given SID has been unpublished by a remote participant. + fn on_track_unpublished(&self, sid: DataTrackSid); +} + +#[uniffi::export] +impl RemoteDataTrackManager { + #[uniffi::constructor] + pub fn new( + delegate: Arc, + decryption_provider: Option>, + ) -> Arc { + let token = CancellationToken::new(); + + let decryption_provider = decryption_provider + .map(|p| Arc::new(FfiDecryptionProvider(p)) as Arc); + let manager_options = remote::ManagerOptions { decryption_provider }; + + let (manager, input, output) = remote::Manager::new(manager_options); + + let rt = crate::runtime::runtime(); + + // TODO: in a follow-up PR, refactor manager to work with cancellation tokens directly, eliminating the + // need for this additional task. + rt.spawn(shutdown_forward_task(input.clone(), token.clone())); + + let delegate_forward = DelegateForwardTask { output, delegate, token: token.clone() }; + rt.spawn(delegate_forward.run()); + + rt.spawn(manager.run()); + + Self { input, _guard: token.drop_guard() }.into() + } + + /// Resend all subscription updates. + /// + /// This must be sent after a full reconnect to ensure the SFU knows which tracks + /// are subscribed to locally. + /// + pub fn resend_subscription_updates(&self) { + _ = self.input.send(remote::InputEvent::ResendSubscriptionUpdates); + } + + /// Handles a serialized `ParticipantUpdate` signal response from the SFU. + /// + /// Note: the local participant identity is required to exclude data tracks published by the + /// local participant from being treated as remote tracks. + /// + pub fn handle_sfu_participant_update( + &self, + res: &[u8], + local_participant_identity: String, + ) -> Result<(), HandleSignalResponseError> { + let proto::signal_response::Message::Update(mut msg) = deserialize_signal_response(res)? + else { + return Err(HandleSignalResponseError::UnsupportedType); + }; + + let event = remote::event_from_participant_update(&mut msg, &local_participant_identity) + .map_err(|err| HandleSignalResponseError::Internal(err))?; + _ = self.input.send(event.into()); + + Ok(()) + } + + /// Handles a serialized `DataTrackSubscriberHandles` signal response from the SFU. + pub fn handle_subscriber_handles(&self, res: &[u8]) -> Result<(), HandleSignalResponseError> { + let proto::signal_response::Message::DataTrackSubscriberHandles(msg) = + deserialize_signal_response(res)? + else { + return Err(HandleSignalResponseError::UnsupportedType); + }; + + let event: remote::SfuSubscriberHandles = + msg.try_into().map_err(|err| HandleSignalResponseError::Internal(err))?; + _ = self.input.send(event.into()); + + Ok(()) + } + + /// Handles a encoded packet received over the data channel. + pub fn handle_packet_received(&self, packet: Bytes) { + _ = self.input.send(remote::InputEvent::PacketReceived(packet)) + } +} + +/// Task for forwarding manager output events to the foreign [`RemoteDataTrackManagerDelegate`]. +struct DelegateForwardTask { + output: remote::ManagerOutput, + delegate: Arc, + token: CancellationToken, +} + +impl DelegateForwardTask { + async fn run(mut self) { + loop { + tokio::select! { + _ = self.token.cancelled() => break, + Some(event) = self.output.next() => self.forward_event(event) + } + } + } + + fn forward_event(&self, event: remote::OutputEvent) { + match event { + remote::OutputEvent::TrackPublished(event) => { + let track = Arc::new(RemoteDataTrack(event.track)); + self.delegate.on_track_published(track); + } + remote::OutputEvent::TrackUnpublished(event) => { + self.delegate.on_track_unpublished(event.sid) + } + remote::OutputEvent::SfuUpdateSubscription(req) => { + let req = proto::signal_request::Message::UpdateDataSubscription(req.into()); + self.forward_signal_request(req); + } + } + } + + fn forward_signal_request(&self, message: proto::signal_request::Message) { + let req = proto::SignalRequest { message: Some(message) }.encode_to_vec(); + self.delegate.on_signal_request(req); + } +} + +async fn shutdown_forward_task(input: remote::ManagerInput, token: CancellationToken) { + token.cancelled().await; + _ = input.send(remote::InputEvent::Shutdown); +} diff --git a/livekit-uniffi/src/lib.rs b/livekit-uniffi/src/lib.rs index 698bd4d0b..94f2cc0d0 100644 --- a/livekit-uniffi/src/lib.rs +++ b/livekit-uniffi/src/lib.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +/// Data tracks core from [`livekit-datatrack`]. +pub mod data_track; + /// Access token generation and verification from [`livekit-api::access_token`]. pub mod access_token; @@ -21,4 +24,10 @@ pub mod log_forward; /// Information about the build such as version. pub mod build_info; +/// Shared exports and utilities. +pub mod common; + +/// Global async runtime. +pub mod runtime; + uniffi::setup_scaffolding!(); diff --git a/livekit-uniffi/src/runtime.rs b/livekit-uniffi/src/runtime.rs new file mode 100644 index 000000000..676dc2579 --- /dev/null +++ b/livekit-uniffi/src/runtime.rs @@ -0,0 +1,28 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::OnceLock; +use tokio::runtime::Runtime; + +/// Returns the process-global Tokio runtime, initializing it on first use. +pub(crate) fn runtime() -> &'static Runtime { + static RUNTIME: OnceLock = OnceLock::new(); + RUNTIME.get_or_init(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name("livekit-uniffi") + .build() + .expect("Failed to build livekit-uniffi tokio runtime") + }) +} diff --git a/livekit/src/room/e2ee/data_track.rs b/livekit/src/room/e2ee/data_track.rs index 068819ecc..96bdb4fc6 100644 --- a/livekit/src/room/e2ee/data_track.rs +++ b/livekit/src/room/e2ee/data_track.rs @@ -37,7 +37,7 @@ impl dt::EncryptionProvider for DataTrackEncryptionProvider { let encrypted = self .manager .encrypt_data(payload.into(), &self.sender_identity, key_index) - .map_err(|_| dt::EncryptionError)?; + .map_err(|_| dt::EncryptionError::Failed)?; debug_assert_eq!( encrypted.key_index as u32, @@ -46,8 +46,8 @@ impl dt::EncryptionProvider for DataTrackEncryptionProvider { ); let payload = encrypted.data.into(); - let iv = encrypted.iv.try_into().map_err(|_| dt::EncryptionError)?; - let key_index = encrypted.key_index.try_into().map_err(|_| dt::EncryptionError)?; + let iv = encrypted.iv.try_into().map_err(|_| dt::EncryptionError::Failed)?; + let key_index = encrypted.key_index.try_into().map_err(|_| dt::EncryptionError::Failed)?; Ok(dt::EncryptedPayload { payload, iv, key_index }) } @@ -79,7 +79,7 @@ impl dt::DecryptionProvider for DataTrackDecryptionProvider { payload.key_index as u32, sender_identity, ) - .ok_or_else(|| dt::DecryptionError)?; + .ok_or_else(|| dt::DecryptionError::Failed)?; Ok(Bytes::from(decrypted)) } } diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 9473b13af..720a59b38 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -14,7 +14,7 @@ pub use crate::utils::take_cell::TakeCell; use bmrng::unbounded::UnboundedRequestReceiver; -use futures_util::{Stream, StreamExt}; +use futures_util::StreamExt; use libwebrtc::{ native::frame_cryptor::EncryptionState, prelude::{ @@ -2062,7 +2062,7 @@ impl RoomSession { /// Task for handling output events from the local data track manager. async fn local_dt_forward_task( self: Arc, - mut events: impl Stream + Unpin, + mut events: dt::local::ManagerOutput, mut close_rx: broadcast::Receiver<()>, ) { loop { @@ -2082,7 +2082,7 @@ impl RoomSession { /// Task for handling output events from the remote data track manager. async fn remote_dt_forward_task( self: Arc, - mut events: impl Stream + Unpin, + mut events: dt::remote::ManagerOutput, mut close_rx: broadcast::Receiver<()>, ) { loop {