1 //! Tests for the array channel flavor.
2
3 use std::any::Any;
4 use std::sync::atomic::AtomicUsize;
5 use std::sync::atomic::Ordering;
6 use std::thread;
7 use std::time::Duration;
8
9 use crossbeam_channel::{bounded, select, Receiver};
10 use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
11 use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
12 use crossbeam_utils::thread::scope;
13 use rand::{thread_rng, Rng};
14
ms(ms: u64) -> Duration15 fn ms(ms: u64) -> Duration {
16 Duration::from_millis(ms)
17 }
18
19 #[test]
smoke()20 fn smoke() {
21 let (s, r) = bounded(1);
22 s.send(7).unwrap();
23 assert_eq!(r.try_recv(), Ok(7));
24
25 s.send(8).unwrap();
26 assert_eq!(r.recv(), Ok(8));
27
28 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
29 assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
30 }
31
32 #[test]
capacity()33 fn capacity() {
34 for i in 1..10 {
35 let (s, r) = bounded::<()>(i);
36 assert_eq!(s.capacity(), Some(i));
37 assert_eq!(r.capacity(), Some(i));
38 }
39 }
40
41 #[test]
len_empty_full()42 fn len_empty_full() {
43 let (s, r) = bounded(2);
44
45 assert_eq!(s.len(), 0);
46 assert_eq!(s.is_empty(), true);
47 assert_eq!(s.is_full(), false);
48 assert_eq!(r.len(), 0);
49 assert_eq!(r.is_empty(), true);
50 assert_eq!(r.is_full(), false);
51
52 s.send(()).unwrap();
53
54 assert_eq!(s.len(), 1);
55 assert_eq!(s.is_empty(), false);
56 assert_eq!(s.is_full(), false);
57 assert_eq!(r.len(), 1);
58 assert_eq!(r.is_empty(), false);
59 assert_eq!(r.is_full(), false);
60
61 s.send(()).unwrap();
62
63 assert_eq!(s.len(), 2);
64 assert_eq!(s.is_empty(), false);
65 assert_eq!(s.is_full(), true);
66 assert_eq!(r.len(), 2);
67 assert_eq!(r.is_empty(), false);
68 assert_eq!(r.is_full(), true);
69
70 r.recv().unwrap();
71
72 assert_eq!(s.len(), 1);
73 assert_eq!(s.is_empty(), false);
74 assert_eq!(s.is_full(), false);
75 assert_eq!(r.len(), 1);
76 assert_eq!(r.is_empty(), false);
77 assert_eq!(r.is_full(), false);
78 }
79
80 #[test]
try_recv()81 fn try_recv() {
82 let (s, r) = bounded(100);
83
84 scope(|scope| {
85 scope.spawn(move |_| {
86 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
87 thread::sleep(ms(1500));
88 assert_eq!(r.try_recv(), Ok(7));
89 thread::sleep(ms(500));
90 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
91 });
92 scope.spawn(move |_| {
93 thread::sleep(ms(1000));
94 s.send(7).unwrap();
95 });
96 })
97 .unwrap();
98 }
99
100 #[test]
recv()101 fn recv() {
102 let (s, r) = bounded(100);
103
104 scope(|scope| {
105 scope.spawn(move |_| {
106 assert_eq!(r.recv(), Ok(7));
107 thread::sleep(ms(1000));
108 assert_eq!(r.recv(), Ok(8));
109 thread::sleep(ms(1000));
110 assert_eq!(r.recv(), Ok(9));
111 assert_eq!(r.recv(), Err(RecvError));
112 });
113 scope.spawn(move |_| {
114 thread::sleep(ms(1500));
115 s.send(7).unwrap();
116 s.send(8).unwrap();
117 s.send(9).unwrap();
118 });
119 })
120 .unwrap();
121 }
122
123 #[test]
recv_timeout()124 fn recv_timeout() {
125 let (s, r) = bounded::<i32>(100);
126
127 scope(|scope| {
128 scope.spawn(move |_| {
129 assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
130 assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
131 assert_eq!(
132 r.recv_timeout(ms(1000)),
133 Err(RecvTimeoutError::Disconnected)
134 );
135 });
136 scope.spawn(move |_| {
137 thread::sleep(ms(1500));
138 s.send(7).unwrap();
139 });
140 })
141 .unwrap();
142 }
143
144 #[test]
try_send()145 fn try_send() {
146 let (s, r) = bounded(1);
147
148 scope(|scope| {
149 scope.spawn(move |_| {
150 assert_eq!(s.try_send(1), Ok(()));
151 assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
152 thread::sleep(ms(1500));
153 assert_eq!(s.try_send(3), Ok(()));
154 thread::sleep(ms(500));
155 assert_eq!(s.try_send(4), Err(TrySendError::Disconnected(4)));
156 });
157 scope.spawn(move |_| {
158 thread::sleep(ms(1000));
159 assert_eq!(r.try_recv(), Ok(1));
160 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
161 assert_eq!(r.recv(), Ok(3));
162 });
163 })
164 .unwrap();
165 }
166
167 #[test]
send()168 fn send() {
169 let (s, r) = bounded(1);
170
171 scope(|scope| {
172 scope.spawn(|_| {
173 s.send(7).unwrap();
174 thread::sleep(ms(1000));
175 s.send(8).unwrap();
176 thread::sleep(ms(1000));
177 s.send(9).unwrap();
178 thread::sleep(ms(1000));
179 s.send(10).unwrap();
180 });
181 scope.spawn(|_| {
182 thread::sleep(ms(1500));
183 assert_eq!(r.recv(), Ok(7));
184 assert_eq!(r.recv(), Ok(8));
185 assert_eq!(r.recv(), Ok(9));
186 });
187 })
188 .unwrap();
189 }
190
191 #[test]
send_timeout()192 fn send_timeout() {
193 let (s, r) = bounded(2);
194
195 scope(|scope| {
196 scope.spawn(move |_| {
197 assert_eq!(s.send_timeout(1, ms(1000)), Ok(()));
198 assert_eq!(s.send_timeout(2, ms(1000)), Ok(()));
199 assert_eq!(
200 s.send_timeout(3, ms(500)),
201 Err(SendTimeoutError::Timeout(3))
202 );
203 thread::sleep(ms(1000));
204 assert_eq!(s.send_timeout(4, ms(1000)), Ok(()));
205 thread::sleep(ms(1000));
206 assert_eq!(s.send(5), Err(SendError(5)));
207 });
208 scope.spawn(move |_| {
209 thread::sleep(ms(1000));
210 assert_eq!(r.recv(), Ok(1));
211 thread::sleep(ms(1000));
212 assert_eq!(r.recv(), Ok(2));
213 assert_eq!(r.recv(), Ok(4));
214 });
215 })
216 .unwrap();
217 }
218
219 #[test]
send_after_disconnect()220 fn send_after_disconnect() {
221 let (s, r) = bounded(100);
222
223 s.send(1).unwrap();
224 s.send(2).unwrap();
225 s.send(3).unwrap();
226
227 drop(r);
228
229 assert_eq!(s.send(4), Err(SendError(4)));
230 assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5)));
231 assert_eq!(
232 s.send_timeout(6, ms(500)),
233 Err(SendTimeoutError::Disconnected(6))
234 );
235 }
236
237 #[test]
recv_after_disconnect()238 fn recv_after_disconnect() {
239 let (s, r) = bounded(100);
240
241 s.send(1).unwrap();
242 s.send(2).unwrap();
243 s.send(3).unwrap();
244
245 drop(s);
246
247 assert_eq!(r.recv(), Ok(1));
248 assert_eq!(r.recv(), Ok(2));
249 assert_eq!(r.recv(), Ok(3));
250 assert_eq!(r.recv(), Err(RecvError));
251 }
252
253 #[test]
len()254 fn len() {
255 const COUNT: usize = 25_000;
256 const CAP: usize = 1000;
257
258 let (s, r) = bounded(CAP);
259
260 assert_eq!(s.len(), 0);
261 assert_eq!(r.len(), 0);
262
263 for _ in 0..CAP / 10 {
264 for i in 0..50 {
265 s.send(i).unwrap();
266 assert_eq!(s.len(), i + 1);
267 }
268
269 for i in 0..50 {
270 r.recv().unwrap();
271 assert_eq!(r.len(), 50 - i - 1);
272 }
273 }
274
275 assert_eq!(s.len(), 0);
276 assert_eq!(r.len(), 0);
277
278 for i in 0..CAP {
279 s.send(i).unwrap();
280 assert_eq!(s.len(), i + 1);
281 }
282
283 for _ in 0..CAP {
284 r.recv().unwrap();
285 }
286
287 assert_eq!(s.len(), 0);
288 assert_eq!(r.len(), 0);
289
290 scope(|scope| {
291 scope.spawn(|_| {
292 for i in 0..COUNT {
293 assert_eq!(r.recv(), Ok(i));
294 let len = r.len();
295 assert!(len <= CAP);
296 }
297 });
298
299 scope.spawn(|_| {
300 for i in 0..COUNT {
301 s.send(i).unwrap();
302 let len = s.len();
303 assert!(len <= CAP);
304 }
305 });
306 })
307 .unwrap();
308
309 assert_eq!(s.len(), 0);
310 assert_eq!(r.len(), 0);
311 }
312
313 #[test]
disconnect_wakes_sender()314 fn disconnect_wakes_sender() {
315 let (s, r) = bounded(1);
316
317 scope(|scope| {
318 scope.spawn(move |_| {
319 assert_eq!(s.send(()), Ok(()));
320 assert_eq!(s.send(()), Err(SendError(())));
321 });
322 scope.spawn(move |_| {
323 thread::sleep(ms(1000));
324 drop(r);
325 });
326 })
327 .unwrap();
328 }
329
330 #[test]
disconnect_wakes_receiver()331 fn disconnect_wakes_receiver() {
332 let (s, r) = bounded::<()>(1);
333
334 scope(|scope| {
335 scope.spawn(move |_| {
336 assert_eq!(r.recv(), Err(RecvError));
337 });
338 scope.spawn(move |_| {
339 thread::sleep(ms(1000));
340 drop(s);
341 });
342 })
343 .unwrap();
344 }
345
346 #[test]
spsc()347 fn spsc() {
348 const COUNT: usize = 100_000;
349
350 let (s, r) = bounded(3);
351
352 scope(|scope| {
353 scope.spawn(move |_| {
354 for i in 0..COUNT {
355 assert_eq!(r.recv(), Ok(i));
356 }
357 assert_eq!(r.recv(), Err(RecvError));
358 });
359 scope.spawn(move |_| {
360 for i in 0..COUNT {
361 s.send(i).unwrap();
362 }
363 });
364 })
365 .unwrap();
366 }
367
368 #[test]
mpmc()369 fn mpmc() {
370 const COUNT: usize = 25_000;
371 const THREADS: usize = 4;
372
373 let (s, r) = bounded::<usize>(3);
374 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
375
376 scope(|scope| {
377 for _ in 0..THREADS {
378 scope.spawn(|_| {
379 for _ in 0..COUNT {
380 let n = r.recv().unwrap();
381 v[n].fetch_add(1, Ordering::SeqCst);
382 }
383 });
384 }
385 for _ in 0..THREADS {
386 scope.spawn(|_| {
387 for i in 0..COUNT {
388 s.send(i).unwrap();
389 }
390 });
391 }
392 })
393 .unwrap();
394
395 for c in v {
396 assert_eq!(c.load(Ordering::SeqCst), THREADS);
397 }
398 }
399
400 #[test]
stress_oneshot()401 fn stress_oneshot() {
402 const COUNT: usize = 10_000;
403
404 for _ in 0..COUNT {
405 let (s, r) = bounded(1);
406
407 scope(|scope| {
408 scope.spawn(|_| r.recv().unwrap());
409 scope.spawn(|_| s.send(0).unwrap());
410 })
411 .unwrap();
412 }
413 }
414
415 #[test]
stress_iter()416 fn stress_iter() {
417 const COUNT: usize = 100_000;
418
419 let (request_s, request_r) = bounded(1);
420 let (response_s, response_r) = bounded(1);
421
422 scope(|scope| {
423 scope.spawn(move |_| {
424 let mut count = 0;
425 loop {
426 for x in response_r.try_iter() {
427 count += x;
428 if count == COUNT {
429 return;
430 }
431 }
432 request_s.send(()).unwrap();
433 }
434 });
435
436 for _ in request_r.iter() {
437 if response_s.send(1).is_err() {
438 break;
439 }
440 }
441 })
442 .unwrap();
443 }
444
445 #[test]
stress_timeout_two_threads()446 fn stress_timeout_two_threads() {
447 const COUNT: usize = 100;
448
449 let (s, r) = bounded(2);
450
451 scope(|scope| {
452 scope.spawn(|_| {
453 for i in 0..COUNT {
454 if i % 2 == 0 {
455 thread::sleep(ms(50));
456 }
457 loop {
458 if let Ok(()) = s.send_timeout(i, ms(10)) {
459 break;
460 }
461 }
462 }
463 });
464
465 scope.spawn(|_| {
466 for i in 0..COUNT {
467 if i % 2 == 0 {
468 thread::sleep(ms(50));
469 }
470 loop {
471 if let Ok(x) = r.recv_timeout(ms(10)) {
472 assert_eq!(x, i);
473 break;
474 }
475 }
476 }
477 });
478 })
479 .unwrap();
480 }
481
482 #[test]
drops()483 fn drops() {
484 const RUNS: usize = 100;
485
486 static DROPS: AtomicUsize = AtomicUsize::new(0);
487
488 #[derive(Debug, PartialEq)]
489 struct DropCounter;
490
491 impl Drop for DropCounter {
492 fn drop(&mut self) {
493 DROPS.fetch_add(1, Ordering::SeqCst);
494 }
495 }
496
497 let mut rng = thread_rng();
498
499 for _ in 0..RUNS {
500 let steps = rng.gen_range(0, 10_000);
501 let additional = rng.gen_range(0, 50);
502
503 DROPS.store(0, Ordering::SeqCst);
504 let (s, r) = bounded::<DropCounter>(50);
505
506 scope(|scope| {
507 scope.spawn(|_| {
508 for _ in 0..steps {
509 r.recv().unwrap();
510 }
511 });
512
513 scope.spawn(|_| {
514 for _ in 0..steps {
515 s.send(DropCounter).unwrap();
516 }
517 });
518 })
519 .unwrap();
520
521 for _ in 0..additional {
522 s.send(DropCounter).unwrap();
523 }
524
525 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
526 drop(s);
527 drop(r);
528 assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
529 }
530 }
531
532 #[test]
linearizable()533 fn linearizable() {
534 const COUNT: usize = 25_000;
535 const THREADS: usize = 4;
536
537 let (s, r) = bounded(THREADS);
538
539 scope(|scope| {
540 for _ in 0..THREADS {
541 scope.spawn(|_| {
542 for _ in 0..COUNT {
543 s.send(0).unwrap();
544 r.try_recv().unwrap();
545 }
546 });
547 }
548 })
549 .unwrap();
550 }
551
552 #[test]
fairness()553 fn fairness() {
554 const COUNT: usize = 10_000;
555
556 let (s1, r1) = bounded::<()>(COUNT);
557 let (s2, r2) = bounded::<()>(COUNT);
558
559 for _ in 0..COUNT {
560 s1.send(()).unwrap();
561 s2.send(()).unwrap();
562 }
563
564 let mut hits = [0usize; 2];
565 for _ in 0..COUNT {
566 select! {
567 recv(r1) -> _ => hits[0] += 1,
568 recv(r2) -> _ => hits[1] += 1,
569 }
570 }
571 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
572 }
573
574 #[test]
fairness_duplicates()575 fn fairness_duplicates() {
576 const COUNT: usize = 10_000;
577
578 let (s, r) = bounded::<()>(COUNT);
579
580 for _ in 0..COUNT {
581 s.send(()).unwrap();
582 }
583
584 let mut hits = [0usize; 5];
585 for _ in 0..COUNT {
586 select! {
587 recv(r) -> _ => hits[0] += 1,
588 recv(r) -> _ => hits[1] += 1,
589 recv(r) -> _ => hits[2] += 1,
590 recv(r) -> _ => hits[3] += 1,
591 recv(r) -> _ => hits[4] += 1,
592 }
593 }
594 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
595 }
596
597 #[test]
recv_in_send()598 fn recv_in_send() {
599 let (s, _r) = bounded(1);
600 s.send(()).unwrap();
601
602 #[allow(unreachable_code)]
603 {
604 select! {
605 send(s, panic!()) -> _ => panic!(),
606 default => {}
607 }
608 }
609
610 let (s, r) = bounded(2);
611 s.send(()).unwrap();
612
613 select! {
614 send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {}
615 }
616 }
617
618 #[test]
channel_through_channel()619 fn channel_through_channel() {
620 const COUNT: usize = 1000;
621
622 type T = Box<dyn Any + Send>;
623
624 let (s, r) = bounded::<T>(1);
625
626 scope(|scope| {
627 scope.spawn(move |_| {
628 let mut s = s;
629
630 for _ in 0..COUNT {
631 let (new_s, new_r) = bounded(1);
632 let new_r: T = Box::new(Some(new_r));
633
634 s.send(new_r).unwrap();
635 s = new_s;
636 }
637 });
638
639 scope.spawn(move |_| {
640 let mut r = r;
641
642 for _ in 0..COUNT {
643 r = r
644 .recv()
645 .unwrap()
646 .downcast_mut::<Option<Receiver<T>>>()
647 .unwrap()
648 .take()
649 .unwrap()
650 }
651 });
652 })
653 .unwrap();
654 }
655