1 | //! A queue for sending values between asynchronous tasks. |
2 | //! |
3 | //! It can be used concurrently by multiple producers (senders) and multiple |
4 | //! consumers (receivers), i.e. it is an "MPMC channel". |
5 | //! |
6 | //! Receivers are competing for messages. So a message that is received by |
7 | //! one receiver is not received by any other. |
8 | //! |
9 | //! This queue takes a Mutex type so that various |
10 | //! targets can be attained. For example, a ThreadModeMutex can be used |
11 | //! for single-core Cortex-M targets where messages are only passed |
12 | //! between tasks running in thread mode. Similarly, a CriticalSectionMutex |
13 | //! can also be used for single-core targets where messages are to be |
14 | //! passed from exception mode e.g. out of an interrupt handler. |
15 | //! |
16 | //! This module provides a bounded channel that has a limit on the number of |
17 | //! messages that it can store, and if this limit is reached, trying to send |
18 | //! another message will result in an error being returned. |
19 | //! |
20 | |
21 | use core::cell::RefCell; |
22 | use core::future::Future; |
23 | use core::pin::Pin; |
24 | use core::task::{Context, Poll}; |
25 | |
26 | use heapless::Deque; |
27 | |
28 | use crate::blocking_mutex::raw::RawMutex; |
29 | use crate::blocking_mutex::Mutex; |
30 | use crate::waitqueue::WakerRegistration; |
31 | |
32 | /// Send-only access to a [`Channel`]. |
33 | pub struct Sender<'ch, M, T, const N: usize> |
34 | where |
35 | M: RawMutex, |
36 | { |
37 | channel: &'ch Channel<M, T, N>, |
38 | } |
39 | |
40 | impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> |
41 | where |
42 | M: RawMutex, |
43 | { |
44 | fn clone(&self) -> Self { |
45 | *self |
46 | } |
47 | } |
48 | |
49 | impl<'ch, M, T, const N: usize> Copy for Sender<'ch, M, T, N> where M: RawMutex {} |
50 | |
51 | impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> |
52 | where |
53 | M: RawMutex, |
54 | { |
55 | /// Sends a value. |
56 | /// |
57 | /// See [`Channel::send()`] |
58 | pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { |
59 | self.channel.send(message) |
60 | } |
61 | |
62 | /// Attempt to immediately send a message. |
63 | /// |
64 | /// See [`Channel::send()`] |
65 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { |
66 | self.channel.try_send(message) |
67 | } |
68 | |
69 | /// Allows a poll_fn to poll until the channel is ready to send |
70 | /// |
71 | /// See [`Channel::poll_ready_to_send()`] |
72 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |
73 | self.channel.poll_ready_to_send(cx) |
74 | } |
75 | |
76 | /// Returns the maximum number of elements the channel can hold. |
77 | /// |
78 | /// See [`Channel::capacity()`] |
79 | pub const fn capacity(&self) -> usize { |
80 | self.channel.capacity() |
81 | } |
82 | |
83 | /// Returns the free capacity of the channel. |
84 | /// |
85 | /// See [`Channel::free_capacity()`] |
86 | pub fn free_capacity(&self) -> usize { |
87 | self.channel.free_capacity() |
88 | } |
89 | |
90 | /// Clears all elements in the channel. |
91 | /// |
92 | /// See [`Channel::clear()`] |
93 | pub fn clear(&self) { |
94 | self.channel.clear(); |
95 | } |
96 | |
97 | /// Returns the number of elements currently in the channel. |
98 | /// |
99 | /// See [`Channel::len()`] |
100 | pub fn len(&self) -> usize { |
101 | self.channel.len() |
102 | } |
103 | |
104 | /// Returns whether the channel is empty. |
105 | /// |
106 | /// See [`Channel::is_empty()`] |
107 | pub fn is_empty(&self) -> bool { |
108 | self.channel.is_empty() |
109 | } |
110 | |
111 | /// Returns whether the channel is full. |
112 | /// |
113 | /// See [`Channel::is_full()`] |
114 | pub fn is_full(&self) -> bool { |
115 | self.channel.is_full() |
116 | } |
117 | } |
118 | |
119 | /// Send-only access to a [`Channel`] without knowing channel size. |
120 | pub struct DynamicSender<'ch, T> { |
121 | pub(crate) channel: &'ch dyn DynamicChannel<T>, |
122 | } |
123 | |
124 | impl<'ch, T> Clone for DynamicSender<'ch, T> { |
125 | fn clone(&self) -> Self { |
126 | *self |
127 | } |
128 | } |
129 | |
130 | impl<'ch, T> Copy for DynamicSender<'ch, T> {} |
131 | |
132 | impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T> |
133 | where |
134 | M: RawMutex, |
135 | { |
136 | fn from(s: Sender<'ch, M, T, N>) -> Self { |
137 | Self { channel: s.channel } |
138 | } |
139 | } |
140 | |
141 | impl<'ch, T> DynamicSender<'ch, T> { |
142 | /// Sends a value. |
143 | /// |
144 | /// See [`Channel::send()`] |
145 | pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> { |
146 | DynamicSendFuture { |
147 | channel: self.channel, |
148 | message: Some(message), |
149 | } |
150 | } |
151 | |
152 | /// Attempt to immediately send a message. |
153 | /// |
154 | /// See [`Channel::send()`] |
155 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { |
156 | self.channel.try_send_with_context(message, None) |
157 | } |
158 | |
159 | /// Allows a poll_fn to poll until the channel is ready to send |
160 | /// |
161 | /// See [`Channel::poll_ready_to_send()`] |
162 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |
163 | self.channel.poll_ready_to_send(cx) |
164 | } |
165 | } |
166 | |
167 | /// Receive-only access to a [`Channel`]. |
168 | pub struct Receiver<'ch, M, T, const N: usize> |
169 | where |
170 | M: RawMutex, |
171 | { |
172 | channel: &'ch Channel<M, T, N>, |
173 | } |
174 | |
175 | impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N> |
176 | where |
177 | M: RawMutex, |
178 | { |
179 | fn clone(&self) -> Self { |
180 | *self |
181 | } |
182 | } |
183 | |
184 | impl<'ch, M, T, const N: usize> Copy for Receiver<'ch, M, T, N> where M: RawMutex {} |
185 | |
186 | impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> |
187 | where |
188 | M: RawMutex, |
189 | { |
190 | /// Receive the next value. |
191 | /// |
192 | /// See [`Channel::receive()`]. |
193 | pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> { |
194 | self.channel.receive() |
195 | } |
196 | |
197 | /// Is a value ready to be received in the channel |
198 | /// |
199 | /// See [`Channel::ready_to_receive()`]. |
200 | pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> { |
201 | self.channel.ready_to_receive() |
202 | } |
203 | |
204 | /// Attempt to immediately receive the next value. |
205 | /// |
206 | /// See [`Channel::try_receive()`] |
207 | pub fn try_receive(&self) -> Result<T, TryReceiveError> { |
208 | self.channel.try_receive() |
209 | } |
210 | |
211 | /// Allows a poll_fn to poll until the channel is ready to receive |
212 | /// |
213 | /// See [`Channel::poll_ready_to_receive()`] |
214 | pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { |
215 | self.channel.poll_ready_to_receive(cx) |
216 | } |
217 | |
218 | /// Poll the channel for the next item |
219 | /// |
220 | /// See [`Channel::poll_receive()`] |
221 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { |
222 | self.channel.poll_receive(cx) |
223 | } |
224 | |
225 | /// Returns the maximum number of elements the channel can hold. |
226 | /// |
227 | /// See [`Channel::capacity()`] |
228 | pub const fn capacity(&self) -> usize { |
229 | self.channel.capacity() |
230 | } |
231 | |
232 | /// Returns the free capacity of the channel. |
233 | /// |
234 | /// See [`Channel::free_capacity()`] |
235 | pub fn free_capacity(&self) -> usize { |
236 | self.channel.free_capacity() |
237 | } |
238 | |
239 | /// Clears all elements in the channel. |
240 | /// |
241 | /// See [`Channel::clear()`] |
242 | pub fn clear(&self) { |
243 | self.channel.clear(); |
244 | } |
245 | |
246 | /// Returns the number of elements currently in the channel. |
247 | /// |
248 | /// See [`Channel::len()`] |
249 | pub fn len(&self) -> usize { |
250 | self.channel.len() |
251 | } |
252 | |
253 | /// Returns whether the channel is empty. |
254 | /// |
255 | /// See [`Channel::is_empty()`] |
256 | pub fn is_empty(&self) -> bool { |
257 | self.channel.is_empty() |
258 | } |
259 | |
260 | /// Returns whether the channel is full. |
261 | /// |
262 | /// See [`Channel::is_full()`] |
263 | pub fn is_full(&self) -> bool { |
264 | self.channel.is_full() |
265 | } |
266 | } |
267 | |
268 | /// Receive-only access to a [`Channel`] without knowing channel size. |
269 | pub struct DynamicReceiver<'ch, T> { |
270 | pub(crate) channel: &'ch dyn DynamicChannel<T>, |
271 | } |
272 | |
273 | impl<'ch, T> Clone for DynamicReceiver<'ch, T> { |
274 | fn clone(&self) -> Self { |
275 | *self |
276 | } |
277 | } |
278 | |
279 | impl<'ch, T> Copy for DynamicReceiver<'ch, T> {} |
280 | |
281 | impl<'ch, T> DynamicReceiver<'ch, T> { |
282 | /// Receive the next value. |
283 | /// |
284 | /// See [`Channel::receive()`]. |
285 | pub fn receive(&self) -> DynamicReceiveFuture<'_, T> { |
286 | DynamicReceiveFuture { channel: self.channel } |
287 | } |
288 | |
289 | /// Attempt to immediately receive the next value. |
290 | /// |
291 | /// See [`Channel::try_receive()`] |
292 | pub fn try_receive(&self) -> Result<T, TryReceiveError> { |
293 | self.channel.try_receive_with_context(None) |
294 | } |
295 | |
296 | /// Allows a poll_fn to poll until the channel is ready to receive |
297 | /// |
298 | /// See [`Channel::poll_ready_to_receive()`] |
299 | pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { |
300 | self.channel.poll_ready_to_receive(cx) |
301 | } |
302 | |
303 | /// Poll the channel for the next item |
304 | /// |
305 | /// See [`Channel::poll_receive()`] |
306 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { |
307 | self.channel.poll_receive(cx) |
308 | } |
309 | } |
310 | |
311 | impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T> |
312 | where |
313 | M: RawMutex, |
314 | { |
315 | fn from(s: Receiver<'ch, M, T, N>) -> Self { |
316 | Self { channel: s.channel } |
317 | } |
318 | } |
319 | |
320 | /// Future returned by [`Channel::receive`] and [`Receiver::receive`]. |
321 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
322 | pub struct ReceiveFuture<'ch, M, T, const N: usize> |
323 | where |
324 | M: RawMutex, |
325 | { |
326 | channel: &'ch Channel<M, T, N>, |
327 | } |
328 | |
329 | impl<'ch, M, T, const N: usize> Future for ReceiveFuture<'ch, M, T, N> |
330 | where |
331 | M: RawMutex, |
332 | { |
333 | type Output = T; |
334 | |
335 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { |
336 | self.channel.poll_receive(cx) |
337 | } |
338 | } |
339 | |
340 | /// Future returned by [`Channel::ready_to_receive`] and [`Receiver::ready_to_receive`]. |
341 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
342 | pub struct ReceiveReadyFuture<'ch, M, T, const N: usize> |
343 | where |
344 | M: RawMutex, |
345 | { |
346 | channel: &'ch Channel<M, T, N>, |
347 | } |
348 | |
349 | impl<'ch, M, T, const N: usize> Future for ReceiveReadyFuture<'ch, M, T, N> |
350 | where |
351 | M: RawMutex, |
352 | { |
353 | type Output = (); |
354 | |
355 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
356 | self.channel.poll_ready_to_receive(cx) |
357 | } |
358 | } |
359 | |
360 | /// Future returned by [`DynamicReceiver::receive`]. |
361 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
362 | pub struct DynamicReceiveFuture<'ch, T> { |
363 | channel: &'ch dyn DynamicChannel<T>, |
364 | } |
365 | |
366 | impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> { |
367 | type Output = T; |
368 | |
369 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { |
370 | match self.channel.try_receive_with_context(cx:Some(cx)) { |
371 | Ok(v: T) => Poll::Ready(v), |
372 | Err(TryReceiveError::Empty) => Poll::Pending, |
373 | } |
374 | } |
375 | } |
376 | |
377 | impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for DynamicReceiveFuture<'ch, T> { |
378 | fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self { |
379 | Self { channel: value.channel } |
380 | } |
381 | } |
382 | |
383 | /// Future returned by [`Channel::send`] and [`Sender::send`]. |
384 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
385 | pub struct SendFuture<'ch, M, T, const N: usize> |
386 | where |
387 | M: RawMutex, |
388 | { |
389 | channel: &'ch Channel<M, T, N>, |
390 | message: Option<T>, |
391 | } |
392 | |
393 | impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> |
394 | where |
395 | M: RawMutex, |
396 | { |
397 | type Output = (); |
398 | |
399 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
400 | match self.message.take() { |
401 | Some(m: T) => match self.channel.try_send_with_context(m, cx:Some(cx)) { |
402 | Ok(..) => Poll::Ready(()), |
403 | Err(TrySendError::Full(m: T)) => { |
404 | self.message = Some(m); |
405 | Poll::Pending |
406 | } |
407 | }, |
408 | None => panic!("Message cannot be None" ), |
409 | } |
410 | } |
411 | } |
412 | |
413 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} |
414 | |
415 | /// Future returned by [`DynamicSender::send`]. |
416 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
417 | pub struct DynamicSendFuture<'ch, T> { |
418 | channel: &'ch dyn DynamicChannel<T>, |
419 | message: Option<T>, |
420 | } |
421 | |
422 | impl<'ch, T> Future for DynamicSendFuture<'ch, T> { |
423 | type Output = (); |
424 | |
425 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
426 | match self.message.take() { |
427 | Some(m: T) => match self.channel.try_send_with_context(message:m, cx:Some(cx)) { |
428 | Ok(..) => Poll::Ready(()), |
429 | Err(TrySendError::Full(m: T)) => { |
430 | self.message = Some(m); |
431 | Poll::Pending |
432 | } |
433 | }, |
434 | None => panic!("Message cannot be None" ), |
435 | } |
436 | } |
437 | } |
438 | |
439 | impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} |
440 | |
441 | impl<'ch, M: RawMutex, T, const N: usize> From<SendFuture<'ch, M, T, N>> for DynamicSendFuture<'ch, T> { |
442 | fn from(value: SendFuture<'ch, M, T, N>) -> Self { |
443 | Self { |
444 | channel: value.channel, |
445 | message: value.message, |
446 | } |
447 | } |
448 | } |
449 | |
450 | pub(crate) trait DynamicChannel<T> { |
451 | fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>; |
452 | |
453 | fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>; |
454 | |
455 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; |
456 | fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; |
457 | |
458 | fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T>; |
459 | } |
460 | |
461 | /// Error returned by [`try_receive`](Channel::try_receive). |
462 | #[derive (PartialEq, Eq, Clone, Copy, Debug)] |
463 | #[cfg_attr (feature = "defmt" , derive(defmt::Format))] |
464 | pub enum TryReceiveError { |
465 | /// A message could not be received because the channel is empty. |
466 | Empty, |
467 | } |
468 | |
469 | /// Error returned by [`try_send`](Channel::try_send). |
470 | #[derive (PartialEq, Eq, Clone, Copy, Debug)] |
471 | #[cfg_attr (feature = "defmt" , derive(defmt::Format))] |
472 | pub enum TrySendError<T> { |
473 | /// The data could not be sent on the channel because the channel is |
474 | /// currently full and sending would require blocking. |
475 | Full(T), |
476 | } |
477 | |
478 | struct ChannelState<T, const N: usize> { |
479 | queue: Deque<T, N>, |
480 | receiver_waker: WakerRegistration, |
481 | senders_waker: WakerRegistration, |
482 | } |
483 | |
484 | impl<T, const N: usize> ChannelState<T, N> { |
485 | const fn new() -> Self { |
486 | ChannelState { |
487 | queue: Deque::new(), |
488 | receiver_waker: WakerRegistration::new(), |
489 | senders_waker: WakerRegistration::new(), |
490 | } |
491 | } |
492 | |
493 | fn try_receive(&mut self) -> Result<T, TryReceiveError> { |
494 | self.try_receive_with_context(None) |
495 | } |
496 | |
497 | fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { |
498 | if self.queue.is_full() { |
499 | self.senders_waker.wake(); |
500 | } |
501 | |
502 | if let Some(message) = self.queue.pop_front() { |
503 | Ok(message) |
504 | } else { |
505 | if let Some(cx) = cx { |
506 | self.receiver_waker.register(cx.waker()); |
507 | } |
508 | Err(TryReceiveError::Empty) |
509 | } |
510 | } |
511 | |
512 | fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> { |
513 | if self.queue.is_full() { |
514 | self.senders_waker.wake(); |
515 | } |
516 | |
517 | if let Some(message) = self.queue.pop_front() { |
518 | Poll::Ready(message) |
519 | } else { |
520 | self.receiver_waker.register(cx.waker()); |
521 | Poll::Pending |
522 | } |
523 | } |
524 | |
525 | fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
526 | self.receiver_waker.register(cx.waker()); |
527 | |
528 | if !self.queue.is_empty() { |
529 | Poll::Ready(()) |
530 | } else { |
531 | Poll::Pending |
532 | } |
533 | } |
534 | |
535 | fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> { |
536 | self.try_send_with_context(message, None) |
537 | } |
538 | |
539 | fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> { |
540 | match self.queue.push_back(message) { |
541 | Ok(()) => { |
542 | self.receiver_waker.wake(); |
543 | Ok(()) |
544 | } |
545 | Err(message) => { |
546 | if let Some(cx) = cx { |
547 | self.senders_waker.register(cx.waker()); |
548 | } |
549 | Err(TrySendError::Full(message)) |
550 | } |
551 | } |
552 | } |
553 | |
554 | fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
555 | self.senders_waker.register(cx.waker()); |
556 | |
557 | if !self.queue.is_full() { |
558 | Poll::Ready(()) |
559 | } else { |
560 | Poll::Pending |
561 | } |
562 | } |
563 | |
564 | fn clear(&mut self) { |
565 | self.queue.clear(); |
566 | } |
567 | |
568 | fn len(&self) -> usize { |
569 | self.queue.len() |
570 | } |
571 | |
572 | fn is_empty(&self) -> bool { |
573 | self.queue.is_empty() |
574 | } |
575 | |
576 | fn is_full(&self) -> bool { |
577 | self.queue.is_full() |
578 | } |
579 | } |
580 | |
581 | /// A bounded channel for communicating between asynchronous tasks |
582 | /// with backpressure. |
583 | /// |
584 | /// The channel will buffer up to the provided number of messages. Once the |
585 | /// buffer is full, attempts to `send` new messages will wait until a message is |
586 | /// received from the channel. |
587 | /// |
588 | /// All data sent will become available in the same order as it was sent. |
589 | pub struct Channel<M, T, const N: usize> |
590 | where |
591 | M: RawMutex, |
592 | { |
593 | inner: Mutex<M, RefCell<ChannelState<T, N>>>, |
594 | } |
595 | |
596 | impl<M, T, const N: usize> Channel<M, T, N> |
597 | where |
598 | M: RawMutex, |
599 | { |
600 | /// Establish a new bounded channel. For example, to create one with a NoopMutex: |
601 | /// |
602 | /// ``` |
603 | /// use embassy_sync::channel::Channel; |
604 | /// use embassy_sync::blocking_mutex::raw::NoopRawMutex; |
605 | /// |
606 | /// // Declare a bounded channel of 3 u32s. |
607 | /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new(); |
608 | /// ``` |
609 | pub const fn new() -> Self { |
610 | Self { |
611 | inner: Mutex::new(RefCell::new(ChannelState::new())), |
612 | } |
613 | } |
614 | |
615 | fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R { |
616 | self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut()))) |
617 | } |
618 | |
619 | fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { |
620 | self.lock(|c| c.try_receive_with_context(cx)) |
621 | } |
622 | |
623 | /// Poll the channel for the next message |
624 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { |
625 | self.lock(|c| c.poll_receive(cx)) |
626 | } |
627 | |
628 | fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> { |
629 | self.lock(|c| c.try_send_with_context(m, cx)) |
630 | } |
631 | |
632 | /// Allows a poll_fn to poll until the channel is ready to receive |
633 | pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { |
634 | self.lock(|c| c.poll_ready_to_receive(cx)) |
635 | } |
636 | |
637 | /// Allows a poll_fn to poll until the channel is ready to send |
638 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |
639 | self.lock(|c| c.poll_ready_to_send(cx)) |
640 | } |
641 | |
642 | /// Get a sender for this channel. |
643 | pub fn sender(&self) -> Sender<'_, M, T, N> { |
644 | Sender { channel: self } |
645 | } |
646 | |
647 | /// Get a receiver for this channel. |
648 | pub fn receiver(&self) -> Receiver<'_, M, T, N> { |
649 | Receiver { channel: self } |
650 | } |
651 | |
652 | /// Get a sender for this channel using dynamic dispatch. |
653 | pub fn dyn_sender(&self) -> DynamicSender<'_, T> { |
654 | DynamicSender { channel: self } |
655 | } |
656 | |
657 | /// Get a receiver for this channel using dynamic dispatch. |
658 | pub fn dyn_receiver(&self) -> DynamicReceiver<'_, T> { |
659 | DynamicReceiver { channel: self } |
660 | } |
661 | |
662 | /// Send a value, waiting until there is capacity. |
663 | /// |
664 | /// Sending completes when the value has been pushed to the channel's queue. |
665 | /// This doesn't mean the value has been received yet. |
666 | pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> { |
667 | SendFuture { |
668 | channel: self, |
669 | message: Some(message), |
670 | } |
671 | } |
672 | |
673 | /// Attempt to immediately send a message. |
674 | /// |
675 | /// This method differs from [`send`](Channel::send) by returning immediately if the channel's |
676 | /// buffer is full, instead of waiting. |
677 | /// |
678 | /// # Errors |
679 | /// |
680 | /// If the channel capacity has been reached, i.e., the channel has `n` |
681 | /// buffered values where `n` is the argument passed to [`Channel`], then an |
682 | /// error is returned. |
683 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { |
684 | self.lock(|c| c.try_send(message)) |
685 | } |
686 | |
687 | /// Receive the next value. |
688 | /// |
689 | /// If there are no messages in the channel's buffer, this method will |
690 | /// wait until a message is sent. |
691 | pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> { |
692 | ReceiveFuture { channel: self } |
693 | } |
694 | |
695 | /// Is a value ready to be received in the channel |
696 | /// |
697 | /// If there are no messages in the channel's buffer, this method will |
698 | /// wait until there is at least one |
699 | pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> { |
700 | ReceiveReadyFuture { channel: self } |
701 | } |
702 | |
703 | /// Attempt to immediately receive a message. |
704 | /// |
705 | /// This method will either receive a message from the channel immediately or return an error |
706 | /// if the channel is empty. |
707 | pub fn try_receive(&self) -> Result<T, TryReceiveError> { |
708 | self.lock(|c| c.try_receive()) |
709 | } |
710 | |
711 | /// Returns the maximum number of elements the channel can hold. |
712 | pub const fn capacity(&self) -> usize { |
713 | N |
714 | } |
715 | |
716 | /// Returns the free capacity of the channel. |
717 | /// |
718 | /// This is equivalent to `capacity() - len()` |
719 | pub fn free_capacity(&self) -> usize { |
720 | N - self.len() |
721 | } |
722 | |
723 | /// Clears all elements in the channel. |
724 | pub fn clear(&self) { |
725 | self.lock(|c| c.clear()); |
726 | } |
727 | |
728 | /// Returns the number of elements currently in the channel. |
729 | pub fn len(&self) -> usize { |
730 | self.lock(|c| c.len()) |
731 | } |
732 | |
733 | /// Returns whether the channel is empty. |
734 | pub fn is_empty(&self) -> bool { |
735 | self.lock(|c| c.is_empty()) |
736 | } |
737 | |
738 | /// Returns whether the channel is full. |
739 | pub fn is_full(&self) -> bool { |
740 | self.lock(|c| c.is_full()) |
741 | } |
742 | } |
743 | |
744 | /// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the |
745 | /// tradeoff cost of dynamic dispatch. |
746 | impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N> |
747 | where |
748 | M: RawMutex, |
749 | { |
750 | fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> { |
751 | Channel::try_send_with_context(self, m, cx) |
752 | } |
753 | |
754 | fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { |
755 | Channel::try_receive_with_context(self, cx) |
756 | } |
757 | |
758 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |
759 | Channel::poll_ready_to_send(self, cx) |
760 | } |
761 | |
762 | fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { |
763 | Channel::poll_ready_to_receive(self, cx) |
764 | } |
765 | |
766 | fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { |
767 | Channel::poll_receive(self, cx) |
768 | } |
769 | } |
770 | |
771 | #[cfg (test)] |
772 | mod tests { |
773 | use core::time::Duration; |
774 | |
775 | use futures_executor::ThreadPool; |
776 | use futures_timer::Delay; |
777 | use futures_util::task::SpawnExt; |
778 | use static_cell::StaticCell; |
779 | |
780 | use super::*; |
781 | use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; |
782 | |
783 | fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize { |
784 | c.queue.capacity() - c.queue.len() |
785 | } |
786 | |
787 | #[test ] |
788 | fn sending_once() { |
789 | let mut c = ChannelState::<u32, 3>::new(); |
790 | assert!(c.try_send(1).is_ok()); |
791 | assert_eq!(capacity(&c), 2); |
792 | } |
793 | |
794 | #[test ] |
795 | fn sending_when_full() { |
796 | let mut c = ChannelState::<u32, 3>::new(); |
797 | let _ = c.try_send(1); |
798 | let _ = c.try_send(1); |
799 | let _ = c.try_send(1); |
800 | match c.try_send(2) { |
801 | Err(TrySendError::Full(2)) => assert!(true), |
802 | _ => assert!(false), |
803 | } |
804 | assert_eq!(capacity(&c), 0); |
805 | } |
806 | |
807 | #[test ] |
808 | fn receiving_once_with_one_send() { |
809 | let mut c = ChannelState::<u32, 3>::new(); |
810 | assert!(c.try_send(1).is_ok()); |
811 | assert_eq!(c.try_receive().unwrap(), 1); |
812 | assert_eq!(capacity(&c), 3); |
813 | } |
814 | |
815 | #[test ] |
816 | fn receiving_when_empty() { |
817 | let mut c = ChannelState::<u32, 3>::new(); |
818 | match c.try_receive() { |
819 | Err(TryReceiveError::Empty) => assert!(true), |
820 | _ => assert!(false), |
821 | } |
822 | assert_eq!(capacity(&c), 3); |
823 | } |
824 | |
825 | #[test ] |
826 | fn simple_send_and_receive() { |
827 | let c = Channel::<NoopRawMutex, u32, 3>::new(); |
828 | assert!(c.try_send(1).is_ok()); |
829 | assert_eq!(c.try_receive().unwrap(), 1); |
830 | } |
831 | |
832 | #[test ] |
833 | fn cloning() { |
834 | let c = Channel::<NoopRawMutex, u32, 3>::new(); |
835 | let r1 = c.receiver(); |
836 | let s1 = c.sender(); |
837 | |
838 | let _ = r1.clone(); |
839 | let _ = s1.clone(); |
840 | } |
841 | |
842 | #[test ] |
843 | fn dynamic_dispatch_into() { |
844 | let c = Channel::<NoopRawMutex, u32, 3>::new(); |
845 | let s: DynamicSender<'_, u32> = c.sender().into(); |
846 | let r: DynamicReceiver<'_, u32> = c.receiver().into(); |
847 | |
848 | assert!(s.try_send(1).is_ok()); |
849 | assert_eq!(r.try_receive().unwrap(), 1); |
850 | } |
851 | |
852 | #[test ] |
853 | fn dynamic_dispatch_constructor() { |
854 | let c = Channel::<NoopRawMutex, u32, 3>::new(); |
855 | let s = c.dyn_sender(); |
856 | let r = c.dyn_receiver(); |
857 | |
858 | assert!(s.try_send(1).is_ok()); |
859 | assert_eq!(r.try_receive().unwrap(), 1); |
860 | } |
861 | |
862 | #[futures_test::test] |
863 | async fn receiver_receives_given_try_send_async() { |
864 | let executor = ThreadPool::new().unwrap(); |
865 | |
866 | static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new(); |
867 | let c = &*CHANNEL.init(Channel::new()); |
868 | let c2 = c; |
869 | assert!(executor |
870 | .spawn(async move { |
871 | assert!(c2.try_send(1).is_ok()); |
872 | }) |
873 | .is_ok()); |
874 | assert_eq!(c.receive().await, 1); |
875 | } |
876 | |
877 | #[futures_test::test] |
878 | async fn sender_send_completes_if_capacity() { |
879 | let c = Channel::<CriticalSectionRawMutex, u32, 1>::new(); |
880 | c.send(1).await; |
881 | assert_eq!(c.receive().await, 1); |
882 | } |
883 | |
884 | #[futures_test::test] |
885 | async fn senders_sends_wait_until_capacity() { |
886 | let executor = ThreadPool::new().unwrap(); |
887 | |
888 | static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 1>> = StaticCell::new(); |
889 | let c = &*CHANNEL.init(Channel::new()); |
890 | assert!(c.try_send(1).is_ok()); |
891 | |
892 | let c2 = c; |
893 | let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await }); |
894 | let c2 = c; |
895 | let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await }); |
896 | // Wish I could think of a means of determining that the async send is waiting instead. |
897 | // However, I've used the debugger to observe that the send does indeed wait. |
898 | Delay::new(Duration::from_millis(500)).await; |
899 | assert_eq!(c.receive().await, 1); |
900 | assert!(executor |
901 | .spawn(async move { |
902 | loop { |
903 | c.receive().await; |
904 | } |
905 | }) |
906 | .is_ok()); |
907 | send_task_1.unwrap().await; |
908 | send_task_2.unwrap().await; |
909 | } |
910 | } |
911 | |