1 | //! This module contains the parallel iterator types for double-ended queues |
2 | //! (`VecDeque<T>`). You will rarely need to interact with it directly |
3 | //! unless you have need to name one of the iterator types. |
4 | |
5 | use std::collections::VecDeque; |
6 | use std::ops::{Range, RangeBounds}; |
7 | |
8 | use crate::iter::plumbing::*; |
9 | use crate::iter::*; |
10 | use crate::math::simplify_range; |
11 | |
12 | use crate::slice; |
13 | use crate::vec; |
14 | |
15 | /// Parallel iterator over a double-ended queue |
16 | #[derive (Debug, Clone)] |
17 | pub struct IntoIter<T: Send> { |
18 | inner: vec::IntoIter<T>, |
19 | } |
20 | |
21 | impl<T: Send> IntoParallelIterator for VecDeque<T> { |
22 | type Item = T; |
23 | type Iter = IntoIter<T>; |
24 | |
25 | fn into_par_iter(self) -> Self::Iter { |
26 | // NOTE: requires data movement if the deque doesn't start at offset 0. |
27 | let inner: IntoIter = Vec::from(self).into_par_iter(); |
28 | IntoIter { inner } |
29 | } |
30 | } |
31 | |
32 | delegate_indexed_iterator! { |
33 | IntoIter<T> => T, |
34 | impl<T: Send> |
35 | } |
36 | |
37 | /// Parallel iterator over an immutable reference to a double-ended queue |
38 | #[derive (Debug)] |
39 | pub struct Iter<'a, T: Sync> { |
40 | inner: Chain<slice::Iter<'a, T>, slice::Iter<'a, T>>, |
41 | } |
42 | |
43 | impl<'a, T: Sync> Clone for Iter<'a, T> { |
44 | fn clone(&self) -> Self { |
45 | Iter { |
46 | inner: self.inner.clone(), |
47 | } |
48 | } |
49 | } |
50 | |
51 | impl<'a, T: Sync> IntoParallelIterator for &'a VecDeque<T> { |
52 | type Item = &'a T; |
53 | type Iter = Iter<'a, T>; |
54 | |
55 | fn into_par_iter(self) -> Self::Iter { |
56 | let (a: &[T], b: &[T]) = self.as_slices(); |
57 | Iter { |
58 | inner: a.into_par_iter().chain(b), |
59 | } |
60 | } |
61 | } |
62 | |
63 | delegate_indexed_iterator! { |
64 | Iter<'a, T> => &'a T, |
65 | impl<'a, T: Sync + 'a> |
66 | } |
67 | |
68 | /// Parallel iterator over a mutable reference to a double-ended queue |
69 | #[derive (Debug)] |
70 | pub struct IterMut<'a, T: Send> { |
71 | inner: Chain<slice::IterMut<'a, T>, slice::IterMut<'a, T>>, |
72 | } |
73 | |
74 | impl<'a, T: Send> IntoParallelIterator for &'a mut VecDeque<T> { |
75 | type Item = &'a mut T; |
76 | type Iter = IterMut<'a, T>; |
77 | |
78 | fn into_par_iter(self) -> Self::Iter { |
79 | let (a: &mut [T], b: &mut [T]) = self.as_mut_slices(); |
80 | IterMut { |
81 | inner: a.into_par_iter().chain(b), |
82 | } |
83 | } |
84 | } |
85 | |
86 | delegate_indexed_iterator! { |
87 | IterMut<'a, T> => &'a mut T, |
88 | impl<'a, T: Send + 'a> |
89 | } |
90 | |
91 | /// Draining parallel iterator that moves a range out of a double-ended queue, |
92 | /// but keeps the total capacity. |
93 | #[derive (Debug)] |
94 | pub struct Drain<'a, T: Send> { |
95 | deque: &'a mut VecDeque<T>, |
96 | range: Range<usize>, |
97 | orig_len: usize, |
98 | } |
99 | |
100 | impl<'a, T: Send> ParallelDrainRange<usize> for &'a mut VecDeque<T> { |
101 | type Iter = Drain<'a, T>; |
102 | type Item = T; |
103 | |
104 | fn par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter { |
105 | Drain { |
106 | orig_len: self.len(), |
107 | range: simplify_range(range, self.len()), |
108 | deque: self, |
109 | } |
110 | } |
111 | } |
112 | |
113 | impl<'a, T: Send> ParallelIterator for Drain<'a, T> { |
114 | type Item = T; |
115 | |
116 | fn drive_unindexed<C>(self, consumer: C) -> C::Result |
117 | where |
118 | C: UnindexedConsumer<Self::Item>, |
119 | { |
120 | bridge(self, consumer) |
121 | } |
122 | |
123 | fn opt_len(&self) -> Option<usize> { |
124 | Some(self.len()) |
125 | } |
126 | } |
127 | |
128 | impl<'a, T: Send> IndexedParallelIterator for Drain<'a, T> { |
129 | fn drive<C>(self, consumer: C) -> C::Result |
130 | where |
131 | C: Consumer<Self::Item>, |
132 | { |
133 | bridge(self, consumer) |
134 | } |
135 | |
136 | fn len(&self) -> usize { |
137 | self.range.len() |
138 | } |
139 | |
140 | fn with_producer<CB>(self, callback: CB) -> CB::Output |
141 | where |
142 | CB: ProducerCallback<Self::Item>, |
143 | { |
144 | // NOTE: requires data movement if the deque doesn't start at offset 0. |
145 | superDrain<'_, T>::DrainGuard::new(self.deque) |
146 | .par_drain(self.range.clone()) |
147 | .with_producer(callback) |
148 | } |
149 | } |
150 | |
151 | impl<'a, T: Send> Drop for Drain<'a, T> { |
152 | fn drop(&mut self) { |
153 | if self.deque.len() != self.orig_len - self.range.len() { |
154 | // We must not have produced, so just call a normal drain to remove the items. |
155 | assert_eq!(self.deque.len(), self.orig_len); |
156 | self.deque.drain(self.range.clone()); |
157 | } |
158 | } |
159 | } |
160 | |