• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg(test)]
2 
3 use crate::ThreadPoolBuilder;
4 use std::sync::atomic::{AtomicUsize, Ordering};
5 use std::sync::mpsc::channel;
6 use std::sync::Arc;
7 use std::{thread, time};
8 
9 #[test]
broadcast_global()10 fn broadcast_global() {
11     let v = crate::broadcast(|ctx| ctx.index());
12     assert!(v.into_iter().eq(0..crate::current_num_threads()));
13 }
14 
15 #[test]
16 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
spawn_broadcast_global()17 fn spawn_broadcast_global() {
18     let (tx, rx) = channel();
19     crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
20 
21     let mut v: Vec<_> = rx.into_iter().collect();
22     v.sort_unstable();
23     assert!(v.into_iter().eq(0..crate::current_num_threads()));
24 }
25 
26 #[test]
27 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
broadcast_pool()28 fn broadcast_pool() {
29     let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
30     let v = pool.broadcast(|ctx| ctx.index());
31     assert!(v.into_iter().eq(0..7));
32 }
33 
34 #[test]
35 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
spawn_broadcast_pool()36 fn spawn_broadcast_pool() {
37     let (tx, rx) = channel();
38     let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
39     pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
40 
41     let mut v: Vec<_> = rx.into_iter().collect();
42     v.sort_unstable();
43     assert!(v.into_iter().eq(0..7));
44 }
45 
46 #[test]
47 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
broadcast_self()48 fn broadcast_self() {
49     let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
50     let v = pool.install(|| crate::broadcast(|ctx| ctx.index()));
51     assert!(v.into_iter().eq(0..7));
52 }
53 
54 #[test]
55 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
spawn_broadcast_self()56 fn spawn_broadcast_self() {
57     let (tx, rx) = channel();
58     let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
59     pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()));
60 
61     let mut v: Vec<_> = rx.into_iter().collect();
62     v.sort_unstable();
63     assert!(v.into_iter().eq(0..7));
64 }
65 
66 #[test]
67 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
broadcast_mutual()68 fn broadcast_mutual() {
69     let count = AtomicUsize::new(0);
70     let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
71     let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
72     pool1.install(|| {
73         pool2.broadcast(|_| {
74             pool1.broadcast(|_| {
75                 count.fetch_add(1, Ordering::Relaxed);
76             })
77         })
78     });
79     assert_eq!(count.into_inner(), 3 * 7);
80 }
81 
82 #[test]
83 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
spawn_broadcast_mutual()84 fn spawn_broadcast_mutual() {
85     let (tx, rx) = channel();
86     let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
87     let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
88     pool1.spawn({
89         let pool1 = Arc::clone(&pool1);
90         move || {
91             pool2.spawn_broadcast(move |_| {
92                 let tx = tx.clone();
93                 pool1.spawn_broadcast(move |_| tx.send(()).unwrap())
94             })
95         }
96     });
97     assert_eq!(rx.into_iter().count(), 3 * 7);
98 }
99 
100 #[test]
101 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
broadcast_mutual_sleepy()102 fn broadcast_mutual_sleepy() {
103     let count = AtomicUsize::new(0);
104     let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
105     let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
106     pool1.install(|| {
107         thread::sleep(time::Duration::from_secs(1));
108         pool2.broadcast(|_| {
109             thread::sleep(time::Duration::from_secs(1));
110             pool1.broadcast(|_| {
111                 thread::sleep(time::Duration::from_millis(100));
112                 count.fetch_add(1, Ordering::Relaxed);
113             })
114         })
115     });
116     assert_eq!(count.into_inner(), 3 * 7);
117 }
118 
119 #[test]
120 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
spawn_broadcast_mutual_sleepy()121 fn spawn_broadcast_mutual_sleepy() {
122     let (tx, rx) = channel();
123     let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
124     let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
125     pool1.spawn({
126         let pool1 = Arc::clone(&pool1);
127         move || {
128             thread::sleep(time::Duration::from_secs(1));
129             pool2.spawn_broadcast(move |_| {
130                 let tx = tx.clone();
131                 thread::sleep(time::Duration::from_secs(1));
132                 pool1.spawn_broadcast(move |_| {
133                     thread::sleep(time::Duration::from_millis(100));
134                     tx.send(()).unwrap();
135                 })
136             })
137         }
138     });
139     assert_eq!(rx.into_iter().count(), 3 * 7);
140 }
141 
142 #[test]
143 #[cfg_attr(not(panic = "unwind"), ignore)]
broadcast_panic_one()144 fn broadcast_panic_one() {
145     let count = AtomicUsize::new(0);
146     let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
147     let result = crate::unwind::halt_unwinding(|| {
148         pool.broadcast(|ctx| {
149             count.fetch_add(1, Ordering::Relaxed);
150             if ctx.index() == 3 {
151                 panic!("Hello, world!");
152             }
153         })
154     });
155     assert_eq!(count.into_inner(), 7);
156     assert!(result.is_err(), "broadcast panic should propagate!");
157 }
158 
159 #[test]
160 #[cfg_attr(not(panic = "unwind"), ignore)]
spawn_broadcast_panic_one()161 fn spawn_broadcast_panic_one() {
162     let (tx, rx) = channel();
163     let (panic_tx, panic_rx) = channel();
164     let pool = ThreadPoolBuilder::new()
165         .num_threads(7)
166         .panic_handler(move |e| panic_tx.send(e).unwrap())
167         .build()
168         .unwrap();
169     pool.spawn_broadcast(move |ctx| {
170         tx.send(()).unwrap();
171         if ctx.index() == 3 {
172             panic!("Hello, world!");
173         }
174     });
175     drop(pool); // including panic_tx
176     assert_eq!(rx.into_iter().count(), 7);
177     assert_eq!(panic_rx.into_iter().count(), 1);
178 }
179 
180 #[test]
181 #[cfg_attr(not(panic = "unwind"), ignore)]
broadcast_panic_many()182 fn broadcast_panic_many() {
183     let count = AtomicUsize::new(0);
184     let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
185     let result = crate::unwind::halt_unwinding(|| {
186         pool.broadcast(|ctx| {
187             count.fetch_add(1, Ordering::Relaxed);
188             if ctx.index() % 2 == 0 {
189                 panic!("Hello, world!");
190             }
191         })
192     });
193     assert_eq!(count.into_inner(), 7);
194     assert!(result.is_err(), "broadcast panic should propagate!");
195 }
196 
197 #[test]
198 #[cfg_attr(not(panic = "unwind"), ignore)]
spawn_broadcast_panic_many()199 fn spawn_broadcast_panic_many() {
200     let (tx, rx) = channel();
201     let (panic_tx, panic_rx) = channel();
202     let pool = ThreadPoolBuilder::new()
203         .num_threads(7)
204         .panic_handler(move |e| panic_tx.send(e).unwrap())
205         .build()
206         .unwrap();
207     pool.spawn_broadcast(move |ctx| {
208         tx.send(()).unwrap();
209         if ctx.index() % 2 == 0 {
210             panic!("Hello, world!");
211         }
212     });
213     drop(pool); // including panic_tx
214     assert_eq!(rx.into_iter().count(), 7);
215     assert_eq!(panic_rx.into_iter().count(), 4);
216 }
217 
218 #[test]
219 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
broadcast_sleep_race()220 fn broadcast_sleep_race() {
221     let test_duration = time::Duration::from_secs(1);
222     let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
223     let start = time::Instant::now();
224     while start.elapsed() < test_duration {
225         pool.broadcast(|ctx| {
226             // A slight spread of sleep duration increases the chance that one
227             // of the threads will race in the pool's idle sleep afterward.
228             thread::sleep(time::Duration::from_micros(ctx.index() as u64));
229         });
230     }
231 }
232 
233 #[test]
broadcast_after_spawn_broadcast()234 fn broadcast_after_spawn_broadcast() {
235     let (tx, rx) = channel();
236 
237     // Queue a non-blocking spawn_broadcast.
238     crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
239 
240     // This blocking broadcast runs after all prior broadcasts.
241     crate::broadcast(|_| {});
242 
243     // The spawn_broadcast **must** have run by now on all threads.
244     let mut v: Vec<_> = rx.try_iter().collect();
245     v.sort_unstable();
246     assert!(v.into_iter().eq(0..crate::current_num_threads()));
247 }
248 
249 #[test]
broadcast_after_spawn()250 fn broadcast_after_spawn() {
251     let (tx, rx) = channel();
252 
253     // Queue a regular spawn on a thread-local deque.
254     crate::registry::in_worker(move |_, _| {
255         crate::spawn(move || tx.send(22).unwrap());
256     });
257 
258     // Broadcast runs after the local deque is empty.
259     crate::broadcast(|_| {});
260 
261     // The spawn **must** have run by now.
262     assert_eq!(22, rx.try_recv().unwrap());
263 }
264