1 | //! A zero-copy queue for sending values between asynchronous tasks. |
2 | //! |
3 | //! It can be used concurrently by a producer (sender) and a |
4 | //! consumer (receiver), i.e. it is an "SPSC channel". |
5 | //! |
6 | //! This queue takes a Mutex type so that various |
7 | //! targets can be attained. For example, a ThreadModeMutex can be used |
8 | //! for single-core Cortex-M targets where messages are only passed |
9 | //! between tasks running in thread mode. Similarly, a CriticalSectionMutex |
10 | //! can also be used for single-core targets where messages are to be |
11 | //! passed from exception mode e.g. out of an interrupt handler. |
12 | //! |
13 | //! This module provides a bounded channel that has a limit on the number of |
14 | //! messages that it can store, and if this limit is reached, trying to send |
15 | //! another message will result in an error being returned. |
16 | |
17 | use core::cell::RefCell; |
18 | use core::future::{poll_fn, Future}; |
19 | use core::marker::PhantomData; |
20 | use core::task::{Context, Poll}; |
21 | |
22 | use crate::blocking_mutex::raw::RawMutex; |
23 | use crate::blocking_mutex::Mutex; |
24 | use crate::waitqueue::WakerRegistration; |
25 | |
26 | /// A bounded zero-copy channel for communicating between asynchronous tasks |
27 | /// with backpressure. |
28 | /// |
29 | /// The channel will buffer up to the provided number of messages. Once the |
30 | /// buffer is full, attempts to `send` new messages will wait until a message is |
31 | /// received from the channel. |
32 | /// |
33 | /// All data sent will become available in the same order as it was sent. |
34 | /// |
35 | /// The channel requires a buffer of recyclable elements. Writing to the channel is done through |
36 | /// an `&mut T`. |
37 | pub struct Channel<'a, M: RawMutex, T> { |
38 | buf: *mut T, |
39 | phantom: PhantomData<&'a mut T>, |
40 | state: Mutex<M, RefCell<State>>, |
41 | } |
42 | |
43 | impl<'a, M: RawMutex, T> Channel<'a, M, T> { |
44 | /// Initialize a new [`Channel`]. |
45 | /// |
46 | /// The provided buffer will be used and reused by the channel's logic, and thus dictates the |
47 | /// channel's capacity. |
48 | pub fn new(buf: &'a mut [T]) -> Self { |
49 | let len = buf.len(); |
50 | assert!(len != 0); |
51 | |
52 | Self { |
53 | buf: buf.as_mut_ptr(), |
54 | phantom: PhantomData, |
55 | state: Mutex::new(RefCell::new(State { |
56 | capacity: len, |
57 | front: 0, |
58 | back: 0, |
59 | full: false, |
60 | send_waker: WakerRegistration::new(), |
61 | receive_waker: WakerRegistration::new(), |
62 | })), |
63 | } |
64 | } |
65 | |
66 | /// Creates a [`Sender`] and [`Receiver`] from an existing channel. |
67 | /// |
68 | /// Further Senders and Receivers can be created through [`Sender::borrow`] and |
69 | /// [`Receiver::borrow`] respectively. |
70 | pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { |
71 | (Sender { channel: self }, Receiver { channel: self }) |
72 | } |
73 | |
74 | /// Clears all elements in the channel. |
75 | pub fn clear(&mut self) { |
76 | self.state.lock(|s| { |
77 | s.borrow_mut().clear(); |
78 | }); |
79 | } |
80 | |
81 | /// Returns the number of elements currently in the channel. |
82 | pub fn len(&self) -> usize { |
83 | self.state.lock(|s| s.borrow().len()) |
84 | } |
85 | |
86 | /// Returns whether the channel is empty. |
87 | pub fn is_empty(&self) -> bool { |
88 | self.state.lock(|s| s.borrow().is_empty()) |
89 | } |
90 | |
91 | /// Returns whether the channel is full. |
92 | pub fn is_full(&self) -> bool { |
93 | self.state.lock(|s| s.borrow().is_full()) |
94 | } |
95 | } |
96 | |
97 | /// Send-only access to a [`Channel`]. |
98 | pub struct Sender<'a, M: RawMutex, T> { |
99 | channel: &'a Channel<'a, M, T>, |
100 | } |
101 | |
102 | impl<'a, M: RawMutex, T> Sender<'a, M, T> { |
103 | /// Creates one further [`Sender`] over the same channel. |
104 | pub fn borrow(&mut self) -> Sender<'_, M, T> { |
105 | Sender { channel: self.channel } |
106 | } |
107 | |
108 | /// Attempts to send a value over the channel. |
109 | pub fn try_send(&mut self) -> Option<&mut T> { |
110 | self.channel.state.lock(|s| { |
111 | let s = &mut *s.borrow_mut(); |
112 | match s.push_index() { |
113 | Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), |
114 | None => None, |
115 | } |
116 | }) |
117 | } |
118 | |
119 | /// Attempts to send a value over the channel. |
120 | pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> { |
121 | self.channel.state.lock(|s| { |
122 | let s = &mut *s.borrow_mut(); |
123 | match s.push_index() { |
124 | Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), |
125 | None => { |
126 | s.receive_waker.register(cx.waker()); |
127 | Poll::Pending |
128 | } |
129 | } |
130 | }) |
131 | } |
132 | |
133 | /// Asynchronously send a value over the channel. |
134 | pub fn send(&mut self) -> impl Future<Output = &mut T> { |
135 | poll_fn(|cx| { |
136 | self.channel.state.lock(|s| { |
137 | let s = &mut *s.borrow_mut(); |
138 | match s.push_index() { |
139 | Some(i) => { |
140 | let r = unsafe { &mut *self.channel.buf.add(i) }; |
141 | Poll::Ready(r) |
142 | } |
143 | None => { |
144 | s.receive_waker.register(cx.waker()); |
145 | Poll::Pending |
146 | } |
147 | } |
148 | }) |
149 | }) |
150 | } |
151 | |
152 | /// Notify the channel that the sending of the value has been finalized. |
153 | pub fn send_done(&mut self) { |
154 | self.channel.state.lock(|s| s.borrow_mut().push_done()) |
155 | } |
156 | |
157 | /// Clears all elements in the channel. |
158 | pub fn clear(&mut self) { |
159 | self.channel.state.lock(|s| { |
160 | s.borrow_mut().clear(); |
161 | }); |
162 | } |
163 | |
164 | /// Returns the number of elements currently in the channel. |
165 | pub fn len(&self) -> usize { |
166 | self.channel.state.lock(|s| s.borrow().len()) |
167 | } |
168 | |
169 | /// Returns whether the channel is empty. |
170 | pub fn is_empty(&self) -> bool { |
171 | self.channel.state.lock(|s| s.borrow().is_empty()) |
172 | } |
173 | |
174 | /// Returns whether the channel is full. |
175 | pub fn is_full(&self) -> bool { |
176 | self.channel.state.lock(|s| s.borrow().is_full()) |
177 | } |
178 | } |
179 | |
180 | /// Receive-only access to a [`Channel`]. |
181 | pub struct Receiver<'a, M: RawMutex, T> { |
182 | channel: &'a Channel<'a, M, T>, |
183 | } |
184 | |
185 | impl<'a, M: RawMutex, T> Receiver<'a, M, T> { |
186 | /// Creates one further [`Sender`] over the same channel. |
187 | pub fn borrow(&mut self) -> Receiver<'_, M, T> { |
188 | Receiver { channel: self.channel } |
189 | } |
190 | |
191 | /// Attempts to receive a value over the channel. |
192 | pub fn try_receive(&mut self) -> Option<&mut T> { |
193 | self.channel.state.lock(|s| { |
194 | let s = &mut *s.borrow_mut(); |
195 | match s.pop_index() { |
196 | Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), |
197 | None => None, |
198 | } |
199 | }) |
200 | } |
201 | |
202 | /// Attempts to asynchronously receive a value over the channel. |
203 | pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> { |
204 | self.channel.state.lock(|s| { |
205 | let s = &mut *s.borrow_mut(); |
206 | match s.pop_index() { |
207 | Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), |
208 | None => { |
209 | s.send_waker.register(cx.waker()); |
210 | Poll::Pending |
211 | } |
212 | } |
213 | }) |
214 | } |
215 | |
216 | /// Asynchronously receive a value over the channel. |
217 | pub fn receive(&mut self) -> impl Future<Output = &mut T> { |
218 | poll_fn(|cx| { |
219 | self.channel.state.lock(|s| { |
220 | let s = &mut *s.borrow_mut(); |
221 | match s.pop_index() { |
222 | Some(i) => { |
223 | let r = unsafe { &mut *self.channel.buf.add(i) }; |
224 | Poll::Ready(r) |
225 | } |
226 | None => { |
227 | s.send_waker.register(cx.waker()); |
228 | Poll::Pending |
229 | } |
230 | } |
231 | }) |
232 | }) |
233 | } |
234 | |
235 | /// Notify the channel that the receiving of the value has been finalized. |
236 | pub fn receive_done(&mut self) { |
237 | self.channel.state.lock(|s| s.borrow_mut().pop_done()) |
238 | } |
239 | |
240 | /// Clears all elements in the channel. |
241 | pub fn clear(&mut self) { |
242 | self.channel.state.lock(|s| { |
243 | s.borrow_mut().clear(); |
244 | }); |
245 | } |
246 | |
247 | /// Returns the number of elements currently in the channel. |
248 | pub fn len(&self) -> usize { |
249 | self.channel.state.lock(|s| s.borrow().len()) |
250 | } |
251 | |
252 | /// Returns whether the channel is empty. |
253 | pub fn is_empty(&self) -> bool { |
254 | self.channel.state.lock(|s| s.borrow().is_empty()) |
255 | } |
256 | |
257 | /// Returns whether the channel is full. |
258 | pub fn is_full(&self) -> bool { |
259 | self.channel.state.lock(|s| s.borrow().is_full()) |
260 | } |
261 | } |
262 | |
263 | struct State { |
264 | /// Maximum number of elements the channel can hold. |
265 | capacity: usize, |
266 | |
267 | /// Front index. Always 0..=(N-1) |
268 | front: usize, |
269 | /// Back index. Always 0..=(N-1). |
270 | back: usize, |
271 | |
272 | /// Used to distinguish "empty" and "full" cases when `front == back`. |
273 | /// May only be `true` if `front == back`, always `false` otherwise. |
274 | full: bool, |
275 | |
276 | send_waker: WakerRegistration, |
277 | receive_waker: WakerRegistration, |
278 | } |
279 | |
280 | impl State { |
281 | fn increment(&self, i: usize) -> usize { |
282 | if i + 1 == self.capacity { |
283 | 0 |
284 | } else { |
285 | i + 1 |
286 | } |
287 | } |
288 | |
289 | fn clear(&mut self) { |
290 | self.front = 0; |
291 | self.back = 0; |
292 | self.full = false; |
293 | } |
294 | |
295 | fn len(&self) -> usize { |
296 | if !self.full { |
297 | if self.back >= self.front { |
298 | self.back - self.front |
299 | } else { |
300 | self.capacity + self.back - self.front |
301 | } |
302 | } else { |
303 | self.capacity |
304 | } |
305 | } |
306 | |
307 | fn is_full(&self) -> bool { |
308 | self.full |
309 | } |
310 | |
311 | fn is_empty(&self) -> bool { |
312 | self.front == self.back && !self.full |
313 | } |
314 | |
315 | fn push_index(&mut self) -> Option<usize> { |
316 | match self.is_full() { |
317 | true => None, |
318 | false => Some(self.back), |
319 | } |
320 | } |
321 | |
322 | fn push_done(&mut self) { |
323 | assert!(!self.is_full()); |
324 | self.back = self.increment(self.back); |
325 | if self.back == self.front { |
326 | self.full = true; |
327 | } |
328 | self.send_waker.wake(); |
329 | } |
330 | |
331 | fn pop_index(&mut self) -> Option<usize> { |
332 | match self.is_empty() { |
333 | true => None, |
334 | false => Some(self.front), |
335 | } |
336 | } |
337 | |
338 | fn pop_done(&mut self) { |
339 | assert!(!self.is_empty()); |
340 | self.front = self.increment(self.front); |
341 | self.full = false; |
342 | self.receive_waker.wake(); |
343 | } |
344 | } |
345 | |