1use crate::{
2 ring_buffer::{RbBase, RbRef, RbWrap, RbWrite, RbWriteCache},
3 utils::write_slice,
4};
5use core::{marker::PhantomData, mem::MaybeUninit};
6
7#[cfg(feature = "std")]
8use crate::utils::slice_assume_init_mut;
9#[cfg(feature = "std")]
10use core::cmp;
11#[cfg(feature = "std")]
12use std::io::{self, Read, Write};
13
14/// Producer part of ring buffer.
15///
16/// # Mode
17///
18/// It can operate in immediate (by default) or postponed mode.
19/// Mode could be switched using [`Self::postponed`]/[`Self::into_postponed`] and [`Self::into_immediate`] methods.
20///
21/// + In immediate mode removed and inserted items are automatically synchronized with the other end.
22/// + In postponed mode synchronization occurs only when [`Self::sync`] or [`Self::into_immediate`] is called or when `Self` is dropped.
23/// The reason to use postponed mode is that multiple subsequent operations are performed faster due to less frequent cache synchronization.
24pub struct Producer<T, R: RbRef>
25where
26 R::Rb: RbWrite<T>,
27{
28 target: R,
29 _phantom: PhantomData<T>,
30}
31
32impl<T, R: RbRef> Producer<T, R>
33where
34 R::Rb: RbWrite<T>,
35{
36 /// Creates producer from the ring buffer reference.
37 ///
38 /// # Safety
39 ///
40 /// There must be only one producer containing the same ring buffer reference.
41 pub unsafe fn new(target: R) -> Self {
42 Self {
43 target,
44 _phantom: PhantomData,
45 }
46 }
47
48 /// Returns reference to the underlying ring buffer.
49 #[inline]
50 pub fn rb(&self) -> &R::Rb {
51 &self.target
52 }
53
54 /// Consumes `self` and returns underlying ring buffer reference.
55 pub fn into_rb_ref(self) -> R {
56 self.target
57 }
58
59 /// Returns postponed producer that borrows [`Self`].
60 pub fn postponed(&mut self) -> PostponedProducer<T, &R::Rb> {
61 unsafe { Producer::new(RbWrap(RbWriteCache::new(&self.target))) }
62 }
63
64 /// Transforms [`Self`] into postponed producer.
65 pub fn into_postponed(self) -> PostponedProducer<T, R> {
66 unsafe { Producer::new(RbWrap(RbWriteCache::new(self.target))) }
67 }
68
69 /// Returns capacity of the ring buffer.
70 ///
71 /// The capacity of the buffer is constant.
72 #[inline]
73 pub fn capacity(&self) -> usize {
74 self.target.capacity_nonzero().get()
75 }
76
77 /// Checks if the ring buffer is empty.
78 #[inline]
79 pub fn is_empty(&self) -> bool {
80 self.target.is_empty()
81 }
82
83 /// Checks if the ring buffer is full.
84 ///
85 /// *The result may become irrelevant at any time because of concurring consumer activity.*
86 #[inline]
87 pub fn is_full(&self) -> bool {
88 self.target.is_full()
89 }
90
91 /// The number of items stored in the buffer.
92 ///
93 /// *Actual number may be less than the returned value because of concurring consumer activity.*
94 #[inline]
95 pub fn len(&self) -> usize {
96 self.target.occupied_len()
97 }
98
99 /// The number of remaining free places in the buffer.
100 ///
101 /// *Actual number may be greater than the returning value because of concurring consumer activity.*
102 #[inline]
103 pub fn free_len(&self) -> usize {
104 self.target.vacant_len()
105 }
106
107 /// Provides a direct access to the ring buffer vacant memory.
108 /// Returns a pair of slices of uninitialized memory, the second one may be empty.
109 ///
110 /// # Safety
111 ///
112 /// Vacant memory is uninitialized. Initialized items must be put starting from the beginning of first slice.
113 /// When first slice is fully filled then items must be put to the beginning of the second slice.
114 ///
115 /// *This method must be followed by `Self::advance` call with the number of items being put previously as argument.*
116 /// *No other mutating calls allowed before that.*
117 #[inline]
118 pub unsafe fn free_space_as_slices(
119 &mut self,
120 ) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) {
121 self.target.vacant_slices()
122 }
123
124 /// Moves `tail` counter by `count` places.
125 ///
126 /// # Safety
127 ///
128 /// First `count` items in free space must be initialized.
129 #[inline]
130 pub unsafe fn advance(&mut self, count: usize) {
131 self.target.advance_tail(count)
132 }
133
134 /// Appends an item to the ring buffer.
135 ///
136 /// On failure returns an `Err` containing the item that hasn't been appended.
137 pub fn push(&mut self, elem: T) -> Result<(), T> {
138 if !self.is_full() {
139 unsafe {
140 self.free_space_as_slices()
141 .0
142 .get_unchecked_mut(0)
143 .write(elem)
144 };
145 unsafe { self.advance(1) };
146 Ok(())
147 } else {
148 Err(elem)
149 }
150 }
151
152 /// Appends items from an iterator to the ring buffer.
153 /// Elements that haven't been added to the ring buffer remain in the iterator.
154 ///
155 /// Returns count of items been appended to the ring buffer.
156 ///
157 /// *Inserted items are committed to the ring buffer all at once in the end,*
158 /// *e.g. when buffer is full or iterator has ended.*
159 pub fn push_iter<I: Iterator<Item = T>>(&mut self, iter: &mut I) -> usize {
160 let (left, right) = unsafe { self.free_space_as_slices() };
161 let mut count = 0;
162 for place in left.iter_mut().chain(right.iter_mut()) {
163 match iter.next() {
164 Some(elem) => unsafe { place.as_mut_ptr().write(elem) },
165 None => break,
166 }
167 count += 1;
168 }
169 unsafe { self.advance(count) };
170 count
171 }
172}
173
174impl<T: Copy, R: RbRef> Producer<T, R>
175where
176 R::Rb: RbWrite<T>,
177{
178 /// Appends items from slice to the ring buffer.
179 /// Elements must be [`Copy`].
180 ///
181 /// Returns count of items been appended to the ring buffer.
182 pub fn push_slice(&mut self, elems: &[T]) -> usize {
183 let (left, right) = unsafe { self.free_space_as_slices() };
184 let count = if elems.len() < left.len() {
185 write_slice(&mut left[..elems.len()], elems);
186 elems.len()
187 } else {
188 let (left_elems, elems) = elems.split_at(left.len());
189 write_slice(left, left_elems);
190 left.len()
191 + if elems.len() < right.len() {
192 write_slice(&mut right[..elems.len()], elems);
193 elems.len()
194 } else {
195 write_slice(right, &elems[..right.len()]);
196 right.len()
197 }
198 };
199 unsafe { self.advance(count) };
200 count
201 }
202}
203
204/// Postponed producer.
205pub type PostponedProducer<T, R> = Producer<T, RbWrap<RbWriteCache<T, R>>>;
206
207impl<T, R: RbRef> PostponedProducer<T, R>
208where
209 R::Rb: RbWrite<T>,
210{
211 /// Create new postponed producer.
212 ///
213 /// # Safety
214 ///
215 /// There must be only one producer containing the same ring buffer reference.
216 pub unsafe fn new_postponed(target: R) -> Self {
217 Producer::new(RbWrap(RbWriteCache::new(target)))
218 }
219
220 /// Synchronize changes with the ring buffer.
221 ///
222 /// Postponed producer requires manual synchronization to make pushed items visible for the consumer.
223 pub fn sync(&mut self) {
224 self.target.0.sync();
225 }
226
227 /// Don't publish and drop items inserted since last synchronization.
228 pub fn discard(&mut self) {
229 self.target.0.discard();
230 }
231
232 /// Synchronize and transform back to immediate producer.
233 pub fn into_immediate(self) -> Producer<T, R> {
234 unsafe { Producer::new(self.target.0.release()) }
235 }
236}
237
238#[cfg(feature = "std")]
239impl<R: RbRef> Producer<u8, R>
240where
241 R::Rb: RbWrite<u8>,
242{
243 /// Reads at most `count` bytes from `Read` instance and appends them to the ring buffer.
244 /// If `count` is `None` then as much as possible bytes will be read.
245 ///
246 /// Returns `Ok(n)` if `read` succeeded. `n` is number of bytes been read.
247 /// `n == 0` means that either `read` returned zero or ring buffer is full.
248 ///
249 /// If `read` is failed then original error is returned. In this case it is guaranteed that no items was read from the reader.
250 /// To achieve this we read only one contiguous slice at once. So this call may read less than `remaining` items in the buffer even if the reader is ready to provide more.
251 pub fn read_from<P: Read>(
252 &mut self,
253 reader: &mut P,
254 count: Option<usize>,
255 ) -> io::Result<usize> {
256 let (left: &mut [MaybeUninit], _) = unsafe { self.free_space_as_slices() };
257 let count: usize = cmp::min(v1:count.unwrap_or(left.len()), v2:left.len());
258 let left_init: &mut [u8] = unsafe { slice_assume_init_mut(&mut left[..count]) };
259
260 let read_count: usize = reader.read(buf:left_init)?;
261 assert!(read_count <= count);
262 unsafe { self.advance(read_count) };
263 Ok(read_count)
264 }
265}
266
267#[cfg(feature = "std")]
268impl<R: RbRef> Write for Producer<u8, R>
269where
270 R::Rb: RbWrite<u8>,
271{
272 fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
273 let n: usize = self.push_slice(elems:buffer);
274 if n == 0 && !buffer.is_empty() {
275 Err(io::ErrorKind::WouldBlock.into())
276 } else {
277 Ok(n)
278 }
279 }
280
281 fn flush(&mut self) -> io::Result<()> {
282 Ok(())
283 }
284}
285
286impl<R: RbRef> core::fmt::Write for Producer<u8, R>
287where
288 R::Rb: RbWrite<u8>,
289{
290 fn write_str(&mut self, s: &str) -> core::fmt::Result {
291 let n: usize = self.push_slice(elems:s.as_bytes());
292 if n != s.len() {
293 Err(core::fmt::Error::default())
294 } else {
295 Ok(())
296 }
297 }
298}
299