1use super::plumbing::*;
2use super::*;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::thread;
5
6/// `PanicFuse` is an adaptor that wraps an iterator with a fuse in case
7/// of panics, to halt all threads as soon as possible.
8///
9/// This struct is created by the [`panic_fuse()`] method on [`ParallelIterator`]
10///
11/// [`panic_fuse()`]: trait.ParallelIterator.html#method.panic_fuse
12/// [`ParallelIterator`]: trait.ParallelIterator.html
13#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
14#[derive(Debug, Clone)]
15pub struct PanicFuse<I: ParallelIterator> {
16 base: I,
17}
18
19/// Helper that sets a bool to `true` if dropped while unwinding.
20#[derive(Clone)]
21struct Fuse<'a>(&'a AtomicBool);
22
23impl<'a> Drop for Fuse<'a> {
24 #[inline]
25 fn drop(&mut self) {
26 if thread::panicking() {
27 self.0.store(val:true, order:Ordering::Relaxed);
28 }
29 }
30}
31
32impl<'a> Fuse<'a> {
33 #[inline]
34 fn panicked(&self) -> bool {
35 self.0.load(order:Ordering::Relaxed)
36 }
37}
38
39impl<I> PanicFuse<I>
40where
41 I: ParallelIterator,
42{
43 /// Creates a new `PanicFuse` iterator.
44 pub(super) fn new(base: I) -> PanicFuse<I> {
45 PanicFuse { base }
46 }
47}
48
49impl<I> ParallelIterator for PanicFuse<I>
50where
51 I: ParallelIterator,
52{
53 type Item = I::Item;
54
55 fn drive_unindexed<C>(self, consumer: C) -> C::Result
56 where
57 C: UnindexedConsumer<Self::Item>,
58 {
59 let panicked: AtomicBool = AtomicBool::new(false);
60 let consumer1: PanicFuseConsumer<'_, C> = PanicFuseConsumer {
61 base: consumer,
62 fuse: Fuse(&panicked),
63 };
64 self.base.drive_unindexed(consumer:consumer1)
65 }
66
67 fn opt_len(&self) -> Option<usize> {
68 self.base.opt_len()
69 }
70}
71
72impl<I> IndexedParallelIterator for PanicFuse<I>
73where
74 I: IndexedParallelIterator,
75{
76 fn drive<C>(self, consumer: C) -> C::Result
77 where
78 C: Consumer<Self::Item>,
79 {
80 let panicked = AtomicBool::new(false);
81 let consumer1 = PanicFuseConsumer {
82 base: consumer,
83 fuse: Fuse(&panicked),
84 };
85 self.base.drive(consumer1)
86 }
87
88 fn len(&self) -> usize {
89 self.base.len()
90 }
91
92 fn with_producer<CB>(self, callback: CB) -> CB::Output
93 where
94 CB: ProducerCallback<Self::Item>,
95 {
96 return self.base.with_producer(Callback { callback });
97
98 struct Callback<CB> {
99 callback: CB,
100 }
101
102 impl<T, CB> ProducerCallback<T> for Callback<CB>
103 where
104 CB: ProducerCallback<T>,
105 {
106 type Output = CB::Output;
107
108 fn callback<P>(self, base: P) -> CB::Output
109 where
110 P: Producer<Item = T>,
111 {
112 let panicked = AtomicBool::new(false);
113 let producer = PanicFuseProducer {
114 base,
115 fuse: Fuse(&panicked),
116 };
117 self.callback.callback(producer)
118 }
119 }
120 }
121}
122
123/// ////////////////////////////////////////////////////////////////////////
124/// Producer implementation
125
126struct PanicFuseProducer<'a, P> {
127 base: P,
128 fuse: Fuse<'a>,
129}
130
131impl<'a, P> Producer for PanicFuseProducer<'a, P>
132where
133 P: Producer,
134{
135 type Item = P::Item;
136 type IntoIter = PanicFuseIter<'a, P::IntoIter>;
137
138 fn into_iter(self) -> Self::IntoIter {
139 PanicFuseIter {
140 base: self.base.into_iter(),
141 fuse: self.fuse,
142 }
143 }
144
145 fn min_len(&self) -> usize {
146 self.base.min_len()
147 }
148 fn max_len(&self) -> usize {
149 self.base.max_len()
150 }
151
152 fn split_at(self, index: usize) -> (Self, Self) {
153 let (left, right) = self.base.split_at(index);
154 (
155 PanicFuseProducer {
156 base: left,
157 fuse: self.fuse.clone(),
158 },
159 PanicFuseProducer {
160 base: right,
161 fuse: self.fuse,
162 },
163 )
164 }
165
166 fn fold_with<G>(self, folder: G) -> G
167 where
168 G: Folder<Self::Item>,
169 {
170 let folder1 = PanicFuseFolder {
171 base: folder,
172 fuse: self.fuse,
173 };
174 self.base.fold_with(folder1).base
175 }
176}
177
178struct PanicFuseIter<'a, I> {
179 base: I,
180 fuse: Fuse<'a>,
181}
182
183impl<'a, I> Iterator for PanicFuseIter<'a, I>
184where
185 I: Iterator,
186{
187 type Item = I::Item;
188
189 fn next(&mut self) -> Option<Self::Item> {
190 if self.fuse.panicked() {
191 None
192 } else {
193 self.base.next()
194 }
195 }
196
197 fn size_hint(&self) -> (usize, Option<usize>) {
198 self.base.size_hint()
199 }
200}
201
202impl<'a, I> DoubleEndedIterator for PanicFuseIter<'a, I>
203where
204 I: DoubleEndedIterator,
205{
206 fn next_back(&mut self) -> Option<Self::Item> {
207 if self.fuse.panicked() {
208 None
209 } else {
210 self.base.next_back()
211 }
212 }
213}
214
215impl<'a, I> ExactSizeIterator for PanicFuseIter<'a, I>
216where
217 I: ExactSizeIterator,
218{
219 fn len(&self) -> usize {
220 self.base.len()
221 }
222}
223
224/// ////////////////////////////////////////////////////////////////////////
225/// Consumer implementation
226
227struct PanicFuseConsumer<'a, C> {
228 base: C,
229 fuse: Fuse<'a>,
230}
231
232impl<'a, T, C> Consumer<T> for PanicFuseConsumer<'a, C>
233where
234 C: Consumer<T>,
235{
236 type Folder = PanicFuseFolder<'a, C::Folder>;
237 type Reducer = PanicFuseReducer<'a, C::Reducer>;
238 type Result = C::Result;
239
240 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
241 let (left, right, reducer) = self.base.split_at(index);
242 (
243 PanicFuseConsumer {
244 base: left,
245 fuse: self.fuse.clone(),
246 },
247 PanicFuseConsumer {
248 base: right,
249 fuse: self.fuse.clone(),
250 },
251 PanicFuseReducer {
252 base: reducer,
253 _fuse: self.fuse,
254 },
255 )
256 }
257
258 fn into_folder(self) -> Self::Folder {
259 PanicFuseFolder {
260 base: self.base.into_folder(),
261 fuse: self.fuse,
262 }
263 }
264
265 fn full(&self) -> bool {
266 self.fuse.panicked() || self.base.full()
267 }
268}
269
270impl<'a, T, C> UnindexedConsumer<T> for PanicFuseConsumer<'a, C>
271where
272 C: UnindexedConsumer<T>,
273{
274 fn split_off_left(&self) -> Self {
275 PanicFuseConsumer {
276 base: self.base.split_off_left(),
277 fuse: self.fuse.clone(),
278 }
279 }
280
281 fn to_reducer(&self) -> Self::Reducer {
282 PanicFuseReducer {
283 base: self.base.to_reducer(),
284 _fuse: self.fuse.clone(),
285 }
286 }
287}
288
289struct PanicFuseFolder<'a, C> {
290 base: C,
291 fuse: Fuse<'a>,
292}
293
294impl<'a, T, C> Folder<T> for PanicFuseFolder<'a, C>
295where
296 C: Folder<T>,
297{
298 type Result = C::Result;
299
300 fn consume(mut self, item: T) -> Self {
301 self.base = self.base.consume(item);
302 self
303 }
304
305 fn consume_iter<I>(mut self, iter: I) -> Self
306 where
307 I: IntoIterator<Item = T>,
308 {
309 fn cool<'a, T>(fuse: &'a Fuse<'_>) -> impl Fn(&T) -> bool + 'a {
310 move |_| !fuse.panicked()
311 }
312
313 self.base = {
314 let fuse = &self.fuse;
315 let iter = iter.into_iter().take_while(cool(fuse));
316 self.base.consume_iter(iter)
317 };
318 self
319 }
320
321 fn complete(self) -> C::Result {
322 self.base.complete()
323 }
324
325 fn full(&self) -> bool {
326 self.fuse.panicked() || self.base.full()
327 }
328}
329
330struct PanicFuseReducer<'a, C> {
331 base: C,
332 _fuse: Fuse<'a>,
333}
334
335impl<'a, T, C> Reducer<T> for PanicFuseReducer<'a, C>
336where
337 C: Reducer<T>,
338{
339 fn reduce(self, left: T, right: T) -> T {
340 self.base.reduce(left, right)
341 }
342}
343