1 | use super::plumbing::*; |
2 | use super::*; |
3 | use std::sync::atomic::{AtomicBool, Ordering}; |
4 | use std::thread; |
5 | |
6 | /// `PanicFuse` is an adaptor that wraps an iterator with a fuse in case |
7 | /// of panics, to halt all threads as soon as possible. |
8 | /// |
9 | /// This struct is created by the [`panic_fuse()`] method on [`ParallelIterator`] |
10 | /// |
11 | /// [`panic_fuse()`]: trait.ParallelIterator.html#method.panic_fuse |
12 | /// [`ParallelIterator`]: trait.ParallelIterator.html |
13 | #[must_use = "iterator adaptors are lazy and do nothing unless consumed" ] |
14 | #[derive (Debug, Clone)] |
15 | pub struct PanicFuse<I: ParallelIterator> { |
16 | base: I, |
17 | } |
18 | |
19 | /// Helper that sets a bool to `true` if dropped while unwinding. |
20 | #[derive (Clone)] |
21 | struct Fuse<'a>(&'a AtomicBool); |
22 | |
23 | impl<'a> Drop for Fuse<'a> { |
24 | #[inline ] |
25 | fn drop(&mut self) { |
26 | if thread::panicking() { |
27 | self.0.store(val:true, order:Ordering::Relaxed); |
28 | } |
29 | } |
30 | } |
31 | |
32 | impl<'a> Fuse<'a> { |
33 | #[inline ] |
34 | fn panicked(&self) -> bool { |
35 | self.0.load(order:Ordering::Relaxed) |
36 | } |
37 | } |
38 | |
39 | impl<I> PanicFuse<I> |
40 | where |
41 | I: ParallelIterator, |
42 | { |
43 | /// Creates a new `PanicFuse` iterator. |
44 | pub(super) fn new(base: I) -> PanicFuse<I> { |
45 | PanicFuse { base } |
46 | } |
47 | } |
48 | |
49 | impl<I> ParallelIterator for PanicFuse<I> |
50 | where |
51 | I: ParallelIterator, |
52 | { |
53 | type Item = I::Item; |
54 | |
55 | fn drive_unindexed<C>(self, consumer: C) -> C::Result |
56 | where |
57 | C: UnindexedConsumer<Self::Item>, |
58 | { |
59 | let panicked: AtomicBool = AtomicBool::new(false); |
60 | let consumer1: PanicFuseConsumer<'_, C> = PanicFuseConsumer { |
61 | base: consumer, |
62 | fuse: Fuse(&panicked), |
63 | }; |
64 | self.base.drive_unindexed(consumer:consumer1) |
65 | } |
66 | |
67 | fn opt_len(&self) -> Option<usize> { |
68 | self.base.opt_len() |
69 | } |
70 | } |
71 | |
72 | impl<I> IndexedParallelIterator for PanicFuse<I> |
73 | where |
74 | I: IndexedParallelIterator, |
75 | { |
76 | fn drive<C>(self, consumer: C) -> C::Result |
77 | where |
78 | C: Consumer<Self::Item>, |
79 | { |
80 | let panicked = AtomicBool::new(false); |
81 | let consumer1 = PanicFuseConsumer { |
82 | base: consumer, |
83 | fuse: Fuse(&panicked), |
84 | }; |
85 | self.base.drive(consumer1) |
86 | } |
87 | |
88 | fn len(&self) -> usize { |
89 | self.base.len() |
90 | } |
91 | |
92 | fn with_producer<CB>(self, callback: CB) -> CB::Output |
93 | where |
94 | CB: ProducerCallback<Self::Item>, |
95 | { |
96 | return self.base.with_producer(Callback { callback }); |
97 | |
98 | struct Callback<CB> { |
99 | callback: CB, |
100 | } |
101 | |
102 | impl<T, CB> ProducerCallback<T> for Callback<CB> |
103 | where |
104 | CB: ProducerCallback<T>, |
105 | { |
106 | type Output = CB::Output; |
107 | |
108 | fn callback<P>(self, base: P) -> CB::Output |
109 | where |
110 | P: Producer<Item = T>, |
111 | { |
112 | let panicked = AtomicBool::new(false); |
113 | let producer = PanicFuseProducer { |
114 | base, |
115 | fuse: Fuse(&panicked), |
116 | }; |
117 | self.callback.callback(producer) |
118 | } |
119 | } |
120 | } |
121 | } |
122 | |
123 | /// //////////////////////////////////////////////////////////////////////// |
124 | /// Producer implementation |
125 | |
126 | struct PanicFuseProducer<'a, P> { |
127 | base: P, |
128 | fuse: Fuse<'a>, |
129 | } |
130 | |
131 | impl<'a, P> Producer for PanicFuseProducer<'a, P> |
132 | where |
133 | P: Producer, |
134 | { |
135 | type Item = P::Item; |
136 | type IntoIter = PanicFuseIter<'a, P::IntoIter>; |
137 | |
138 | fn into_iter(self) -> Self::IntoIter { |
139 | PanicFuseIter { |
140 | base: self.base.into_iter(), |
141 | fuse: self.fuse, |
142 | } |
143 | } |
144 | |
145 | fn min_len(&self) -> usize { |
146 | self.base.min_len() |
147 | } |
148 | fn max_len(&self) -> usize { |
149 | self.base.max_len() |
150 | } |
151 | |
152 | fn split_at(self, index: usize) -> (Self, Self) { |
153 | let (left, right) = self.base.split_at(index); |
154 | ( |
155 | PanicFuseProducer { |
156 | base: left, |
157 | fuse: self.fuse.clone(), |
158 | }, |
159 | PanicFuseProducer { |
160 | base: right, |
161 | fuse: self.fuse, |
162 | }, |
163 | ) |
164 | } |
165 | |
166 | fn fold_with<G>(self, folder: G) -> G |
167 | where |
168 | G: Folder<Self::Item>, |
169 | { |
170 | let folder1 = PanicFuseFolder { |
171 | base: folder, |
172 | fuse: self.fuse, |
173 | }; |
174 | self.base.fold_with(folder1).base |
175 | } |
176 | } |
177 | |
178 | struct PanicFuseIter<'a, I> { |
179 | base: I, |
180 | fuse: Fuse<'a>, |
181 | } |
182 | |
183 | impl<'a, I> Iterator for PanicFuseIter<'a, I> |
184 | where |
185 | I: Iterator, |
186 | { |
187 | type Item = I::Item; |
188 | |
189 | fn next(&mut self) -> Option<Self::Item> { |
190 | if self.fuse.panicked() { |
191 | None |
192 | } else { |
193 | self.base.next() |
194 | } |
195 | } |
196 | |
197 | fn size_hint(&self) -> (usize, Option<usize>) { |
198 | self.base.size_hint() |
199 | } |
200 | } |
201 | |
202 | impl<'a, I> DoubleEndedIterator for PanicFuseIter<'a, I> |
203 | where |
204 | I: DoubleEndedIterator, |
205 | { |
206 | fn next_back(&mut self) -> Option<Self::Item> { |
207 | if self.fuse.panicked() { |
208 | None |
209 | } else { |
210 | self.base.next_back() |
211 | } |
212 | } |
213 | } |
214 | |
215 | impl<'a, I> ExactSizeIterator for PanicFuseIter<'a, I> |
216 | where |
217 | I: ExactSizeIterator, |
218 | { |
219 | fn len(&self) -> usize { |
220 | self.base.len() |
221 | } |
222 | } |
223 | |
224 | /// //////////////////////////////////////////////////////////////////////// |
225 | /// Consumer implementation |
226 | |
227 | struct PanicFuseConsumer<'a, C> { |
228 | base: C, |
229 | fuse: Fuse<'a>, |
230 | } |
231 | |
232 | impl<'a, T, C> Consumer<T> for PanicFuseConsumer<'a, C> |
233 | where |
234 | C: Consumer<T>, |
235 | { |
236 | type Folder = PanicFuseFolder<'a, C::Folder>; |
237 | type Reducer = PanicFuseReducer<'a, C::Reducer>; |
238 | type Result = C::Result; |
239 | |
240 | fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { |
241 | let (left, right, reducer) = self.base.split_at(index); |
242 | ( |
243 | PanicFuseConsumer { |
244 | base: left, |
245 | fuse: self.fuse.clone(), |
246 | }, |
247 | PanicFuseConsumer { |
248 | base: right, |
249 | fuse: self.fuse.clone(), |
250 | }, |
251 | PanicFuseReducer { |
252 | base: reducer, |
253 | _fuse: self.fuse, |
254 | }, |
255 | ) |
256 | } |
257 | |
258 | fn into_folder(self) -> Self::Folder { |
259 | PanicFuseFolder { |
260 | base: self.base.into_folder(), |
261 | fuse: self.fuse, |
262 | } |
263 | } |
264 | |
265 | fn full(&self) -> bool { |
266 | self.fuse.panicked() || self.base.full() |
267 | } |
268 | } |
269 | |
270 | impl<'a, T, C> UnindexedConsumer<T> for PanicFuseConsumer<'a, C> |
271 | where |
272 | C: UnindexedConsumer<T>, |
273 | { |
274 | fn split_off_left(&self) -> Self { |
275 | PanicFuseConsumer { |
276 | base: self.base.split_off_left(), |
277 | fuse: self.fuse.clone(), |
278 | } |
279 | } |
280 | |
281 | fn to_reducer(&self) -> Self::Reducer { |
282 | PanicFuseReducer { |
283 | base: self.base.to_reducer(), |
284 | _fuse: self.fuse.clone(), |
285 | } |
286 | } |
287 | } |
288 | |
289 | struct PanicFuseFolder<'a, C> { |
290 | base: C, |
291 | fuse: Fuse<'a>, |
292 | } |
293 | |
294 | impl<'a, T, C> Folder<T> for PanicFuseFolder<'a, C> |
295 | where |
296 | C: Folder<T>, |
297 | { |
298 | type Result = C::Result; |
299 | |
300 | fn consume(mut self, item: T) -> Self { |
301 | self.base = self.base.consume(item); |
302 | self |
303 | } |
304 | |
305 | fn consume_iter<I>(mut self, iter: I) -> Self |
306 | where |
307 | I: IntoIterator<Item = T>, |
308 | { |
309 | fn cool<'a, T>(fuse: &'a Fuse<'_>) -> impl Fn(&T) -> bool + 'a { |
310 | move |_| !fuse.panicked() |
311 | } |
312 | |
313 | self.base = { |
314 | let fuse = &self.fuse; |
315 | let iter = iter.into_iter().take_while(cool(fuse)); |
316 | self.base.consume_iter(iter) |
317 | }; |
318 | self |
319 | } |
320 | |
321 | fn complete(self) -> C::Result { |
322 | self.base.complete() |
323 | } |
324 | |
325 | fn full(&self) -> bool { |
326 | self.fuse.panicked() || self.base.full() |
327 | } |
328 | } |
329 | |
330 | struct PanicFuseReducer<'a, C> { |
331 | base: C, |
332 | _fuse: Fuse<'a>, |
333 | } |
334 | |
335 | impl<'a, T, C> Reducer<T> for PanicFuseReducer<'a, C> |
336 | where |
337 | C: Reducer<T>, |
338 | { |
339 | fn reduce(self, left: T, right: T) -> T { |
340 | self.base.reduce(left, right) |
341 | } |
342 | } |
343 | |