1use super::{Container, Rb, RbBase, RbRead, RbWrite, SharedStorage};
2use crate::{consumer::Consumer, producer::Producer};
3use core::{
4 mem::{ManuallyDrop, MaybeUninit},
5 num::NonZeroUsize,
6 ptr,
7 sync::atomic::{AtomicUsize, Ordering},
8};
9use crossbeam_utils::CachePadded;
10
11#[cfg(feature = "alloc")]
12use alloc::sync::Arc;
13
14/// Ring buffer that could be shared between threads.
15///
16/// Implements [`Sync`] *if `T` implements [`Send`]*. And therefore its [`Producer`] and [`Consumer`] implement [`Send`].
17///
18/// Note that there is no explicit requirement of `T: Send`. Instead [`SharedRb`] will work just fine even with `T: !Send`
19/// until you try to send its [`Producer`] or [`Consumer`] to another thread.
20#[cfg_attr(
21 feature = "std",
22 doc = r##"
23```
24use std::{thread, vec::Vec};
25use ringbuf::SharedRb;
26
27let (mut prod, mut cons) = SharedRb::<i32, Vec<_>>::new(256).split();
28thread::spawn(move || {
29 prod.push(123).unwrap();
30})
31.join();
32thread::spawn(move || {
33 assert_eq!(cons.pop().unwrap(), 123);
34})
35.join();
36```
37"##
38)]
39pub struct SharedRb<T, C: Container<T>> {
40 storage: SharedStorage<T, C>,
41 head: CachePadded<AtomicUsize>,
42 tail: CachePadded<AtomicUsize>,
43}
44
45impl<T, C: Container<T>> RbBase<T> for SharedRb<T, C> {
46 #[inline]
47 unsafe fn slices(
48 &self,
49 head: usize,
50 tail: usize,
51 ) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) {
52 self.storage.as_mut_slices(head, tail)
53 }
54
55 #[inline]
56 fn capacity_nonzero(&self) -> NonZeroUsize {
57 self.storage.len()
58 }
59
60 #[inline]
61 fn head(&self) -> usize {
62 self.head.load(Ordering::Acquire)
63 }
64
65 #[inline]
66 fn tail(&self) -> usize {
67 self.tail.load(Ordering::Acquire)
68 }
69}
70
71impl<T, C: Container<T>> RbRead<T> for SharedRb<T, C> {
72 #[inline]
73 unsafe fn set_head(&self, value: usize) {
74 self.head.store(val:value, order:Ordering::Release)
75 }
76}
77
78impl<T, C: Container<T>> RbWrite<T> for SharedRb<T, C> {
79 #[inline]
80 unsafe fn set_tail(&self, value: usize) {
81 self.tail.store(val:value, order:Ordering::Release)
82 }
83}
84
85impl<T, C: Container<T>> Rb<T> for SharedRb<T, C> {}
86
87impl<T, C: Container<T>> Drop for SharedRb<T, C> {
88 fn drop(&mut self) {
89 self.clear();
90 }
91}
92
93impl<T, C: Container<T>> SharedRb<T, C> {
94 /// Constructs ring buffer from container and counters.
95 ///
96 /// # Safety
97 ///
98 /// The items in container inside `head..tail` range must be initialized, items outside this range must be uninitialized.
99 /// `head` and `tail` values must be valid (see [`RbBase`](`crate::ring_buffer::RbBase`)).
100 pub unsafe fn from_raw_parts(container: C, head: usize, tail: usize) -> Self {
101 Self {
102 storage: SharedStorage::new(container),
103 head: CachePadded::new(AtomicUsize::new(head)),
104 tail: CachePadded::new(AtomicUsize::new(tail)),
105 }
106 }
107
108 /// Destructures ring buffer into underlying container and `head` and `tail` counters.
109 ///
110 /// # Safety
111 ///
112 /// Initialized contents of the container must be properly dropped.
113 pub unsafe fn into_raw_parts(self) -> (C, usize, usize) {
114 let (head, tail) = (self.head(), self.tail());
115 let self_ = ManuallyDrop::new(self);
116
117 (ptr::read(&self_.storage).into_inner(), head, tail)
118 }
119
120 /// Splits ring buffer into producer and consumer.
121 ///
122 /// This method consumes the ring buffer and puts it on heap in [`Arc`]. If you don't want to use heap the see [`Self::split_ref`].
123 #[cfg(feature = "alloc")]
124 pub fn split(self) -> (Producer<T, Arc<Self>>, Consumer<T, Arc<Self>>)
125 where
126 Self: Sized,
127 {
128 let arc = Arc::new(self);
129 unsafe { (Producer::new(arc.clone()), Consumer::new(arc)) }
130 }
131
132 /// Splits ring buffer into producer and consumer without using the heap.
133 ///
134 /// In this case producer and consumer stores a reference to the ring buffer, so you also need to store the buffer somewhere.
135 pub fn split_ref(&mut self) -> (Producer<T, &Self>, Consumer<T, &Self>)
136 where
137 Self: Sized,
138 {
139 unsafe { (Producer::new(self), Consumer::new(self)) }
140 }
141}
142