| 1 | //! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers. |
| 2 | |
| 3 | #![deny (missing_docs)] |
| 4 | |
| 5 | use core::cell::RefCell; |
| 6 | use core::fmt::Debug; |
| 7 | use core::task::{Context, Poll}; |
| 8 | |
| 9 | use heapless::Deque; |
| 10 | |
| 11 | use self::publisher::{ImmediatePub, Pub}; |
| 12 | use self::subscriber::Sub; |
| 13 | use crate::blocking_mutex::raw::RawMutex; |
| 14 | use crate::blocking_mutex::Mutex; |
| 15 | use crate::waitqueue::MultiWakerRegistration; |
| 16 | |
| 17 | pub mod publisher; |
| 18 | pub mod subscriber; |
| 19 | |
| 20 | pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher}; |
| 21 | pub use subscriber::{DynSubscriber, Subscriber}; |
| 22 | |
| 23 | /// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers |
| 24 | /// |
| 25 | /// Any published message can be read by all subscribers. |
| 26 | /// A publisher can choose how it sends its message. |
| 27 | /// |
| 28 | /// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue. |
| 29 | /// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message |
| 30 | /// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive |
| 31 | /// an error to indicate that it has lagged. |
| 32 | /// |
| 33 | /// ## Example |
| 34 | /// |
| 35 | /// ``` |
| 36 | /// # use embassy_sync::blocking_mutex::raw::NoopRawMutex; |
| 37 | /// # use embassy_sync::pubsub::WaitResult; |
| 38 | /// # use embassy_sync::pubsub::PubSubChannel; |
| 39 | /// # use futures_executor::block_on; |
| 40 | /// # let test = async { |
| 41 | /// // Create the channel. This can be static as well |
| 42 | /// let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
| 43 | /// |
| 44 | /// // This is a generic subscriber with a direct reference to the channel |
| 45 | /// let mut sub0 = channel.subscriber().unwrap(); |
| 46 | /// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel |
| 47 | /// let mut sub1 = channel.dyn_subscriber().unwrap(); |
| 48 | /// |
| 49 | /// let pub0 = channel.publisher().unwrap(); |
| 50 | /// |
| 51 | /// // Publish a message, but wait if the queue is full |
| 52 | /// pub0.publish(42).await; |
| 53 | /// |
| 54 | /// // Publish a message, but if the queue is full, just kick out the oldest message. |
| 55 | /// // This may cause some subscribers to miss a message |
| 56 | /// pub0.publish_immediate(43); |
| 57 | /// |
| 58 | /// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result |
| 59 | /// assert_eq!(sub0.next_message().await, WaitResult::Message(42)); |
| 60 | /// assert_eq!(sub1.next_message().await, WaitResult::Message(42)); |
| 61 | /// |
| 62 | /// // Wait again, but this time ignore any Lag results |
| 63 | /// assert_eq!(sub0.next_message_pure().await, 43); |
| 64 | /// assert_eq!(sub1.next_message_pure().await, 43); |
| 65 | /// |
| 66 | /// // There's also a polling interface |
| 67 | /// assert_eq!(sub0.try_next_message(), None); |
| 68 | /// assert_eq!(sub1.try_next_message(), None); |
| 69 | /// # }; |
| 70 | /// # |
| 71 | /// # block_on(test); |
| 72 | /// ``` |
| 73 | /// |
| 74 | pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { |
| 75 | inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>, |
| 76 | } |
| 77 | |
| 78 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> |
| 79 | PubSubChannel<M, T, CAP, SUBS, PUBS> |
| 80 | { |
| 81 | /// Create a new channel |
| 82 | pub const fn new() -> Self { |
| 83 | Self { |
| 84 | inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())), |
| 85 | } |
| 86 | } |
| 87 | |
| 88 | /// Create a new subscriber. It will only receive messages that are published after its creation. |
| 89 | /// |
| 90 | /// If there are no subscriber slots left, an error will be returned. |
| 91 | pub fn subscriber(&self) -> Result<Subscriber<M, T, CAP, SUBS, PUBS>, Error> { |
| 92 | self.inner.lock(|inner| { |
| 93 | let mut s = inner.borrow_mut(); |
| 94 | |
| 95 | if s.subscriber_count >= SUBS { |
| 96 | Err(Error::MaximumSubscribersReached) |
| 97 | } else { |
| 98 | s.subscriber_count += 1; |
| 99 | Ok(Subscriber(Sub::new(s.next_message_id, self))) |
| 100 | } |
| 101 | }) |
| 102 | } |
| 103 | |
| 104 | /// Create a new subscriber. It will only receive messages that are published after its creation. |
| 105 | /// |
| 106 | /// If there are no subscriber slots left, an error will be returned. |
| 107 | pub fn dyn_subscriber(&self) -> Result<DynSubscriber<'_, T>, Error> { |
| 108 | self.inner.lock(|inner| { |
| 109 | let mut s = inner.borrow_mut(); |
| 110 | |
| 111 | if s.subscriber_count >= SUBS { |
| 112 | Err(Error::MaximumSubscribersReached) |
| 113 | } else { |
| 114 | s.subscriber_count += 1; |
| 115 | Ok(DynSubscriber(Sub::new(s.next_message_id, self))) |
| 116 | } |
| 117 | }) |
| 118 | } |
| 119 | |
| 120 | /// Create a new publisher |
| 121 | /// |
| 122 | /// If there are no publisher slots left, an error will be returned. |
| 123 | pub fn publisher(&self) -> Result<Publisher<M, T, CAP, SUBS, PUBS>, Error> { |
| 124 | self.inner.lock(|inner| { |
| 125 | let mut s = inner.borrow_mut(); |
| 126 | |
| 127 | if s.publisher_count >= PUBS { |
| 128 | Err(Error::MaximumPublishersReached) |
| 129 | } else { |
| 130 | s.publisher_count += 1; |
| 131 | Ok(Publisher(Pub::new(self))) |
| 132 | } |
| 133 | }) |
| 134 | } |
| 135 | |
| 136 | /// Create a new publisher |
| 137 | /// |
| 138 | /// If there are no publisher slots left, an error will be returned. |
| 139 | pub fn dyn_publisher(&self) -> Result<DynPublisher<'_, T>, Error> { |
| 140 | self.inner.lock(|inner| { |
| 141 | let mut s = inner.borrow_mut(); |
| 142 | |
| 143 | if s.publisher_count >= PUBS { |
| 144 | Err(Error::MaximumPublishersReached) |
| 145 | } else { |
| 146 | s.publisher_count += 1; |
| 147 | Ok(DynPublisher(Pub::new(self))) |
| 148 | } |
| 149 | }) |
| 150 | } |
| 151 | |
| 152 | /// Create a new publisher that can only send immediate messages. |
| 153 | /// This kind of publisher does not take up a publisher slot. |
| 154 | pub fn immediate_publisher(&self) -> ImmediatePublisher<M, T, CAP, SUBS, PUBS> { |
| 155 | ImmediatePublisher(ImmediatePub::new(self)) |
| 156 | } |
| 157 | |
| 158 | /// Create a new publisher that can only send immediate messages. |
| 159 | /// This kind of publisher does not take up a publisher slot. |
| 160 | pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> { |
| 161 | DynImmediatePublisher(ImmediatePub::new(self)) |
| 162 | } |
| 163 | |
| 164 | /// Returns the maximum number of elements the channel can hold. |
| 165 | pub const fn capacity(&self) -> usize { |
| 166 | CAP |
| 167 | } |
| 168 | |
| 169 | /// Returns the free capacity of the channel. |
| 170 | /// |
| 171 | /// This is equivalent to `capacity() - len()` |
| 172 | pub fn free_capacity(&self) -> usize { |
| 173 | CAP - self.len() |
| 174 | } |
| 175 | |
| 176 | /// Clears all elements in the channel. |
| 177 | pub fn clear(&self) { |
| 178 | self.inner.lock(|inner| inner.borrow_mut().clear()); |
| 179 | } |
| 180 | |
| 181 | /// Returns the number of elements currently in the channel. |
| 182 | pub fn len(&self) -> usize { |
| 183 | self.inner.lock(|inner| inner.borrow().len()) |
| 184 | } |
| 185 | |
| 186 | /// Returns whether the channel is empty. |
| 187 | pub fn is_empty(&self) -> bool { |
| 188 | self.inner.lock(|inner| inner.borrow().is_empty()) |
| 189 | } |
| 190 | |
| 191 | /// Returns whether the channel is full. |
| 192 | pub fn is_full(&self) -> bool { |
| 193 | self.inner.lock(|inner| inner.borrow().is_full()) |
| 194 | } |
| 195 | } |
| 196 | |
| 197 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> crate::pubsub::PubSubBehavior<T> |
| 198 | for PubSubChannel<M, T, CAP, SUBS, PUBS> |
| 199 | { |
| 200 | fn publish_immediate(&self, message: T) { |
| 201 | self.inner.lock(|s: &RefCell>| { |
| 202 | let mut s: RefMut<'_, PubSubState> = s.borrow_mut(); |
| 203 | s.publish_immediate(message) |
| 204 | }) |
| 205 | } |
| 206 | |
| 207 | fn capacity(&self) -> usize { |
| 208 | self.capacity() |
| 209 | } |
| 210 | |
| 211 | fn is_full(&self) -> bool { |
| 212 | self.is_full() |
| 213 | } |
| 214 | } |
| 215 | |
| 216 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T> |
| 217 | for PubSubChannel<M, T, CAP, SUBS, PUBS> |
| 218 | { |
| 219 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> { |
| 220 | self.inner.lock(|s| { |
| 221 | let mut s = s.borrow_mut(); |
| 222 | |
| 223 | // Check if we can read a message |
| 224 | match s.get_message(*next_message_id) { |
| 225 | // Yes, so we are done polling |
| 226 | Some(WaitResult::Message(message)) => { |
| 227 | *next_message_id += 1; |
| 228 | Poll::Ready(WaitResult::Message(message)) |
| 229 | } |
| 230 | // No, so we need to reregister our waker and sleep again |
| 231 | None => { |
| 232 | if let Some(cx) = cx { |
| 233 | s.subscriber_wakers.register(cx.waker()); |
| 234 | } |
| 235 | Poll::Pending |
| 236 | } |
| 237 | // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged |
| 238 | Some(WaitResult::Lagged(amount)) => { |
| 239 | *next_message_id += amount; |
| 240 | Poll::Ready(WaitResult::Lagged(amount)) |
| 241 | } |
| 242 | } |
| 243 | }) |
| 244 | } |
| 245 | |
| 246 | fn available(&self, next_message_id: u64) -> u64 { |
| 247 | self.inner.lock(|s| s.borrow().next_message_id - next_message_id) |
| 248 | } |
| 249 | |
| 250 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { |
| 251 | self.inner.lock(|s| { |
| 252 | let mut s = s.borrow_mut(); |
| 253 | // Try to publish the message |
| 254 | match s.try_publish(message) { |
| 255 | // We did it, we are ready |
| 256 | Ok(()) => Ok(()), |
| 257 | // The queue is full, so we need to reregister our waker and go to sleep |
| 258 | Err(message) => { |
| 259 | if let Some(cx) = cx { |
| 260 | s.publisher_wakers.register(cx.waker()); |
| 261 | } |
| 262 | Err(message) |
| 263 | } |
| 264 | } |
| 265 | }) |
| 266 | } |
| 267 | |
| 268 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { |
| 269 | self.inner.lock(|s| { |
| 270 | let mut s = s.borrow_mut(); |
| 271 | s.unregister_subscriber(subscriber_next_message_id) |
| 272 | }) |
| 273 | } |
| 274 | |
| 275 | fn unregister_publisher(&self) { |
| 276 | self.inner.lock(|s| { |
| 277 | let mut s = s.borrow_mut(); |
| 278 | s.unregister_publisher() |
| 279 | }) |
| 280 | } |
| 281 | |
| 282 | fn free_capacity(&self) -> usize { |
| 283 | self.free_capacity() |
| 284 | } |
| 285 | |
| 286 | fn clear(&self) { |
| 287 | self.clear(); |
| 288 | } |
| 289 | |
| 290 | fn len(&self) -> usize { |
| 291 | self.len() |
| 292 | } |
| 293 | |
| 294 | fn is_empty(&self) -> bool { |
| 295 | self.is_empty() |
| 296 | } |
| 297 | } |
| 298 | |
| 299 | /// Internal state for the PubSub channel |
| 300 | struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { |
| 301 | /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it |
| 302 | queue: Deque<(T, usize), CAP>, |
| 303 | /// Every message has an id. |
| 304 | /// Don't worry, we won't run out. |
| 305 | /// If a million messages were published every second, then the ID's would run out in about 584942 years. |
| 306 | next_message_id: u64, |
| 307 | /// Collection of wakers for Subscribers that are waiting. |
| 308 | subscriber_wakers: MultiWakerRegistration<SUBS>, |
| 309 | /// Collection of wakers for Publishers that are waiting. |
| 310 | publisher_wakers: MultiWakerRegistration<PUBS>, |
| 311 | /// The amount of subscribers that are active |
| 312 | subscriber_count: usize, |
| 313 | /// The amount of publishers that are active |
| 314 | publisher_count: usize, |
| 315 | } |
| 316 | |
| 317 | impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> { |
| 318 | /// Create a new internal channel state |
| 319 | const fn new() -> Self { |
| 320 | Self { |
| 321 | queue: Deque::new(), |
| 322 | next_message_id: 0, |
| 323 | subscriber_wakers: MultiWakerRegistration::new(), |
| 324 | publisher_wakers: MultiWakerRegistration::new(), |
| 325 | subscriber_count: 0, |
| 326 | publisher_count: 0, |
| 327 | } |
| 328 | } |
| 329 | |
| 330 | fn try_publish(&mut self, message: T) -> Result<(), T> { |
| 331 | if self.subscriber_count == 0 { |
| 332 | // We don't need to publish anything because there is no one to receive it |
| 333 | return Ok(()); |
| 334 | } |
| 335 | |
| 336 | if self.queue.is_full() { |
| 337 | return Err(message); |
| 338 | } |
| 339 | // We just did a check for this |
| 340 | self.queue.push_back((message, self.subscriber_count)).ok().unwrap(); |
| 341 | |
| 342 | self.next_message_id += 1; |
| 343 | |
| 344 | // Wake all of the subscribers |
| 345 | self.subscriber_wakers.wake(); |
| 346 | |
| 347 | Ok(()) |
| 348 | } |
| 349 | |
| 350 | fn publish_immediate(&mut self, message: T) { |
| 351 | // Make space in the queue if required |
| 352 | if self.queue.is_full() { |
| 353 | self.queue.pop_front(); |
| 354 | } |
| 355 | |
| 356 | // This will succeed because we made sure there is space |
| 357 | self.try_publish(message).ok().unwrap(); |
| 358 | } |
| 359 | |
| 360 | fn get_message(&mut self, message_id: u64) -> Option<WaitResult<T>> { |
| 361 | let start_id = self.next_message_id - self.queue.len() as u64; |
| 362 | |
| 363 | if message_id < start_id { |
| 364 | return Some(WaitResult::Lagged(start_id - message_id)); |
| 365 | } |
| 366 | |
| 367 | let current_message_index = (message_id - start_id) as usize; |
| 368 | |
| 369 | if current_message_index >= self.queue.len() { |
| 370 | return None; |
| 371 | } |
| 372 | |
| 373 | // We've checked that the index is valid |
| 374 | let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap(); |
| 375 | |
| 376 | // We're reading this item, so decrement the counter |
| 377 | queue_item.1 -= 1; |
| 378 | |
| 379 | let message = if current_message_index == 0 && queue_item.1 == 0 { |
| 380 | let (message, _) = self.queue.pop_front().unwrap(); |
| 381 | self.publisher_wakers.wake(); |
| 382 | // Return pop'd message without clone |
| 383 | message |
| 384 | } else { |
| 385 | queue_item.0.clone() |
| 386 | }; |
| 387 | |
| 388 | Some(WaitResult::Message(message)) |
| 389 | } |
| 390 | |
| 391 | fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { |
| 392 | self.subscriber_count -= 1; |
| 393 | |
| 394 | // All messages that haven't been read yet by this subscriber must have their counter decremented |
| 395 | let start_id = self.next_message_id - self.queue.len() as u64; |
| 396 | if subscriber_next_message_id >= start_id { |
| 397 | let current_message_index = (subscriber_next_message_id - start_id) as usize; |
| 398 | self.queue |
| 399 | .iter_mut() |
| 400 | .skip(current_message_index) |
| 401 | .for_each(|(_, counter)| *counter -= 1); |
| 402 | |
| 403 | let mut wake_publishers = false; |
| 404 | while let Some((_, count)) = self.queue.front() { |
| 405 | if *count == 0 { |
| 406 | self.queue.pop_front().unwrap(); |
| 407 | wake_publishers = true; |
| 408 | } else { |
| 409 | break; |
| 410 | } |
| 411 | } |
| 412 | |
| 413 | if wake_publishers { |
| 414 | self.publisher_wakers.wake(); |
| 415 | } |
| 416 | } |
| 417 | } |
| 418 | |
| 419 | fn unregister_publisher(&mut self) { |
| 420 | self.publisher_count -= 1; |
| 421 | } |
| 422 | |
| 423 | fn clear(&mut self) { |
| 424 | self.queue.clear(); |
| 425 | } |
| 426 | |
| 427 | fn len(&self) -> usize { |
| 428 | self.queue.len() |
| 429 | } |
| 430 | |
| 431 | fn is_empty(&self) -> bool { |
| 432 | self.queue.is_empty() |
| 433 | } |
| 434 | |
| 435 | fn is_full(&self) -> bool { |
| 436 | self.queue.is_full() |
| 437 | } |
| 438 | } |
| 439 | |
| 440 | /// Error type for the [PubSubChannel] |
| 441 | #[derive (Debug, PartialEq, Eq, Clone, Copy)] |
| 442 | #[cfg_attr (feature = "defmt" , derive(defmt::Format))] |
| 443 | pub enum Error { |
| 444 | /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or |
| 445 | /// the capacity of the channels must be increased. |
| 446 | MaximumSubscribersReached, |
| 447 | /// All publisher slots are used. To add another publisher, first another publisher must be dropped or |
| 448 | /// the capacity of the channels must be increased. |
| 449 | MaximumPublishersReached, |
| 450 | } |
| 451 | |
| 452 | trait SealedPubSubBehavior<T> { |
| 453 | /// Try to get a message from the queue with the given message id. |
| 454 | /// |
| 455 | /// If the message is not yet present and a context is given, then its waker is registered in the subscriber wakers. |
| 456 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; |
| 457 | |
| 458 | /// Get the amount of messages that are between the given the next_message_id and the most recent message. |
| 459 | /// This is not necessarily the amount of messages a subscriber can still received as it may have lagged. |
| 460 | fn available(&self, next_message_id: u64) -> u64; |
| 461 | |
| 462 | /// Try to publish a message to the queue. |
| 463 | /// |
| 464 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. |
| 465 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; |
| 466 | |
| 467 | /// Returns the free capacity of the channel. |
| 468 | /// |
| 469 | /// This is equivalent to `capacity() - len()` |
| 470 | fn free_capacity(&self) -> usize; |
| 471 | |
| 472 | /// Clears all elements in the channel. |
| 473 | fn clear(&self); |
| 474 | |
| 475 | /// Returns the number of elements currently in the channel. |
| 476 | fn len(&self) -> usize; |
| 477 | |
| 478 | /// Returns whether the channel is empty. |
| 479 | fn is_empty(&self) -> bool; |
| 480 | |
| 481 | /// Let the channel know that a subscriber has dropped |
| 482 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); |
| 483 | |
| 484 | /// Let the channel know that a publisher has dropped |
| 485 | fn unregister_publisher(&self); |
| 486 | } |
| 487 | |
| 488 | /// 'Middle level' behaviour of the pubsub channel. |
| 489 | /// This trait is used so that Sub and Pub can be generic over the channel. |
| 490 | #[allow (private_bounds)] |
| 491 | pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> { |
| 492 | /// Publish a message immediately |
| 493 | fn publish_immediate(&self, message: T); |
| 494 | |
| 495 | /// Returns the maximum number of elements the channel can hold. |
| 496 | fn capacity(&self) -> usize; |
| 497 | |
| 498 | /// Returns whether the channel is full. |
| 499 | fn is_full(&self) -> bool; |
| 500 | } |
| 501 | |
| 502 | /// The result of the subscriber wait procedure |
| 503 | #[derive (Debug, Clone, PartialEq, Eq)] |
| 504 | #[cfg_attr (feature = "defmt" , derive(defmt::Format))] |
| 505 | pub enum WaitResult<T> { |
| 506 | /// The subscriber did not receive all messages and lagged by the given amount of messages. |
| 507 | /// (This is the amount of messages that were missed) |
| 508 | Lagged(u64), |
| 509 | /// A message was received |
| 510 | Message(T), |
| 511 | } |
| 512 | |
| 513 | #[cfg (test)] |
| 514 | mod tests { |
| 515 | use super::*; |
| 516 | use crate::blocking_mutex::raw::NoopRawMutex; |
| 517 | |
| 518 | #[futures_test::test] |
| 519 | async fn dyn_pub_sub_works() { |
| 520 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
| 521 | |
| 522 | let mut sub0 = channel.dyn_subscriber().unwrap(); |
| 523 | let mut sub1 = channel.dyn_subscriber().unwrap(); |
| 524 | let pub0 = channel.dyn_publisher().unwrap(); |
| 525 | |
| 526 | pub0.publish(42).await; |
| 527 | |
| 528 | assert_eq!(sub0.next_message().await, WaitResult::Message(42)); |
| 529 | assert_eq!(sub1.next_message().await, WaitResult::Message(42)); |
| 530 | |
| 531 | assert_eq!(sub0.try_next_message(), None); |
| 532 | assert_eq!(sub1.try_next_message(), None); |
| 533 | } |
| 534 | |
| 535 | #[futures_test::test] |
| 536 | async fn all_subscribers_receive() { |
| 537 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
| 538 | |
| 539 | let mut sub0 = channel.subscriber().unwrap(); |
| 540 | let mut sub1 = channel.subscriber().unwrap(); |
| 541 | let pub0 = channel.publisher().unwrap(); |
| 542 | |
| 543 | pub0.publish(42).await; |
| 544 | |
| 545 | assert_eq!(sub0.next_message().await, WaitResult::Message(42)); |
| 546 | assert_eq!(sub1.next_message().await, WaitResult::Message(42)); |
| 547 | |
| 548 | assert_eq!(sub0.try_next_message(), None); |
| 549 | assert_eq!(sub1.try_next_message(), None); |
| 550 | } |
| 551 | |
| 552 | #[futures_test::test] |
| 553 | async fn lag_when_queue_full_on_immediate_publish() { |
| 554 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
| 555 | |
| 556 | let mut sub0 = channel.subscriber().unwrap(); |
| 557 | let pub0 = channel.publisher().unwrap(); |
| 558 | |
| 559 | pub0.publish_immediate(42); |
| 560 | pub0.publish_immediate(43); |
| 561 | pub0.publish_immediate(44); |
| 562 | pub0.publish_immediate(45); |
| 563 | pub0.publish_immediate(46); |
| 564 | pub0.publish_immediate(47); |
| 565 | |
| 566 | assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2))); |
| 567 | assert_eq!(sub0.next_message().await, WaitResult::Message(44)); |
| 568 | assert_eq!(sub0.next_message().await, WaitResult::Message(45)); |
| 569 | assert_eq!(sub0.next_message().await, WaitResult::Message(46)); |
| 570 | assert_eq!(sub0.next_message().await, WaitResult::Message(47)); |
| 571 | assert_eq!(sub0.try_next_message(), None); |
| 572 | } |
| 573 | |
| 574 | #[test ] |
| 575 | fn limited_subs_and_pubs() { |
| 576 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
| 577 | |
| 578 | let sub0 = channel.subscriber(); |
| 579 | let sub1 = channel.subscriber(); |
| 580 | let sub2 = channel.subscriber(); |
| 581 | let sub3 = channel.subscriber(); |
| 582 | let sub4 = channel.subscriber(); |
| 583 | |
| 584 | assert!(sub0.is_ok()); |
| 585 | assert!(sub1.is_ok()); |
| 586 | assert!(sub2.is_ok()); |
| 587 | assert!(sub3.is_ok()); |
| 588 | assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached); |
| 589 | |
| 590 | drop(sub0); |
| 591 | |
| 592 | let sub5 = channel.subscriber(); |
| 593 | assert!(sub5.is_ok()); |
| 594 | |
| 595 | // publishers |
| 596 | |
| 597 | let pub0 = channel.publisher(); |
| 598 | let pub1 = channel.publisher(); |
| 599 | let pub2 = channel.publisher(); |
| 600 | let pub3 = channel.publisher(); |
| 601 | let pub4 = channel.publisher(); |
| 602 | |
| 603 | assert!(pub0.is_ok()); |
| 604 | assert!(pub1.is_ok()); |
| 605 | assert!(pub2.is_ok()); |
| 606 | assert!(pub3.is_ok()); |
| 607 | assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached); |
| 608 | |
| 609 | drop(pub0); |
| 610 | |
| 611 | let pub5 = channel.publisher(); |
| 612 | assert!(pub5.is_ok()); |
| 613 | } |
| 614 | |
| 615 | #[test ] |
| 616 | fn publisher_wait_on_full_queue() { |
| 617 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
| 618 | |
| 619 | let pub0 = channel.publisher().unwrap(); |
| 620 | |
| 621 | // There are no subscribers, so the queue will never be full |
| 622 | assert_eq!(pub0.try_publish(0), Ok(())); |
| 623 | assert_eq!(pub0.try_publish(0), Ok(())); |
| 624 | assert_eq!(pub0.try_publish(0), Ok(())); |
| 625 | assert_eq!(pub0.try_publish(0), Ok(())); |
| 626 | assert_eq!(pub0.try_publish(0), Ok(())); |
| 627 | |
| 628 | let sub0 = channel.subscriber().unwrap(); |
| 629 | |
| 630 | assert_eq!(pub0.try_publish(0), Ok(())); |
| 631 | assert_eq!(pub0.try_publish(0), Ok(())); |
| 632 | assert_eq!(pub0.try_publish(0), Ok(())); |
| 633 | assert_eq!(pub0.try_publish(0), Ok(())); |
| 634 | assert!(pub0.is_full()); |
| 635 | assert_eq!(pub0.try_publish(0), Err(0)); |
| 636 | |
| 637 | drop(sub0); |
| 638 | } |
| 639 | |
| 640 | #[futures_test::test] |
| 641 | async fn correct_available() { |
| 642 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
| 643 | |
| 644 | let sub0 = channel.subscriber().unwrap(); |
| 645 | let mut sub1 = channel.subscriber().unwrap(); |
| 646 | let pub0 = channel.publisher().unwrap(); |
| 647 | |
| 648 | assert_eq!(sub0.available(), 0); |
| 649 | assert_eq!(sub1.available(), 0); |
| 650 | |
| 651 | pub0.publish(42).await; |
| 652 | |
| 653 | assert_eq!(sub0.available(), 1); |
| 654 | assert_eq!(sub1.available(), 1); |
| 655 | |
| 656 | sub1.next_message().await; |
| 657 | |
| 658 | assert_eq!(sub1.available(), 0); |
| 659 | |
| 660 | pub0.publish(42).await; |
| 661 | |
| 662 | assert_eq!(sub0.available(), 2); |
| 663 | assert_eq!(sub1.available(), 1); |
| 664 | } |
| 665 | |
| 666 | #[futures_test::test] |
| 667 | async fn correct_len() { |
| 668 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
| 669 | |
| 670 | let mut sub0 = channel.subscriber().unwrap(); |
| 671 | let mut sub1 = channel.subscriber().unwrap(); |
| 672 | let pub0 = channel.publisher().unwrap(); |
| 673 | |
| 674 | assert!(sub0.is_empty()); |
| 675 | assert!(sub1.is_empty()); |
| 676 | assert!(pub0.is_empty()); |
| 677 | assert_eq!(pub0.free_capacity(), 4); |
| 678 | assert_eq!(pub0.len(), 0); |
| 679 | |
| 680 | pub0.publish(42).await; |
| 681 | |
| 682 | assert_eq!(pub0.free_capacity(), 3); |
| 683 | assert_eq!(pub0.len(), 1); |
| 684 | |
| 685 | pub0.publish(42).await; |
| 686 | |
| 687 | assert_eq!(pub0.free_capacity(), 2); |
| 688 | assert_eq!(pub0.len(), 2); |
| 689 | |
| 690 | sub0.next_message().await; |
| 691 | sub0.next_message().await; |
| 692 | |
| 693 | assert_eq!(pub0.free_capacity(), 2); |
| 694 | assert_eq!(pub0.len(), 2); |
| 695 | |
| 696 | sub1.next_message().await; |
| 697 | assert_eq!(pub0.free_capacity(), 3); |
| 698 | assert_eq!(pub0.len(), 1); |
| 699 | |
| 700 | sub1.next_message().await; |
| 701 | assert_eq!(pub0.free_capacity(), 4); |
| 702 | assert_eq!(pub0.len(), 0); |
| 703 | } |
| 704 | |
| 705 | #[futures_test::test] |
| 706 | async fn empty_channel_when_last_subscriber_is_dropped() { |
| 707 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
| 708 | |
| 709 | let pub0 = channel.publisher().unwrap(); |
| 710 | let mut sub0 = channel.subscriber().unwrap(); |
| 711 | let mut sub1 = channel.subscriber().unwrap(); |
| 712 | |
| 713 | assert_eq!(4, pub0.free_capacity()); |
| 714 | |
| 715 | pub0.publish(1).await; |
| 716 | pub0.publish(2).await; |
| 717 | |
| 718 | assert_eq!(2, channel.free_capacity()); |
| 719 | |
| 720 | assert_eq!(1, sub0.try_next_message_pure().unwrap()); |
| 721 | assert_eq!(2, sub0.try_next_message_pure().unwrap()); |
| 722 | |
| 723 | assert_eq!(2, channel.free_capacity()); |
| 724 | |
| 725 | drop(sub0); |
| 726 | |
| 727 | assert_eq!(2, channel.free_capacity()); |
| 728 | |
| 729 | assert_eq!(1, sub1.try_next_message_pure().unwrap()); |
| 730 | |
| 731 | assert_eq!(3, channel.free_capacity()); |
| 732 | |
| 733 | drop(sub1); |
| 734 | |
| 735 | assert_eq!(4, channel.free_capacity()); |
| 736 | } |
| 737 | |
| 738 | struct CloneCallCounter(usize); |
| 739 | |
| 740 | impl Clone for CloneCallCounter { |
| 741 | fn clone(&self) -> Self { |
| 742 | Self(self.0 + 1) |
| 743 | } |
| 744 | } |
| 745 | |
| 746 | #[futures_test::test] |
| 747 | async fn skip_clone_for_last_message() { |
| 748 | let channel = PubSubChannel::<NoopRawMutex, CloneCallCounter, 1, 2, 1>::new(); |
| 749 | let pub0 = channel.publisher().unwrap(); |
| 750 | let mut sub0 = channel.subscriber().unwrap(); |
| 751 | let mut sub1 = channel.subscriber().unwrap(); |
| 752 | |
| 753 | pub0.publish(CloneCallCounter(0)).await; |
| 754 | |
| 755 | assert_eq!(1, sub0.try_next_message_pure().unwrap().0); |
| 756 | assert_eq!(0, sub1.try_next_message_pure().unwrap().0); |
| 757 | } |
| 758 | |
| 759 | #[futures_test::test] |
| 760 | async fn publisher_sink() { |
| 761 | use futures_util::{SinkExt, StreamExt}; |
| 762 | |
| 763 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
| 764 | |
| 765 | let mut sub = channel.subscriber().unwrap(); |
| 766 | |
| 767 | let publ = channel.publisher().unwrap(); |
| 768 | let mut sink = publ.sink(); |
| 769 | |
| 770 | sink.send(0).await.unwrap(); |
| 771 | assert_eq!(0, sub.try_next_message_pure().unwrap()); |
| 772 | |
| 773 | sink.send(1).await.unwrap(); |
| 774 | assert_eq!(1, sub.try_next_message_pure().unwrap()); |
| 775 | |
| 776 | sink.send_all(&mut futures_util::stream::iter(0..4).map(Ok)) |
| 777 | .await |
| 778 | .unwrap(); |
| 779 | assert_eq!(0, sub.try_next_message_pure().unwrap()); |
| 780 | assert_eq!(1, sub.try_next_message_pure().unwrap()); |
| 781 | assert_eq!(2, sub.try_next_message_pure().unwrap()); |
| 782 | assert_eq!(3, sub.try_next_message_pure().unwrap()); |
| 783 | } |
| 784 | } |
| 785 | |