1 | //! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers. |
2 | |
3 | #![deny (missing_docs)] |
4 | |
5 | use core::cell::RefCell; |
6 | use core::fmt::Debug; |
7 | use core::task::{Context, Poll}; |
8 | |
9 | use heapless::Deque; |
10 | |
11 | use self::publisher::{ImmediatePub, Pub}; |
12 | use self::subscriber::Sub; |
13 | use crate::blocking_mutex::raw::RawMutex; |
14 | use crate::blocking_mutex::Mutex; |
15 | use crate::waitqueue::MultiWakerRegistration; |
16 | |
17 | pub mod publisher; |
18 | pub mod subscriber; |
19 | |
20 | pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher}; |
21 | pub use subscriber::{DynSubscriber, Subscriber}; |
22 | |
23 | /// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers |
24 | /// |
25 | /// Any published message can be read by all subscribers. |
26 | /// A publisher can choose how it sends its message. |
27 | /// |
28 | /// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue. |
29 | /// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message |
30 | /// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive |
31 | /// an error to indicate that it has lagged. |
32 | /// |
33 | /// ## Example |
34 | /// |
35 | /// ``` |
36 | /// # use embassy_sync::blocking_mutex::raw::NoopRawMutex; |
37 | /// # use embassy_sync::pubsub::WaitResult; |
38 | /// # use embassy_sync::pubsub::PubSubChannel; |
39 | /// # use futures_executor::block_on; |
40 | /// # let test = async { |
41 | /// // Create the channel. This can be static as well |
42 | /// let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
43 | /// |
44 | /// // This is a generic subscriber with a direct reference to the channel |
45 | /// let mut sub0 = channel.subscriber().unwrap(); |
46 | /// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel |
47 | /// let mut sub1 = channel.dyn_subscriber().unwrap(); |
48 | /// |
49 | /// let pub0 = channel.publisher().unwrap(); |
50 | /// |
51 | /// // Publish a message, but wait if the queue is full |
52 | /// pub0.publish(42).await; |
53 | /// |
54 | /// // Publish a message, but if the queue is full, just kick out the oldest message. |
55 | /// // This may cause some subscribers to miss a message |
56 | /// pub0.publish_immediate(43); |
57 | /// |
58 | /// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result |
59 | /// assert_eq!(sub0.next_message().await, WaitResult::Message(42)); |
60 | /// assert_eq!(sub1.next_message().await, WaitResult::Message(42)); |
61 | /// |
62 | /// // Wait again, but this time ignore any Lag results |
63 | /// assert_eq!(sub0.next_message_pure().await, 43); |
64 | /// assert_eq!(sub1.next_message_pure().await, 43); |
65 | /// |
66 | /// // There's also a polling interface |
67 | /// assert_eq!(sub0.try_next_message(), None); |
68 | /// assert_eq!(sub1.try_next_message(), None); |
69 | /// # }; |
70 | /// # |
71 | /// # block_on(test); |
72 | /// ``` |
73 | /// |
74 | pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { |
75 | inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>, |
76 | } |
77 | |
78 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> |
79 | PubSubChannel<M, T, CAP, SUBS, PUBS> |
80 | { |
81 | /// Create a new channel |
82 | pub const fn new() -> Self { |
83 | Self { |
84 | inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())), |
85 | } |
86 | } |
87 | |
88 | /// Create a new subscriber. It will only receive messages that are published after its creation. |
89 | /// |
90 | /// If there are no subscriber slots left, an error will be returned. |
91 | pub fn subscriber(&self) -> Result<Subscriber<M, T, CAP, SUBS, PUBS>, Error> { |
92 | self.inner.lock(|inner| { |
93 | let mut s = inner.borrow_mut(); |
94 | |
95 | if s.subscriber_count >= SUBS { |
96 | Err(Error::MaximumSubscribersReached) |
97 | } else { |
98 | s.subscriber_count += 1; |
99 | Ok(Subscriber(Sub::new(s.next_message_id, self))) |
100 | } |
101 | }) |
102 | } |
103 | |
104 | /// Create a new subscriber. It will only receive messages that are published after its creation. |
105 | /// |
106 | /// If there are no subscriber slots left, an error will be returned. |
107 | pub fn dyn_subscriber(&self) -> Result<DynSubscriber<'_, T>, Error> { |
108 | self.inner.lock(|inner| { |
109 | let mut s = inner.borrow_mut(); |
110 | |
111 | if s.subscriber_count >= SUBS { |
112 | Err(Error::MaximumSubscribersReached) |
113 | } else { |
114 | s.subscriber_count += 1; |
115 | Ok(DynSubscriber(Sub::new(s.next_message_id, self))) |
116 | } |
117 | }) |
118 | } |
119 | |
120 | /// Create a new publisher |
121 | /// |
122 | /// If there are no publisher slots left, an error will be returned. |
123 | pub fn publisher(&self) -> Result<Publisher<M, T, CAP, SUBS, PUBS>, Error> { |
124 | self.inner.lock(|inner| { |
125 | let mut s = inner.borrow_mut(); |
126 | |
127 | if s.publisher_count >= PUBS { |
128 | Err(Error::MaximumPublishersReached) |
129 | } else { |
130 | s.publisher_count += 1; |
131 | Ok(Publisher(Pub::new(self))) |
132 | } |
133 | }) |
134 | } |
135 | |
136 | /// Create a new publisher |
137 | /// |
138 | /// If there are no publisher slots left, an error will be returned. |
139 | pub fn dyn_publisher(&self) -> Result<DynPublisher<'_, T>, Error> { |
140 | self.inner.lock(|inner| { |
141 | let mut s = inner.borrow_mut(); |
142 | |
143 | if s.publisher_count >= PUBS { |
144 | Err(Error::MaximumPublishersReached) |
145 | } else { |
146 | s.publisher_count += 1; |
147 | Ok(DynPublisher(Pub::new(self))) |
148 | } |
149 | }) |
150 | } |
151 | |
152 | /// Create a new publisher that can only send immediate messages. |
153 | /// This kind of publisher does not take up a publisher slot. |
154 | pub fn immediate_publisher(&self) -> ImmediatePublisher<M, T, CAP, SUBS, PUBS> { |
155 | ImmediatePublisher(ImmediatePub::new(self)) |
156 | } |
157 | |
158 | /// Create a new publisher that can only send immediate messages. |
159 | /// This kind of publisher does not take up a publisher slot. |
160 | pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> { |
161 | DynImmediatePublisher(ImmediatePub::new(self)) |
162 | } |
163 | |
164 | /// Returns the maximum number of elements the channel can hold. |
165 | pub const fn capacity(&self) -> usize { |
166 | CAP |
167 | } |
168 | |
169 | /// Returns the free capacity of the channel. |
170 | /// |
171 | /// This is equivalent to `capacity() - len()` |
172 | pub fn free_capacity(&self) -> usize { |
173 | CAP - self.len() |
174 | } |
175 | |
176 | /// Clears all elements in the channel. |
177 | pub fn clear(&self) { |
178 | self.inner.lock(|inner| inner.borrow_mut().clear()); |
179 | } |
180 | |
181 | /// Returns the number of elements currently in the channel. |
182 | pub fn len(&self) -> usize { |
183 | self.inner.lock(|inner| inner.borrow().len()) |
184 | } |
185 | |
186 | /// Returns whether the channel is empty. |
187 | pub fn is_empty(&self) -> bool { |
188 | self.inner.lock(|inner| inner.borrow().is_empty()) |
189 | } |
190 | |
191 | /// Returns whether the channel is full. |
192 | pub fn is_full(&self) -> bool { |
193 | self.inner.lock(|inner| inner.borrow().is_full()) |
194 | } |
195 | } |
196 | |
197 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> crate::pubsub::PubSubBehavior<T> |
198 | for PubSubChannel<M, T, CAP, SUBS, PUBS> |
199 | { |
200 | fn publish_immediate(&self, message: T) { |
201 | self.inner.lock(|s: &RefCell>| { |
202 | let mut s: RefMut<'_, PubSubState> = s.borrow_mut(); |
203 | s.publish_immediate(message) |
204 | }) |
205 | } |
206 | |
207 | fn capacity(&self) -> usize { |
208 | self.capacity() |
209 | } |
210 | |
211 | fn is_full(&self) -> bool { |
212 | self.is_full() |
213 | } |
214 | } |
215 | |
216 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T> |
217 | for PubSubChannel<M, T, CAP, SUBS, PUBS> |
218 | { |
219 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> { |
220 | self.inner.lock(|s| { |
221 | let mut s = s.borrow_mut(); |
222 | |
223 | // Check if we can read a message |
224 | match s.get_message(*next_message_id) { |
225 | // Yes, so we are done polling |
226 | Some(WaitResult::Message(message)) => { |
227 | *next_message_id += 1; |
228 | Poll::Ready(WaitResult::Message(message)) |
229 | } |
230 | // No, so we need to reregister our waker and sleep again |
231 | None => { |
232 | if let Some(cx) = cx { |
233 | s.subscriber_wakers.register(cx.waker()); |
234 | } |
235 | Poll::Pending |
236 | } |
237 | // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged |
238 | Some(WaitResult::Lagged(amount)) => { |
239 | *next_message_id += amount; |
240 | Poll::Ready(WaitResult::Lagged(amount)) |
241 | } |
242 | } |
243 | }) |
244 | } |
245 | |
246 | fn available(&self, next_message_id: u64) -> u64 { |
247 | self.inner.lock(|s| s.borrow().next_message_id - next_message_id) |
248 | } |
249 | |
250 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { |
251 | self.inner.lock(|s| { |
252 | let mut s = s.borrow_mut(); |
253 | // Try to publish the message |
254 | match s.try_publish(message) { |
255 | // We did it, we are ready |
256 | Ok(()) => Ok(()), |
257 | // The queue is full, so we need to reregister our waker and go to sleep |
258 | Err(message) => { |
259 | if let Some(cx) = cx { |
260 | s.publisher_wakers.register(cx.waker()); |
261 | } |
262 | Err(message) |
263 | } |
264 | } |
265 | }) |
266 | } |
267 | |
268 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { |
269 | self.inner.lock(|s| { |
270 | let mut s = s.borrow_mut(); |
271 | s.unregister_subscriber(subscriber_next_message_id) |
272 | }) |
273 | } |
274 | |
275 | fn unregister_publisher(&self) { |
276 | self.inner.lock(|s| { |
277 | let mut s = s.borrow_mut(); |
278 | s.unregister_publisher() |
279 | }) |
280 | } |
281 | |
282 | fn free_capacity(&self) -> usize { |
283 | self.free_capacity() |
284 | } |
285 | |
286 | fn clear(&self) { |
287 | self.clear(); |
288 | } |
289 | |
290 | fn len(&self) -> usize { |
291 | self.len() |
292 | } |
293 | |
294 | fn is_empty(&self) -> bool { |
295 | self.is_empty() |
296 | } |
297 | } |
298 | |
299 | /// Internal state for the PubSub channel |
300 | struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { |
301 | /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it |
302 | queue: Deque<(T, usize), CAP>, |
303 | /// Every message has an id. |
304 | /// Don't worry, we won't run out. |
305 | /// If a million messages were published every second, then the ID's would run out in about 584942 years. |
306 | next_message_id: u64, |
307 | /// Collection of wakers for Subscribers that are waiting. |
308 | subscriber_wakers: MultiWakerRegistration<SUBS>, |
309 | /// Collection of wakers for Publishers that are waiting. |
310 | publisher_wakers: MultiWakerRegistration<PUBS>, |
311 | /// The amount of subscribers that are active |
312 | subscriber_count: usize, |
313 | /// The amount of publishers that are active |
314 | publisher_count: usize, |
315 | } |
316 | |
317 | impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> { |
318 | /// Create a new internal channel state |
319 | const fn new() -> Self { |
320 | Self { |
321 | queue: Deque::new(), |
322 | next_message_id: 0, |
323 | subscriber_wakers: MultiWakerRegistration::new(), |
324 | publisher_wakers: MultiWakerRegistration::new(), |
325 | subscriber_count: 0, |
326 | publisher_count: 0, |
327 | } |
328 | } |
329 | |
330 | fn try_publish(&mut self, message: T) -> Result<(), T> { |
331 | if self.subscriber_count == 0 { |
332 | // We don't need to publish anything because there is no one to receive it |
333 | return Ok(()); |
334 | } |
335 | |
336 | if self.queue.is_full() { |
337 | return Err(message); |
338 | } |
339 | // We just did a check for this |
340 | self.queue.push_back((message, self.subscriber_count)).ok().unwrap(); |
341 | |
342 | self.next_message_id += 1; |
343 | |
344 | // Wake all of the subscribers |
345 | self.subscriber_wakers.wake(); |
346 | |
347 | Ok(()) |
348 | } |
349 | |
350 | fn publish_immediate(&mut self, message: T) { |
351 | // Make space in the queue if required |
352 | if self.queue.is_full() { |
353 | self.queue.pop_front(); |
354 | } |
355 | |
356 | // This will succeed because we made sure there is space |
357 | self.try_publish(message).ok().unwrap(); |
358 | } |
359 | |
360 | fn get_message(&mut self, message_id: u64) -> Option<WaitResult<T>> { |
361 | let start_id = self.next_message_id - self.queue.len() as u64; |
362 | |
363 | if message_id < start_id { |
364 | return Some(WaitResult::Lagged(start_id - message_id)); |
365 | } |
366 | |
367 | let current_message_index = (message_id - start_id) as usize; |
368 | |
369 | if current_message_index >= self.queue.len() { |
370 | return None; |
371 | } |
372 | |
373 | // We've checked that the index is valid |
374 | let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap(); |
375 | |
376 | // We're reading this item, so decrement the counter |
377 | queue_item.1 -= 1; |
378 | |
379 | let message = if current_message_index == 0 && queue_item.1 == 0 { |
380 | let (message, _) = self.queue.pop_front().unwrap(); |
381 | self.publisher_wakers.wake(); |
382 | // Return pop'd message without clone |
383 | message |
384 | } else { |
385 | queue_item.0.clone() |
386 | }; |
387 | |
388 | Some(WaitResult::Message(message)) |
389 | } |
390 | |
391 | fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { |
392 | self.subscriber_count -= 1; |
393 | |
394 | // All messages that haven't been read yet by this subscriber must have their counter decremented |
395 | let start_id = self.next_message_id - self.queue.len() as u64; |
396 | if subscriber_next_message_id >= start_id { |
397 | let current_message_index = (subscriber_next_message_id - start_id) as usize; |
398 | self.queue |
399 | .iter_mut() |
400 | .skip(current_message_index) |
401 | .for_each(|(_, counter)| *counter -= 1); |
402 | |
403 | let mut wake_publishers = false; |
404 | while let Some((_, count)) = self.queue.front() { |
405 | if *count == 0 { |
406 | self.queue.pop_front().unwrap(); |
407 | wake_publishers = true; |
408 | } else { |
409 | break; |
410 | } |
411 | } |
412 | |
413 | if wake_publishers { |
414 | self.publisher_wakers.wake(); |
415 | } |
416 | } |
417 | } |
418 | |
419 | fn unregister_publisher(&mut self) { |
420 | self.publisher_count -= 1; |
421 | } |
422 | |
423 | fn clear(&mut self) { |
424 | self.queue.clear(); |
425 | } |
426 | |
427 | fn len(&self) -> usize { |
428 | self.queue.len() |
429 | } |
430 | |
431 | fn is_empty(&self) -> bool { |
432 | self.queue.is_empty() |
433 | } |
434 | |
435 | fn is_full(&self) -> bool { |
436 | self.queue.is_full() |
437 | } |
438 | } |
439 | |
440 | /// Error type for the [PubSubChannel] |
441 | #[derive (Debug, PartialEq, Eq, Clone, Copy)] |
442 | #[cfg_attr (feature = "defmt" , derive(defmt::Format))] |
443 | pub enum Error { |
444 | /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or |
445 | /// the capacity of the channels must be increased. |
446 | MaximumSubscribersReached, |
447 | /// All publisher slots are used. To add another publisher, first another publisher must be dropped or |
448 | /// the capacity of the channels must be increased. |
449 | MaximumPublishersReached, |
450 | } |
451 | |
452 | trait SealedPubSubBehavior<T> { |
453 | /// Try to get a message from the queue with the given message id. |
454 | /// |
455 | /// If the message is not yet present and a context is given, then its waker is registered in the subscriber wakers. |
456 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; |
457 | |
458 | /// Get the amount of messages that are between the given the next_message_id and the most recent message. |
459 | /// This is not necessarily the amount of messages a subscriber can still received as it may have lagged. |
460 | fn available(&self, next_message_id: u64) -> u64; |
461 | |
462 | /// Try to publish a message to the queue. |
463 | /// |
464 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. |
465 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; |
466 | |
467 | /// Returns the free capacity of the channel. |
468 | /// |
469 | /// This is equivalent to `capacity() - len()` |
470 | fn free_capacity(&self) -> usize; |
471 | |
472 | /// Clears all elements in the channel. |
473 | fn clear(&self); |
474 | |
475 | /// Returns the number of elements currently in the channel. |
476 | fn len(&self) -> usize; |
477 | |
478 | /// Returns whether the channel is empty. |
479 | fn is_empty(&self) -> bool; |
480 | |
481 | /// Let the channel know that a subscriber has dropped |
482 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); |
483 | |
484 | /// Let the channel know that a publisher has dropped |
485 | fn unregister_publisher(&self); |
486 | } |
487 | |
488 | /// 'Middle level' behaviour of the pubsub channel. |
489 | /// This trait is used so that Sub and Pub can be generic over the channel. |
490 | #[allow (private_bounds)] |
491 | pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> { |
492 | /// Publish a message immediately |
493 | fn publish_immediate(&self, message: T); |
494 | |
495 | /// Returns the maximum number of elements the channel can hold. |
496 | fn capacity(&self) -> usize; |
497 | |
498 | /// Returns whether the channel is full. |
499 | fn is_full(&self) -> bool; |
500 | } |
501 | |
502 | /// The result of the subscriber wait procedure |
503 | #[derive (Debug, Clone, PartialEq, Eq)] |
504 | #[cfg_attr (feature = "defmt" , derive(defmt::Format))] |
505 | pub enum WaitResult<T> { |
506 | /// The subscriber did not receive all messages and lagged by the given amount of messages. |
507 | /// (This is the amount of messages that were missed) |
508 | Lagged(u64), |
509 | /// A message was received |
510 | Message(T), |
511 | } |
512 | |
513 | #[cfg (test)] |
514 | mod tests { |
515 | use super::*; |
516 | use crate::blocking_mutex::raw::NoopRawMutex; |
517 | |
518 | #[futures_test::test] |
519 | async fn dyn_pub_sub_works() { |
520 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
521 | |
522 | let mut sub0 = channel.dyn_subscriber().unwrap(); |
523 | let mut sub1 = channel.dyn_subscriber().unwrap(); |
524 | let pub0 = channel.dyn_publisher().unwrap(); |
525 | |
526 | pub0.publish(42).await; |
527 | |
528 | assert_eq!(sub0.next_message().await, WaitResult::Message(42)); |
529 | assert_eq!(sub1.next_message().await, WaitResult::Message(42)); |
530 | |
531 | assert_eq!(sub0.try_next_message(), None); |
532 | assert_eq!(sub1.try_next_message(), None); |
533 | } |
534 | |
535 | #[futures_test::test] |
536 | async fn all_subscribers_receive() { |
537 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
538 | |
539 | let mut sub0 = channel.subscriber().unwrap(); |
540 | let mut sub1 = channel.subscriber().unwrap(); |
541 | let pub0 = channel.publisher().unwrap(); |
542 | |
543 | pub0.publish(42).await; |
544 | |
545 | assert_eq!(sub0.next_message().await, WaitResult::Message(42)); |
546 | assert_eq!(sub1.next_message().await, WaitResult::Message(42)); |
547 | |
548 | assert_eq!(sub0.try_next_message(), None); |
549 | assert_eq!(sub1.try_next_message(), None); |
550 | } |
551 | |
552 | #[futures_test::test] |
553 | async fn lag_when_queue_full_on_immediate_publish() { |
554 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
555 | |
556 | let mut sub0 = channel.subscriber().unwrap(); |
557 | let pub0 = channel.publisher().unwrap(); |
558 | |
559 | pub0.publish_immediate(42); |
560 | pub0.publish_immediate(43); |
561 | pub0.publish_immediate(44); |
562 | pub0.publish_immediate(45); |
563 | pub0.publish_immediate(46); |
564 | pub0.publish_immediate(47); |
565 | |
566 | assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2))); |
567 | assert_eq!(sub0.next_message().await, WaitResult::Message(44)); |
568 | assert_eq!(sub0.next_message().await, WaitResult::Message(45)); |
569 | assert_eq!(sub0.next_message().await, WaitResult::Message(46)); |
570 | assert_eq!(sub0.next_message().await, WaitResult::Message(47)); |
571 | assert_eq!(sub0.try_next_message(), None); |
572 | } |
573 | |
574 | #[test ] |
575 | fn limited_subs_and_pubs() { |
576 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
577 | |
578 | let sub0 = channel.subscriber(); |
579 | let sub1 = channel.subscriber(); |
580 | let sub2 = channel.subscriber(); |
581 | let sub3 = channel.subscriber(); |
582 | let sub4 = channel.subscriber(); |
583 | |
584 | assert!(sub0.is_ok()); |
585 | assert!(sub1.is_ok()); |
586 | assert!(sub2.is_ok()); |
587 | assert!(sub3.is_ok()); |
588 | assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached); |
589 | |
590 | drop(sub0); |
591 | |
592 | let sub5 = channel.subscriber(); |
593 | assert!(sub5.is_ok()); |
594 | |
595 | // publishers |
596 | |
597 | let pub0 = channel.publisher(); |
598 | let pub1 = channel.publisher(); |
599 | let pub2 = channel.publisher(); |
600 | let pub3 = channel.publisher(); |
601 | let pub4 = channel.publisher(); |
602 | |
603 | assert!(pub0.is_ok()); |
604 | assert!(pub1.is_ok()); |
605 | assert!(pub2.is_ok()); |
606 | assert!(pub3.is_ok()); |
607 | assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached); |
608 | |
609 | drop(pub0); |
610 | |
611 | let pub5 = channel.publisher(); |
612 | assert!(pub5.is_ok()); |
613 | } |
614 | |
615 | #[test ] |
616 | fn publisher_wait_on_full_queue() { |
617 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
618 | |
619 | let pub0 = channel.publisher().unwrap(); |
620 | |
621 | // There are no subscribers, so the queue will never be full |
622 | assert_eq!(pub0.try_publish(0), Ok(())); |
623 | assert_eq!(pub0.try_publish(0), Ok(())); |
624 | assert_eq!(pub0.try_publish(0), Ok(())); |
625 | assert_eq!(pub0.try_publish(0), Ok(())); |
626 | assert_eq!(pub0.try_publish(0), Ok(())); |
627 | |
628 | let sub0 = channel.subscriber().unwrap(); |
629 | |
630 | assert_eq!(pub0.try_publish(0), Ok(())); |
631 | assert_eq!(pub0.try_publish(0), Ok(())); |
632 | assert_eq!(pub0.try_publish(0), Ok(())); |
633 | assert_eq!(pub0.try_publish(0), Ok(())); |
634 | assert!(pub0.is_full()); |
635 | assert_eq!(pub0.try_publish(0), Err(0)); |
636 | |
637 | drop(sub0); |
638 | } |
639 | |
640 | #[futures_test::test] |
641 | async fn correct_available() { |
642 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
643 | |
644 | let sub0 = channel.subscriber().unwrap(); |
645 | let mut sub1 = channel.subscriber().unwrap(); |
646 | let pub0 = channel.publisher().unwrap(); |
647 | |
648 | assert_eq!(sub0.available(), 0); |
649 | assert_eq!(sub1.available(), 0); |
650 | |
651 | pub0.publish(42).await; |
652 | |
653 | assert_eq!(sub0.available(), 1); |
654 | assert_eq!(sub1.available(), 1); |
655 | |
656 | sub1.next_message().await; |
657 | |
658 | assert_eq!(sub1.available(), 0); |
659 | |
660 | pub0.publish(42).await; |
661 | |
662 | assert_eq!(sub0.available(), 2); |
663 | assert_eq!(sub1.available(), 1); |
664 | } |
665 | |
666 | #[futures_test::test] |
667 | async fn correct_len() { |
668 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
669 | |
670 | let mut sub0 = channel.subscriber().unwrap(); |
671 | let mut sub1 = channel.subscriber().unwrap(); |
672 | let pub0 = channel.publisher().unwrap(); |
673 | |
674 | assert!(sub0.is_empty()); |
675 | assert!(sub1.is_empty()); |
676 | assert!(pub0.is_empty()); |
677 | assert_eq!(pub0.free_capacity(), 4); |
678 | assert_eq!(pub0.len(), 0); |
679 | |
680 | pub0.publish(42).await; |
681 | |
682 | assert_eq!(pub0.free_capacity(), 3); |
683 | assert_eq!(pub0.len(), 1); |
684 | |
685 | pub0.publish(42).await; |
686 | |
687 | assert_eq!(pub0.free_capacity(), 2); |
688 | assert_eq!(pub0.len(), 2); |
689 | |
690 | sub0.next_message().await; |
691 | sub0.next_message().await; |
692 | |
693 | assert_eq!(pub0.free_capacity(), 2); |
694 | assert_eq!(pub0.len(), 2); |
695 | |
696 | sub1.next_message().await; |
697 | assert_eq!(pub0.free_capacity(), 3); |
698 | assert_eq!(pub0.len(), 1); |
699 | |
700 | sub1.next_message().await; |
701 | assert_eq!(pub0.free_capacity(), 4); |
702 | assert_eq!(pub0.len(), 0); |
703 | } |
704 | |
705 | #[futures_test::test] |
706 | async fn empty_channel_when_last_subscriber_is_dropped() { |
707 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
708 | |
709 | let pub0 = channel.publisher().unwrap(); |
710 | let mut sub0 = channel.subscriber().unwrap(); |
711 | let mut sub1 = channel.subscriber().unwrap(); |
712 | |
713 | assert_eq!(4, pub0.free_capacity()); |
714 | |
715 | pub0.publish(1).await; |
716 | pub0.publish(2).await; |
717 | |
718 | assert_eq!(2, channel.free_capacity()); |
719 | |
720 | assert_eq!(1, sub0.try_next_message_pure().unwrap()); |
721 | assert_eq!(2, sub0.try_next_message_pure().unwrap()); |
722 | |
723 | assert_eq!(2, channel.free_capacity()); |
724 | |
725 | drop(sub0); |
726 | |
727 | assert_eq!(2, channel.free_capacity()); |
728 | |
729 | assert_eq!(1, sub1.try_next_message_pure().unwrap()); |
730 | |
731 | assert_eq!(3, channel.free_capacity()); |
732 | |
733 | drop(sub1); |
734 | |
735 | assert_eq!(4, channel.free_capacity()); |
736 | } |
737 | |
738 | struct CloneCallCounter(usize); |
739 | |
740 | impl Clone for CloneCallCounter { |
741 | fn clone(&self) -> Self { |
742 | Self(self.0 + 1) |
743 | } |
744 | } |
745 | |
746 | #[futures_test::test] |
747 | async fn skip_clone_for_last_message() { |
748 | let channel = PubSubChannel::<NoopRawMutex, CloneCallCounter, 1, 2, 1>::new(); |
749 | let pub0 = channel.publisher().unwrap(); |
750 | let mut sub0 = channel.subscriber().unwrap(); |
751 | let mut sub1 = channel.subscriber().unwrap(); |
752 | |
753 | pub0.publish(CloneCallCounter(0)).await; |
754 | |
755 | assert_eq!(1, sub0.try_next_message_pure().unwrap().0); |
756 | assert_eq!(0, sub1.try_next_message_pure().unwrap().0); |
757 | } |
758 | |
759 | #[futures_test::test] |
760 | async fn publisher_sink() { |
761 | use futures_util::{SinkExt, StreamExt}; |
762 | |
763 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |
764 | |
765 | let mut sub = channel.subscriber().unwrap(); |
766 | |
767 | let publ = channel.publisher().unwrap(); |
768 | let mut sink = publ.sink(); |
769 | |
770 | sink.send(0).await.unwrap(); |
771 | assert_eq!(0, sub.try_next_message_pure().unwrap()); |
772 | |
773 | sink.send(1).await.unwrap(); |
774 | assert_eq!(1, sub.try_next_message_pure().unwrap()); |
775 | |
776 | sink.send_all(&mut futures_util::stream::iter(0..4).map(Ok)) |
777 | .await |
778 | .unwrap(); |
779 | assert_eq!(0, sub.try_next_message_pure().unwrap()); |
780 | assert_eq!(1, sub.try_next_message_pure().unwrap()); |
781 | assert_eq!(2, sub.try_next_message_pure().unwrap()); |
782 | assert_eq!(3, sub.try_next_message_pure().unwrap()); |
783 | } |
784 | } |
785 | |