1 //! Tests for the zero channel flavor.
2
3 use std::any::Any;
4 use std::sync::atomic::AtomicUsize;
5 use std::sync::atomic::Ordering;
6 use std::thread;
7 use std::time::Duration;
8
9 use crossbeam_channel::{bounded, select, Receiver};
10 use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
11 use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
12 use crossbeam_utils::thread::scope;
13 use rand::{thread_rng, Rng};
14
ms(ms: u64) -> Duration15 fn ms(ms: u64) -> Duration {
16 Duration::from_millis(ms)
17 }
18
19 #[test]
smoke()20 fn smoke() {
21 let (s, r) = bounded(0);
22 assert_eq!(s.try_send(7), Err(TrySendError::Full(7)));
23 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
24 }
25
26 #[test]
capacity()27 fn capacity() {
28 let (s, r) = bounded::<()>(0);
29 assert_eq!(s.capacity(), Some(0));
30 assert_eq!(r.capacity(), Some(0));
31 }
32
33 #[test]
len_empty_full()34 fn len_empty_full() {
35 let (s, r) = bounded(0);
36
37 assert_eq!(s.len(), 0);
38 assert!(s.is_empty());
39 assert!(s.is_full());
40 assert_eq!(r.len(), 0);
41 assert!(r.is_empty());
42 assert!(r.is_full());
43
44 scope(|scope| {
45 scope.spawn(|_| s.send(0).unwrap());
46 scope.spawn(|_| r.recv().unwrap());
47 })
48 .unwrap();
49
50 assert_eq!(s.len(), 0);
51 assert!(s.is_empty());
52 assert!(s.is_full());
53 assert_eq!(r.len(), 0);
54 assert!(r.is_empty());
55 assert!(r.is_full());
56 }
57
58 #[test]
try_recv()59 fn try_recv() {
60 let (s, r) = bounded(0);
61
62 scope(|scope| {
63 scope.spawn(move |_| {
64 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
65 thread::sleep(ms(1500));
66 assert_eq!(r.try_recv(), Ok(7));
67 thread::sleep(ms(500));
68 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
69 });
70 scope.spawn(move |_| {
71 thread::sleep(ms(1000));
72 s.send(7).unwrap();
73 });
74 })
75 .unwrap();
76 }
77
78 #[test]
recv()79 fn recv() {
80 let (s, r) = bounded(0);
81
82 scope(|scope| {
83 scope.spawn(move |_| {
84 assert_eq!(r.recv(), Ok(7));
85 thread::sleep(ms(1000));
86 assert_eq!(r.recv(), Ok(8));
87 thread::sleep(ms(1000));
88 assert_eq!(r.recv(), Ok(9));
89 assert_eq!(r.recv(), Err(RecvError));
90 });
91 scope.spawn(move |_| {
92 thread::sleep(ms(1500));
93 s.send(7).unwrap();
94 s.send(8).unwrap();
95 s.send(9).unwrap();
96 });
97 })
98 .unwrap();
99 }
100
101 #[test]
recv_timeout()102 fn recv_timeout() {
103 let (s, r) = bounded::<i32>(0);
104
105 scope(|scope| {
106 scope.spawn(move |_| {
107 assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
108 assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
109 assert_eq!(
110 r.recv_timeout(ms(1000)),
111 Err(RecvTimeoutError::Disconnected)
112 );
113 });
114 scope.spawn(move |_| {
115 thread::sleep(ms(1500));
116 s.send(7).unwrap();
117 });
118 })
119 .unwrap();
120 }
121
122 #[test]
try_send()123 fn try_send() {
124 let (s, r) = bounded(0);
125
126 scope(|scope| {
127 scope.spawn(move |_| {
128 assert_eq!(s.try_send(7), Err(TrySendError::Full(7)));
129 thread::sleep(ms(1500));
130 assert_eq!(s.try_send(8), Ok(()));
131 thread::sleep(ms(500));
132 assert_eq!(s.try_send(9), Err(TrySendError::Disconnected(9)));
133 });
134 scope.spawn(move |_| {
135 thread::sleep(ms(1000));
136 assert_eq!(r.recv(), Ok(8));
137 });
138 })
139 .unwrap();
140 }
141
142 #[test]
send()143 fn send() {
144 let (s, r) = bounded(0);
145
146 scope(|scope| {
147 scope.spawn(move |_| {
148 s.send(7).unwrap();
149 thread::sleep(ms(1000));
150 s.send(8).unwrap();
151 thread::sleep(ms(1000));
152 s.send(9).unwrap();
153 });
154 scope.spawn(move |_| {
155 thread::sleep(ms(1500));
156 assert_eq!(r.recv(), Ok(7));
157 assert_eq!(r.recv(), Ok(8));
158 assert_eq!(r.recv(), Ok(9));
159 });
160 })
161 .unwrap();
162 }
163
164 #[test]
send_timeout()165 fn send_timeout() {
166 let (s, r) = bounded(0);
167
168 scope(|scope| {
169 scope.spawn(move |_| {
170 assert_eq!(
171 s.send_timeout(7, ms(1000)),
172 Err(SendTimeoutError::Timeout(7))
173 );
174 assert_eq!(s.send_timeout(8, ms(1000)), Ok(()));
175 assert_eq!(
176 s.send_timeout(9, ms(1000)),
177 Err(SendTimeoutError::Disconnected(9))
178 );
179 });
180 scope.spawn(move |_| {
181 thread::sleep(ms(1500));
182 assert_eq!(r.recv(), Ok(8));
183 });
184 })
185 .unwrap();
186 }
187
188 #[test]
len()189 fn len() {
190 #[cfg(miri)]
191 const COUNT: usize = 100;
192 #[cfg(not(miri))]
193 const COUNT: usize = 25_000;
194
195 let (s, r) = bounded(0);
196
197 assert_eq!(s.len(), 0);
198 assert_eq!(r.len(), 0);
199
200 scope(|scope| {
201 scope.spawn(|_| {
202 for i in 0..COUNT {
203 assert_eq!(r.recv(), Ok(i));
204 assert_eq!(r.len(), 0);
205 }
206 });
207
208 scope.spawn(|_| {
209 for i in 0..COUNT {
210 s.send(i).unwrap();
211 assert_eq!(s.len(), 0);
212 }
213 });
214 })
215 .unwrap();
216
217 assert_eq!(s.len(), 0);
218 assert_eq!(r.len(), 0);
219 }
220
221 #[test]
disconnect_wakes_sender()222 fn disconnect_wakes_sender() {
223 let (s, r) = bounded(0);
224
225 scope(|scope| {
226 scope.spawn(move |_| {
227 assert_eq!(s.send(()), Err(SendError(())));
228 });
229 scope.spawn(move |_| {
230 thread::sleep(ms(1000));
231 drop(r);
232 });
233 })
234 .unwrap();
235 }
236
237 #[test]
disconnect_wakes_receiver()238 fn disconnect_wakes_receiver() {
239 let (s, r) = bounded::<()>(0);
240
241 scope(|scope| {
242 scope.spawn(move |_| {
243 assert_eq!(r.recv(), Err(RecvError));
244 });
245 scope.spawn(move |_| {
246 thread::sleep(ms(1000));
247 drop(s);
248 });
249 })
250 .unwrap();
251 }
252
253 #[test]
spsc()254 fn spsc() {
255 #[cfg(miri)]
256 const COUNT: usize = 100;
257 #[cfg(not(miri))]
258 const COUNT: usize = 100_000;
259
260 let (s, r) = bounded(0);
261
262 scope(|scope| {
263 scope.spawn(move |_| {
264 for i in 0..COUNT {
265 assert_eq!(r.recv(), Ok(i));
266 }
267 assert_eq!(r.recv(), Err(RecvError));
268 });
269 scope.spawn(move |_| {
270 for i in 0..COUNT {
271 s.send(i).unwrap();
272 }
273 });
274 })
275 .unwrap();
276 }
277
278 #[test]
mpmc()279 fn mpmc() {
280 #[cfg(miri)]
281 const COUNT: usize = 100;
282 #[cfg(not(miri))]
283 const COUNT: usize = 25_000;
284 const THREADS: usize = 4;
285
286 let (s, r) = bounded::<usize>(0);
287 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
288
289 scope(|scope| {
290 for _ in 0..THREADS {
291 scope.spawn(|_| {
292 for _ in 0..COUNT {
293 let n = r.recv().unwrap();
294 v[n].fetch_add(1, Ordering::SeqCst);
295 }
296 });
297 }
298 for _ in 0..THREADS {
299 scope.spawn(|_| {
300 for i in 0..COUNT {
301 s.send(i).unwrap();
302 }
303 });
304 }
305 })
306 .unwrap();
307
308 for c in v {
309 assert_eq!(c.load(Ordering::SeqCst), THREADS);
310 }
311 }
312
313 #[test]
stress_oneshot()314 fn stress_oneshot() {
315 #[cfg(miri)]
316 const COUNT: usize = 100;
317 #[cfg(not(miri))]
318 const COUNT: usize = 10_000;
319
320 for _ in 0..COUNT {
321 let (s, r) = bounded(1);
322
323 scope(|scope| {
324 scope.spawn(|_| r.recv().unwrap());
325 scope.spawn(|_| s.send(0).unwrap());
326 })
327 .unwrap();
328 }
329 }
330
331 #[cfg_attr(miri, ignore)] // Miri is too slow
332 #[test]
stress_iter()333 fn stress_iter() {
334 const COUNT: usize = 1000;
335
336 let (request_s, request_r) = bounded(0);
337 let (response_s, response_r) = bounded(0);
338
339 scope(|scope| {
340 scope.spawn(move |_| {
341 let mut count = 0;
342 loop {
343 for x in response_r.try_iter() {
344 count += x;
345 if count == COUNT {
346 return;
347 }
348 }
349 let _ = request_s.try_send(());
350 }
351 });
352
353 for _ in request_r.iter() {
354 if response_s.send(1).is_err() {
355 break;
356 }
357 }
358 })
359 .unwrap();
360 }
361
362 #[test]
stress_timeout_two_threads()363 fn stress_timeout_two_threads() {
364 const COUNT: usize = 100;
365
366 let (s, r) = bounded(0);
367
368 scope(|scope| {
369 scope.spawn(|_| {
370 for i in 0..COUNT {
371 if i % 2 == 0 {
372 thread::sleep(ms(50));
373 }
374 loop {
375 if let Ok(()) = s.send_timeout(i, ms(10)) {
376 break;
377 }
378 }
379 }
380 });
381
382 scope.spawn(|_| {
383 for i in 0..COUNT {
384 if i % 2 == 0 {
385 thread::sleep(ms(50));
386 }
387 loop {
388 if let Ok(x) = r.recv_timeout(ms(10)) {
389 assert_eq!(x, i);
390 break;
391 }
392 }
393 }
394 });
395 })
396 .unwrap();
397 }
398
399 #[cfg_attr(miri, ignore)] // Miri is too slow
400 #[test]
drops()401 fn drops() {
402 const RUNS: usize = 100;
403
404 static DROPS: AtomicUsize = AtomicUsize::new(0);
405
406 #[derive(Debug, PartialEq)]
407 struct DropCounter;
408
409 impl Drop for DropCounter {
410 fn drop(&mut self) {
411 DROPS.fetch_add(1, Ordering::SeqCst);
412 }
413 }
414
415 let mut rng = thread_rng();
416
417 for _ in 0..RUNS {
418 let steps = rng.gen_range(0..3_000);
419
420 DROPS.store(0, Ordering::SeqCst);
421 let (s, r) = bounded::<DropCounter>(0);
422
423 scope(|scope| {
424 scope.spawn(|_| {
425 for _ in 0..steps {
426 r.recv().unwrap();
427 }
428 });
429
430 scope.spawn(|_| {
431 for _ in 0..steps {
432 s.send(DropCounter).unwrap();
433 }
434 });
435 })
436 .unwrap();
437
438 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
439 drop(s);
440 drop(r);
441 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
442 }
443 }
444
445 #[test]
fairness()446 fn fairness() {
447 #[cfg(miri)]
448 const COUNT: usize = 100;
449 #[cfg(not(miri))]
450 const COUNT: usize = 10_000;
451
452 let (s1, r1) = bounded::<()>(0);
453 let (s2, r2) = bounded::<()>(0);
454
455 scope(|scope| {
456 scope.spawn(|_| {
457 let mut hits = [0usize; 2];
458 for _ in 0..COUNT {
459 select! {
460 recv(r1) -> _ => hits[0] += 1,
461 recv(r2) -> _ => hits[1] += 1,
462 }
463 }
464 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
465 });
466
467 let mut hits = [0usize; 2];
468 for _ in 0..COUNT {
469 select! {
470 send(s1, ()) -> _ => hits[0] += 1,
471 send(s2, ()) -> _ => hits[1] += 1,
472 }
473 }
474 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
475 })
476 .unwrap();
477 }
478
479 #[test]
fairness_duplicates()480 fn fairness_duplicates() {
481 #[cfg(miri)]
482 const COUNT: usize = 100;
483 #[cfg(not(miri))]
484 const COUNT: usize = 10_000;
485
486 let (s, r) = bounded::<()>(0);
487
488 scope(|scope| {
489 scope.spawn(|_| {
490 let mut hits = [0usize; 5];
491 for _ in 0..COUNT {
492 select! {
493 recv(r) -> _ => hits[0] += 1,
494 recv(r) -> _ => hits[1] += 1,
495 recv(r) -> _ => hits[2] += 1,
496 recv(r) -> _ => hits[3] += 1,
497 recv(r) -> _ => hits[4] += 1,
498 }
499 }
500 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
501 });
502
503 let mut hits = [0usize; 5];
504 for _ in 0..COUNT {
505 select! {
506 send(s, ()) -> _ => hits[0] += 1,
507 send(s, ()) -> _ => hits[1] += 1,
508 send(s, ()) -> _ => hits[2] += 1,
509 send(s, ()) -> _ => hits[3] += 1,
510 send(s, ()) -> _ => hits[4] += 1,
511 }
512 }
513 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
514 })
515 .unwrap();
516 }
517
518 #[test]
recv_in_send()519 fn recv_in_send() {
520 let (s, r) = bounded(0);
521
522 scope(|scope| {
523 scope.spawn(|_| {
524 thread::sleep(ms(100));
525 r.recv()
526 });
527
528 scope.spawn(|_| {
529 thread::sleep(ms(500));
530 s.send(()).unwrap();
531 });
532
533 select! {
534 send(s, r.recv().unwrap()) -> _ => {}
535 }
536 })
537 .unwrap();
538 }
539
540 #[test]
channel_through_channel()541 fn channel_through_channel() {
542 #[cfg(miri)]
543 const COUNT: usize = 100;
544 #[cfg(not(miri))]
545 const COUNT: usize = 1000;
546
547 type T = Box<dyn Any + Send>;
548
549 let (s, r) = bounded::<T>(0);
550
551 scope(|scope| {
552 scope.spawn(move |_| {
553 let mut s = s;
554
555 for _ in 0..COUNT {
556 let (new_s, new_r) = bounded(0);
557 let new_r: T = Box::new(Some(new_r));
558
559 s.send(new_r).unwrap();
560 s = new_s;
561 }
562 });
563
564 scope.spawn(move |_| {
565 let mut r = r;
566
567 for _ in 0..COUNT {
568 r = r
569 .recv()
570 .unwrap()
571 .downcast_mut::<Option<Receiver<T>>>()
572 .unwrap()
573 .take()
574 .unwrap()
575 }
576 });
577 })
578 .unwrap();
579 }
580