1 #![cfg(test)]
2
3 use crate::ThreadPoolBuilder;
4 use std::sync::atomic::{AtomicUsize, Ordering};
5 use std::sync::mpsc::channel;
6 use std::sync::Arc;
7 use std::{thread, time};
8
9 #[test]
broadcast_global()10 fn broadcast_global() {
11 let v = crate::broadcast(|ctx| ctx.index());
12 assert!(v.into_iter().eq(0..crate::current_num_threads()));
13 }
14
15 #[test]
16 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
spawn_broadcast_global()17 fn spawn_broadcast_global() {
18 let (tx, rx) = channel();
19 crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
20
21 let mut v: Vec<_> = rx.into_iter().collect();
22 v.sort_unstable();
23 assert!(v.into_iter().eq(0..crate::current_num_threads()));
24 }
25
26 #[test]
27 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
broadcast_pool()28 fn broadcast_pool() {
29 let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
30 let v = pool.broadcast(|ctx| ctx.index());
31 assert!(v.into_iter().eq(0..7));
32 }
33
34 #[test]
35 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
spawn_broadcast_pool()36 fn spawn_broadcast_pool() {
37 let (tx, rx) = channel();
38 let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
39 pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
40
41 let mut v: Vec<_> = rx.into_iter().collect();
42 v.sort_unstable();
43 assert!(v.into_iter().eq(0..7));
44 }
45
46 #[test]
47 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
broadcast_self()48 fn broadcast_self() {
49 let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
50 let v = pool.install(|| crate::broadcast(|ctx| ctx.index()));
51 assert!(v.into_iter().eq(0..7));
52 }
53
54 #[test]
55 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
spawn_broadcast_self()56 fn spawn_broadcast_self() {
57 let (tx, rx) = channel();
58 let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
59 pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()));
60
61 let mut v: Vec<_> = rx.into_iter().collect();
62 v.sort_unstable();
63 assert!(v.into_iter().eq(0..7));
64 }
65
66 #[test]
67 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
broadcast_mutual()68 fn broadcast_mutual() {
69 let count = AtomicUsize::new(0);
70 let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
71 let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
72 pool1.install(|| {
73 pool2.broadcast(|_| {
74 pool1.broadcast(|_| {
75 count.fetch_add(1, Ordering::Relaxed);
76 })
77 })
78 });
79 assert_eq!(count.into_inner(), 3 * 7);
80 }
81
82 #[test]
83 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
spawn_broadcast_mutual()84 fn spawn_broadcast_mutual() {
85 let (tx, rx) = channel();
86 let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
87 let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
88 pool1.spawn({
89 let pool1 = Arc::clone(&pool1);
90 move || {
91 pool2.spawn_broadcast(move |_| {
92 let tx = tx.clone();
93 pool1.spawn_broadcast(move |_| tx.send(()).unwrap())
94 })
95 }
96 });
97 assert_eq!(rx.into_iter().count(), 3 * 7);
98 }
99
100 #[test]
101 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
broadcast_mutual_sleepy()102 fn broadcast_mutual_sleepy() {
103 let count = AtomicUsize::new(0);
104 let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
105 let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
106 pool1.install(|| {
107 thread::sleep(time::Duration::from_secs(1));
108 pool2.broadcast(|_| {
109 thread::sleep(time::Duration::from_secs(1));
110 pool1.broadcast(|_| {
111 thread::sleep(time::Duration::from_millis(100));
112 count.fetch_add(1, Ordering::Relaxed);
113 })
114 })
115 });
116 assert_eq!(count.into_inner(), 3 * 7);
117 }
118
119 #[test]
120 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
spawn_broadcast_mutual_sleepy()121 fn spawn_broadcast_mutual_sleepy() {
122 let (tx, rx) = channel();
123 let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
124 let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
125 pool1.spawn({
126 let pool1 = Arc::clone(&pool1);
127 move || {
128 thread::sleep(time::Duration::from_secs(1));
129 pool2.spawn_broadcast(move |_| {
130 let tx = tx.clone();
131 thread::sleep(time::Duration::from_secs(1));
132 pool1.spawn_broadcast(move |_| {
133 thread::sleep(time::Duration::from_millis(100));
134 tx.send(()).unwrap();
135 })
136 })
137 }
138 });
139 assert_eq!(rx.into_iter().count(), 3 * 7);
140 }
141
142 #[test]
143 #[cfg_attr(not(panic = "unwind"), ignore)]
broadcast_panic_one()144 fn broadcast_panic_one() {
145 let count = AtomicUsize::new(0);
146 let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
147 let result = crate::unwind::halt_unwinding(|| {
148 pool.broadcast(|ctx| {
149 count.fetch_add(1, Ordering::Relaxed);
150 if ctx.index() == 3 {
151 panic!("Hello, world!");
152 }
153 })
154 });
155 assert_eq!(count.into_inner(), 7);
156 assert!(result.is_err(), "broadcast panic should propagate!");
157 }
158
159 #[test]
160 #[cfg_attr(not(panic = "unwind"), ignore)]
spawn_broadcast_panic_one()161 fn spawn_broadcast_panic_one() {
162 let (tx, rx) = channel();
163 let (panic_tx, panic_rx) = channel();
164 let pool = ThreadPoolBuilder::new()
165 .num_threads(7)
166 .panic_handler(move |e| panic_tx.send(e).unwrap())
167 .build()
168 .unwrap();
169 pool.spawn_broadcast(move |ctx| {
170 tx.send(()).unwrap();
171 if ctx.index() == 3 {
172 panic!("Hello, world!");
173 }
174 });
175 drop(pool); // including panic_tx
176 assert_eq!(rx.into_iter().count(), 7);
177 assert_eq!(panic_rx.into_iter().count(), 1);
178 }
179
180 #[test]
181 #[cfg_attr(not(panic = "unwind"), ignore)]
broadcast_panic_many()182 fn broadcast_panic_many() {
183 let count = AtomicUsize::new(0);
184 let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
185 let result = crate::unwind::halt_unwinding(|| {
186 pool.broadcast(|ctx| {
187 count.fetch_add(1, Ordering::Relaxed);
188 if ctx.index() % 2 == 0 {
189 panic!("Hello, world!");
190 }
191 })
192 });
193 assert_eq!(count.into_inner(), 7);
194 assert!(result.is_err(), "broadcast panic should propagate!");
195 }
196
197 #[test]
198 #[cfg_attr(not(panic = "unwind"), ignore)]
spawn_broadcast_panic_many()199 fn spawn_broadcast_panic_many() {
200 let (tx, rx) = channel();
201 let (panic_tx, panic_rx) = channel();
202 let pool = ThreadPoolBuilder::new()
203 .num_threads(7)
204 .panic_handler(move |e| panic_tx.send(e).unwrap())
205 .build()
206 .unwrap();
207 pool.spawn_broadcast(move |ctx| {
208 tx.send(()).unwrap();
209 if ctx.index() % 2 == 0 {
210 panic!("Hello, world!");
211 }
212 });
213 drop(pool); // including panic_tx
214 assert_eq!(rx.into_iter().count(), 7);
215 assert_eq!(panic_rx.into_iter().count(), 4);
216 }
217
218 #[test]
219 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
broadcast_sleep_race()220 fn broadcast_sleep_race() {
221 let test_duration = time::Duration::from_secs(1);
222 let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
223 let start = time::Instant::now();
224 while start.elapsed() < test_duration {
225 pool.broadcast(|ctx| {
226 // A slight spread of sleep duration increases the chance that one
227 // of the threads will race in the pool's idle sleep afterward.
228 thread::sleep(time::Duration::from_micros(ctx.index() as u64));
229 });
230 }
231 }
232
233 #[test]
broadcast_after_spawn_broadcast()234 fn broadcast_after_spawn_broadcast() {
235 let (tx, rx) = channel();
236
237 // Queue a non-blocking spawn_broadcast.
238 crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
239
240 // This blocking broadcast runs after all prior broadcasts.
241 crate::broadcast(|_| {});
242
243 // The spawn_broadcast **must** have run by now on all threads.
244 let mut v: Vec<_> = rx.try_iter().collect();
245 v.sort_unstable();
246 assert!(v.into_iter().eq(0..crate::current_num_threads()));
247 }
248
249 #[test]
broadcast_after_spawn()250 fn broadcast_after_spawn() {
251 let (tx, rx) = channel();
252
253 // Queue a regular spawn on a thread-local deque.
254 crate::registry::in_worker(move |_, _| {
255 crate::spawn(move || tx.send(22).unwrap());
256 });
257
258 // Broadcast runs after the local deque is empty.
259 crate::broadcast(|_| {});
260
261 // The spawn **must** have run by now.
262 assert_eq!(22, rx.try_recv().unwrap());
263 }
264