1 | //! Atomic reusable ringbuffer. |
2 | use core::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; |
3 | use core::{ptr, slice}; |
4 | |
5 | /// Atomic reusable ringbuffer |
6 | /// |
7 | /// This ringbuffer implementation is designed to be stored in a `static`, |
8 | /// therefore all methods take `&self` and not `&mut self`. |
9 | /// |
10 | /// It is "reusable": when created it has no backing buffer, you can give it |
11 | /// one with `init` and take it back with `deinit`, and init it again in the |
12 | /// future if needed. This is very non-idiomatic, but helps a lot when storing |
13 | /// it in a `static`. |
14 | /// |
15 | /// One concurrent writer and one concurrent reader are supported, even at |
16 | /// different execution priorities (like main and irq). |
17 | pub struct RingBuffer { |
18 | #[doc (hidden)] |
19 | pub buf: AtomicPtr<u8>, |
20 | len: AtomicUsize, |
21 | |
22 | // start and end wrap at len*2, not at len. |
23 | // This allows distinguishing "full" and "empty". |
24 | // full is when start+len == end (modulo len*2) |
25 | // empty is when start == end |
26 | // |
27 | // This avoids having to consider the ringbuffer "full" at len-1 instead of len. |
28 | // The usual solution is adding a "full" flag, but that can't be made atomic |
29 | #[doc (hidden)] |
30 | pub start: AtomicUsize, |
31 | #[doc (hidden)] |
32 | pub end: AtomicUsize, |
33 | } |
34 | |
35 | /// A type which can only read from a ring buffer. |
36 | pub struct Reader<'a>(&'a RingBuffer); |
37 | |
38 | /// A type which can only write to a ring buffer. |
39 | pub struct Writer<'a>(&'a RingBuffer); |
40 | |
41 | impl RingBuffer { |
42 | /// Create a new empty ringbuffer. |
43 | pub const fn new() -> Self { |
44 | Self { |
45 | buf: AtomicPtr::new(core::ptr::null_mut()), |
46 | len: AtomicUsize::new(0), |
47 | start: AtomicUsize::new(0), |
48 | end: AtomicUsize::new(0), |
49 | } |
50 | } |
51 | |
52 | /// Initialize the ring buffer with a buffer. |
53 | /// |
54 | /// # Safety |
55 | /// - The buffer (`buf .. buf+len`) must be valid memory until `deinit` is called. |
56 | /// - Must not be called concurrently with any other methods. |
57 | pub unsafe fn init(&self, buf: *mut u8, len: usize) { |
58 | // Ordering: it's OK to use `Relaxed` because this is not called |
59 | // concurrently with other methods. |
60 | self.buf.store(buf, Ordering::Relaxed); |
61 | self.len.store(len, Ordering::Relaxed); |
62 | self.start.store(0, Ordering::Relaxed); |
63 | self.end.store(0, Ordering::Relaxed); |
64 | } |
65 | |
66 | /// Deinitialize the ringbuffer. |
67 | /// |
68 | /// After calling this, the ringbuffer becomes empty, as if it was |
69 | /// just created with `new()`. |
70 | /// |
71 | /// # Safety |
72 | /// - Must not be called concurrently with any other methods. |
73 | pub unsafe fn deinit(&self) { |
74 | // Ordering: it's OK to use `Relaxed` because this is not called |
75 | // concurrently with other methods. |
76 | self.buf.store(ptr::null_mut(), Ordering::Relaxed); |
77 | self.len.store(0, Ordering::Relaxed); |
78 | self.start.store(0, Ordering::Relaxed); |
79 | self.end.store(0, Ordering::Relaxed); |
80 | } |
81 | |
82 | /// Create a reader. |
83 | /// |
84 | /// # Safety |
85 | /// |
86 | /// - Only one reader can exist at a time. |
87 | /// - Ringbuffer must be initialized. |
88 | pub unsafe fn reader(&self) -> Reader<'_> { |
89 | Reader(self) |
90 | } |
91 | |
92 | /// Try creating a reader, fails if not initialized. |
93 | /// |
94 | /// # Safety |
95 | /// |
96 | /// Only one reader can exist at a time. |
97 | pub unsafe fn try_reader(&self) -> Option<Reader<'_>> { |
98 | if self.buf.load(Ordering::Relaxed).is_null() { |
99 | return None; |
100 | } |
101 | Some(Reader(self)) |
102 | } |
103 | |
104 | /// Create a writer. |
105 | /// |
106 | /// # Safety |
107 | /// |
108 | /// - Only one writer can exist at a time. |
109 | /// - Ringbuffer must be initialized. |
110 | pub unsafe fn writer(&self) -> Writer<'_> { |
111 | Writer(self) |
112 | } |
113 | |
114 | /// Try creating a writer, fails if not initialized. |
115 | /// |
116 | /// # Safety |
117 | /// |
118 | /// Only one writer can exist at a time. |
119 | pub unsafe fn try_writer(&self) -> Option<Writer<'_>> { |
120 | if self.buf.load(Ordering::Relaxed).is_null() { |
121 | return None; |
122 | } |
123 | Some(Writer(self)) |
124 | } |
125 | |
126 | /// Return if buffer is available. |
127 | pub fn is_available(&self) -> bool { |
128 | !self.buf.load(Ordering::Relaxed).is_null() && self.len.load(Ordering::Relaxed) != 0 |
129 | } |
130 | |
131 | /// Return length of buffer. |
132 | pub fn len(&self) -> usize { |
133 | self.len.load(Ordering::Relaxed) |
134 | } |
135 | |
136 | /// Check if buffer is full. |
137 | pub fn is_full(&self) -> bool { |
138 | let len = self.len.load(Ordering::Relaxed); |
139 | let start = self.start.load(Ordering::Relaxed); |
140 | let end = self.end.load(Ordering::Relaxed); |
141 | |
142 | self.wrap(start + len) == end |
143 | } |
144 | |
145 | /// Check if buffer is empty. |
146 | pub fn is_empty(&self) -> bool { |
147 | let start = self.start.load(Ordering::Relaxed); |
148 | let end = self.end.load(Ordering::Relaxed); |
149 | |
150 | start == end |
151 | } |
152 | |
153 | fn wrap(&self, mut n: usize) -> usize { |
154 | let len = self.len.load(Ordering::Relaxed); |
155 | |
156 | if n >= len * 2 { |
157 | n -= len * 2 |
158 | } |
159 | n |
160 | } |
161 | } |
162 | |
163 | impl<'a> Writer<'a> { |
164 | /// Push data into the buffer in-place. |
165 | /// |
166 | /// The closure `f` is called with a free part of the buffer, it must write |
167 | /// some data to it and return the amount of bytes written. |
168 | pub fn push(&mut self, f: impl FnOnce(&mut [u8]) -> usize) -> usize { |
169 | let (p, n) = self.push_buf(); |
170 | let buf = unsafe { slice::from_raw_parts_mut(p, n) }; |
171 | let n = f(buf); |
172 | self.push_done(n); |
173 | n |
174 | } |
175 | |
176 | /// Push one data byte. |
177 | /// |
178 | /// Returns true if pushed successfully. |
179 | pub fn push_one(&mut self, val: u8) -> bool { |
180 | let n = self.push(|f| match f { |
181 | [] => 0, |
182 | [x, ..] => { |
183 | *x = val; |
184 | 1 |
185 | } |
186 | }); |
187 | n != 0 |
188 | } |
189 | |
190 | /// Get a buffer where data can be pushed to. |
191 | /// |
192 | /// Equivalent to [`Self::push_buf`] but returns a slice. |
193 | pub fn push_slice(&mut self) -> &mut [u8] { |
194 | let (data, len) = self.push_buf(); |
195 | unsafe { slice::from_raw_parts_mut(data, len) } |
196 | } |
197 | |
198 | /// Get up to two buffers where data can be pushed to. |
199 | /// |
200 | /// Equivalent to [`Self::push_bufs`] but returns slices. |
201 | pub fn push_slices(&mut self) -> [&mut [u8]; 2] { |
202 | let [(d0, l0), (d1, l1)] = self.push_bufs(); |
203 | unsafe { [slice::from_raw_parts_mut(d0, l0), slice::from_raw_parts_mut(d1, l1)] } |
204 | } |
205 | |
206 | /// Get a buffer where data can be pushed to. |
207 | /// |
208 | /// Write data to the start of the buffer, then call `push_done` with |
209 | /// however many bytes you've pushed. |
210 | /// |
211 | /// The buffer is suitable to DMA to. |
212 | /// |
213 | /// If the ringbuf is full, size=0 will be returned. |
214 | /// |
215 | /// The buffer stays valid as long as no other `Writer` method is called |
216 | /// and `init`/`deinit` aren't called on the ringbuf. |
217 | pub fn push_buf(&mut self) -> (*mut u8, usize) { |
218 | // Ordering: popping writes `start` last, so we read `start` first. |
219 | // Read it with Acquire ordering, so that the next accesses can't be reordered up past it. |
220 | let mut start = self.0.start.load(Ordering::Acquire); |
221 | let buf = self.0.buf.load(Ordering::Relaxed); |
222 | let len = self.0.len.load(Ordering::Relaxed); |
223 | let mut end = self.0.end.load(Ordering::Relaxed); |
224 | |
225 | let empty = start == end; |
226 | |
227 | if start >= len { |
228 | start -= len |
229 | } |
230 | if end >= len { |
231 | end -= len |
232 | } |
233 | |
234 | if start == end && !empty { |
235 | // full |
236 | return (buf, 0); |
237 | } |
238 | let n = if start > end { start - end } else { len - end }; |
239 | |
240 | trace!(" ringbuf: push_buf {:?}..{:?}" , end, end + n); |
241 | (unsafe { buf.add(end) }, n) |
242 | } |
243 | |
244 | /// Get up to two buffers where data can be pushed to. |
245 | /// |
246 | /// Write data starting at the beginning of the first buffer, then call |
247 | /// `push_done` with however many bytes you've pushed. |
248 | /// |
249 | /// The buffers are suitable to DMA to. |
250 | /// |
251 | /// If the ringbuf is full, both buffers will be zero length. |
252 | /// If there is only area available, the second buffer will be zero length. |
253 | /// |
254 | /// The buffer stays valid as long as no other `Writer` method is called |
255 | /// and `init`/`deinit` aren't called on the ringbuf. |
256 | pub fn push_bufs(&mut self) -> [(*mut u8, usize); 2] { |
257 | // Ordering: as per push_buf() |
258 | let mut start = self.0.start.load(Ordering::Acquire); |
259 | let buf = self.0.buf.load(Ordering::Relaxed); |
260 | let len = self.0.len.load(Ordering::Relaxed); |
261 | let mut end = self.0.end.load(Ordering::Relaxed); |
262 | |
263 | let empty = start == end; |
264 | |
265 | if start >= len { |
266 | start -= len |
267 | } |
268 | if end >= len { |
269 | end -= len |
270 | } |
271 | |
272 | if start == end && !empty { |
273 | // full |
274 | return [(buf, 0), (buf, 0)]; |
275 | } |
276 | let n0 = if start > end { start - end } else { len - end }; |
277 | let n1 = if start <= end { start } else { 0 }; |
278 | |
279 | trace!(" ringbuf: push_bufs [{:?}..{:?}, {:?}..{:?}]" , end, end + n0, 0, n1); |
280 | [(unsafe { buf.add(end) }, n0), (buf, n1)] |
281 | } |
282 | |
283 | /// Mark n bytes as written and advance the write index. |
284 | pub fn push_done(&mut self, n: usize) { |
285 | trace!(" ringbuf: push {:?}" , n); |
286 | let end = self.0.end.load(Ordering::Relaxed); |
287 | |
288 | // Ordering: write `end` last, with Release ordering. |
289 | // The ordering ensures no preceding memory accesses (such as writing |
290 | // the actual data in the buffer) can be reordered down past it, which |
291 | // will guarantee the reader sees them after reading from `end`. |
292 | self.0.end.store(self.0.wrap(end + n), Ordering::Release); |
293 | } |
294 | } |
295 | |
296 | impl<'a> Reader<'a> { |
297 | /// Pop data from the buffer in-place. |
298 | /// |
299 | /// The closure `f` is called with the next data, it must process |
300 | /// some data from it and return the amount of bytes processed. |
301 | pub fn pop(&mut self, f: impl FnOnce(&[u8]) -> usize) -> usize { |
302 | let (p, n) = self.pop_buf(); |
303 | let buf = unsafe { slice::from_raw_parts(p, n) }; |
304 | let n = f(buf); |
305 | self.pop_done(n); |
306 | n |
307 | } |
308 | |
309 | /// Pop one data byte. |
310 | /// |
311 | /// Returns true if popped successfully. |
312 | pub fn pop_one(&mut self) -> Option<u8> { |
313 | let mut res = None; |
314 | self.pop(|f| match f { |
315 | &[] => 0, |
316 | &[x, ..] => { |
317 | res = Some(x); |
318 | 1 |
319 | } |
320 | }); |
321 | res |
322 | } |
323 | |
324 | /// Get a buffer where data can be popped from. |
325 | /// |
326 | /// Equivalent to [`Self::pop_buf`] but returns a slice. |
327 | pub fn pop_slice(&mut self) -> &mut [u8] { |
328 | let (data, len) = self.pop_buf(); |
329 | unsafe { slice::from_raw_parts_mut(data, len) } |
330 | } |
331 | |
332 | /// Get a buffer where data can be popped from. |
333 | /// |
334 | /// Read data from the start of the buffer, then call `pop_done` with |
335 | /// however many bytes you've processed. |
336 | /// |
337 | /// The buffer is suitable to DMA from. |
338 | /// |
339 | /// If the ringbuf is empty, size=0 will be returned. |
340 | /// |
341 | /// The buffer stays valid as long as no other `Reader` method is called |
342 | /// and `init`/`deinit` aren't called on the ringbuf. |
343 | pub fn pop_buf(&mut self) -> (*mut u8, usize) { |
344 | // Ordering: pushing writes `end` last, so we read `end` first. |
345 | // Read it with Acquire ordering, so that the next accesses can't be reordered up past it. |
346 | // This is needed to guarantee we "see" the data written by the writer. |
347 | let mut end = self.0.end.load(Ordering::Acquire); |
348 | let buf = self.0.buf.load(Ordering::Relaxed); |
349 | let len = self.0.len.load(Ordering::Relaxed); |
350 | let mut start = self.0.start.load(Ordering::Relaxed); |
351 | |
352 | if start == end { |
353 | return (buf, 0); |
354 | } |
355 | |
356 | if start >= len { |
357 | start -= len |
358 | } |
359 | if end >= len { |
360 | end -= len |
361 | } |
362 | |
363 | let n = if end > start { end - start } else { len - start }; |
364 | |
365 | trace!(" ringbuf: pop_buf {:?}..{:?}" , start, start + n); |
366 | (unsafe { buf.add(start) }, n) |
367 | } |
368 | |
369 | /// Mark n bytes as read and allow advance the read index. |
370 | pub fn pop_done(&mut self, n: usize) { |
371 | trace!(" ringbuf: pop {:?}" , n); |
372 | |
373 | let start = self.0.start.load(Ordering::Relaxed); |
374 | |
375 | // Ordering: write `start` last, with Release ordering. |
376 | // The ordering ensures no preceding memory accesses (such as reading |
377 | // the actual data) can be reordered down past it. This is necessary |
378 | // because writing to `start` is effectively freeing the read part of the |
379 | // buffer, which "gives permission" to the writer to write to it again. |
380 | // Therefore, all buffer accesses must be completed before this. |
381 | self.0.start.store(self.0.wrap(start + n), Ordering::Release); |
382 | } |
383 | } |
384 | |
385 | #[cfg (test)] |
386 | mod tests { |
387 | use super::*; |
388 | |
389 | #[test ] |
390 | fn push_pop() { |
391 | let mut b = [0; 4]; |
392 | let rb = RingBuffer::new(); |
393 | unsafe { |
394 | rb.init(b.as_mut_ptr(), 4); |
395 | |
396 | assert_eq!(rb.is_empty(), true); |
397 | assert_eq!(rb.is_full(), false); |
398 | |
399 | rb.writer().push(|buf| { |
400 | assert_eq!(4, buf.len()); |
401 | buf[0] = 1; |
402 | buf[1] = 2; |
403 | buf[2] = 3; |
404 | buf[3] = 4; |
405 | 4 |
406 | }); |
407 | |
408 | assert_eq!(rb.is_empty(), false); |
409 | assert_eq!(rb.is_full(), true); |
410 | |
411 | rb.writer().push(|buf| { |
412 | // If it's full, we can push 0 bytes. |
413 | assert_eq!(0, buf.len()); |
414 | 0 |
415 | }); |
416 | |
417 | assert_eq!(rb.is_empty(), false); |
418 | assert_eq!(rb.is_full(), true); |
419 | |
420 | rb.reader().pop(|buf| { |
421 | assert_eq!(4, buf.len()); |
422 | assert_eq!(1, buf[0]); |
423 | 1 |
424 | }); |
425 | |
426 | assert_eq!(rb.is_empty(), false); |
427 | assert_eq!(rb.is_full(), false); |
428 | |
429 | rb.reader().pop(|buf| { |
430 | assert_eq!(3, buf.len()); |
431 | 0 |
432 | }); |
433 | |
434 | assert_eq!(rb.is_empty(), false); |
435 | assert_eq!(rb.is_full(), false); |
436 | |
437 | rb.reader().pop(|buf| { |
438 | assert_eq!(3, buf.len()); |
439 | assert_eq!(2, buf[0]); |
440 | assert_eq!(3, buf[1]); |
441 | 2 |
442 | }); |
443 | rb.reader().pop(|buf| { |
444 | assert_eq!(1, buf.len()); |
445 | assert_eq!(4, buf[0]); |
446 | 1 |
447 | }); |
448 | |
449 | assert_eq!(rb.is_empty(), true); |
450 | assert_eq!(rb.is_full(), false); |
451 | |
452 | rb.reader().pop(|buf| { |
453 | assert_eq!(0, buf.len()); |
454 | 0 |
455 | }); |
456 | |
457 | rb.writer().push(|buf| { |
458 | assert_eq!(4, buf.len()); |
459 | buf[0] = 10; |
460 | 1 |
461 | }); |
462 | |
463 | rb.writer().push(|buf| { |
464 | assert_eq!(3, buf.len()); |
465 | buf[0] = 11; |
466 | buf[1] = 12; |
467 | 2 |
468 | }); |
469 | |
470 | assert_eq!(rb.is_empty(), false); |
471 | assert_eq!(rb.is_full(), false); |
472 | |
473 | rb.writer().push(|buf| { |
474 | assert_eq!(1, buf.len()); |
475 | buf[0] = 13; |
476 | 1 |
477 | }); |
478 | |
479 | assert_eq!(rb.is_empty(), false); |
480 | assert_eq!(rb.is_full(), true); |
481 | } |
482 | } |
483 | |
484 | #[test ] |
485 | fn zero_len() { |
486 | let mut b = [0; 0]; |
487 | |
488 | let rb = RingBuffer::new(); |
489 | unsafe { |
490 | rb.init(b.as_mut_ptr(), b.len()); |
491 | |
492 | assert_eq!(rb.is_empty(), true); |
493 | assert_eq!(rb.is_full(), true); |
494 | |
495 | rb.writer().push(|buf| { |
496 | assert_eq!(0, buf.len()); |
497 | 0 |
498 | }); |
499 | |
500 | rb.reader().pop(|buf| { |
501 | assert_eq!(0, buf.len()); |
502 | 0 |
503 | }); |
504 | } |
505 | } |
506 | |
507 | #[test ] |
508 | fn push_slices() { |
509 | let mut b = [0; 4]; |
510 | let rb = RingBuffer::new(); |
511 | unsafe { |
512 | rb.init(b.as_mut_ptr(), 4); |
513 | |
514 | /* push 3 -> [1 2 3 x] */ |
515 | let mut w = rb.writer(); |
516 | let ps = w.push_slices(); |
517 | assert_eq!(4, ps[0].len()); |
518 | assert_eq!(0, ps[1].len()); |
519 | ps[0][0] = 1; |
520 | ps[0][1] = 2; |
521 | ps[0][2] = 3; |
522 | w.push_done(3); |
523 | drop(w); |
524 | |
525 | /* pop 2 -> [x x 3 x] */ |
526 | rb.reader().pop(|buf| { |
527 | assert_eq!(3, buf.len()); |
528 | assert_eq!(1, buf[0]); |
529 | assert_eq!(2, buf[1]); |
530 | assert_eq!(3, buf[2]); |
531 | 2 |
532 | }); |
533 | |
534 | /* push 3 -> [5 6 3 4] */ |
535 | let mut w = rb.writer(); |
536 | let ps = w.push_slices(); |
537 | assert_eq!(1, ps[0].len()); |
538 | assert_eq!(2, ps[1].len()); |
539 | ps[0][0] = 4; |
540 | ps[1][0] = 5; |
541 | ps[1][1] = 6; |
542 | w.push_done(3); |
543 | drop(w); |
544 | |
545 | /* buf is now full */ |
546 | let mut w = rb.writer(); |
547 | let ps = w.push_slices(); |
548 | assert_eq!(0, ps[0].len()); |
549 | assert_eq!(0, ps[1].len()); |
550 | |
551 | /* pop 2 -> [5 6 x x] */ |
552 | rb.reader().pop(|buf| { |
553 | assert_eq!(2, buf.len()); |
554 | assert_eq!(3, buf[0]); |
555 | assert_eq!(4, buf[1]); |
556 | 2 |
557 | }); |
558 | |
559 | /* should now have one push slice again */ |
560 | let mut w = rb.writer(); |
561 | let ps = w.push_slices(); |
562 | assert_eq!(2, ps[0].len()); |
563 | assert_eq!(0, ps[1].len()); |
564 | drop(w); |
565 | |
566 | /* pop 2 -> [x x x x] */ |
567 | rb.reader().pop(|buf| { |
568 | assert_eq!(2, buf.len()); |
569 | assert_eq!(5, buf[0]); |
570 | assert_eq!(6, buf[1]); |
571 | 2 |
572 | }); |
573 | |
574 | /* should now have two push slices */ |
575 | let mut w = rb.writer(); |
576 | let ps = w.push_slices(); |
577 | assert_eq!(2, ps[0].len()); |
578 | assert_eq!(2, ps[1].len()); |
579 | drop(w); |
580 | |
581 | /* make sure we exercise all wrap around cases properly */ |
582 | for _ in 0..10 { |
583 | /* should be empty, push 1 */ |
584 | let mut w = rb.writer(); |
585 | let ps = w.push_slices(); |
586 | assert_eq!(4, ps[0].len() + ps[1].len()); |
587 | w.push_done(1); |
588 | drop(w); |
589 | |
590 | /* should have 1 element */ |
591 | let mut w = rb.writer(); |
592 | let ps = w.push_slices(); |
593 | assert_eq!(3, ps[0].len() + ps[1].len()); |
594 | drop(w); |
595 | |
596 | /* pop 1 */ |
597 | rb.reader().pop(|buf| { |
598 | assert_eq!(1, buf.len()); |
599 | 1 |
600 | }); |
601 | } |
602 | } |
603 | } |
604 | } |
605 | |