1 | use rayon::prelude::*; |
2 | use rayon::ThreadPoolBuilder; |
3 | use std::ops::Range; |
4 | use std::panic::{self, UnwindSafe}; |
5 | use std::sync::atomic::{AtomicUsize, Ordering}; |
6 | |
7 | const ITER: Range<i32> = 0..0x1_0000; |
8 | const PANIC: i32 = 0xC000; |
9 | |
10 | fn check(&i: &i32) { |
11 | if i == PANIC { |
12 | panic!("boom" ) |
13 | } |
14 | } |
15 | |
16 | #[test] |
17 | #[should_panic (expected = "boom" )] |
18 | fn iter_panic() { |
19 | ITER.into_par_iter().for_each(|i| check(&i)); |
20 | } |
21 | |
22 | #[test] |
23 | #[cfg_attr (not(panic = "unwind" ), ignore)] |
24 | fn iter_panic_fuse() { |
25 | // We only use a single thread in order to make the behavior |
26 | // of 'panic_fuse' deterministic |
27 | let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); |
28 | |
29 | pool.install(|| { |
30 | fn count(iter: impl ParallelIterator + UnwindSafe) -> usize { |
31 | let count = AtomicUsize::new(0); |
32 | let result = panic::catch_unwind(|| { |
33 | iter.for_each(|_| { |
34 | count.fetch_add(1, Ordering::Relaxed); |
35 | }); |
36 | }); |
37 | assert!(result.is_err()); |
38 | count.into_inner() |
39 | } |
40 | |
41 | // Without `panic_fuse()`, we'll reach every item except the panicking one. |
42 | let expected = ITER.len() - 1; |
43 | let iter = ITER.into_par_iter().with_max_len(1); |
44 | assert_eq!(count(iter.clone().inspect(check)), expected); |
45 | |
46 | // With `panic_fuse()` anywhere in the chain, we'll reach fewer items. |
47 | assert!(count(iter.clone().inspect(check).panic_fuse()) < expected); |
48 | assert!(count(iter.clone().panic_fuse().inspect(check)) < expected); |
49 | |
50 | // Try in reverse to be sure we hit the producer case. |
51 | assert!(count(iter.panic_fuse().inspect(check).rev()) < expected); |
52 | }); |
53 | } |
54 | |