|
| 1 | +//! Wrapper for broadcast channel functionality |
| 2 | +//! |
| 3 | +//! This implementation provides a stable Rust API for broadcast channels, |
| 4 | +//! independent of upstream deno_web API changes. |
| 5 | +//! |
| 6 | +//! # Important Note |
| 7 | +//! |
| 8 | +//! Due to upstream changes in `deno_web` (the broadcast channel methods are now private), |
| 9 | +//! this wrapper is designed for **Rust-to-Rust communication only**. It does not share |
| 10 | +//! the same underlying channel as the JavaScript `BroadcastChannel` API. |
| 11 | +//! |
| 12 | +//! For JavaScript-to-JavaScript communication, use the built-in `BroadcastChannel` API directly: |
| 13 | +//! |
| 14 | +//! ```javascript |
| 15 | +//! const channel = new BroadcastChannel("my-channel"); |
| 16 | +//! channel.postMessage({ data: "hello" }); |
| 17 | +//! channel.onmessage = (event) => console.log(event.data); |
| 18 | +//! ``` |
| 19 | +//! |
| 20 | +//! # Usage |
| 21 | +//! |
| 22 | +//! ```rust,ignore |
| 23 | +//! use rustyscript::{BroadcastChannel, Runtime, RuntimeOptions}; |
| 24 | +//! |
| 25 | +//! // Create a shared channel |
| 26 | +//! let channel = BroadcastChannel::new(); |
| 27 | +//! |
| 28 | +//! // Create subscriptions for different runtimes |
| 29 | +//! let mut runtime1 = Runtime::new(RuntimeOptions::default())?; |
| 30 | +//! let mut runtime2 = Runtime::new(RuntimeOptions::default())?; |
| 31 | +//! |
| 32 | +//! let sub1 = channel.subscribe("my_channel")?; |
| 33 | +//! let sub2 = channel.subscribe("my_channel")?; |
| 34 | +//! |
| 35 | +//! // Send from one subscription |
| 36 | +//! sub1.send_sync(&mut runtime1, "hello")?; |
| 37 | +//! |
| 38 | +//! // Receive from another |
| 39 | +//! let msg: String = sub2.recv_sync(&mut runtime2, None)?.unwrap(); |
| 40 | +//! ``` |
| 41 | +
|
| 42 | +use std::sync::Arc; |
| 43 | +use std::time::Duration; |
| 44 | + |
| 45 | +use deno_core::parking_lot::Mutex; |
| 46 | +use serde::{de::DeserializeOwned, Serialize}; |
| 47 | +use tokio::sync::broadcast; |
| 48 | +use tokio::sync::mpsc; |
| 49 | +use uuid::Uuid; |
| 50 | + |
| 51 | +use crate::{big_json_args, Error, Runtime}; |
| 52 | + |
| 53 | +/// Message type for internal broadcast channel communication |
| 54 | +#[derive(Clone, Debug)] |
| 55 | +struct ChannelMessage { |
| 56 | + name: Arc<String>, |
| 57 | + data: Arc<Vec<u8>>, |
| 58 | + sender_id: Uuid, |
| 59 | +} |
| 60 | + |
| 61 | +/// A broadcast channel that can be shared across multiple runtimes |
| 62 | +/// |
| 63 | +/// This is the backing storage for broadcast channel communication. |
| 64 | +/// Clone this to share the channel between multiple wrappers. |
| 65 | +#[derive(Clone)] |
| 66 | +pub struct BroadcastChannel { |
| 67 | + sender: Arc<Mutex<broadcast::Sender<ChannelMessage>>>, |
| 68 | +} |
| 69 | + |
| 70 | +impl Default for BroadcastChannel { |
| 71 | + fn default() -> Self { |
| 72 | + Self::new() |
| 73 | + } |
| 74 | +} |
| 75 | + |
| 76 | +impl BroadcastChannel { |
| 77 | + /// Create a new broadcast channel |
| 78 | + #[must_use] |
| 79 | + pub fn new() -> Self { |
| 80 | + let (sender, _) = broadcast::channel(256); |
| 81 | + Self { |
| 82 | + sender: Arc::new(Mutex::new(sender)), |
| 83 | + } |
| 84 | + } |
| 85 | + |
| 86 | + /// Subscribe to this channel, creating a wrapper for sending/receiving messages |
| 87 | + /// |
| 88 | + /// # Errors |
| 89 | + /// Will return an error if the subscription cannot be created |
| 90 | + pub fn subscribe(&self, name: impl ToString) -> Result<BroadcastChannelWrapper, Error> { |
| 91 | + BroadcastChannelWrapper::new(self, name) |
| 92 | + } |
| 93 | +} |
| 94 | + |
| 95 | +/// Helper struct to wrap a broadcast channel subscription |
| 96 | +/// |
| 97 | +/// Takes care of some of the boilerplate for serialization/deserialization. |
| 98 | +/// Messages are serialized through the JavaScript runtime to ensure compatibility |
| 99 | +/// with the JavaScript BroadcastChannel API. |
| 100 | +pub struct BroadcastChannelWrapper { |
| 101 | + channel: BroadcastChannel, |
| 102 | + receiver: tokio::sync::Mutex<( |
| 103 | + broadcast::Receiver<ChannelMessage>, |
| 104 | + mpsc::UnboundedReceiver<()>, |
| 105 | + )>, |
| 106 | + cancel_tx: mpsc::UnboundedSender<()>, |
| 107 | + name: String, |
| 108 | + uuid: Uuid, |
| 109 | +} |
| 110 | + |
| 111 | +impl BroadcastChannelWrapper { |
| 112 | + /// Create a new broadcast channel wrapper and subscribe to the channel |
| 113 | + /// |
| 114 | + /// Unsubscribe is called when the wrapper is dropped |
| 115 | + /// |
| 116 | + /// # Errors |
| 117 | + /// Will return an error if the channel cannot be subscribed to |
| 118 | + pub fn new(channel: &BroadcastChannel, name: impl ToString) -> Result<Self, Error> { |
| 119 | + let (cancel_tx, cancel_rx) = mpsc::unbounded_channel(); |
| 120 | + let broadcast_rx = channel.sender.lock().subscribe(); |
| 121 | + let receiver = tokio::sync::Mutex::new((broadcast_rx, cancel_rx)); |
| 122 | + let uuid = Uuid::new_v4(); |
| 123 | + let name = name.to_string(); |
| 124 | + |
| 125 | + Ok(Self { |
| 126 | + channel: channel.clone(), |
| 127 | + receiver, |
| 128 | + cancel_tx, |
| 129 | + name, |
| 130 | + uuid, |
| 131 | + }) |
| 132 | + } |
| 133 | + |
| 134 | + /// Get the name of this channel |
| 135 | + #[must_use] |
| 136 | + pub fn name(&self) -> &str { |
| 137 | + &self.name |
| 138 | + } |
| 139 | + |
| 140 | + /// Send a message to the channel, blocking until the message is sent |
| 141 | + /// |
| 142 | + /// # Errors |
| 143 | + /// Will return an error if the message cannot be serialized or sent |
| 144 | + pub fn send_sync<T: Serialize>(&self, runtime: &mut Runtime, data: T) -> Result<(), Error> { |
| 145 | + let tokio_rt = runtime.tokio_runtime(); |
| 146 | + tokio_rt.block_on(self.send(runtime, data)) |
| 147 | + } |
| 148 | + |
| 149 | + /// Send a message to the channel |
| 150 | + /// |
| 151 | + /// # Errors |
| 152 | + /// Will return an error if the message cannot be serialized or sent |
| 153 | + pub async fn send<T: Serialize>(&self, runtime: &mut Runtime, data: T) -> Result<(), Error> { |
| 154 | + // Serialize through JavaScript for compatibility |
| 155 | + let data: Vec<u8> = runtime |
| 156 | + .call_function_async(None, "broadcast_serialize", &data) |
| 157 | + .await?; |
| 158 | + |
| 159 | + let message = ChannelMessage { |
| 160 | + name: Arc::new(self.name.clone()), |
| 161 | + data: Arc::new(data), |
| 162 | + sender_id: self.uuid, |
| 163 | + }; |
| 164 | + |
| 165 | + self.channel |
| 166 | + .sender |
| 167 | + .lock() |
| 168 | + .send(message) |
| 169 | + .map_err(|e| Error::Runtime(format!("Failed to send broadcast message: {e}")))?; |
| 170 | + |
| 171 | + Ok(()) |
| 172 | + } |
| 173 | + |
| 174 | + /// Receive a message from the channel, waiting for a message to arrive, |
| 175 | + /// or until the timeout is reached |
| 176 | + /// |
| 177 | + /// Returns `None` if the timeout is reached or the channel is closed |
| 178 | + /// |
| 179 | + /// # Errors |
| 180 | + /// Will return an error if the message cannot be deserialized |
| 181 | + /// or if receiving the message fails |
| 182 | + pub async fn recv<T: DeserializeOwned>( |
| 183 | + &self, |
| 184 | + runtime: &mut Runtime, |
| 185 | + timeout: Option<Duration>, |
| 186 | + ) -> Result<Option<T>, Error> { |
| 187 | + let mut guard = self.receiver.lock().await; |
| 188 | + let (broadcast_rx, cancel_rx) = &mut *guard; |
| 189 | + |
| 190 | + loop { |
| 191 | + let result = if let Some(timeout) = timeout { |
| 192 | + tokio::select! { |
| 193 | + r = broadcast_rx.recv() => r, |
| 194 | + () = tokio::time::sleep(timeout) => return Ok(None), |
| 195 | + _ = cancel_rx.recv() => return Ok(None), |
| 196 | + } |
| 197 | + } else { |
| 198 | + tokio::select! { |
| 199 | + r = broadcast_rx.recv() => r, |
| 200 | + _ = cancel_rx.recv() => return Ok(None), |
| 201 | + } |
| 202 | + }; |
| 203 | + |
| 204 | + use tokio::sync::broadcast::error::RecvError::*; |
| 205 | + match result { |
| 206 | + Err(Closed) => return Ok(None), |
| 207 | + Err(Lagged(_)) => continue, // Backlogged, messages dropped - try again |
| 208 | + Ok(message) if message.sender_id == self.uuid => continue, // Self-send, skip |
| 209 | + Ok(message) if *message.name != self.name => continue, // Different channel name |
| 210 | + Ok(message) => { |
| 211 | + // Deserialize through JavaScript for compatibility |
| 212 | + let data: T = runtime |
| 213 | + .call_function_async( |
| 214 | + None, |
| 215 | + "broadcast_deserialize", |
| 216 | + big_json_args!(Vec::clone(&message.data)), |
| 217 | + ) |
| 218 | + .await?; |
| 219 | + return Ok(Some(data)); |
| 220 | + } |
| 221 | + } |
| 222 | + } |
| 223 | + } |
| 224 | + |
| 225 | + /// Receive a message from the channel, blocking until a message arrives, |
| 226 | + /// or until the timeout is reached |
| 227 | + /// |
| 228 | + /// Returns `None` if the timeout is reached or the channel is closed |
| 229 | + /// |
| 230 | + /// # Errors |
| 231 | + /// Will return an error if the message cannot be deserialized |
| 232 | + /// or if receiving the message fails |
| 233 | + pub fn recv_sync<T: DeserializeOwned>( |
| 234 | + &self, |
| 235 | + runtime: &mut Runtime, |
| 236 | + timeout: Option<Duration>, |
| 237 | + ) -> Result<Option<T>, Error> { |
| 238 | + let tokio_rt = runtime.tokio_runtime(); |
| 239 | + tokio_rt.block_on(self.recv(runtime, timeout)) |
| 240 | + } |
| 241 | + |
| 242 | + /// Close this subscription |
| 243 | + /// |
| 244 | + /// After calling this, `recv` will return `None` |
| 245 | + pub fn close(&self) { |
| 246 | + let _ = self.cancel_tx.send(()); |
| 247 | + } |
| 248 | +} |
| 249 | + |
| 250 | +impl Drop for BroadcastChannelWrapper { |
| 251 | + fn drop(&mut self) { |
| 252 | + self.close(); |
| 253 | + } |
| 254 | +} |
| 255 | + |
| 256 | +#[cfg(test)] |
| 257 | +mod test { |
| 258 | + use crate::{BroadcastChannel, Runtime, RuntimeOptions}; |
| 259 | + |
| 260 | + #[test] |
| 261 | + fn test_broadcast_channel_send_recv() { |
| 262 | + // This test demonstrates Rust-to-Rust communication via the BroadcastChannel. |
| 263 | + // Note: This wrapper is for Rust-side communication only. |
| 264 | + // For JavaScript BroadcastChannel, use the JavaScript API directly. |
| 265 | + |
| 266 | + let channel = BroadcastChannel::new(); |
| 267 | + |
| 268 | + // Create a runtime for serialization |
| 269 | + let mut runtime = Runtime::new(RuntimeOptions::default()).unwrap(); |
| 270 | + |
| 271 | + // Create two subscriptions on the same channel |
| 272 | + let wrapper1 = channel.subscribe("test_channel").unwrap(); |
| 273 | + let wrapper2 = channel.subscribe("test_channel").unwrap(); |
| 274 | + |
| 275 | + // Use async to send and receive |
| 276 | + let tokio_rt = runtime.tokio_runtime(); |
| 277 | + tokio_rt.block_on(async { |
| 278 | + // Send from wrapper1 |
| 279 | + wrapper1.send(&mut runtime, "hello from rust").await.unwrap(); |
| 280 | + |
| 281 | + // Receive from wrapper2 |
| 282 | + let received: String = wrapper2 |
| 283 | + .recv(&mut runtime, Some(std::time::Duration::from_secs(1))) |
| 284 | + .await |
| 285 | + .unwrap() |
| 286 | + .unwrap(); |
| 287 | + |
| 288 | + assert_eq!(received, "hello from rust"); |
| 289 | + }); |
| 290 | + } |
| 291 | + |
| 292 | + #[test] |
| 293 | + fn test_broadcast_channel_timeout() { |
| 294 | + let channel = BroadcastChannel::new(); |
| 295 | + let mut runtime = Runtime::new(RuntimeOptions::default()).unwrap(); |
| 296 | + let wrapper = channel.subscribe("timeout_test").unwrap(); |
| 297 | + |
| 298 | + // Try to receive with a short timeout - should return None |
| 299 | + let result = wrapper |
| 300 | + .recv_sync::<String>(&mut runtime, Some(std::time::Duration::from_millis(100))) |
| 301 | + .unwrap(); |
| 302 | + |
| 303 | + assert!(result.is_none()); |
| 304 | + } |
| 305 | + |
| 306 | + #[test] |
| 307 | + fn test_broadcast_channel_different_names() { |
| 308 | + // Messages should only be received by subscriptions with matching names |
| 309 | + let channel = BroadcastChannel::new(); |
| 310 | + let mut runtime = Runtime::new(RuntimeOptions::default()).unwrap(); |
| 311 | + |
| 312 | + let wrapper_a = channel.subscribe("channel_a").unwrap(); |
| 313 | + let wrapper_b = channel.subscribe("channel_b").unwrap(); |
| 314 | + |
| 315 | + let tokio_rt = runtime.tokio_runtime(); |
| 316 | + tokio_rt.block_on(async { |
| 317 | + // Send to channel_a |
| 318 | + wrapper_a.send(&mut runtime, "message for a").await.unwrap(); |
| 319 | + |
| 320 | + // wrapper_b should not receive this message (different channel name) |
| 321 | + let result: Option<String> = wrapper_b |
| 322 | + .recv(&mut runtime, Some(std::time::Duration::from_millis(100))) |
| 323 | + .await |
| 324 | + .unwrap(); |
| 325 | + |
| 326 | + assert!(result.is_none()); |
| 327 | + }); |
| 328 | + } |
| 329 | +} |
0 commit comments