| 1 | //! Implementation of anything directly subscriber related |
| 2 | |
| 3 | use core::future::Future; |
| 4 | use core::marker::PhantomData; |
| 5 | use core::ops::{Deref, DerefMut}; |
| 6 | use core::pin::Pin; |
| 7 | use core::task::{Context, Poll}; |
| 8 | |
| 9 | use super::{PubSubBehavior, PubSubChannel, WaitResult}; |
| 10 | use crate::blocking_mutex::raw::RawMutex; |
| 11 | |
| 12 | /// A subscriber to a channel |
| 13 | pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
| 14 | /// The message id of the next message we are yet to receive |
| 15 | next_message_id: u64, |
| 16 | /// The channel we are a subscriber to |
| 17 | channel: &'a PSB, |
| 18 | _phantom: PhantomData<T>, |
| 19 | } |
| 20 | |
| 21 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> { |
| 22 | pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self { |
| 23 | Self { |
| 24 | next_message_id, |
| 25 | channel, |
| 26 | _phantom: Default::default(), |
| 27 | } |
| 28 | } |
| 29 | |
| 30 | /// Wait for a published message |
| 31 | pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { |
| 32 | SubscriberWaitFuture { subscriber: self } |
| 33 | } |
| 34 | |
| 35 | /// Wait for a published message (ignoring lag results) |
| 36 | pub async fn next_message_pure(&mut self) -> T { |
| 37 | loop { |
| 38 | match self.next_message().await { |
| 39 | WaitResult::Lagged(_) => continue, |
| 40 | WaitResult::Message(message) => break message, |
| 41 | } |
| 42 | } |
| 43 | } |
| 44 | |
| 45 | /// Try to see if there's a published message we haven't received yet. |
| 46 | /// |
| 47 | /// This function does not peek. The message is received if there is one. |
| 48 | pub fn try_next_message(&mut self) -> Option<WaitResult<T>> { |
| 49 | match self.channel.get_message_with_context(&mut self.next_message_id, None) { |
| 50 | Poll::Ready(result) => Some(result), |
| 51 | Poll::Pending => None, |
| 52 | } |
| 53 | } |
| 54 | |
| 55 | /// Try to see if there's a published message we haven't received yet (ignoring lag results). |
| 56 | /// |
| 57 | /// This function does not peek. The message is received if there is one. |
| 58 | pub fn try_next_message_pure(&mut self) -> Option<T> { |
| 59 | loop { |
| 60 | match self.try_next_message() { |
| 61 | Some(WaitResult::Lagged(_)) => continue, |
| 62 | Some(WaitResult::Message(message)) => break Some(message), |
| 63 | None => break None, |
| 64 | } |
| 65 | } |
| 66 | } |
| 67 | |
| 68 | /// The amount of messages this subscriber hasn't received yet. This is like [Self::len] but specifically |
| 69 | /// for this subscriber. |
| 70 | pub fn available(&self) -> u64 { |
| 71 | self.channel.available(self.next_message_id) |
| 72 | } |
| 73 | |
| 74 | /// Returns the maximum number of elements the ***channel*** can hold. |
| 75 | pub fn capacity(&self) -> usize { |
| 76 | self.channel.capacity() |
| 77 | } |
| 78 | |
| 79 | /// Returns the free capacity of the ***channel***. |
| 80 | /// |
| 81 | /// This is equivalent to `capacity() - len()` |
| 82 | pub fn free_capacity(&self) -> usize { |
| 83 | self.channel.free_capacity() |
| 84 | } |
| 85 | |
| 86 | /// Clears all elements in the ***channel***. |
| 87 | pub fn clear(&self) { |
| 88 | self.channel.clear(); |
| 89 | } |
| 90 | |
| 91 | /// Returns the number of elements currently in the ***channel***. |
| 92 | /// See [Self::available] for how many messages are available for this subscriber. |
| 93 | pub fn len(&self) -> usize { |
| 94 | self.channel.len() |
| 95 | } |
| 96 | |
| 97 | /// Returns whether the ***channel*** is empty. |
| 98 | pub fn is_empty(&self) -> bool { |
| 99 | self.channel.is_empty() |
| 100 | } |
| 101 | |
| 102 | /// Returns whether the ***channel*** is full. |
| 103 | pub fn is_full(&self) -> bool { |
| 104 | self.channel.is_full() |
| 105 | } |
| 106 | } |
| 107 | |
| 108 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { |
| 109 | fn drop(&mut self) { |
| 110 | self.channel.unregister_subscriber(self.next_message_id) |
| 111 | } |
| 112 | } |
| 113 | |
| 114 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {} |
| 115 | |
| 116 | /// Warning: The stream implementation ignores lag results and returns all messages. |
| 117 | /// This might miss some messages without you knowing it. |
| 118 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> { |
| 119 | type Item = T; |
| 120 | |
| 121 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 122 | match self |
| 123 | .channel |
| 124 | .get_message_with_context(&mut self.next_message_id, cx:Some(cx)) |
| 125 | { |
| 126 | Poll::Ready(WaitResult::Message(message: T)) => Poll::Ready(Some(message)), |
| 127 | Poll::Ready(WaitResult::Lagged(_)) => { |
| 128 | cx.waker().wake_by_ref(); |
| 129 | Poll::Pending |
| 130 | } |
| 131 | Poll::Pending => Poll::Pending, |
| 132 | } |
| 133 | } |
| 134 | } |
| 135 | |
| 136 | /// A subscriber that holds a dynamic reference to the channel |
| 137 | pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior<T> + 'a, T>); |
| 138 | |
| 139 | impl<'a, T: Clone> Deref for DynSubscriber<'a, T> { |
| 140 | type Target = Sub<'a, dyn PubSubBehavior<T> + 'a, T>; |
| 141 | |
| 142 | fn deref(&self) -> &Self::Target { |
| 143 | &self.0 |
| 144 | } |
| 145 | } |
| 146 | |
| 147 | impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> { |
| 148 | fn deref_mut(&mut self) -> &mut Self::Target { |
| 149 | &mut self.0 |
| 150 | } |
| 151 | } |
| 152 | |
| 153 | /// A subscriber that holds a generic reference to the channel |
| 154 | pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( |
| 155 | pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, |
| 156 | ); |
| 157 | |
| 158 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref |
| 159 | for Subscriber<'a, M, T, CAP, SUBS, PUBS> |
| 160 | { |
| 161 | type Target = Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>; |
| 162 | |
| 163 | fn deref(&self) -> &Self::Target { |
| 164 | &self.0 |
| 165 | } |
| 166 | } |
| 167 | |
| 168 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut |
| 169 | for Subscriber<'a, M, T, CAP, SUBS, PUBS> |
| 170 | { |
| 171 | fn deref_mut(&mut self) -> &mut Self::Target { |
| 172 | &mut self.0 |
| 173 | } |
| 174 | } |
| 175 | |
| 176 | /// Future for the subscriber wait action |
| 177 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
| 178 | pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
| 179 | subscriber: &'s mut Sub<'a, PSB, T>, |
| 180 | } |
| 181 | |
| 182 | impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> { |
| 183 | type Output = WaitResult<T>; |
| 184 | |
| 185 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 186 | self.subscriber |
| 187 | .channel |
| 188 | .get_message_with_context(&mut self.subscriber.next_message_id, cx:Some(cx)) |
| 189 | } |
| 190 | } |
| 191 | |
| 192 | impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {} |
| 193 | |