1 | //! A synchronization primitive for passing the latest value to **multiple** receivers. |
2 | |
3 | use core::cell::RefCell; |
4 | use core::future::{poll_fn, Future}; |
5 | use core::marker::PhantomData; |
6 | use core::ops::{Deref, DerefMut}; |
7 | use core::task::{Context, Poll}; |
8 | |
9 | use crate::blocking_mutex::raw::RawMutex; |
10 | use crate::blocking_mutex::Mutex; |
11 | use crate::waitqueue::MultiWakerRegistration; |
12 | |
13 | /// The `Watch` is a single-slot signaling primitive that allows multiple receivers to concurrently await |
14 | /// changes to the value. Unlike a [`Signal`](crate::signal::Signal), `Watch` supports multiple receivers, |
15 | /// and unlike a [`PubSubChannel`](crate::pubsub::PubSubChannel), `Watch` immediately overwrites the previous |
16 | /// value when a new one is sent, without waiting for all receivers to read the previous value. |
17 | /// |
18 | /// This makes `Watch` particularly useful when a single task updates a value or "state", and multiple other tasks |
19 | /// need to be notified about changes to this value asynchronously. Receivers may "lose" stale values, as they are |
20 | /// always provided with the latest value. |
21 | /// |
22 | /// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`] |
23 | /// (or [`DynSender`] and/or [`DynReceiver`]) are obtained where relevant. An [`AnonReceiver`] |
24 | /// and [`DynAnonReceiver`] are also available, which do not increase the receiver count for the |
25 | /// channel, and unwrapping is therefore not required, but it is not possible to `.await` the channel. |
26 | /// ``` |
27 | /// |
28 | /// use futures_executor::block_on; |
29 | /// use embassy_sync::watch::Watch; |
30 | /// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; |
31 | /// |
32 | /// let f = async { |
33 | /// |
34 | /// static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); |
35 | /// |
36 | /// // Obtain receivers and sender |
37 | /// let mut rcv0 = WATCH.receiver().unwrap(); |
38 | /// let mut rcv1 = WATCH.dyn_receiver().unwrap(); |
39 | /// let mut snd = WATCH.sender(); |
40 | /// |
41 | /// // No more receivers, and no update |
42 | /// assert!(WATCH.receiver().is_none()); |
43 | /// assert_eq!(rcv1.try_changed(), None); |
44 | /// |
45 | /// snd.send(10); |
46 | /// |
47 | /// // Receive the new value (async or try) |
48 | /// assert_eq!(rcv0.changed().await, 10); |
49 | /// assert_eq!(rcv1.try_changed(), Some(10)); |
50 | /// |
51 | /// // No update |
52 | /// assert_eq!(rcv0.try_changed(), None); |
53 | /// assert_eq!(rcv1.try_changed(), None); |
54 | /// |
55 | /// snd.send(20); |
56 | /// |
57 | /// // Using `get` marks the value as seen |
58 | /// assert_eq!(rcv1.get().await, 20); |
59 | /// assert_eq!(rcv1.try_changed(), None); |
60 | /// |
61 | /// // But `get` also returns when unchanged |
62 | /// assert_eq!(rcv1.get().await, 20); |
63 | /// assert_eq!(rcv1.get().await, 20); |
64 | /// |
65 | /// }; |
66 | /// block_on(f); |
67 | /// ``` |
68 | pub struct Watch<M: RawMutex, T: Clone, const N: usize> { |
69 | mutex: Mutex<M, RefCell<WatchState<T, N>>>, |
70 | } |
71 | |
72 | struct WatchState<T: Clone, const N: usize> { |
73 | data: Option<T>, |
74 | current_id: u64, |
75 | wakers: MultiWakerRegistration<N>, |
76 | receiver_count: usize, |
77 | } |
78 | |
79 | trait SealedWatchBehavior<T> { |
80 | /// Poll the `Watch` for the current value, making it as seen. |
81 | fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; |
82 | |
83 | /// Poll the `Watch` for the value if it matches the predicate function |
84 | /// `f`, making it as seen. |
85 | fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>; |
86 | |
87 | /// Poll the `Watch` for a changed value, marking it as seen, if an id is given. |
88 | fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; |
89 | |
90 | /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. |
91 | fn try_changed(&self, id: &mut u64) -> Option<T>; |
92 | |
93 | /// Poll the `Watch` for a changed value that matches the predicate function |
94 | /// `f`, marking it as seen. |
95 | fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>; |
96 | |
97 | /// Tries to retrieve the value of the `Watch` if it has changed and matches the |
98 | /// predicate function `f`, marking it as seen. |
99 | fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>; |
100 | |
101 | /// Used when a receiver is dropped to decrement the receiver count. |
102 | /// |
103 | /// ## This method should not be called by the user. |
104 | fn drop_receiver(&self); |
105 | |
106 | /// Clears the value of the `Watch`. |
107 | fn clear(&self); |
108 | |
109 | /// Sends a new value to the `Watch`. |
110 | fn send(&self, val: T); |
111 | |
112 | /// Modify the value of the `Watch` using a closure. Returns `false` if the |
113 | /// `Watch` does not already contain a value. |
114 | fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)); |
115 | |
116 | /// Modify the value of the `Watch` using a closure. Returns `false` if the |
117 | /// `Watch` does not already contain a value. |
118 | fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool); |
119 | } |
120 | |
121 | /// A trait representing the 'inner' behavior of the `Watch`. |
122 | #[allow (private_bounds)] |
123 | pub trait WatchBehavior<T: Clone>: SealedWatchBehavior<T> { |
124 | /// Tries to get the value of the `Watch`, marking it as seen, if an id is given. |
125 | fn try_get(&self, id: Option<&mut u64>) -> Option<T>; |
126 | |
127 | /// Tries to get the value of the `Watch` if it matches the predicate function |
128 | /// `f`, marking it as seen. |
129 | fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T>; |
130 | |
131 | /// Checks if the `Watch` is been initialized with a value. |
132 | fn contains_value(&self) -> bool; |
133 | } |
134 | |
135 | impl<M: RawMutex, T: Clone, const N: usize> SealedWatchBehavior<T> for Watch<M, T, N> { |
136 | fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { |
137 | self.mutex.lock(|state| { |
138 | let mut s = state.borrow_mut(); |
139 | match &s.data { |
140 | Some(data) => { |
141 | *id = s.current_id; |
142 | Poll::Ready(data.clone()) |
143 | } |
144 | None => { |
145 | s.wakers.register(cx.waker()); |
146 | Poll::Pending |
147 | } |
148 | } |
149 | }) |
150 | } |
151 | |
152 | fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> { |
153 | self.mutex.lock(|state| { |
154 | let mut s = state.borrow_mut(); |
155 | match s.data { |
156 | Some(ref data) if f(data) => { |
157 | *id = s.current_id; |
158 | Poll::Ready(data.clone()) |
159 | } |
160 | _ => { |
161 | s.wakers.register(cx.waker()); |
162 | Poll::Pending |
163 | } |
164 | } |
165 | }) |
166 | } |
167 | |
168 | fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { |
169 | self.mutex.lock(|state| { |
170 | let mut s = state.borrow_mut(); |
171 | match (&s.data, s.current_id > *id) { |
172 | (Some(data), true) => { |
173 | *id = s.current_id; |
174 | Poll::Ready(data.clone()) |
175 | } |
176 | _ => { |
177 | s.wakers.register(cx.waker()); |
178 | Poll::Pending |
179 | } |
180 | } |
181 | }) |
182 | } |
183 | |
184 | fn try_changed(&self, id: &mut u64) -> Option<T> { |
185 | self.mutex.lock(|state| { |
186 | let s = state.borrow(); |
187 | match s.current_id > *id { |
188 | true => { |
189 | *id = s.current_id; |
190 | s.data.clone() |
191 | } |
192 | false => None, |
193 | } |
194 | }) |
195 | } |
196 | |
197 | fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> { |
198 | self.mutex.lock(|state| { |
199 | let mut s = state.borrow_mut(); |
200 | match (&s.data, s.current_id > *id) { |
201 | (Some(data), true) if f(data) => { |
202 | *id = s.current_id; |
203 | Poll::Ready(data.clone()) |
204 | } |
205 | _ => { |
206 | s.wakers.register(cx.waker()); |
207 | Poll::Pending |
208 | } |
209 | } |
210 | }) |
211 | } |
212 | |
213 | fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> { |
214 | self.mutex.lock(|state| { |
215 | let s = state.borrow(); |
216 | match (&s.data, s.current_id > *id) { |
217 | (Some(data), true) if f(data) => { |
218 | *id = s.current_id; |
219 | s.data.clone() |
220 | } |
221 | _ => None, |
222 | } |
223 | }) |
224 | } |
225 | |
226 | fn drop_receiver(&self) { |
227 | self.mutex.lock(|state| { |
228 | let mut s = state.borrow_mut(); |
229 | s.receiver_count -= 1; |
230 | }) |
231 | } |
232 | |
233 | fn clear(&self) { |
234 | self.mutex.lock(|state| { |
235 | let mut s = state.borrow_mut(); |
236 | s.data = None; |
237 | }) |
238 | } |
239 | |
240 | fn send(&self, val: T) { |
241 | self.mutex.lock(|state| { |
242 | let mut s = state.borrow_mut(); |
243 | s.data = Some(val); |
244 | s.current_id += 1; |
245 | s.wakers.wake(); |
246 | }) |
247 | } |
248 | |
249 | fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)) { |
250 | self.mutex.lock(|state| { |
251 | let mut s = state.borrow_mut(); |
252 | f(&mut s.data); |
253 | s.current_id += 1; |
254 | s.wakers.wake(); |
255 | }) |
256 | } |
257 | |
258 | fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool) { |
259 | self.mutex.lock(|state| { |
260 | let mut s = state.borrow_mut(); |
261 | if f(&mut s.data) { |
262 | s.current_id += 1; |
263 | s.wakers.wake(); |
264 | } |
265 | }) |
266 | } |
267 | } |
268 | |
269 | impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> { |
270 | fn try_get(&self, id: Option<&mut u64>) -> Option<T> { |
271 | self.mutex.lock(|state| { |
272 | let s = state.borrow(); |
273 | if let Some(id) = id { |
274 | *id = s.current_id; |
275 | } |
276 | s.data.clone() |
277 | }) |
278 | } |
279 | |
280 | fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T> { |
281 | self.mutex.lock(|state| { |
282 | let s = state.borrow(); |
283 | match s.data { |
284 | Some(ref data) if f(data) => { |
285 | if let Some(id) = id { |
286 | *id = s.current_id; |
287 | } |
288 | Some(data.clone()) |
289 | } |
290 | _ => None, |
291 | } |
292 | }) |
293 | } |
294 | |
295 | fn contains_value(&self) -> bool { |
296 | self.mutex.lock(|state| state.borrow().data.is_some()) |
297 | } |
298 | } |
299 | |
300 | impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> { |
301 | /// Create a new `Watch` channel. |
302 | pub const fn new() -> Self { |
303 | Self { |
304 | mutex: Mutex::new(RefCell::new(WatchState { |
305 | data: None, |
306 | current_id: 0, |
307 | wakers: MultiWakerRegistration::new(), |
308 | receiver_count: 0, |
309 | })), |
310 | } |
311 | } |
312 | |
313 | /// Create a new `Watch` channel with default data. |
314 | pub const fn new_with(data: T) -> Self { |
315 | Self { |
316 | mutex: Mutex::new(RefCell::new(WatchState { |
317 | data: Some(data), |
318 | current_id: 0, |
319 | wakers: MultiWakerRegistration::new(), |
320 | receiver_count: 0, |
321 | })), |
322 | } |
323 | } |
324 | |
325 | /// Create a new [`Sender`] for the `Watch`. |
326 | pub fn sender(&self) -> Sender<'_, M, T, N> { |
327 | Sender(Snd::new(self)) |
328 | } |
329 | |
330 | /// Create a new [`DynSender`] for the `Watch`. |
331 | pub fn dyn_sender(&self) -> DynSender<'_, T> { |
332 | DynSender(Snd::new(self)) |
333 | } |
334 | |
335 | /// Try to create a new [`Receiver`] for the `Watch`. If the |
336 | /// maximum number of receivers has been reached, `None` is returned. |
337 | pub fn receiver(&self) -> Option<Receiver<'_, M, T, N>> { |
338 | self.mutex.lock(|state| { |
339 | let mut s = state.borrow_mut(); |
340 | if s.receiver_count < N { |
341 | s.receiver_count += 1; |
342 | Some(Receiver(Rcv::new(self, 0))) |
343 | } else { |
344 | None |
345 | } |
346 | }) |
347 | } |
348 | |
349 | /// Try to create a new [`DynReceiver`] for the `Watch`. If the |
350 | /// maximum number of receivers has been reached, `None` is returned. |
351 | pub fn dyn_receiver(&self) -> Option<DynReceiver<'_, T>> { |
352 | self.mutex.lock(|state| { |
353 | let mut s = state.borrow_mut(); |
354 | if s.receiver_count < N { |
355 | s.receiver_count += 1; |
356 | Some(DynReceiver(Rcv::new(self, 0))) |
357 | } else { |
358 | None |
359 | } |
360 | }) |
361 | } |
362 | |
363 | /// Try to create a new [`AnonReceiver`] for the `Watch`. |
364 | pub fn anon_receiver(&self) -> AnonReceiver<'_, M, T, N> { |
365 | AnonReceiver(AnonRcv::new(self, 0)) |
366 | } |
367 | |
368 | /// Try to create a new [`DynAnonReceiver`] for the `Watch`. |
369 | pub fn dyn_anon_receiver(&self) -> DynAnonReceiver<'_, T> { |
370 | DynAnonReceiver(AnonRcv::new(self, 0)) |
371 | } |
372 | |
373 | /// Returns the message ID of the latest message sent to the `Watch`. |
374 | /// |
375 | /// This counter is monotonic, and is incremented every time a new message is sent. |
376 | pub fn get_msg_id(&self) -> u64 { |
377 | self.mutex.lock(|state| state.borrow().current_id) |
378 | } |
379 | |
380 | /// Tries to get the value of the `Watch`. |
381 | pub fn try_get(&self) -> Option<T> { |
382 | WatchBehavior::try_get(self, None) |
383 | } |
384 | |
385 | /// Tries to get the value of the `Watch` if it matches the predicate function `f`. |
386 | pub fn try_get_and<F>(&self, mut f: F) -> Option<T> |
387 | where |
388 | F: Fn(&T) -> bool, |
389 | { |
390 | WatchBehavior::try_get_and(self, None, &mut f) |
391 | } |
392 | } |
393 | |
394 | /// A receiver can `.await` a change in the `Watch` value. |
395 | pub struct Snd<'a, T: Clone, W: WatchBehavior<T> + ?Sized> { |
396 | watch: &'a W, |
397 | _phantom: PhantomData<T>, |
398 | } |
399 | |
400 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Clone for Snd<'a, T, W> { |
401 | fn clone(&self) -> Self { |
402 | Self { |
403 | watch: self.watch, |
404 | _phantom: PhantomData, |
405 | } |
406 | } |
407 | } |
408 | |
409 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> { |
410 | /// Creates a new `Receiver` with a reference to the `Watch`. |
411 | fn new(watch: &'a W) -> Self { |
412 | Self { |
413 | watch, |
414 | _phantom: PhantomData, |
415 | } |
416 | } |
417 | |
418 | /// Sends a new value to the `Watch`. |
419 | pub fn send(&self, val: T) { |
420 | self.watch.send(val) |
421 | } |
422 | |
423 | /// Clears the value of the `Watch`. |
424 | /// This will cause calls to [`Rcv::get`] to be pending. |
425 | pub fn clear(&self) { |
426 | self.watch.clear() |
427 | } |
428 | |
429 | /// Tries to retrieve the value of the `Watch`. |
430 | pub fn try_get(&self) -> Option<T> { |
431 | self.watch.try_get(None) |
432 | } |
433 | |
434 | /// Tries to peek the current value of the `Watch` if it matches the predicate |
435 | /// function `f`. |
436 | pub fn try_get_and<F>(&self, mut f: F) -> Option<T> |
437 | where |
438 | F: Fn(&T) -> bool, |
439 | { |
440 | self.watch.try_get_and(None, &mut f) |
441 | } |
442 | |
443 | /// Returns true if the `Watch` contains a value. |
444 | pub fn contains_value(&self) -> bool { |
445 | self.watch.contains_value() |
446 | } |
447 | |
448 | /// Modify the value of the `Watch` using a closure. |
449 | pub fn send_modify<F>(&self, mut f: F) |
450 | where |
451 | F: Fn(&mut Option<T>), |
452 | { |
453 | self.watch.send_modify(&mut f) |
454 | } |
455 | |
456 | /// Modify the value of the `Watch` using a closure. The closure must return |
457 | /// `true` if the value was modified, which notifies all receivers. |
458 | pub fn send_if_modified<F>(&self, mut f: F) |
459 | where |
460 | F: Fn(&mut Option<T>) -> bool, |
461 | { |
462 | self.watch.send_if_modified(&mut f) |
463 | } |
464 | } |
465 | |
466 | /// A sender of a `Watch` channel. |
467 | /// |
468 | /// For a simpler type definition, consider [`DynSender`] at the expense of |
469 | /// some runtime performance due to dynamic dispatch. |
470 | pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch<M, T, N>>); |
471 | |
472 | impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> { |
473 | fn clone(&self) -> Self { |
474 | Self(self.0.clone()) |
475 | } |
476 | } |
477 | |
478 | impl<'a, M: RawMutex, T: Clone, const N: usize> Sender<'a, M, T, N> { |
479 | /// Converts the `Sender` into a [`DynSender`]. |
480 | pub fn as_dyn(self) -> DynSender<'a, T> { |
481 | DynSender(Snd::new(self.watch)) |
482 | } |
483 | } |
484 | |
485 | impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynSender<'a, T>> for Sender<'a, M, T, N> { |
486 | fn into(self) -> DynSender<'a, T> { |
487 | self.as_dyn() |
488 | } |
489 | } |
490 | |
491 | impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Sender<'a, M, T, N> { |
492 | type Target = Snd<'a, T, Watch<M, T, N>>; |
493 | |
494 | fn deref(&self) -> &Self::Target { |
495 | &self.0 |
496 | } |
497 | } |
498 | |
499 | impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Sender<'a, M, T, N> { |
500 | fn deref_mut(&mut self) -> &mut Self::Target { |
501 | &mut self.0 |
502 | } |
503 | } |
504 | |
505 | /// A sender which holds a **dynamic** reference to a `Watch` channel. |
506 | /// |
507 | /// This is an alternative to [`Sender`] with a simpler type definition, |
508 | pub struct DynSender<'a, T: Clone>(Snd<'a, T, dyn WatchBehavior<T> + 'a>); |
509 | |
510 | impl<'a, T: Clone> Clone for DynSender<'a, T> { |
511 | fn clone(&self) -> Self { |
512 | Self(self.0.clone()) |
513 | } |
514 | } |
515 | |
516 | impl<'a, T: Clone> Deref for DynSender<'a, T> { |
517 | type Target = Snd<'a, T, dyn WatchBehavior<T> + 'a>; |
518 | |
519 | fn deref(&self) -> &Self::Target { |
520 | &self.0 |
521 | } |
522 | } |
523 | |
524 | impl<'a, T: Clone> DerefMut for DynSender<'a, T> { |
525 | fn deref_mut(&mut self) -> &mut Self::Target { |
526 | &mut self.0 |
527 | } |
528 | } |
529 | |
530 | /// A receiver can `.await` a change in the `Watch` value. |
531 | pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> { |
532 | watch: &'a W, |
533 | at_id: u64, |
534 | _phantom: PhantomData<T>, |
535 | } |
536 | |
537 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> { |
538 | /// Creates a new `Receiver` with a reference to the `Watch`. |
539 | fn new(watch: &'a W, at_id: u64) -> Self { |
540 | Self { |
541 | watch, |
542 | at_id, |
543 | _phantom: PhantomData, |
544 | } |
545 | } |
546 | |
547 | /// Returns the current value of the `Watch` once it is initialized, marking it as seen. |
548 | /// |
549 | /// **Note**: Futures do nothing unless you `.await` or poll them. |
550 | pub fn get(&mut self) -> impl Future<Output = T> + '_ { |
551 | poll_fn(|cx| self.watch.poll_get(&mut self.at_id, cx)) |
552 | } |
553 | |
554 | /// Tries to get the current value of the `Watch` without waiting, marking it as seen. |
555 | pub fn try_get(&mut self) -> Option<T> { |
556 | self.watch.try_get(Some(&mut self.at_id)) |
557 | } |
558 | |
559 | /// Returns the value of the `Watch` if it matches the predicate function `f`, |
560 | /// or waits for it to match, marking it as seen. |
561 | /// |
562 | /// **Note**: Futures do nothing unless you `.await` or poll them. |
563 | pub async fn get_and<F>(&mut self, mut f: F) -> T |
564 | where |
565 | F: Fn(&T) -> bool, |
566 | { |
567 | poll_fn(|cx| self.watch.poll_get_and(&mut self.at_id, &mut f, cx)).await |
568 | } |
569 | |
570 | /// Tries to get the current value of the `Watch` if it matches the predicate |
571 | /// function `f` without waiting, marking it as seen. |
572 | pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T> |
573 | where |
574 | F: Fn(&T) -> bool, |
575 | { |
576 | self.watch.try_get_and(Some(&mut self.at_id), &mut f) |
577 | } |
578 | |
579 | /// Waits for the `Watch` to change and returns the new value, marking it as seen. |
580 | /// |
581 | /// **Note**: Futures do nothing unless you `.await` or poll them. |
582 | pub async fn changed(&mut self) -> T { |
583 | poll_fn(|cx| self.watch.poll_changed(&mut self.at_id, cx)).await |
584 | } |
585 | |
586 | /// Tries to get the new value of the watch without waiting, marking it as seen. |
587 | pub fn try_changed(&mut self) -> Option<T> { |
588 | self.watch.try_changed(&mut self.at_id) |
589 | } |
590 | |
591 | /// Waits for the `Watch` to change to a value which satisfies the predicate |
592 | /// function `f` and returns the new value, marking it as seen. |
593 | /// |
594 | /// **Note**: Futures do nothing unless you `.await` or poll them. |
595 | pub async fn changed_and<F>(&mut self, mut f: F) -> T |
596 | where |
597 | F: Fn(&T) -> bool, |
598 | { |
599 | poll_fn(|cx| self.watch.poll_changed_and(&mut self.at_id, &mut f, cx)).await |
600 | } |
601 | |
602 | /// Tries to get the new value of the watch which satisfies the predicate |
603 | /// function `f` and returns the new value without waiting, marking it as seen. |
604 | pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T> |
605 | where |
606 | F: Fn(&T) -> bool, |
607 | { |
608 | self.watch.try_changed_and(&mut self.at_id, &mut f) |
609 | } |
610 | |
611 | /// Checks if the `Watch` contains a value. If this returns true, |
612 | /// then awaiting [`Rcv::get`] will return immediately. |
613 | pub fn contains_value(&self) -> bool { |
614 | self.watch.contains_value() |
615 | } |
616 | } |
617 | |
618 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> { |
619 | fn drop(&mut self) { |
620 | self.watch.drop_receiver(); |
621 | } |
622 | } |
623 | |
624 | /// A anonymous receiver can NOT `.await` a change in the `Watch` value. |
625 | pub struct AnonRcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> { |
626 | watch: &'a W, |
627 | at_id: u64, |
628 | _phantom: PhantomData<T>, |
629 | } |
630 | |
631 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> AnonRcv<'a, T, W> { |
632 | /// Creates a new `Receiver` with a reference to the `Watch`. |
633 | fn new(watch: &'a W, at_id: u64) -> Self { |
634 | Self { |
635 | watch, |
636 | at_id, |
637 | _phantom: PhantomData, |
638 | } |
639 | } |
640 | |
641 | /// Tries to get the current value of the `Watch` without waiting, marking it as seen. |
642 | pub fn try_get(&mut self) -> Option<T> { |
643 | self.watch.try_get(Some(&mut self.at_id)) |
644 | } |
645 | |
646 | /// Tries to get the current value of the `Watch` if it matches the predicate |
647 | /// function `f` without waiting, marking it as seen. |
648 | pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T> |
649 | where |
650 | F: Fn(&T) -> bool, |
651 | { |
652 | self.watch.try_get_and(Some(&mut self.at_id), &mut f) |
653 | } |
654 | |
655 | /// Tries to get the new value of the watch without waiting, marking it as seen. |
656 | pub fn try_changed(&mut self) -> Option<T> { |
657 | self.watch.try_changed(&mut self.at_id) |
658 | } |
659 | |
660 | /// Tries to get the new value of the watch which satisfies the predicate |
661 | /// function `f` and returns the new value without waiting, marking it as seen. |
662 | pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T> |
663 | where |
664 | F: Fn(&T) -> bool, |
665 | { |
666 | self.watch.try_changed_and(&mut self.at_id, &mut f) |
667 | } |
668 | |
669 | /// Checks if the `Watch` contains a value. If this returns true, |
670 | /// then awaiting [`Rcv::get`] will return immediately. |
671 | pub fn contains_value(&self) -> bool { |
672 | self.watch.contains_value() |
673 | } |
674 | } |
675 | |
676 | /// A receiver of a `Watch` channel. |
677 | pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>); |
678 | |
679 | impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> { |
680 | /// Converts the `Receiver` into a [`DynReceiver`]. |
681 | pub fn as_dyn(self) -> DynReceiver<'a, T> { |
682 | let rcv: DynReceiver<'_, T> = DynReceiver(Rcv::new(self.0.watch, self.at_id)); |
683 | core::mem::forget(self); // Ensures the destructor is not called |
684 | rcv |
685 | } |
686 | } |
687 | |
688 | impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynReceiver<'a, T>> for Receiver<'a, M, T, N> { |
689 | fn into(self) -> DynReceiver<'a, T> { |
690 | self.as_dyn() |
691 | } |
692 | } |
693 | |
694 | impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { |
695 | type Target = Rcv<'a, T, Watch<M, T, N>>; |
696 | |
697 | fn deref(&self) -> &Self::Target { |
698 | &self.0 |
699 | } |
700 | } |
701 | |
702 | impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> { |
703 | fn deref_mut(&mut self) -> &mut Self::Target { |
704 | &mut self.0 |
705 | } |
706 | } |
707 | |
708 | /// A receiver which holds a **dynamic** reference to a `Watch` channel. |
709 | /// |
710 | /// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of |
711 | /// some runtime performance due to dynamic dispatch. |
712 | pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>); |
713 | |
714 | impl<'a, T: Clone> Deref for DynReceiver<'a, T> { |
715 | type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>; |
716 | |
717 | fn deref(&self) -> &Self::Target { |
718 | &self.0 |
719 | } |
720 | } |
721 | |
722 | impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> { |
723 | fn deref_mut(&mut self) -> &mut Self::Target { |
724 | &mut self.0 |
725 | } |
726 | } |
727 | |
728 | /// A receiver of a `Watch` channel that cannot `.await` values. |
729 | pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch<M, T, N>>); |
730 | |
731 | impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> { |
732 | /// Converts the `Receiver` into a [`DynReceiver`]. |
733 | pub fn as_dyn(self) -> DynAnonReceiver<'a, T> { |
734 | let rcv: DynAnonReceiver<'_, T> = DynAnonReceiver(AnonRcv::new(self.0.watch, self.at_id)); |
735 | core::mem::forget(self); // Ensures the destructor is not called |
736 | rcv |
737 | } |
738 | } |
739 | |
740 | impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynAnonReceiver<'a, T>> for AnonReceiver<'a, M, T, N> { |
741 | fn into(self) -> DynAnonReceiver<'a, T> { |
742 | self.as_dyn() |
743 | } |
744 | } |
745 | |
746 | impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for AnonReceiver<'a, M, T, N> { |
747 | type Target = AnonRcv<'a, T, Watch<M, T, N>>; |
748 | |
749 | fn deref(&self) -> &Self::Target { |
750 | &self.0 |
751 | } |
752 | } |
753 | |
754 | impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for AnonReceiver<'a, M, T, N> { |
755 | fn deref_mut(&mut self) -> &mut Self::Target { |
756 | &mut self.0 |
757 | } |
758 | } |
759 | |
760 | /// A receiver that cannot `.await` value, which holds a **dynamic** reference to a `Watch` channel. |
761 | /// |
762 | /// This is an alternative to [`AnonReceiver`] with a simpler type definition, at the expense of |
763 | /// some runtime performance due to dynamic dispatch. |
764 | pub struct DynAnonReceiver<'a, T: Clone>(AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>); |
765 | |
766 | impl<'a, T: Clone> Deref for DynAnonReceiver<'a, T> { |
767 | type Target = AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>; |
768 | |
769 | fn deref(&self) -> &Self::Target { |
770 | &self.0 |
771 | } |
772 | } |
773 | |
774 | impl<'a, T: Clone> DerefMut for DynAnonReceiver<'a, T> { |
775 | fn deref_mut(&mut self) -> &mut Self::Target { |
776 | &mut self.0 |
777 | } |
778 | } |
779 | |
780 | #[cfg (test)] |
781 | mod tests { |
782 | use futures_executor::block_on; |
783 | |
784 | use super::Watch; |
785 | use crate::blocking_mutex::raw::CriticalSectionRawMutex; |
786 | |
787 | #[test ] |
788 | fn multiple_sends() { |
789 | let f = async { |
790 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); |
791 | |
792 | // Obtain receiver and sender |
793 | let mut rcv = WATCH.receiver().unwrap(); |
794 | let snd = WATCH.sender(); |
795 | |
796 | // Not initialized |
797 | assert_eq!(rcv.try_changed(), None); |
798 | |
799 | // Receive the new value |
800 | snd.send(10); |
801 | assert_eq!(rcv.changed().await, 10); |
802 | |
803 | // Receive another value |
804 | snd.send(20); |
805 | assert_eq!(rcv.try_changed(), Some(20)); |
806 | |
807 | // No update |
808 | assert_eq!(rcv.try_changed(), None); |
809 | }; |
810 | block_on(f); |
811 | } |
812 | |
813 | #[test ] |
814 | fn all_try_get() { |
815 | let f = async { |
816 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); |
817 | |
818 | // Obtain receiver and sender |
819 | let mut rcv = WATCH.receiver().unwrap(); |
820 | let snd = WATCH.sender(); |
821 | |
822 | // Not initialized |
823 | assert_eq!(WATCH.try_get(), None); |
824 | assert_eq!(rcv.try_get(), None); |
825 | assert_eq!(snd.try_get(), None); |
826 | |
827 | // Receive the new value |
828 | snd.send(10); |
829 | assert_eq!(WATCH.try_get(), Some(10)); |
830 | assert_eq!(rcv.try_get(), Some(10)); |
831 | assert_eq!(snd.try_get(), Some(10)); |
832 | |
833 | assert_eq!(WATCH.try_get_and(|x| x > &5), Some(10)); |
834 | assert_eq!(rcv.try_get_and(|x| x > &5), Some(10)); |
835 | assert_eq!(snd.try_get_and(|x| x > &5), Some(10)); |
836 | |
837 | assert_eq!(WATCH.try_get_and(|x| x < &5), None); |
838 | assert_eq!(rcv.try_get_and(|x| x < &5), None); |
839 | assert_eq!(snd.try_get_and(|x| x < &5), None); |
840 | }; |
841 | block_on(f); |
842 | } |
843 | |
844 | #[test ] |
845 | fn once_lock_like() { |
846 | let f = async { |
847 | static CONFIG0: u8 = 10; |
848 | static CONFIG1: u8 = 20; |
849 | |
850 | static WATCH: Watch<CriticalSectionRawMutex, &'static u8, 1> = Watch::new(); |
851 | |
852 | // Obtain receiver and sender |
853 | let mut rcv = WATCH.receiver().unwrap(); |
854 | let snd = WATCH.sender(); |
855 | |
856 | // Not initialized |
857 | assert_eq!(rcv.try_changed(), None); |
858 | |
859 | // Receive the new value |
860 | snd.send(&CONFIG0); |
861 | let rcv0 = rcv.changed().await; |
862 | assert_eq!(rcv0, &10); |
863 | |
864 | // Receive another value |
865 | snd.send(&CONFIG1); |
866 | let rcv1 = rcv.try_changed(); |
867 | assert_eq!(rcv1, Some(&20)); |
868 | |
869 | // No update |
870 | assert_eq!(rcv.try_changed(), None); |
871 | |
872 | // Ensure similarity with original static |
873 | assert_eq!(rcv0, &CONFIG0); |
874 | assert_eq!(rcv1, Some(&CONFIG1)); |
875 | }; |
876 | block_on(f); |
877 | } |
878 | |
879 | #[test ] |
880 | fn sender_modify() { |
881 | let f = async { |
882 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); |
883 | |
884 | // Obtain receiver and sender |
885 | let mut rcv = WATCH.receiver().unwrap(); |
886 | let snd = WATCH.sender(); |
887 | |
888 | // Receive the new value |
889 | snd.send(10); |
890 | assert_eq!(rcv.try_changed(), Some(10)); |
891 | |
892 | // Modify the value inplace |
893 | snd.send_modify(|opt| { |
894 | if let Some(inner) = opt { |
895 | *inner += 5; |
896 | } |
897 | }); |
898 | |
899 | // Get the modified value |
900 | assert_eq!(rcv.try_changed(), Some(15)); |
901 | assert_eq!(rcv.try_changed(), None); |
902 | }; |
903 | block_on(f); |
904 | } |
905 | |
906 | #[test ] |
907 | fn predicate_fn() { |
908 | let f = async { |
909 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); |
910 | |
911 | // Obtain receiver and sender |
912 | let mut rcv = WATCH.receiver().unwrap(); |
913 | let snd = WATCH.sender(); |
914 | |
915 | snd.send(15); |
916 | assert_eq!(rcv.try_get_and(|x| x > &5), Some(15)); |
917 | assert_eq!(rcv.try_get_and(|x| x < &5), None); |
918 | assert!(rcv.try_changed().is_none()); |
919 | |
920 | snd.send(20); |
921 | assert_eq!(rcv.try_changed_and(|x| x > &5), Some(20)); |
922 | assert_eq!(rcv.try_changed_and(|x| x > &5), None); |
923 | |
924 | snd.send(25); |
925 | assert_eq!(rcv.try_changed_and(|x| x < &5), None); |
926 | assert_eq!(rcv.try_changed(), Some(25)); |
927 | |
928 | snd.send(30); |
929 | assert_eq!(rcv.changed_and(|x| x > &5).await, 30); |
930 | assert_eq!(rcv.get_and(|x| x > &5).await, 30); |
931 | }; |
932 | block_on(f); |
933 | } |
934 | |
935 | #[test ] |
936 | fn receive_after_create() { |
937 | let f = async { |
938 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); |
939 | |
940 | // Obtain sender and send value |
941 | let snd = WATCH.sender(); |
942 | snd.send(10); |
943 | |
944 | // Obtain receiver and receive value |
945 | let mut rcv = WATCH.receiver().unwrap(); |
946 | assert_eq!(rcv.try_changed(), Some(10)); |
947 | }; |
948 | block_on(f); |
949 | } |
950 | |
951 | #[test ] |
952 | fn max_receivers_drop() { |
953 | let f = async { |
954 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); |
955 | |
956 | // Try to create 3 receivers (only 2 can exist at once) |
957 | let rcv0 = WATCH.receiver(); |
958 | let rcv1 = WATCH.receiver(); |
959 | let rcv2 = WATCH.receiver(); |
960 | |
961 | // Ensure the first two are successful and the third is not |
962 | assert!(rcv0.is_some()); |
963 | assert!(rcv1.is_some()); |
964 | assert!(rcv2.is_none()); |
965 | |
966 | // Drop the first receiver |
967 | drop(rcv0); |
968 | |
969 | // Create another receiver and ensure it is successful |
970 | let rcv3 = WATCH.receiver(); |
971 | assert!(rcv3.is_some()); |
972 | }; |
973 | block_on(f); |
974 | } |
975 | |
976 | #[test ] |
977 | fn multiple_receivers() { |
978 | let f = async { |
979 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); |
980 | |
981 | // Obtain receivers and sender |
982 | let mut rcv0 = WATCH.receiver().unwrap(); |
983 | let mut rcv1 = WATCH.anon_receiver(); |
984 | let snd = WATCH.sender(); |
985 | |
986 | // No update for both |
987 | assert_eq!(rcv0.try_changed(), None); |
988 | assert_eq!(rcv1.try_changed(), None); |
989 | |
990 | // Send a new value |
991 | snd.send(0); |
992 | |
993 | // Both receivers receive the new value |
994 | assert_eq!(rcv0.try_changed(), Some(0)); |
995 | assert_eq!(rcv1.try_changed(), Some(0)); |
996 | }; |
997 | block_on(f); |
998 | } |
999 | |
1000 | #[test ] |
1001 | fn clone_senders() { |
1002 | let f = async { |
1003 | // Obtain different ways to send |
1004 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); |
1005 | let snd0 = WATCH.sender(); |
1006 | let snd1 = snd0.clone(); |
1007 | |
1008 | // Obtain Receiver |
1009 | let mut rcv = WATCH.receiver().unwrap().as_dyn(); |
1010 | |
1011 | // Send a value from first sender |
1012 | snd0.send(10); |
1013 | assert_eq!(rcv.try_changed(), Some(10)); |
1014 | |
1015 | // Send a value from second sender |
1016 | snd1.send(20); |
1017 | assert_eq!(rcv.try_changed(), Some(20)); |
1018 | }; |
1019 | block_on(f); |
1020 | } |
1021 | |
1022 | #[test ] |
1023 | fn use_dynamics() { |
1024 | let f = async { |
1025 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); |
1026 | |
1027 | // Obtain receiver and sender |
1028 | let mut anon_rcv = WATCH.dyn_anon_receiver(); |
1029 | let mut dyn_rcv = WATCH.dyn_receiver().unwrap(); |
1030 | let dyn_snd = WATCH.dyn_sender(); |
1031 | |
1032 | // Send a value |
1033 | dyn_snd.send(10); |
1034 | |
1035 | // Ensure the dynamic receiver receives the value |
1036 | assert_eq!(anon_rcv.try_changed(), Some(10)); |
1037 | assert_eq!(dyn_rcv.try_changed(), Some(10)); |
1038 | assert_eq!(dyn_rcv.try_changed(), None); |
1039 | }; |
1040 | block_on(f); |
1041 | } |
1042 | |
1043 | #[test ] |
1044 | fn convert_to_dyn() { |
1045 | let f = async { |
1046 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); |
1047 | |
1048 | // Obtain receiver and sender |
1049 | let anon_rcv = WATCH.anon_receiver(); |
1050 | let rcv = WATCH.receiver().unwrap(); |
1051 | let snd = WATCH.sender(); |
1052 | |
1053 | // Convert to dynamic |
1054 | let mut dyn_anon_rcv = anon_rcv.as_dyn(); |
1055 | let mut dyn_rcv = rcv.as_dyn(); |
1056 | let dyn_snd = snd.as_dyn(); |
1057 | |
1058 | // Send a value |
1059 | dyn_snd.send(10); |
1060 | |
1061 | // Ensure the dynamic receiver receives the value |
1062 | assert_eq!(dyn_anon_rcv.try_changed(), Some(10)); |
1063 | assert_eq!(dyn_rcv.try_changed(), Some(10)); |
1064 | assert_eq!(dyn_rcv.try_changed(), None); |
1065 | }; |
1066 | block_on(f); |
1067 | } |
1068 | |
1069 | #[test ] |
1070 | fn dynamic_receiver_count() { |
1071 | let f = async { |
1072 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); |
1073 | |
1074 | // Obtain receiver and sender |
1075 | let rcv0 = WATCH.receiver(); |
1076 | let rcv1 = WATCH.receiver(); |
1077 | let rcv2 = WATCH.receiver(); |
1078 | |
1079 | // Ensure the first two are successful and the third is not |
1080 | assert!(rcv0.is_some()); |
1081 | assert!(rcv1.is_some()); |
1082 | assert!(rcv2.is_none()); |
1083 | |
1084 | // Convert to dynamic |
1085 | let dyn_rcv0 = rcv0.unwrap().as_dyn(); |
1086 | |
1087 | // Drop the (now dynamic) receiver |
1088 | drop(dyn_rcv0); |
1089 | |
1090 | // Create another receiver and ensure it is successful |
1091 | let rcv3 = WATCH.receiver(); |
1092 | let rcv4 = WATCH.receiver(); |
1093 | assert!(rcv3.is_some()); |
1094 | assert!(rcv4.is_none()); |
1095 | }; |
1096 | block_on(f); |
1097 | } |
1098 | |
1099 | #[test ] |
1100 | fn contains_value() { |
1101 | let f = async { |
1102 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); |
1103 | |
1104 | // Obtain receiver and sender |
1105 | let rcv = WATCH.receiver().unwrap(); |
1106 | let snd = WATCH.sender(); |
1107 | |
1108 | // check if the watch contains a value |
1109 | assert_eq!(rcv.contains_value(), false); |
1110 | assert_eq!(snd.contains_value(), false); |
1111 | |
1112 | // Send a value |
1113 | snd.send(10); |
1114 | |
1115 | // check if the watch contains a value |
1116 | assert_eq!(rcv.contains_value(), true); |
1117 | assert_eq!(snd.contains_value(), true); |
1118 | }; |
1119 | block_on(f); |
1120 | } |
1121 | } |
1122 | |