| 1 | use super::{Container, Rb, RbBase, RbRead, RbWrite, SharedStorage}; |
| 2 | use crate::{consumer::Consumer, producer::Producer}; |
| 3 | use core::{ |
| 4 | mem::{ManuallyDrop, MaybeUninit}, |
| 5 | num::NonZeroUsize, |
| 6 | ptr, |
| 7 | sync::atomic::{AtomicUsize, Ordering}, |
| 8 | }; |
| 9 | use crossbeam_utils::CachePadded; |
| 10 | |
| 11 | #[cfg (feature = "alloc" )] |
| 12 | use alloc::sync::Arc; |
| 13 | |
| 14 | /// Ring buffer that could be shared between threads. |
| 15 | /// |
| 16 | /// Implements [`Sync`] *if `T` implements [`Send`]*. And therefore its [`Producer`] and [`Consumer`] implement [`Send`]. |
| 17 | /// |
| 18 | /// Note that there is no explicit requirement of `T: Send`. Instead [`SharedRb`] will work just fine even with `T: !Send` |
| 19 | /// until you try to send its [`Producer`] or [`Consumer`] to another thread. |
| 20 | #[cfg_attr ( |
| 21 | feature = "std" , |
| 22 | doc = r##" |
| 23 | ``` |
| 24 | use std::{thread, vec::Vec}; |
| 25 | use ringbuf::SharedRb; |
| 26 | |
| 27 | let (mut prod, mut cons) = SharedRb::<i32, Vec<_>>::new(256).split(); |
| 28 | thread::spawn(move || { |
| 29 | prod.push(123).unwrap(); |
| 30 | }) |
| 31 | .join(); |
| 32 | thread::spawn(move || { |
| 33 | assert_eq!(cons.pop().unwrap(), 123); |
| 34 | }) |
| 35 | .join(); |
| 36 | ``` |
| 37 | "## |
| 38 | )] |
| 39 | pub struct SharedRb<T, C: Container<T>> { |
| 40 | storage: SharedStorage<T, C>, |
| 41 | head: CachePadded<AtomicUsize>, |
| 42 | tail: CachePadded<AtomicUsize>, |
| 43 | } |
| 44 | |
| 45 | impl<T, C: Container<T>> RbBase<T> for SharedRb<T, C> { |
| 46 | #[inline ] |
| 47 | unsafe fn slices( |
| 48 | &self, |
| 49 | head: usize, |
| 50 | tail: usize, |
| 51 | ) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) { |
| 52 | self.storage.as_mut_slices(head, tail) |
| 53 | } |
| 54 | |
| 55 | #[inline ] |
| 56 | fn capacity_nonzero(&self) -> NonZeroUsize { |
| 57 | self.storage.len() |
| 58 | } |
| 59 | |
| 60 | #[inline ] |
| 61 | fn head(&self) -> usize { |
| 62 | self.head.load(Ordering::Acquire) |
| 63 | } |
| 64 | |
| 65 | #[inline ] |
| 66 | fn tail(&self) -> usize { |
| 67 | self.tail.load(Ordering::Acquire) |
| 68 | } |
| 69 | } |
| 70 | |
| 71 | impl<T, C: Container<T>> RbRead<T> for SharedRb<T, C> { |
| 72 | #[inline ] |
| 73 | unsafe fn set_head(&self, value: usize) { |
| 74 | self.head.store(val:value, order:Ordering::Release) |
| 75 | } |
| 76 | } |
| 77 | |
| 78 | impl<T, C: Container<T>> RbWrite<T> for SharedRb<T, C> { |
| 79 | #[inline ] |
| 80 | unsafe fn set_tail(&self, value: usize) { |
| 81 | self.tail.store(val:value, order:Ordering::Release) |
| 82 | } |
| 83 | } |
| 84 | |
| 85 | impl<T, C: Container<T>> Rb<T> for SharedRb<T, C> {} |
| 86 | |
| 87 | impl<T, C: Container<T>> Drop for SharedRb<T, C> { |
| 88 | fn drop(&mut self) { |
| 89 | self.clear(); |
| 90 | } |
| 91 | } |
| 92 | |
| 93 | impl<T, C: Container<T>> SharedRb<T, C> { |
| 94 | /// Constructs ring buffer from container and counters. |
| 95 | /// |
| 96 | /// # Safety |
| 97 | /// |
| 98 | /// The items in container inside `head..tail` range must be initialized, items outside this range must be uninitialized. |
| 99 | /// `head` and `tail` values must be valid (see [`RbBase`](`crate::ring_buffer::RbBase`)). |
| 100 | pub unsafe fn from_raw_parts(container: C, head: usize, tail: usize) -> Self { |
| 101 | Self { |
| 102 | storage: SharedStorage::new(container), |
| 103 | head: CachePadded::new(AtomicUsize::new(head)), |
| 104 | tail: CachePadded::new(AtomicUsize::new(tail)), |
| 105 | } |
| 106 | } |
| 107 | |
| 108 | /// Destructures ring buffer into underlying container and `head` and `tail` counters. |
| 109 | /// |
| 110 | /// # Safety |
| 111 | /// |
| 112 | /// Initialized contents of the container must be properly dropped. |
| 113 | pub unsafe fn into_raw_parts(self) -> (C, usize, usize) { |
| 114 | let (head, tail) = (self.head(), self.tail()); |
| 115 | let self_ = ManuallyDrop::new(self); |
| 116 | |
| 117 | (ptr::read(&self_.storage).into_inner(), head, tail) |
| 118 | } |
| 119 | |
| 120 | /// Splits ring buffer into producer and consumer. |
| 121 | /// |
| 122 | /// This method consumes the ring buffer and puts it on heap in [`Arc`]. If you don't want to use heap the see [`Self::split_ref`]. |
| 123 | #[cfg (feature = "alloc" )] |
| 124 | pub fn split(self) -> (Producer<T, Arc<Self>>, Consumer<T, Arc<Self>>) |
| 125 | where |
| 126 | Self: Sized, |
| 127 | { |
| 128 | let arc = Arc::new(self); |
| 129 | unsafe { (Producer::new(arc.clone()), Consumer::new(arc)) } |
| 130 | } |
| 131 | |
| 132 | /// Splits ring buffer into producer and consumer without using the heap. |
| 133 | /// |
| 134 | /// In this case producer and consumer stores a reference to the ring buffer, so you also need to store the buffer somewhere. |
| 135 | pub fn split_ref(&mut self) -> (Producer<T, &Self>, Consumer<T, &Self>) |
| 136 | where |
| 137 | Self: Sized, |
| 138 | { |
| 139 | unsafe { (Producer::new(self), Consumer::new(self)) } |
| 140 | } |
| 141 | } |
| 142 | |