1 | use crate::{ |
2 | ring_buffer::{RbBase, RbRead, RbReadCache, RbRef, RbWrap}, |
3 | utils::{slice_assume_init_mut, slice_assume_init_ref, write_uninit_slice}, |
4 | }; |
5 | use core::{cmp, iter::ExactSizeIterator, marker::PhantomData, mem::MaybeUninit}; |
6 | |
7 | #[cfg (feature = "std" )] |
8 | use std::io::{self, Read, Write}; |
9 | |
10 | /// Consumer part of ring buffer. |
11 | /// |
12 | /// # Mode |
13 | /// |
14 | /// It can operate in immediate (by default) or postponed mode. |
15 | /// Mode could be switched using [`Self::postponed`]/[`Self::into_postponed`] and [`Self::into_immediate`] methods. |
16 | /// |
17 | /// + In immediate mode removed and inserted items are automatically synchronized with the other end. |
18 | /// + In postponed mode synchronization occurs only when [`Self::sync`] or [`Self::into_immediate`] is called or when `Self` is dropped. |
19 | /// The reason to use postponed mode is that multiple subsequent operations are performed faster due to less frequent cache synchronization. |
20 | pub struct Consumer<T, R: RbRef> |
21 | where |
22 | R::Rb: RbRead<T>, |
23 | { |
24 | target: R, |
25 | _phantom: PhantomData<T>, |
26 | } |
27 | |
28 | impl<T, R: RbRef> Consumer<T, R> |
29 | where |
30 | R::Rb: RbRead<T>, |
31 | { |
32 | /// Creates consumer from the ring buffer reference. |
33 | /// |
34 | /// # Safety |
35 | /// |
36 | /// There must be only one consumer containing the same ring buffer reference. |
37 | pub unsafe fn new(target: R) -> Self { |
38 | Self { |
39 | target, |
40 | _phantom: PhantomData, |
41 | } |
42 | } |
43 | |
44 | /// Returns reference to the underlying ring buffer. |
45 | #[inline ] |
46 | pub fn rb(&self) -> &R::Rb { |
47 | &self.target |
48 | } |
49 | |
50 | /// Consumes `self` and returns underlying ring buffer reference. |
51 | pub fn into_rb_ref(self) -> R { |
52 | self.target |
53 | } |
54 | |
55 | /// Returns postponed consumer that borrows [`Self`]. |
56 | pub fn postponed(&mut self) -> Consumer<T, RbWrap<RbReadCache<T, &R::Rb>>> { |
57 | unsafe { Consumer::new(RbWrap(RbReadCache::new(&self.target))) } |
58 | } |
59 | |
60 | /// Transforms [`Self`] into postponed consumer. |
61 | pub fn into_postponed(self) -> Consumer<T, RbWrap<RbReadCache<T, R>>> { |
62 | unsafe { Consumer::new(RbWrap(RbReadCache::new(self.target))) } |
63 | } |
64 | |
65 | /// Returns capacity of the ring buffer. |
66 | /// |
67 | /// The capacity of the buffer is constant. |
68 | #[inline ] |
69 | pub fn capacity(&self) -> usize { |
70 | self.target.capacity_nonzero().get() |
71 | } |
72 | |
73 | /// Checks if the ring buffer is empty. |
74 | /// |
75 | /// *The result may become irrelevant at any time because of concurring producer activity.* |
76 | #[inline ] |
77 | pub fn is_empty(&self) -> bool { |
78 | self.target.is_empty() |
79 | } |
80 | |
81 | /// Checks if the ring buffer is full. |
82 | #[inline ] |
83 | pub fn is_full(&self) -> bool { |
84 | self.target.is_full() |
85 | } |
86 | |
87 | /// The number of items stored in the buffer. |
88 | /// |
89 | /// *Actual number may be greater than the returned value because of concurring producer activity.* |
90 | #[inline ] |
91 | pub fn len(&self) -> usize { |
92 | self.target.occupied_len() |
93 | } |
94 | |
95 | /// The number of remaining free places in the buffer. |
96 | /// |
97 | /// *Actual number may be less than the returned value because of concurring producer activity.* |
98 | #[inline ] |
99 | pub fn free_len(&self) -> usize { |
100 | self.target.vacant_len() |
101 | } |
102 | |
103 | /// Provides a direct access to the ring buffer occupied memory. |
104 | /// The difference from [`Self::as_slices`] is that this method provides slices of [`MaybeUninit<T>`], so items may be moved out of slices. |
105 | /// |
106 | /// Returns a pair of slices of stored items, the second one may be empty. |
107 | /// Elements with lower indices in slice are older. First slice contains older items that second one. |
108 | /// |
109 | /// # Safety |
110 | /// |
111 | /// All items are initialized. Elements must be removed starting from the beginning of first slice. |
112 | /// When all items are removed from the first slice then items must be removed from the beginning of the second slice. |
113 | /// |
114 | /// *This method must be followed by [`Self::advance`] call with the number of items being removed previously as argument.* |
115 | /// *No other mutating calls allowed before that.* |
116 | #[inline ] |
117 | pub unsafe fn as_uninit_slices(&self) -> (&[MaybeUninit<T>], &[MaybeUninit<T>]) { |
118 | let (left, right) = self.target.occupied_slices(); |
119 | (left as &[_], right as &[_]) |
120 | } |
121 | |
122 | /// Provides a direct mutable access to the ring buffer occupied memory. |
123 | /// |
124 | /// Same as [`Self::as_uninit_slices`]. |
125 | /// |
126 | /// # Safety |
127 | /// |
128 | /// See [`Self::as_uninit_slices`]. |
129 | #[inline ] |
130 | pub unsafe fn as_mut_uninit_slices(&self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) { |
131 | self.target.occupied_slices() |
132 | } |
133 | |
134 | /// Moves `head` target by `count` places. |
135 | /// |
136 | /// # Safety |
137 | /// |
138 | /// First `count` items in occupied memory must be moved out or dropped. |
139 | #[inline ] |
140 | pub unsafe fn advance(&mut self, count: usize) { |
141 | self.target.advance_head(count); |
142 | } |
143 | |
144 | /// Returns a pair of slices which contain, in order, the contents of the ring buffer. |
145 | #[inline ] |
146 | pub fn as_slices(&self) -> (&[T], &[T]) { |
147 | unsafe { |
148 | let (left, right) = self.as_uninit_slices(); |
149 | (slice_assume_init_ref(left), slice_assume_init_ref(right)) |
150 | } |
151 | } |
152 | |
153 | /// Returns a pair of mutable slices which contain, in order, the contents of the ring buffer. |
154 | #[inline ] |
155 | pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) { |
156 | unsafe { |
157 | let (left, right) = self.as_mut_uninit_slices(); |
158 | (slice_assume_init_mut(left), slice_assume_init_mut(right)) |
159 | } |
160 | } |
161 | |
162 | /// Removes latest item from the ring buffer and returns it. |
163 | /// |
164 | /// Returns `None` if the ring buffer is empty. |
165 | pub fn pop(&mut self) -> Option<T> { |
166 | if !self.is_empty() { |
167 | let elem = unsafe { |
168 | self.as_uninit_slices() |
169 | .0 |
170 | .get_unchecked(0) |
171 | .assume_init_read() |
172 | }; |
173 | unsafe { self.advance(1) }; |
174 | Some(elem) |
175 | } else { |
176 | None |
177 | } |
178 | } |
179 | |
180 | /// Returns an iterator that removes items one by one from the ring buffer. |
181 | /// |
182 | /// Iterator provides only items that are available for consumer at the moment of `pop_iter` call, it will not contain new items added after it was created. |
183 | /// |
184 | /// *Information about removed items is commited to the buffer only when iterator is destroyed.* |
185 | pub fn pop_iter(&mut self) -> PopIterator<'_, T, R> { |
186 | PopIterator::new(&self.target) |
187 | } |
188 | |
189 | /// Returns a front-to-back iterator containing references to items in the ring buffer. |
190 | /// |
191 | /// This iterator does not remove items out of the ring buffer. |
192 | pub fn iter(&self) -> impl Iterator<Item = &T> + '_ { |
193 | let (left, right) = self.as_slices(); |
194 | left.iter().chain(right.iter()) |
195 | } |
196 | |
197 | /// Returns a front-to-back iterator that returns mutable references to items in the ring buffer. |
198 | /// |
199 | /// This iterator does not remove items out of the ring buffer. |
200 | pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> + '_ { |
201 | let (left, right) = self.as_mut_slices(); |
202 | left.iter_mut().chain(right.iter_mut()) |
203 | } |
204 | |
205 | /// Removes at most `n` and at least `min(n, Self::len())` items from the buffer and safely drops them. |
206 | /// |
207 | /// If there is no concurring producer activity then exactly `min(n, Self::len())` items are removed. |
208 | /// |
209 | /// Returns the number of deleted items. |
210 | /// |
211 | #[cfg_attr ( |
212 | feature = "alloc" , |
213 | doc = r##" |
214 | ```rust |
215 | # extern crate ringbuf; |
216 | # use ringbuf::HeapRb; |
217 | # fn main() { |
218 | let target = HeapRb::<i32>::new(8); |
219 | let (mut prod, mut cons) = target.split(); |
220 | |
221 | assert_eq!(prod.push_iter(&mut (0..8)), 8); |
222 | |
223 | assert_eq!(cons.skip(4), 4); |
224 | assert_eq!(cons.skip(8), 4); |
225 | assert_eq!(cons.skip(8), 0); |
226 | # } |
227 | ``` |
228 | "## |
229 | )] |
230 | pub fn skip(&mut self, count: usize) -> usize { |
231 | let count = cmp::min(count, self.len()); |
232 | assert_eq!(unsafe { self.target.skip_internal(Some(count)) }, count); |
233 | count |
234 | } |
235 | |
236 | /// Removes all items from the buffer and safely drops them. |
237 | /// |
238 | /// Returns the number of deleted items. |
239 | pub fn clear(&mut self) -> usize { |
240 | unsafe { self.target.skip_internal(None) } |
241 | } |
242 | } |
243 | |
244 | /// An iterator that removes items from the ring buffer. |
245 | pub struct PopIterator<'a, T, R: RbRef + ?Sized> |
246 | where |
247 | R::Rb: RbRead<T>, |
248 | { |
249 | target: &'a R, |
250 | slices: (&'a [MaybeUninit<T>], &'a [MaybeUninit<T>]), |
251 | initial_len: usize, |
252 | } |
253 | |
254 | impl<'a, T, R: RbRef + ?Sized> PopIterator<'a, T, R> |
255 | where |
256 | R::Rb: RbRead<T>, |
257 | { |
258 | pub(crate) fn new(target: &'a R) -> Self { |
259 | let slices: (&mut [MaybeUninit], &mut …) = unsafe { target.occupied_slices() }; |
260 | Self { |
261 | target, |
262 | initial_len: slices.0.len() + slices.1.len(), |
263 | slices: (slices.0, slices.1), |
264 | } |
265 | } |
266 | } |
267 | |
268 | impl<'a, T, R: RbRef + ?Sized> Iterator for PopIterator<'a, T, R> |
269 | where |
270 | R::Rb: RbRead<T>, |
271 | { |
272 | type Item = T; |
273 | #[inline ] |
274 | fn next(&mut self) -> Option<T> { |
275 | match self.slices.0.len() { |
276 | 0 => None, |
277 | n: usize => { |
278 | let item: T = unsafe { self.slices.0.get_unchecked(index:0).assume_init_read() }; |
279 | if n == 1 { |
280 | (self.slices.0, self.slices.1) = (self.slices.1, &[]); |
281 | } else { |
282 | self.slices.0 = unsafe { self.slices.0.get_unchecked(index:1..n) }; |
283 | } |
284 | Some(item) |
285 | } |
286 | } |
287 | } |
288 | #[inline ] |
289 | fn size_hint(&self) -> (usize, Option<usize>) { |
290 | (self.len(), Some(self.len())) |
291 | } |
292 | } |
293 | |
294 | impl<'a, T, R: RbRef + ?Sized> ExactSizeIterator for PopIterator<'a, T, R> |
295 | where |
296 | R::Rb: RbRead<T>, |
297 | { |
298 | fn len(&self) -> usize { |
299 | self.slices.0.len() + self.slices.1.len() |
300 | } |
301 | } |
302 | |
303 | impl<'a, T, R: RbRef + ?Sized> Drop for PopIterator<'a, T, R> |
304 | where |
305 | R::Rb: RbRead<T>, |
306 | { |
307 | fn drop(&mut self) { |
308 | unsafe { self.target.advance_head(self.initial_len - self.len()) }; |
309 | } |
310 | } |
311 | |
312 | impl<T: Copy, R: RbRef> Consumer<T, R> |
313 | where |
314 | R::Rb: RbRead<T>, |
315 | { |
316 | /// Removes first items from the ring buffer and writes them into a slice. |
317 | /// Elements must be [`Copy`]. |
318 | /// |
319 | /// Returns count of items been removed from the ring buffer. |
320 | pub fn pop_slice(&mut self, elems: &mut [T]) -> usize { |
321 | let (left, right) = unsafe { self.as_uninit_slices() }; |
322 | let count = if elems.len() < left.len() { |
323 | unsafe { write_uninit_slice(elems, &left[..elems.len()]) }; |
324 | elems.len() |
325 | } else { |
326 | let (left_elems, elems) = elems.split_at_mut(left.len()); |
327 | unsafe { write_uninit_slice(left_elems, left) }; |
328 | left.len() |
329 | + if elems.len() < right.len() { |
330 | unsafe { write_uninit_slice(elems, &right[..elems.len()]) }; |
331 | elems.len() |
332 | } else { |
333 | unsafe { write_uninit_slice(&mut elems[..right.len()], right) }; |
334 | right.len() |
335 | } |
336 | }; |
337 | unsafe { self.advance(count) }; |
338 | count |
339 | } |
340 | } |
341 | |
342 | /// Postponed consumer. |
343 | pub type PostponedConsumer<T, R> = Consumer<T, RbWrap<RbReadCache<T, R>>>; |
344 | |
345 | impl<T, R: RbRef> PostponedConsumer<T, R> |
346 | where |
347 | R::Rb: RbRead<T>, |
348 | { |
349 | /// Create new postponed consumer. |
350 | /// |
351 | /// # Safety |
352 | /// |
353 | /// There must be only one consumer containing the same ring buffer reference. |
354 | pub unsafe fn new_postponed(target: R) -> Self { |
355 | Consumer::new(target:RbWrap(RbReadCache::new(rb_ref:target))) |
356 | } |
357 | |
358 | /// Synchronize changes with the ring buffer. |
359 | /// |
360 | /// Postponed consumer requires manual synchronization to make freed space visible for the producer. |
361 | pub fn sync(&mut self) { |
362 | self.target.0.sync(); |
363 | } |
364 | |
365 | /// Synchronize and transform back to immediate consumer. |
366 | pub fn into_immediate(self) -> Consumer<T, R> { |
367 | unsafe { Consumer::new(self.target.0.release()) } |
368 | } |
369 | } |
370 | |
371 | #[cfg (feature = "std" )] |
372 | impl<R: RbRef> Consumer<u8, R> |
373 | where |
374 | R::Rb: RbRead<u8>, |
375 | { |
376 | /// Removes at most first `count` bytes from the ring buffer and writes them into a [`Write`] instance. |
377 | /// If `count` is `None` then as much as possible bytes will be written. |
378 | /// |
379 | /// Returns `Ok(n)` if `write` succeeded. `n` is number of bytes been written. |
380 | /// `n == 0` means that either `write` returned zero or ring buffer is empty. |
381 | /// |
382 | /// If `write` is failed then original error is returned. In this case it is guaranteed that no items was written to the writer. |
383 | /// To achieve this we write only one contiguous slice at once. So this call may write less than `len` items even if the writer is ready to get more. |
384 | pub fn write_into<P: Write>( |
385 | &mut self, |
386 | writer: &mut P, |
387 | count: Option<usize>, |
388 | ) -> io::Result<usize> { |
389 | let (left: &[MaybeUninit], _) = unsafe { self.as_uninit_slices() }; |
390 | let count: usize = cmp::min(v1:count.unwrap_or(left.len()), v2:left.len()); |
391 | let left_init: &[u8] = unsafe { slice_assume_init_ref(&left[..count]) }; |
392 | |
393 | let write_count: usize = writer.write(buf:left_init)?; |
394 | assert!(write_count <= count); |
395 | unsafe { self.advance(write_count) }; |
396 | Ok(write_count) |
397 | } |
398 | } |
399 | |
400 | #[cfg (feature = "std" )] |
401 | impl<R: RbRef> Read for Consumer<u8, R> |
402 | where |
403 | R::Rb: RbRead<u8>, |
404 | { |
405 | fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> { |
406 | let n: usize = self.pop_slice(elems:buffer); |
407 | if n == 0 && !buffer.is_empty() { |
408 | Err(io::ErrorKind::WouldBlock.into()) |
409 | } else { |
410 | Ok(n) |
411 | } |
412 | } |
413 | } |
414 | |