1 | use super::{Container, Rb, RbBase, RbRead, RbWrite, SharedStorage}; |
2 | use crate::{consumer::Consumer, producer::Producer}; |
3 | use core::{ |
4 | mem::{ManuallyDrop, MaybeUninit}, |
5 | num::NonZeroUsize, |
6 | ptr, |
7 | sync::atomic::{AtomicUsize, Ordering}, |
8 | }; |
9 | use crossbeam_utils::CachePadded; |
10 | |
11 | #[cfg (feature = "alloc" )] |
12 | use alloc::sync::Arc; |
13 | |
14 | /// Ring buffer that could be shared between threads. |
15 | /// |
16 | /// Implements [`Sync`] *if `T` implements [`Send`]*. And therefore its [`Producer`] and [`Consumer`] implement [`Send`]. |
17 | /// |
18 | /// Note that there is no explicit requirement of `T: Send`. Instead [`SharedRb`] will work just fine even with `T: !Send` |
19 | /// until you try to send its [`Producer`] or [`Consumer`] to another thread. |
20 | #[cfg_attr ( |
21 | feature = "std" , |
22 | doc = r##" |
23 | ``` |
24 | use std::{thread, vec::Vec}; |
25 | use ringbuf::SharedRb; |
26 | |
27 | let (mut prod, mut cons) = SharedRb::<i32, Vec<_>>::new(256).split(); |
28 | thread::spawn(move || { |
29 | prod.push(123).unwrap(); |
30 | }) |
31 | .join(); |
32 | thread::spawn(move || { |
33 | assert_eq!(cons.pop().unwrap(), 123); |
34 | }) |
35 | .join(); |
36 | ``` |
37 | "## |
38 | )] |
39 | pub struct SharedRb<T, C: Container<T>> { |
40 | storage: SharedStorage<T, C>, |
41 | head: CachePadded<AtomicUsize>, |
42 | tail: CachePadded<AtomicUsize>, |
43 | } |
44 | |
45 | impl<T, C: Container<T>> RbBase<T> for SharedRb<T, C> { |
46 | #[inline ] |
47 | unsafe fn slices( |
48 | &self, |
49 | head: usize, |
50 | tail: usize, |
51 | ) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) { |
52 | self.storage.as_mut_slices(head, tail) |
53 | } |
54 | |
55 | #[inline ] |
56 | fn capacity_nonzero(&self) -> NonZeroUsize { |
57 | self.storage.len() |
58 | } |
59 | |
60 | #[inline ] |
61 | fn head(&self) -> usize { |
62 | self.head.load(Ordering::Acquire) |
63 | } |
64 | |
65 | #[inline ] |
66 | fn tail(&self) -> usize { |
67 | self.tail.load(Ordering::Acquire) |
68 | } |
69 | } |
70 | |
71 | impl<T, C: Container<T>> RbRead<T> for SharedRb<T, C> { |
72 | #[inline ] |
73 | unsafe fn set_head(&self, value: usize) { |
74 | self.head.store(val:value, order:Ordering::Release) |
75 | } |
76 | } |
77 | |
78 | impl<T, C: Container<T>> RbWrite<T> for SharedRb<T, C> { |
79 | #[inline ] |
80 | unsafe fn set_tail(&self, value: usize) { |
81 | self.tail.store(val:value, order:Ordering::Release) |
82 | } |
83 | } |
84 | |
85 | impl<T, C: Container<T>> Rb<T> for SharedRb<T, C> {} |
86 | |
87 | impl<T, C: Container<T>> Drop for SharedRb<T, C> { |
88 | fn drop(&mut self) { |
89 | self.clear(); |
90 | } |
91 | } |
92 | |
93 | impl<T, C: Container<T>> SharedRb<T, C> { |
94 | /// Constructs ring buffer from container and counters. |
95 | /// |
96 | /// # Safety |
97 | /// |
98 | /// The items in container inside `head..tail` range must be initialized, items outside this range must be uninitialized. |
99 | /// `head` and `tail` values must be valid (see [`RbBase`](`crate::ring_buffer::RbBase`)). |
100 | pub unsafe fn from_raw_parts(container: C, head: usize, tail: usize) -> Self { |
101 | Self { |
102 | storage: SharedStorage::new(container), |
103 | head: CachePadded::new(AtomicUsize::new(head)), |
104 | tail: CachePadded::new(AtomicUsize::new(tail)), |
105 | } |
106 | } |
107 | |
108 | /// Destructures ring buffer into underlying container and `head` and `tail` counters. |
109 | /// |
110 | /// # Safety |
111 | /// |
112 | /// Initialized contents of the container must be properly dropped. |
113 | pub unsafe fn into_raw_parts(self) -> (C, usize, usize) { |
114 | let (head, tail) = (self.head(), self.tail()); |
115 | let self_ = ManuallyDrop::new(self); |
116 | |
117 | (ptr::read(&self_.storage).into_inner(), head, tail) |
118 | } |
119 | |
120 | /// Splits ring buffer into producer and consumer. |
121 | /// |
122 | /// This method consumes the ring buffer and puts it on heap in [`Arc`]. If you don't want to use heap the see [`Self::split_ref`]. |
123 | #[cfg (feature = "alloc" )] |
124 | pub fn split(self) -> (Producer<T, Arc<Self>>, Consumer<T, Arc<Self>>) |
125 | where |
126 | Self: Sized, |
127 | { |
128 | let arc = Arc::new(self); |
129 | unsafe { (Producer::new(arc.clone()), Consumer::new(arc)) } |
130 | } |
131 | |
132 | /// Splits ring buffer into producer and consumer without using the heap. |
133 | /// |
134 | /// In this case producer and consumer stores a reference to the ring buffer, so you also need to store the buffer somewhere. |
135 | pub fn split_ref(&mut self) -> (Producer<T, &Self>, Consumer<T, &Self>) |
136 | where |
137 | Self: Sized, |
138 | { |
139 | unsafe { (Producer::new(self), Consumer::new(self)) } |
140 | } |
141 | } |
142 | |