1 | //! Implementation of anything directly subscriber related |
2 | |
3 | use core::future::Future; |
4 | use core::marker::PhantomData; |
5 | use core::ops::{Deref, DerefMut}; |
6 | use core::pin::Pin; |
7 | use core::task::{Context, Poll}; |
8 | |
9 | use super::{PubSubBehavior, PubSubChannel, WaitResult}; |
10 | use crate::blocking_mutex::raw::RawMutex; |
11 | |
12 | /// A subscriber to a channel |
13 | pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
14 | /// The message id of the next message we are yet to receive |
15 | next_message_id: u64, |
16 | /// The channel we are a subscriber to |
17 | channel: &'a PSB, |
18 | _phantom: PhantomData<T>, |
19 | } |
20 | |
21 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> { |
22 | pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self { |
23 | Self { |
24 | next_message_id, |
25 | channel, |
26 | _phantom: Default::default(), |
27 | } |
28 | } |
29 | |
30 | /// Wait for a published message |
31 | pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { |
32 | SubscriberWaitFuture { subscriber: self } |
33 | } |
34 | |
35 | /// Wait for a published message (ignoring lag results) |
36 | pub async fn next_message_pure(&mut self) -> T { |
37 | loop { |
38 | match self.next_message().await { |
39 | WaitResult::Lagged(_) => continue, |
40 | WaitResult::Message(message) => break message, |
41 | } |
42 | } |
43 | } |
44 | |
45 | /// Try to see if there's a published message we haven't received yet. |
46 | /// |
47 | /// This function does not peek. The message is received if there is one. |
48 | pub fn try_next_message(&mut self) -> Option<WaitResult<T>> { |
49 | match self.channel.get_message_with_context(&mut self.next_message_id, None) { |
50 | Poll::Ready(result) => Some(result), |
51 | Poll::Pending => None, |
52 | } |
53 | } |
54 | |
55 | /// Try to see if there's a published message we haven't received yet (ignoring lag results). |
56 | /// |
57 | /// This function does not peek. The message is received if there is one. |
58 | pub fn try_next_message_pure(&mut self) -> Option<T> { |
59 | loop { |
60 | match self.try_next_message() { |
61 | Some(WaitResult::Lagged(_)) => continue, |
62 | Some(WaitResult::Message(message)) => break Some(message), |
63 | None => break None, |
64 | } |
65 | } |
66 | } |
67 | |
68 | /// The amount of messages this subscriber hasn't received yet. This is like [Self::len] but specifically |
69 | /// for this subscriber. |
70 | pub fn available(&self) -> u64 { |
71 | self.channel.available(self.next_message_id) |
72 | } |
73 | |
74 | /// Returns the maximum number of elements the ***channel*** can hold. |
75 | pub fn capacity(&self) -> usize { |
76 | self.channel.capacity() |
77 | } |
78 | |
79 | /// Returns the free capacity of the ***channel***. |
80 | /// |
81 | /// This is equivalent to `capacity() - len()` |
82 | pub fn free_capacity(&self) -> usize { |
83 | self.channel.free_capacity() |
84 | } |
85 | |
86 | /// Clears all elements in the ***channel***. |
87 | pub fn clear(&self) { |
88 | self.channel.clear(); |
89 | } |
90 | |
91 | /// Returns the number of elements currently in the ***channel***. |
92 | /// See [Self::available] for how many messages are available for this subscriber. |
93 | pub fn len(&self) -> usize { |
94 | self.channel.len() |
95 | } |
96 | |
97 | /// Returns whether the ***channel*** is empty. |
98 | pub fn is_empty(&self) -> bool { |
99 | self.channel.is_empty() |
100 | } |
101 | |
102 | /// Returns whether the ***channel*** is full. |
103 | pub fn is_full(&self) -> bool { |
104 | self.channel.is_full() |
105 | } |
106 | } |
107 | |
108 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { |
109 | fn drop(&mut self) { |
110 | self.channel.unregister_subscriber(self.next_message_id) |
111 | } |
112 | } |
113 | |
114 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {} |
115 | |
116 | /// Warning: The stream implementation ignores lag results and returns all messages. |
117 | /// This might miss some messages without you knowing it. |
118 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> { |
119 | type Item = T; |
120 | |
121 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
122 | match self |
123 | .channel |
124 | .get_message_with_context(&mut self.next_message_id, cx:Some(cx)) |
125 | { |
126 | Poll::Ready(WaitResult::Message(message: T)) => Poll::Ready(Some(message)), |
127 | Poll::Ready(WaitResult::Lagged(_)) => { |
128 | cx.waker().wake_by_ref(); |
129 | Poll::Pending |
130 | } |
131 | Poll::Pending => Poll::Pending, |
132 | } |
133 | } |
134 | } |
135 | |
136 | /// A subscriber that holds a dynamic reference to the channel |
137 | pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior<T> + 'a, T>); |
138 | |
139 | impl<'a, T: Clone> Deref for DynSubscriber<'a, T> { |
140 | type Target = Sub<'a, dyn PubSubBehavior<T> + 'a, T>; |
141 | |
142 | fn deref(&self) -> &Self::Target { |
143 | &self.0 |
144 | } |
145 | } |
146 | |
147 | impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> { |
148 | fn deref_mut(&mut self) -> &mut Self::Target { |
149 | &mut self.0 |
150 | } |
151 | } |
152 | |
153 | /// A subscriber that holds a generic reference to the channel |
154 | pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( |
155 | pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, |
156 | ); |
157 | |
158 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref |
159 | for Subscriber<'a, M, T, CAP, SUBS, PUBS> |
160 | { |
161 | type Target = Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>; |
162 | |
163 | fn deref(&self) -> &Self::Target { |
164 | &self.0 |
165 | } |
166 | } |
167 | |
168 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut |
169 | for Subscriber<'a, M, T, CAP, SUBS, PUBS> |
170 | { |
171 | fn deref_mut(&mut self) -> &mut Self::Target { |
172 | &mut self.0 |
173 | } |
174 | } |
175 | |
176 | /// Future for the subscriber wait action |
177 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
178 | pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
179 | subscriber: &'s mut Sub<'a, PSB, T>, |
180 | } |
181 | |
182 | impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> { |
183 | type Output = WaitResult<T>; |
184 | |
185 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
186 | self.subscriber |
187 | .channel |
188 | .get_message_with_context(&mut self.subscriber.next_message_id, cx:Some(cx)) |
189 | } |
190 | } |
191 | |
192 | impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {} |
193 | |