1//! A zero-copy queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by a producer (sender) and a
4//! consumer (receiver), i.e. it is an "SPSC channel".
5//!
6//! This queue takes a Mutex type so that various
7//! targets can be attained. For example, a ThreadModeMutex can be used
8//! for single-core Cortex-M targets where messages are only passed
9//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
10//! can also be used for single-core targets where messages are to be
11//! passed from exception mode e.g. out of an interrupt handler.
12//!
13//! This module provides a bounded channel that has a limit on the number of
14//! messages that it can store, and if this limit is reached, trying to send
15//! another message will result in an error being returned.
16
17use core::cell::RefCell;
18use core::future::{poll_fn, Future};
19use core::marker::PhantomData;
20use core::task::{Context, Poll};
21
22use crate::blocking_mutex::raw::RawMutex;
23use crate::blocking_mutex::Mutex;
24use crate::waitqueue::WakerRegistration;
25
26/// A bounded zero-copy channel for communicating between asynchronous tasks
27/// with backpressure.
28///
29/// The channel will buffer up to the provided number of messages. Once the
30/// buffer is full, attempts to `send` new messages will wait until a message is
31/// received from the channel.
32///
33/// All data sent will become available in the same order as it was sent.
34///
35/// The channel requires a buffer of recyclable elements. Writing to the channel is done through
36/// an `&mut T`.
37pub struct Channel<'a, M: RawMutex, T> {
38 buf: *mut T,
39 phantom: PhantomData<&'a mut T>,
40 state: Mutex<M, RefCell<State>>,
41}
42
43impl<'a, M: RawMutex, T> Channel<'a, M, T> {
44 /// Initialize a new [`Channel`].
45 ///
46 /// The provided buffer will be used and reused by the channel's logic, and thus dictates the
47 /// channel's capacity.
48 pub fn new(buf: &'a mut [T]) -> Self {
49 let len = buf.len();
50 assert!(len != 0);
51
52 Self {
53 buf: buf.as_mut_ptr(),
54 phantom: PhantomData,
55 state: Mutex::new(RefCell::new(State {
56 capacity: len,
57 front: 0,
58 back: 0,
59 full: false,
60 send_waker: WakerRegistration::new(),
61 receive_waker: WakerRegistration::new(),
62 })),
63 }
64 }
65
66 /// Creates a [`Sender`] and [`Receiver`] from an existing channel.
67 ///
68 /// Further Senders and Receivers can be created through [`Sender::borrow`] and
69 /// [`Receiver::borrow`] respectively.
70 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
71 (Sender { channel: self }, Receiver { channel: self })
72 }
73
74 /// Clears all elements in the channel.
75 pub fn clear(&mut self) {
76 self.state.lock(|s| {
77 s.borrow_mut().clear();
78 });
79 }
80
81 /// Returns the number of elements currently in the channel.
82 pub fn len(&self) -> usize {
83 self.state.lock(|s| s.borrow().len())
84 }
85
86 /// Returns whether the channel is empty.
87 pub fn is_empty(&self) -> bool {
88 self.state.lock(|s| s.borrow().is_empty())
89 }
90
91 /// Returns whether the channel is full.
92 pub fn is_full(&self) -> bool {
93 self.state.lock(|s| s.borrow().is_full())
94 }
95}
96
97/// Send-only access to a [`Channel`].
98pub struct Sender<'a, M: RawMutex, T> {
99 channel: &'a Channel<'a, M, T>,
100}
101
102impl<'a, M: RawMutex, T> Sender<'a, M, T> {
103 /// Creates one further [`Sender`] over the same channel.
104 pub fn borrow(&mut self) -> Sender<'_, M, T> {
105 Sender { channel: self.channel }
106 }
107
108 /// Attempts to send a value over the channel.
109 pub fn try_send(&mut self) -> Option<&mut T> {
110 self.channel.state.lock(|s| {
111 let s = &mut *s.borrow_mut();
112 match s.push_index() {
113 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
114 None => None,
115 }
116 })
117 }
118
119 /// Attempts to send a value over the channel.
120 pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
121 self.channel.state.lock(|s| {
122 let s = &mut *s.borrow_mut();
123 match s.push_index() {
124 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
125 None => {
126 s.receive_waker.register(cx.waker());
127 Poll::Pending
128 }
129 }
130 })
131 }
132
133 /// Asynchronously send a value over the channel.
134 pub fn send(&mut self) -> impl Future<Output = &mut T> {
135 poll_fn(|cx| {
136 self.channel.state.lock(|s| {
137 let s = &mut *s.borrow_mut();
138 match s.push_index() {
139 Some(i) => {
140 let r = unsafe { &mut *self.channel.buf.add(i) };
141 Poll::Ready(r)
142 }
143 None => {
144 s.receive_waker.register(cx.waker());
145 Poll::Pending
146 }
147 }
148 })
149 })
150 }
151
152 /// Notify the channel that the sending of the value has been finalized.
153 pub fn send_done(&mut self) {
154 self.channel.state.lock(|s| s.borrow_mut().push_done())
155 }
156
157 /// Clears all elements in the channel.
158 pub fn clear(&mut self) {
159 self.channel.state.lock(|s| {
160 s.borrow_mut().clear();
161 });
162 }
163
164 /// Returns the number of elements currently in the channel.
165 pub fn len(&self) -> usize {
166 self.channel.state.lock(|s| s.borrow().len())
167 }
168
169 /// Returns whether the channel is empty.
170 pub fn is_empty(&self) -> bool {
171 self.channel.state.lock(|s| s.borrow().is_empty())
172 }
173
174 /// Returns whether the channel is full.
175 pub fn is_full(&self) -> bool {
176 self.channel.state.lock(|s| s.borrow().is_full())
177 }
178}
179
180/// Receive-only access to a [`Channel`].
181pub struct Receiver<'a, M: RawMutex, T> {
182 channel: &'a Channel<'a, M, T>,
183}
184
185impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
186 /// Creates one further [`Sender`] over the same channel.
187 pub fn borrow(&mut self) -> Receiver<'_, M, T> {
188 Receiver { channel: self.channel }
189 }
190
191 /// Attempts to receive a value over the channel.
192 pub fn try_receive(&mut self) -> Option<&mut T> {
193 self.channel.state.lock(|s| {
194 let s = &mut *s.borrow_mut();
195 match s.pop_index() {
196 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
197 None => None,
198 }
199 })
200 }
201
202 /// Attempts to asynchronously receive a value over the channel.
203 pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> {
204 self.channel.state.lock(|s| {
205 let s = &mut *s.borrow_mut();
206 match s.pop_index() {
207 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
208 None => {
209 s.send_waker.register(cx.waker());
210 Poll::Pending
211 }
212 }
213 })
214 }
215
216 /// Asynchronously receive a value over the channel.
217 pub fn receive(&mut self) -> impl Future<Output = &mut T> {
218 poll_fn(|cx| {
219 self.channel.state.lock(|s| {
220 let s = &mut *s.borrow_mut();
221 match s.pop_index() {
222 Some(i) => {
223 let r = unsafe { &mut *self.channel.buf.add(i) };
224 Poll::Ready(r)
225 }
226 None => {
227 s.send_waker.register(cx.waker());
228 Poll::Pending
229 }
230 }
231 })
232 })
233 }
234
235 /// Notify the channel that the receiving of the value has been finalized.
236 pub fn receive_done(&mut self) {
237 self.channel.state.lock(|s| s.borrow_mut().pop_done())
238 }
239
240 /// Clears all elements in the channel.
241 pub fn clear(&mut self) {
242 self.channel.state.lock(|s| {
243 s.borrow_mut().clear();
244 });
245 }
246
247 /// Returns the number of elements currently in the channel.
248 pub fn len(&self) -> usize {
249 self.channel.state.lock(|s| s.borrow().len())
250 }
251
252 /// Returns whether the channel is empty.
253 pub fn is_empty(&self) -> bool {
254 self.channel.state.lock(|s| s.borrow().is_empty())
255 }
256
257 /// Returns whether the channel is full.
258 pub fn is_full(&self) -> bool {
259 self.channel.state.lock(|s| s.borrow().is_full())
260 }
261}
262
263struct State {
264 /// Maximum number of elements the channel can hold.
265 capacity: usize,
266
267 /// Front index. Always 0..=(N-1)
268 front: usize,
269 /// Back index. Always 0..=(N-1).
270 back: usize,
271
272 /// Used to distinguish "empty" and "full" cases when `front == back`.
273 /// May only be `true` if `front == back`, always `false` otherwise.
274 full: bool,
275
276 send_waker: WakerRegistration,
277 receive_waker: WakerRegistration,
278}
279
280impl State {
281 fn increment(&self, i: usize) -> usize {
282 if i + 1 == self.capacity {
283 0
284 } else {
285 i + 1
286 }
287 }
288
289 fn clear(&mut self) {
290 self.front = 0;
291 self.back = 0;
292 self.full = false;
293 }
294
295 fn len(&self) -> usize {
296 if !self.full {
297 if self.back >= self.front {
298 self.back - self.front
299 } else {
300 self.capacity + self.back - self.front
301 }
302 } else {
303 self.capacity
304 }
305 }
306
307 fn is_full(&self) -> bool {
308 self.full
309 }
310
311 fn is_empty(&self) -> bool {
312 self.front == self.back && !self.full
313 }
314
315 fn push_index(&mut self) -> Option<usize> {
316 match self.is_full() {
317 true => None,
318 false => Some(self.back),
319 }
320 }
321
322 fn push_done(&mut self) {
323 assert!(!self.is_full());
324 self.back = self.increment(self.back);
325 if self.back == self.front {
326 self.full = true;
327 }
328 self.send_waker.wake();
329 }
330
331 fn pop_index(&mut self) -> Option<usize> {
332 match self.is_empty() {
333 true => None,
334 false => Some(self.front),
335 }
336 }
337
338 fn pop_done(&mut self) {
339 assert!(!self.is_empty());
340 self.front = self.increment(self.front);
341 self.full = false;
342 self.receive_waker.wake();
343 }
344}
345