• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::scope;
2 use std::any::Any;
3 use std::sync::mpsc::channel;
4 use std::sync::Mutex;
5 
6 use super::{spawn, spawn_fifo};
7 use crate::ThreadPoolBuilder;
8 
9 #[test]
10 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
spawn_then_join_in_worker()11 fn spawn_then_join_in_worker() {
12     let (tx, rx) = channel();
13     scope(move |_| {
14         spawn(move || tx.send(22).unwrap());
15     });
16     assert_eq!(22, rx.recv().unwrap());
17 }
18 
19 #[test]
20 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
spawn_then_join_outside_worker()21 fn spawn_then_join_outside_worker() {
22     let (tx, rx) = channel();
23     spawn(move || tx.send(22).unwrap());
24     assert_eq!(22, rx.recv().unwrap());
25 }
26 
27 #[test]
28 #[cfg_attr(not(panic = "unwind"), ignore)]
panic_fwd()29 fn panic_fwd() {
30     let (tx, rx) = channel();
31 
32     let tx = Mutex::new(tx);
33     let panic_handler = move |err: Box<dyn Any + Send>| {
34         let tx = tx.lock().unwrap();
35         if let Some(&msg) = err.downcast_ref::<&str>() {
36             if msg == "Hello, world!" {
37                 tx.send(1).unwrap();
38             } else {
39                 tx.send(2).unwrap();
40             }
41         } else {
42             tx.send(3).unwrap();
43         }
44     };
45 
46     let builder = ThreadPoolBuilder::new().panic_handler(panic_handler);
47 
48     builder
49         .build()
50         .unwrap()
51         .spawn(move || panic!("Hello, world!"));
52 
53     assert_eq!(1, rx.recv().unwrap());
54 }
55 
56 /// Test what happens when the thread-pool is dropped but there are
57 /// still active asynchronous tasks. We expect the thread-pool to stay
58 /// alive and executing until those threads are complete.
59 #[test]
60 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
termination_while_things_are_executing()61 fn termination_while_things_are_executing() {
62     let (tx0, rx0) = channel();
63     let (tx1, rx1) = channel();
64 
65     // Create a thread-pool and spawn some code in it, but then drop
66     // our reference to it.
67     {
68         let thread_pool = ThreadPoolBuilder::new().build().unwrap();
69         thread_pool.spawn(move || {
70             let data = rx0.recv().unwrap();
71 
72             // At this point, we know the "main" reference to the
73             // `ThreadPool` has been dropped, but there are still
74             // active threads. Launch one more.
75             spawn(move || {
76                 tx1.send(data).unwrap();
77             });
78         });
79     }
80 
81     tx0.send(22).unwrap();
82     let v = rx1.recv().unwrap();
83     assert_eq!(v, 22);
84 }
85 
86 #[test]
87 #[cfg_attr(not(panic = "unwind"), ignore)]
custom_panic_handler_and_spawn()88 fn custom_panic_handler_and_spawn() {
89     let (tx, rx) = channel();
90 
91     // Create a parallel closure that will send panics on the
92     // channel; since the closure is potentially executed in parallel
93     // with itself, we have to wrap `tx` in a mutex.
94     let tx = Mutex::new(tx);
95     let panic_handler = move |e: Box<dyn Any + Send>| {
96         tx.lock().unwrap().send(e).unwrap();
97     };
98 
99     // Execute an async that will panic.
100     let builder = ThreadPoolBuilder::new().panic_handler(panic_handler);
101     builder.build().unwrap().spawn(move || {
102         panic!("Hello, world!");
103     });
104 
105     // Check that we got back the panic we expected.
106     let error = rx.recv().unwrap();
107     if let Some(&msg) = error.downcast_ref::<&str>() {
108         assert_eq!(msg, "Hello, world!");
109     } else {
110         panic!("did not receive a string from panic handler");
111     }
112 }
113 
114 #[test]
115 #[cfg_attr(not(panic = "unwind"), ignore)]
custom_panic_handler_and_nested_spawn()116 fn custom_panic_handler_and_nested_spawn() {
117     let (tx, rx) = channel();
118 
119     // Create a parallel closure that will send panics on the
120     // channel; since the closure is potentially executed in parallel
121     // with itself, we have to wrap `tx` in a mutex.
122     let tx = Mutex::new(tx);
123     let panic_handler = move |e| {
124         tx.lock().unwrap().send(e).unwrap();
125     };
126 
127     // Execute an async that will (eventually) panic.
128     const PANICS: usize = 3;
129     let builder = ThreadPoolBuilder::new().panic_handler(panic_handler);
130     builder.build().unwrap().spawn(move || {
131         // launch 3 nested spawn-asyncs; these should be in the same
132         // thread-pool and hence inherit the same panic handler
133         for _ in 0..PANICS {
134             spawn(move || {
135                 panic!("Hello, world!");
136             });
137         }
138     });
139 
140     // Check that we get back the panics we expected.
141     for _ in 0..PANICS {
142         let error = rx.recv().unwrap();
143         if let Some(&msg) = error.downcast_ref::<&str>() {
144             assert_eq!(msg, "Hello, world!");
145         } else {
146             panic!("did not receive a string from panic handler");
147         }
148     }
149 }
150 
151 macro_rules! test_order {
152     ($outer_spawn:ident, $inner_spawn:ident) => {{
153         let builder = ThreadPoolBuilder::new().num_threads(1);
154         let pool = builder.build().unwrap();
155         let (tx, rx) = channel();
156         pool.install(move || {
157             for i in 0..10 {
158                 let tx = tx.clone();
159                 $outer_spawn(move || {
160                     for j in 0..10 {
161                         let tx = tx.clone();
162                         $inner_spawn(move || {
163                             tx.send(i * 10 + j).unwrap();
164                         });
165                     }
166                 });
167             }
168         });
169         rx.iter().collect::<Vec<i32>>()
170     }};
171 }
172 
173 #[test]
174 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
lifo_order()175 fn lifo_order() {
176     // In the absence of stealing, `spawn()` jobs on a thread will run in LIFO order.
177     let vec = test_order!(spawn, spawn);
178     let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed
179     assert_eq!(vec, expected);
180 }
181 
182 #[test]
183 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fifo_order()184 fn fifo_order() {
185     // In the absence of stealing, `spawn_fifo()` jobs on a thread will run in FIFO order.
186     let vec = test_order!(spawn_fifo, spawn_fifo);
187     let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order
188     assert_eq!(vec, expected);
189 }
190 
191 #[test]
192 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
lifo_fifo_order()193 fn lifo_fifo_order() {
194     // LIFO on the outside, FIFO on the inside
195     let vec = test_order!(spawn, spawn_fifo);
196     let expected: Vec<i32> = (0..10)
197         .rev()
198         .flat_map(|i| (0..10).map(move |j| i * 10 + j))
199         .collect();
200     assert_eq!(vec, expected);
201 }
202 
203 #[test]
204 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fifo_lifo_order()205 fn fifo_lifo_order() {
206     // FIFO on the outside, LIFO on the inside
207     let vec = test_order!(spawn_fifo, spawn);
208     let expected: Vec<i32> = (0..10)
209         .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j))
210         .collect();
211     assert_eq!(vec, expected);
212 }
213 
214 macro_rules! spawn_send {
215     ($spawn:ident, $tx:ident, $i:expr) => {{
216         let tx = $tx.clone();
217         $spawn(move || tx.send($i).unwrap());
218     }};
219 }
220 
221 /// Test mixed spawns pushing a series of numbers, interleaved such
222 /// such that negative values are using the second kind of spawn.
223 macro_rules! test_mixed_order {
224     ($pos_spawn:ident, $neg_spawn:ident) => {{
225         let builder = ThreadPoolBuilder::new().num_threads(1);
226         let pool = builder.build().unwrap();
227         let (tx, rx) = channel();
228         pool.install(move || {
229             spawn_send!($pos_spawn, tx, 0);
230             spawn_send!($neg_spawn, tx, -1);
231             spawn_send!($pos_spawn, tx, 1);
232             spawn_send!($neg_spawn, tx, -2);
233             spawn_send!($pos_spawn, tx, 2);
234             spawn_send!($neg_spawn, tx, -3);
235             spawn_send!($pos_spawn, tx, 3);
236         });
237         rx.iter().collect::<Vec<i32>>()
238     }};
239 }
240 
241 #[test]
242 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
mixed_lifo_fifo_order()243 fn mixed_lifo_fifo_order() {
244     let vec = test_mixed_order!(spawn, spawn_fifo);
245     let expected = vec![3, -1, 2, -2, 1, -3, 0];
246     assert_eq!(vec, expected);
247 }
248 
249 #[test]
250 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
mixed_fifo_lifo_order()251 fn mixed_fifo_lifo_order() {
252     let vec = test_mixed_order!(spawn_fifo, spawn);
253     let expected = vec![0, -3, 1, -2, 2, -1, 3];
254     assert_eq!(vec, expected);
255 }
256