1use crate::{
2 ring_buffer::{RbBase, RbRead, RbReadCache, RbRef, RbWrap},
3 utils::{slice_assume_init_mut, slice_assume_init_ref, write_uninit_slice},
4};
5use core::{cmp, iter::ExactSizeIterator, marker::PhantomData, mem::MaybeUninit};
6
7#[cfg(feature = "std")]
8use std::io::{self, Read, Write};
9
10/// Consumer part of ring buffer.
11///
12/// # Mode
13///
14/// It can operate in immediate (by default) or postponed mode.
15/// Mode could be switched using [`Self::postponed`]/[`Self::into_postponed`] and [`Self::into_immediate`] methods.
16///
17/// + In immediate mode removed and inserted items are automatically synchronized with the other end.
18/// + In postponed mode synchronization occurs only when [`Self::sync`] or [`Self::into_immediate`] is called or when `Self` is dropped.
19/// The reason to use postponed mode is that multiple subsequent operations are performed faster due to less frequent cache synchronization.
20pub struct Consumer<T, R: RbRef>
21where
22 R::Rb: RbRead<T>,
23{
24 target: R,
25 _phantom: PhantomData<T>,
26}
27
28impl<T, R: RbRef> Consumer<T, R>
29where
30 R::Rb: RbRead<T>,
31{
32 /// Creates consumer from the ring buffer reference.
33 ///
34 /// # Safety
35 ///
36 /// There must be only one consumer containing the same ring buffer reference.
37 pub unsafe fn new(target: R) -> Self {
38 Self {
39 target,
40 _phantom: PhantomData,
41 }
42 }
43
44 /// Returns reference to the underlying ring buffer.
45 #[inline]
46 pub fn rb(&self) -> &R::Rb {
47 &self.target
48 }
49
50 /// Consumes `self` and returns underlying ring buffer reference.
51 pub fn into_rb_ref(self) -> R {
52 self.target
53 }
54
55 /// Returns postponed consumer that borrows [`Self`].
56 pub fn postponed(&mut self) -> Consumer<T, RbWrap<RbReadCache<T, &R::Rb>>> {
57 unsafe { Consumer::new(RbWrap(RbReadCache::new(&self.target))) }
58 }
59
60 /// Transforms [`Self`] into postponed consumer.
61 pub fn into_postponed(self) -> Consumer<T, RbWrap<RbReadCache<T, R>>> {
62 unsafe { Consumer::new(RbWrap(RbReadCache::new(self.target))) }
63 }
64
65 /// Returns capacity of the ring buffer.
66 ///
67 /// The capacity of the buffer is constant.
68 #[inline]
69 pub fn capacity(&self) -> usize {
70 self.target.capacity_nonzero().get()
71 }
72
73 /// Checks if the ring buffer is empty.
74 ///
75 /// *The result may become irrelevant at any time because of concurring producer activity.*
76 #[inline]
77 pub fn is_empty(&self) -> bool {
78 self.target.is_empty()
79 }
80
81 /// Checks if the ring buffer is full.
82 #[inline]
83 pub fn is_full(&self) -> bool {
84 self.target.is_full()
85 }
86
87 /// The number of items stored in the buffer.
88 ///
89 /// *Actual number may be greater than the returned value because of concurring producer activity.*
90 #[inline]
91 pub fn len(&self) -> usize {
92 self.target.occupied_len()
93 }
94
95 /// The number of remaining free places in the buffer.
96 ///
97 /// *Actual number may be less than the returned value because of concurring producer activity.*
98 #[inline]
99 pub fn free_len(&self) -> usize {
100 self.target.vacant_len()
101 }
102
103 /// Provides a direct access to the ring buffer occupied memory.
104 /// The difference from [`Self::as_slices`] is that this method provides slices of [`MaybeUninit<T>`], so items may be moved out of slices.
105 ///
106 /// Returns a pair of slices of stored items, the second one may be empty.
107 /// Elements with lower indices in slice are older. First slice contains older items that second one.
108 ///
109 /// # Safety
110 ///
111 /// All items are initialized. Elements must be removed starting from the beginning of first slice.
112 /// When all items are removed from the first slice then items must be removed from the beginning of the second slice.
113 ///
114 /// *This method must be followed by [`Self::advance`] call with the number of items being removed previously as argument.*
115 /// *No other mutating calls allowed before that.*
116 #[inline]
117 pub unsafe fn as_uninit_slices(&self) -> (&[MaybeUninit<T>], &[MaybeUninit<T>]) {
118 let (left, right) = self.target.occupied_slices();
119 (left as &[_], right as &[_])
120 }
121
122 /// Provides a direct mutable access to the ring buffer occupied memory.
123 ///
124 /// Same as [`Self::as_uninit_slices`].
125 ///
126 /// # Safety
127 ///
128 /// See [`Self::as_uninit_slices`].
129 #[inline]
130 pub unsafe fn as_mut_uninit_slices(&self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) {
131 self.target.occupied_slices()
132 }
133
134 /// Moves `head` target by `count` places.
135 ///
136 /// # Safety
137 ///
138 /// First `count` items in occupied memory must be moved out or dropped.
139 #[inline]
140 pub unsafe fn advance(&mut self, count: usize) {
141 self.target.advance_head(count);
142 }
143
144 /// Returns a pair of slices which contain, in order, the contents of the ring buffer.
145 #[inline]
146 pub fn as_slices(&self) -> (&[T], &[T]) {
147 unsafe {
148 let (left, right) = self.as_uninit_slices();
149 (slice_assume_init_ref(left), slice_assume_init_ref(right))
150 }
151 }
152
153 /// Returns a pair of mutable slices which contain, in order, the contents of the ring buffer.
154 #[inline]
155 pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
156 unsafe {
157 let (left, right) = self.as_mut_uninit_slices();
158 (slice_assume_init_mut(left), slice_assume_init_mut(right))
159 }
160 }
161
162 /// Removes latest item from the ring buffer and returns it.
163 ///
164 /// Returns `None` if the ring buffer is empty.
165 pub fn pop(&mut self) -> Option<T> {
166 if !self.is_empty() {
167 let elem = unsafe {
168 self.as_uninit_slices()
169 .0
170 .get_unchecked(0)
171 .assume_init_read()
172 };
173 unsafe { self.advance(1) };
174 Some(elem)
175 } else {
176 None
177 }
178 }
179
180 /// Returns an iterator that removes items one by one from the ring buffer.
181 ///
182 /// Iterator provides only items that are available for consumer at the moment of `pop_iter` call, it will not contain new items added after it was created.
183 ///
184 /// *Information about removed items is commited to the buffer only when iterator is destroyed.*
185 pub fn pop_iter(&mut self) -> PopIterator<'_, T, R> {
186 PopIterator::new(&self.target)
187 }
188
189 /// Returns a front-to-back iterator containing references to items in the ring buffer.
190 ///
191 /// This iterator does not remove items out of the ring buffer.
192 pub fn iter(&self) -> impl Iterator<Item = &T> + '_ {
193 let (left, right) = self.as_slices();
194 left.iter().chain(right.iter())
195 }
196
197 /// Returns a front-to-back iterator that returns mutable references to items in the ring buffer.
198 ///
199 /// This iterator does not remove items out of the ring buffer.
200 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> + '_ {
201 let (left, right) = self.as_mut_slices();
202 left.iter_mut().chain(right.iter_mut())
203 }
204
205 /// Removes at most `n` and at least `min(n, Self::len())` items from the buffer and safely drops them.
206 ///
207 /// If there is no concurring producer activity then exactly `min(n, Self::len())` items are removed.
208 ///
209 /// Returns the number of deleted items.
210 ///
211 #[cfg_attr(
212 feature = "alloc",
213 doc = r##"
214```rust
215# extern crate ringbuf;
216# use ringbuf::HeapRb;
217# fn main() {
218let target = HeapRb::<i32>::new(8);
219let (mut prod, mut cons) = target.split();
220
221assert_eq!(prod.push_iter(&mut (0..8)), 8);
222
223assert_eq!(cons.skip(4), 4);
224assert_eq!(cons.skip(8), 4);
225assert_eq!(cons.skip(8), 0);
226# }
227```
228"##
229 )]
230 pub fn skip(&mut self, count: usize) -> usize {
231 let count = cmp::min(count, self.len());
232 assert_eq!(unsafe { self.target.skip_internal(Some(count)) }, count);
233 count
234 }
235
236 /// Removes all items from the buffer and safely drops them.
237 ///
238 /// Returns the number of deleted items.
239 pub fn clear(&mut self) -> usize {
240 unsafe { self.target.skip_internal(None) }
241 }
242}
243
244/// An iterator that removes items from the ring buffer.
245pub struct PopIterator<'a, T, R: RbRef + ?Sized>
246where
247 R::Rb: RbRead<T>,
248{
249 target: &'a R,
250 slices: (&'a [MaybeUninit<T>], &'a [MaybeUninit<T>]),
251 initial_len: usize,
252}
253
254impl<'a, T, R: RbRef + ?Sized> PopIterator<'a, T, R>
255where
256 R::Rb: RbRead<T>,
257{
258 pub(crate) fn new(target: &'a R) -> Self {
259 let slices: (&mut [MaybeUninit], &mut …) = unsafe { target.occupied_slices() };
260 Self {
261 target,
262 initial_len: slices.0.len() + slices.1.len(),
263 slices: (slices.0, slices.1),
264 }
265 }
266}
267
268impl<'a, T, R: RbRef + ?Sized> Iterator for PopIterator<'a, T, R>
269where
270 R::Rb: RbRead<T>,
271{
272 type Item = T;
273 #[inline]
274 fn next(&mut self) -> Option<T> {
275 match self.slices.0.len() {
276 0 => None,
277 n: usize => {
278 let item: T = unsafe { self.slices.0.get_unchecked(index:0).assume_init_read() };
279 if n == 1 {
280 (self.slices.0, self.slices.1) = (self.slices.1, &[]);
281 } else {
282 self.slices.0 = unsafe { self.slices.0.get_unchecked(index:1..n) };
283 }
284 Some(item)
285 }
286 }
287 }
288 #[inline]
289 fn size_hint(&self) -> (usize, Option<usize>) {
290 (self.len(), Some(self.len()))
291 }
292}
293
294impl<'a, T, R: RbRef + ?Sized> ExactSizeIterator for PopIterator<'a, T, R>
295where
296 R::Rb: RbRead<T>,
297{
298 fn len(&self) -> usize {
299 self.slices.0.len() + self.slices.1.len()
300 }
301}
302
303impl<'a, T, R: RbRef + ?Sized> Drop for PopIterator<'a, T, R>
304where
305 R::Rb: RbRead<T>,
306{
307 fn drop(&mut self) {
308 unsafe { self.target.advance_head(self.initial_len - self.len()) };
309 }
310}
311
312impl<T: Copy, R: RbRef> Consumer<T, R>
313where
314 R::Rb: RbRead<T>,
315{
316 /// Removes first items from the ring buffer and writes them into a slice.
317 /// Elements must be [`Copy`].
318 ///
319 /// Returns count of items been removed from the ring buffer.
320 pub fn pop_slice(&mut self, elems: &mut [T]) -> usize {
321 let (left, right) = unsafe { self.as_uninit_slices() };
322 let count = if elems.len() < left.len() {
323 unsafe { write_uninit_slice(elems, &left[..elems.len()]) };
324 elems.len()
325 } else {
326 let (left_elems, elems) = elems.split_at_mut(left.len());
327 unsafe { write_uninit_slice(left_elems, left) };
328 left.len()
329 + if elems.len() < right.len() {
330 unsafe { write_uninit_slice(elems, &right[..elems.len()]) };
331 elems.len()
332 } else {
333 unsafe { write_uninit_slice(&mut elems[..right.len()], right) };
334 right.len()
335 }
336 };
337 unsafe { self.advance(count) };
338 count
339 }
340}
341
342/// Postponed consumer.
343pub type PostponedConsumer<T, R> = Consumer<T, RbWrap<RbReadCache<T, R>>>;
344
345impl<T, R: RbRef> PostponedConsumer<T, R>
346where
347 R::Rb: RbRead<T>,
348{
349 /// Create new postponed consumer.
350 ///
351 /// # Safety
352 ///
353 /// There must be only one consumer containing the same ring buffer reference.
354 pub unsafe fn new_postponed(target: R) -> Self {
355 Consumer::new(target:RbWrap(RbReadCache::new(rb_ref:target)))
356 }
357
358 /// Synchronize changes with the ring buffer.
359 ///
360 /// Postponed consumer requires manual synchronization to make freed space visible for the producer.
361 pub fn sync(&mut self) {
362 self.target.0.sync();
363 }
364
365 /// Synchronize and transform back to immediate consumer.
366 pub fn into_immediate(self) -> Consumer<T, R> {
367 unsafe { Consumer::new(self.target.0.release()) }
368 }
369}
370
371#[cfg(feature = "std")]
372impl<R: RbRef> Consumer<u8, R>
373where
374 R::Rb: RbRead<u8>,
375{
376 /// Removes at most first `count` bytes from the ring buffer and writes them into a [`Write`] instance.
377 /// If `count` is `None` then as much as possible bytes will be written.
378 ///
379 /// Returns `Ok(n)` if `write` succeeded. `n` is number of bytes been written.
380 /// `n == 0` means that either `write` returned zero or ring buffer is empty.
381 ///
382 /// If `write` is failed then original error is returned. In this case it is guaranteed that no items was written to the writer.
383 /// To achieve this we write only one contiguous slice at once. So this call may write less than `len` items even if the writer is ready to get more.
384 pub fn write_into<P: Write>(
385 &mut self,
386 writer: &mut P,
387 count: Option<usize>,
388 ) -> io::Result<usize> {
389 let (left: &[MaybeUninit], _) = unsafe { self.as_uninit_slices() };
390 let count: usize = cmp::min(v1:count.unwrap_or(left.len()), v2:left.len());
391 let left_init: &[u8] = unsafe { slice_assume_init_ref(&left[..count]) };
392
393 let write_count: usize = writer.write(buf:left_init)?;
394 assert!(write_count <= count);
395 unsafe { self.advance(write_count) };
396 Ok(write_count)
397 }
398}
399
400#[cfg(feature = "std")]
401impl<R: RbRef> Read for Consumer<u8, R>
402where
403 R::Rb: RbRead<u8>,
404{
405 fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
406 let n: usize = self.pop_slice(elems:buffer);
407 if n == 0 && !buffer.is_empty() {
408 Err(io::ErrorKind::WouldBlock.into())
409 } else {
410 Ok(n)
411 }
412 }
413}
414