1 use rayon::prelude::*; 2 use rayon::ThreadPoolBuilder; 3 use std::ops::Range; 4 use std::panic::{self, UnwindSafe}; 5 use std::sync::atomic::{AtomicUsize, Ordering}; 6 7 const ITER: Range<i32> = 0..0x1_0000; 8 const PANIC: i32 = 0xC000; 9 check(&i: &i32)10fn check(&i: &i32) { 11 if i == PANIC { 12 panic!("boom") 13 } 14 } 15 16 #[test] 17 #[should_panic(expected = "boom")] iter_panic()18fn iter_panic() { 19 ITER.into_par_iter().for_each(|i| check(&i)); 20 } 21 22 #[test] 23 #[cfg_attr(not(panic = "unwind"), ignore)] iter_panic_fuse()24fn iter_panic_fuse() { 25 // We only use a single thread in order to make the behavior 26 // of 'panic_fuse' deterministic 27 let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); 28 29 pool.install(|| { 30 fn count(iter: impl ParallelIterator + UnwindSafe) -> usize { 31 let count = AtomicUsize::new(0); 32 let result = panic::catch_unwind(|| { 33 iter.for_each(|_| { 34 count.fetch_add(1, Ordering::Relaxed); 35 }); 36 }); 37 assert!(result.is_err()); 38 count.into_inner() 39 } 40 41 // Without `panic_fuse()`, we'll reach every item except the panicking one. 42 let expected = ITER.len() - 1; 43 let iter = ITER.into_par_iter().with_max_len(1); 44 assert_eq!(count(iter.clone().inspect(check)), expected); 45 46 // With `panic_fuse()` anywhere in the chain, we'll reach fewer items. 47 assert!(count(iter.clone().inspect(check).panic_fuse()) < expected); 48 assert!(count(iter.clone().panic_fuse().inspect(check)) < expected); 49 50 // Try in reverse to be sure we hit the producer case. 51 assert!(count(iter.panic_fuse().inspect(check).rev()) < expected); 52 }); 53 } 54