| 1 | use crate::raw::Bucket; |
| 2 | use crate::raw::{Allocator, Global, RawIter, RawIterRange, RawTable}; |
| 3 | use crate::scopeguard::guard; |
| 4 | use core::marker::PhantomData; |
| 5 | use core::mem; |
| 6 | use core::ptr::NonNull; |
| 7 | use rayon::iter::{ |
| 8 | plumbing::{self, Folder, UnindexedConsumer, UnindexedProducer}, |
| 9 | ParallelIterator, |
| 10 | }; |
| 11 | |
| 12 | /// Parallel iterator which returns a raw pointer to every full bucket in the table. |
| 13 | pub struct RawParIter<T> { |
| 14 | iter: RawIterRange<T>, |
| 15 | } |
| 16 | |
| 17 | impl<T> RawParIter<T> { |
| 18 | #[cfg_attr (feature = "inline-more" , inline)] |
| 19 | pub(super) unsafe fn iter(&self) -> RawIterRange<T> { |
| 20 | self.iter.clone() |
| 21 | } |
| 22 | } |
| 23 | |
| 24 | impl<T> Clone for RawParIter<T> { |
| 25 | #[cfg_attr (feature = "inline-more" , inline)] |
| 26 | fn clone(&self) -> Self { |
| 27 | Self { |
| 28 | iter: self.iter.clone(), |
| 29 | } |
| 30 | } |
| 31 | } |
| 32 | |
| 33 | impl<T> From<RawIter<T>> for RawParIter<T> { |
| 34 | fn from(it: RawIter<T>) -> Self { |
| 35 | RawParIter { iter: it.iter } |
| 36 | } |
| 37 | } |
| 38 | |
| 39 | impl<T> ParallelIterator for RawParIter<T> { |
| 40 | type Item = Bucket<T>; |
| 41 | |
| 42 | #[cfg_attr (feature = "inline-more" , inline)] |
| 43 | fn drive_unindexed<C>(self, consumer: C) -> C::Result |
| 44 | where |
| 45 | C: UnindexedConsumer<Self::Item>, |
| 46 | { |
| 47 | let producer: ParIterProducer = ParIterProducer { iter: self.iter }; |
| 48 | plumbing::bridge_unindexed(producer, consumer) |
| 49 | } |
| 50 | } |
| 51 | |
| 52 | /// Producer which returns a `Bucket<T>` for every element. |
| 53 | struct ParIterProducer<T> { |
| 54 | iter: RawIterRange<T>, |
| 55 | } |
| 56 | |
| 57 | impl<T> UnindexedProducer for ParIterProducer<T> { |
| 58 | type Item = Bucket<T>; |
| 59 | |
| 60 | #[cfg_attr (feature = "inline-more" , inline)] |
| 61 | fn split(self) -> (Self, Option<Self>) { |
| 62 | let (left: RawIterRange, right: Option>) = self.iter.split(); |
| 63 | let left: ParIterProducer = ParIterProducer { iter: left }; |
| 64 | let right: Option> = right.map(|right: RawIterRange| ParIterProducer { iter: right }); |
| 65 | (left, right) |
| 66 | } |
| 67 | |
| 68 | #[cfg_attr (feature = "inline-more" , inline)] |
| 69 | fn fold_with<F>(self, folder: F) -> F |
| 70 | where |
| 71 | F: Folder<Self::Item>, |
| 72 | { |
| 73 | folder.consume_iter(self.iter) |
| 74 | } |
| 75 | } |
| 76 | |
| 77 | /// Parallel iterator which consumes a table and returns elements. |
| 78 | pub struct RawIntoParIter<T, A: Allocator = Global> { |
| 79 | table: RawTable<T, A>, |
| 80 | } |
| 81 | |
| 82 | impl<T, A: Allocator> RawIntoParIter<T, A> { |
| 83 | #[cfg_attr (feature = "inline-more" , inline)] |
| 84 | pub(super) unsafe fn par_iter(&self) -> RawParIter<T> { |
| 85 | self.table.par_iter() |
| 86 | } |
| 87 | } |
| 88 | |
| 89 | impl<T: Send, A: Allocator + Send> ParallelIterator for RawIntoParIter<T, A> { |
| 90 | type Item = T; |
| 91 | |
| 92 | #[cfg_attr (feature = "inline-more" , inline)] |
| 93 | fn drive_unindexed<C>(self, consumer: C) -> C::Result |
| 94 | where |
| 95 | C: UnindexedConsumer<Self::Item>, |
| 96 | { |
| 97 | let iter: RawIterRange = unsafe { self.table.iter().iter }; |
| 98 | let _guard: ScopeGuard = guard(self.table.into_allocation(), |alloc: &mut Option<(NonNull, Layout, …)>| { |
| 99 | if let Some((ptr: NonNull, layout: Layout, ref alloc: &A)) = *alloc { |
| 100 | unsafe { |
| 101 | alloc.deallocate(ptr, layout); |
| 102 | } |
| 103 | } |
| 104 | }); |
| 105 | let producer: ParDrainProducer = ParDrainProducer { iter }; |
| 106 | plumbing::bridge_unindexed(producer, consumer) |
| 107 | } |
| 108 | } |
| 109 | |
| 110 | /// Parallel iterator which consumes elements without freeing the table storage. |
| 111 | pub struct RawParDrain<'a, T, A: Allocator = Global> { |
| 112 | // We don't use a &'a mut RawTable<T> because we want RawParDrain to be |
| 113 | // covariant over T. |
| 114 | table: NonNull<RawTable<T, A>>, |
| 115 | marker: PhantomData<&'a RawTable<T, A>>, |
| 116 | } |
| 117 | |
| 118 | unsafe impl<T: Send, A: Allocator> Send for RawParDrain<'_, T, A> {} |
| 119 | |
| 120 | impl<T, A: Allocator> RawParDrain<'_, T, A> { |
| 121 | #[cfg_attr (feature = "inline-more" , inline)] |
| 122 | pub(super) unsafe fn par_iter(&self) -> RawParIter<T> { |
| 123 | self.table.as_ref().par_iter() |
| 124 | } |
| 125 | } |
| 126 | |
| 127 | impl<T: Send, A: Allocator> ParallelIterator for RawParDrain<'_, T, A> { |
| 128 | type Item = T; |
| 129 | |
| 130 | #[cfg_attr (feature = "inline-more" , inline)] |
| 131 | fn drive_unindexed<C>(self, consumer: C) -> C::Result |
| 132 | where |
| 133 | C: UnindexedConsumer<Self::Item>, |
| 134 | { |
| 135 | let _guard: ScopeGuard>, …> = guard(self.table, |table: &mut NonNull>| unsafe { |
| 136 | table.as_mut().clear_no_drop(); |
| 137 | }); |
| 138 | let iter: RawIterRange = unsafe { self.table.as_ref().iter().iter }; |
| 139 | mem::forget(self); |
| 140 | let producer: ParDrainProducer = ParDrainProducer { iter }; |
| 141 | plumbing::bridge_unindexed(producer, consumer) |
| 142 | } |
| 143 | } |
| 144 | |
| 145 | impl<T, A: Allocator> Drop for RawParDrain<'_, T, A> { |
| 146 | fn drop(&mut self) { |
| 147 | // If drive_unindexed is not called then simply clear the table. |
| 148 | unsafe { |
| 149 | self.table.as_mut().clear(); |
| 150 | } |
| 151 | } |
| 152 | } |
| 153 | |
| 154 | /// Producer which will consume all elements in the range, even if it is dropped |
| 155 | /// halfway through. |
| 156 | struct ParDrainProducer<T> { |
| 157 | iter: RawIterRange<T>, |
| 158 | } |
| 159 | |
| 160 | impl<T: Send> UnindexedProducer for ParDrainProducer<T> { |
| 161 | type Item = T; |
| 162 | |
| 163 | #[cfg_attr (feature = "inline-more" , inline)] |
| 164 | fn split(self) -> (Self, Option<Self>) { |
| 165 | let (left, right) = self.iter.clone().split(); |
| 166 | mem::forget(self); |
| 167 | let left = ParDrainProducer { iter: left }; |
| 168 | let right = right.map(|right| ParDrainProducer { iter: right }); |
| 169 | (left, right) |
| 170 | } |
| 171 | |
| 172 | #[cfg_attr (feature = "inline-more" , inline)] |
| 173 | fn fold_with<F>(mut self, mut folder: F) -> F |
| 174 | where |
| 175 | F: Folder<Self::Item>, |
| 176 | { |
| 177 | // Make sure to modify the iterator in-place so that any remaining |
| 178 | // elements are processed in our Drop impl. |
| 179 | for item in &mut self.iter { |
| 180 | folder = folder.consume(unsafe { item.read() }); |
| 181 | if folder.full() { |
| 182 | return folder; |
| 183 | } |
| 184 | } |
| 185 | |
| 186 | // If we processed all elements then we don't need to run the drop. |
| 187 | mem::forget(self); |
| 188 | folder |
| 189 | } |
| 190 | } |
| 191 | |
| 192 | impl<T> Drop for ParDrainProducer<T> { |
| 193 | #[cfg_attr (feature = "inline-more" , inline)] |
| 194 | fn drop(&mut self) { |
| 195 | // Drop all remaining elements |
| 196 | if mem::needs_drop::<T>() { |
| 197 | for item: Bucket in &mut self.iter { |
| 198 | unsafe { |
| 199 | item.drop(); |
| 200 | } |
| 201 | } |
| 202 | } |
| 203 | } |
| 204 | } |
| 205 | |
| 206 | impl<T, A: Allocator> RawTable<T, A> { |
| 207 | /// Returns a parallel iterator over the elements in a `RawTable`. |
| 208 | #[cfg_attr (feature = "inline-more" , inline)] |
| 209 | pub unsafe fn par_iter(&self) -> RawParIter<T> { |
| 210 | RawParIter { |
| 211 | iter: self.iter().iter, |
| 212 | } |
| 213 | } |
| 214 | |
| 215 | /// Returns a parallel iterator over the elements in a `RawTable`. |
| 216 | #[cfg_attr (feature = "inline-more" , inline)] |
| 217 | pub fn into_par_iter(self) -> RawIntoParIter<T, A> { |
| 218 | RawIntoParIter { table: self } |
| 219 | } |
| 220 | |
| 221 | /// Returns a parallel iterator which consumes all elements of a `RawTable` |
| 222 | /// without freeing its memory allocation. |
| 223 | #[cfg_attr (feature = "inline-more" , inline)] |
| 224 | pub fn par_drain(&mut self) -> RawParDrain<'_, T, A> { |
| 225 | RawParDrain { |
| 226 | table: NonNull::from(self), |
| 227 | marker: PhantomData, |
| 228 | } |
| 229 | } |
| 230 | } |
| 231 | |