1 use rayon::prelude::*; 2 use rayon::ThreadPoolBuilder; 3 use std::ops::Range; 4 use std::panic::{self, UnwindSafe}; 5 use std::sync::atomic::{AtomicUsize, Ordering}; 6 7 const ITER: Range<i32> = 0..0x1_0000; 8 const PANIC: i32 = 0xC000; 9 check(&i: &i32)10fn check(&i: &i32) { 11 if i == PANIC { 12 panic!("boom") 13 } 14 } 15 16 #[test] 17 #[should_panic(expected = "boom")] iter_panic()18fn iter_panic() { 19 ITER.into_par_iter().for_each(|i| check(&i)); 20 } 21 22 #[test] iter_panic_fuse()23fn iter_panic_fuse() { 24 // We only use a single thread in order to make the behavior 25 // of 'panic_fuse' deterministic 26 let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); 27 28 pool.install(|| { 29 fn count(iter: impl ParallelIterator + UnwindSafe) -> usize { 30 let count = AtomicUsize::new(0); 31 let result = panic::catch_unwind(|| { 32 iter.for_each(|_| { 33 count.fetch_add(1, Ordering::Relaxed); 34 }); 35 }); 36 assert!(result.is_err()); 37 count.into_inner() 38 } 39 40 // Without `panic_fuse()`, we'll reach every item except the panicking one. 41 let expected = ITER.len() - 1; 42 let iter = ITER.into_par_iter().with_max_len(1); 43 assert_eq!(count(iter.clone().inspect(check)), expected); 44 45 // With `panic_fuse()` anywhere in the chain, we'll reach fewer items. 46 assert!(count(iter.clone().inspect(check).panic_fuse()) < expected); 47 assert!(count(iter.clone().panic_fuse().inspect(check)) < expected); 48 49 // Try in reverse to be sure we hit the producer case. 50 assert!(count(iter.clone().panic_fuse().inspect(check).rev()) < expected); 51 }); 52 } 53