1 | //! Implementation of anything directly publisher related |
2 | |
3 | use core::future::Future; |
4 | use core::marker::PhantomData; |
5 | use core::ops::{Deref, DerefMut}; |
6 | use core::pin::Pin; |
7 | use core::task::{Context, Poll}; |
8 | |
9 | use super::{PubSubBehavior, PubSubChannel}; |
10 | use crate::blocking_mutex::raw::RawMutex; |
11 | |
12 | /// A publisher to a channel |
13 | pub struct Pub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
14 | /// The channel we are a publisher for |
15 | channel: &'a PSB, |
16 | _phantom: PhantomData<T>, |
17 | } |
18 | |
19 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> { |
20 | pub(super) fn new(channel: &'a PSB) -> Self { |
21 | Self { |
22 | channel, |
23 | _phantom: Default::default(), |
24 | } |
25 | } |
26 | |
27 | /// Publish a message right now even when the queue is full. |
28 | /// This may cause a subscriber to miss an older message. |
29 | pub fn publish_immediate(&self, message: T) { |
30 | self.channel.publish_immediate(message) |
31 | } |
32 | |
33 | /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message |
34 | pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> { |
35 | PublisherWaitFuture { |
36 | message: Some(message), |
37 | publisher: self, |
38 | } |
39 | } |
40 | |
41 | /// Publish a message if there is space in the message queue |
42 | pub fn try_publish(&self, message: T) -> Result<(), T> { |
43 | self.channel.publish_with_context(message, None) |
44 | } |
45 | |
46 | /// Returns the maximum number of elements the ***channel*** can hold. |
47 | pub fn capacity(&self) -> usize { |
48 | self.channel.capacity() |
49 | } |
50 | |
51 | /// Returns the free capacity of the ***channel***. |
52 | /// |
53 | /// This is equivalent to `capacity() - len()` |
54 | pub fn free_capacity(&self) -> usize { |
55 | self.channel.free_capacity() |
56 | } |
57 | |
58 | /// Clears all elements in the ***channel***. |
59 | pub fn clear(&self) { |
60 | self.channel.clear(); |
61 | } |
62 | |
63 | /// Returns the number of elements currently in the ***channel***. |
64 | pub fn len(&self) -> usize { |
65 | self.channel.len() |
66 | } |
67 | |
68 | /// Returns whether the ***channel*** is empty. |
69 | pub fn is_empty(&self) -> bool { |
70 | self.channel.is_empty() |
71 | } |
72 | |
73 | /// Returns whether the ***channel*** is full. |
74 | pub fn is_full(&self) -> bool { |
75 | self.channel.is_full() |
76 | } |
77 | |
78 | /// Create a [`futures::Sink`] adapter for this publisher. |
79 | #[inline ] |
80 | pub const fn sink(&self) -> PubSink<'a, '_, PSB, T> { |
81 | PubSink { publ: self, fut: None } |
82 | } |
83 | } |
84 | |
85 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { |
86 | fn drop(&mut self) { |
87 | self.channel.unregister_publisher() |
88 | } |
89 | } |
90 | |
91 | /// A publisher that holds a dynamic reference to the channel |
92 | pub struct DynPublisher<'a, T: Clone>(pub(super) Pub<'a, dyn PubSubBehavior<T> + 'a, T>); |
93 | |
94 | impl<'a, T: Clone> Deref for DynPublisher<'a, T> { |
95 | type Target = Pub<'a, dyn PubSubBehavior<T> + 'a, T>; |
96 | |
97 | fn deref(&self) -> &Self::Target { |
98 | &self.0 |
99 | } |
100 | } |
101 | |
102 | impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> { |
103 | fn deref_mut(&mut self) -> &mut Self::Target { |
104 | &mut self.0 |
105 | } |
106 | } |
107 | |
108 | /// A publisher that holds a generic reference to the channel |
109 | pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( |
110 | pub(super) Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, |
111 | ); |
112 | |
113 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref |
114 | for Publisher<'a, M, T, CAP, SUBS, PUBS> |
115 | { |
116 | type Target = Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>; |
117 | |
118 | fn deref(&self) -> &Self::Target { |
119 | &self.0 |
120 | } |
121 | } |
122 | |
123 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut |
124 | for Publisher<'a, M, T, CAP, SUBS, PUBS> |
125 | { |
126 | fn deref_mut(&mut self) -> &mut Self::Target { |
127 | &mut self.0 |
128 | } |
129 | } |
130 | |
131 | /// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. |
132 | /// (So an infinite amount is possible) |
133 | pub struct ImmediatePub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
134 | /// The channel we are a publisher for |
135 | channel: &'a PSB, |
136 | _phantom: PhantomData<T>, |
137 | } |
138 | |
139 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { |
140 | pub(super) fn new(channel: &'a PSB) -> Self { |
141 | Self { |
142 | channel, |
143 | _phantom: Default::default(), |
144 | } |
145 | } |
146 | /// Publish the message right now even when the queue is full. |
147 | /// This may cause a subscriber to miss an older message. |
148 | pub fn publish_immediate(&self, message: T) { |
149 | self.channel.publish_immediate(message) |
150 | } |
151 | |
152 | /// Publish a message if there is space in the message queue |
153 | pub fn try_publish(&self, message: T) -> Result<(), T> { |
154 | self.channel.publish_with_context(message, None) |
155 | } |
156 | |
157 | /// Returns the maximum number of elements the ***channel*** can hold. |
158 | pub fn capacity(&self) -> usize { |
159 | self.channel.capacity() |
160 | } |
161 | |
162 | /// Returns the free capacity of the ***channel***. |
163 | /// |
164 | /// This is equivalent to `capacity() - len()` |
165 | pub fn free_capacity(&self) -> usize { |
166 | self.channel.free_capacity() |
167 | } |
168 | |
169 | /// Clears all elements in the ***channel***. |
170 | pub fn clear(&self) { |
171 | self.channel.clear(); |
172 | } |
173 | |
174 | /// Returns the number of elements currently in the ***channel***. |
175 | pub fn len(&self) -> usize { |
176 | self.channel.len() |
177 | } |
178 | |
179 | /// Returns whether the ***channel*** is empty. |
180 | pub fn is_empty(&self) -> bool { |
181 | self.channel.is_empty() |
182 | } |
183 | |
184 | /// Returns whether the ***channel*** is full. |
185 | pub fn is_full(&self) -> bool { |
186 | self.channel.is_full() |
187 | } |
188 | } |
189 | |
190 | /// An immediate publisher that holds a dynamic reference to the channel |
191 | pub struct DynImmediatePublisher<'a, T: Clone>(pub(super) ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>); |
192 | |
193 | impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> { |
194 | type Target = ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>; |
195 | |
196 | fn deref(&self) -> &Self::Target { |
197 | &self.0 |
198 | } |
199 | } |
200 | |
201 | impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> { |
202 | fn deref_mut(&mut self) -> &mut Self::Target { |
203 | &mut self.0 |
204 | } |
205 | } |
206 | |
207 | /// An immediate publisher that holds a generic reference to the channel |
208 | pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( |
209 | pub(super) ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, |
210 | ); |
211 | |
212 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref |
213 | for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> |
214 | { |
215 | type Target = ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>; |
216 | |
217 | fn deref(&self) -> &Self::Target { |
218 | &self.0 |
219 | } |
220 | } |
221 | |
222 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut |
223 | for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> |
224 | { |
225 | fn deref_mut(&mut self) -> &mut Self::Target { |
226 | &mut self.0 |
227 | } |
228 | } |
229 | |
230 | #[must_use = "Sinks do nothing unless polled" ] |
231 | /// [`futures_sink::Sink`] adapter for [`Pub`]. |
232 | pub struct PubSink<'a, 'p, PSB, T> |
233 | where |
234 | T: Clone, |
235 | PSB: PubSubBehavior<T> + ?Sized, |
236 | { |
237 | publ: &'p Pub<'a, PSB, T>, |
238 | fut: Option<PublisherWaitFuture<'p, 'a, PSB, T>>, |
239 | } |
240 | |
241 | impl<'a, 'p, PSB, T> PubSink<'a, 'p, PSB, T> |
242 | where |
243 | PSB: PubSubBehavior<T> + ?Sized, |
244 | T: Clone, |
245 | { |
246 | /// Try to make progress on the pending future if we have one. |
247 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
248 | let Some(mut fut: PublisherWaitFuture<'p, 'a, …, …>) = self.fut.take() else { |
249 | return Poll::Ready(()); |
250 | }; |
251 | |
252 | if Pin::new(&mut fut).poll(cx).is_pending() { |
253 | self.fut = Some(fut); |
254 | return Poll::Pending; |
255 | } |
256 | |
257 | Poll::Ready(()) |
258 | } |
259 | } |
260 | |
261 | impl<'a, 'p, PSB, T> futures_sink::Sink<T> for PubSink<'a, 'p, PSB, T> |
262 | where |
263 | PSB: PubSubBehavior<T> + ?Sized, |
264 | T: Clone, |
265 | { |
266 | type Error = core::convert::Infallible; |
267 | |
268 | #[inline ] |
269 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
270 | self.poll(cx).map(Ok) |
271 | } |
272 | |
273 | #[inline ] |
274 | fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { |
275 | self.fut = Some(self.publ.publish(item)); |
276 | |
277 | Ok(()) |
278 | } |
279 | |
280 | #[inline ] |
281 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
282 | self.poll(cx).map(Ok) |
283 | } |
284 | |
285 | #[inline ] |
286 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
287 | self.poll(cx).map(Ok) |
288 | } |
289 | } |
290 | |
291 | /// Future for the publisher wait action |
292 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
293 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
294 | /// The message we need to publish |
295 | message: Option<T>, |
296 | publisher: &'s Pub<'a, PSB, T>, |
297 | } |
298 | |
299 | impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> { |
300 | type Output = (); |
301 | |
302 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
303 | let message: T = self.message.take().unwrap(); |
304 | match self.publisher.channel.publish_with_context(message, cx:Some(cx)) { |
305 | Ok(()) => Poll::Ready(()), |
306 | Err(message: T) => { |
307 | self.message = Some(message); |
308 | Poll::Pending |
309 | } |
310 | } |
311 | } |
312 | } |
313 | |
314 | impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {} |
315 | |