1 //! Tests for channel readiness using the `Select` struct.
2
3 use std::any::Any;
4 use std::cell::Cell;
5 use std::thread;
6 use std::time::{Duration, Instant};
7
8 use crossbeam_channel::{after, bounded, tick, unbounded};
9 use crossbeam_channel::{Receiver, Select, TryRecvError, TrySendError};
10 use crossbeam_utils::thread::scope;
11
ms(ms: u64) -> Duration12 fn ms(ms: u64) -> Duration {
13 Duration::from_millis(ms)
14 }
15
16 #[test]
smoke1()17 fn smoke1() {
18 let (s1, r1) = unbounded::<usize>();
19 let (s2, r2) = unbounded::<usize>();
20
21 s1.send(1).unwrap();
22
23 let mut sel = Select::new();
24 sel.recv(&r1);
25 sel.recv(&r2);
26 assert_eq!(sel.ready(), 0);
27 assert_eq!(r1.try_recv(), Ok(1));
28
29 s2.send(2).unwrap();
30
31 let mut sel = Select::new();
32 sel.recv(&r1);
33 sel.recv(&r2);
34 assert_eq!(sel.ready(), 1);
35 assert_eq!(r2.try_recv(), Ok(2));
36 }
37
38 #[test]
smoke2()39 fn smoke2() {
40 let (_s1, r1) = unbounded::<i32>();
41 let (_s2, r2) = unbounded::<i32>();
42 let (_s3, r3) = unbounded::<i32>();
43 let (_s4, r4) = unbounded::<i32>();
44 let (s5, r5) = unbounded::<i32>();
45
46 s5.send(5).unwrap();
47
48 let mut sel = Select::new();
49 sel.recv(&r1);
50 sel.recv(&r2);
51 sel.recv(&r3);
52 sel.recv(&r4);
53 sel.recv(&r5);
54 assert_eq!(sel.ready(), 4);
55 assert_eq!(r5.try_recv(), Ok(5));
56 }
57
58 #[test]
disconnected()59 fn disconnected() {
60 let (s1, r1) = unbounded::<i32>();
61 let (s2, r2) = unbounded::<i32>();
62
63 scope(|scope| {
64 scope.spawn(|_| {
65 drop(s1);
66 thread::sleep(ms(500));
67 s2.send(5).unwrap();
68 });
69
70 let mut sel = Select::new();
71 sel.recv(&r1);
72 sel.recv(&r2);
73 match sel.ready_timeout(ms(1000)) {
74 Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)),
75 _ => panic!(),
76 }
77
78 r2.recv().unwrap();
79 })
80 .unwrap();
81
82 let mut sel = Select::new();
83 sel.recv(&r1);
84 sel.recv(&r2);
85 match sel.ready_timeout(ms(1000)) {
86 Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)),
87 _ => panic!(),
88 }
89
90 scope(|scope| {
91 scope.spawn(|_| {
92 thread::sleep(ms(500));
93 drop(s2);
94 });
95
96 let mut sel = Select::new();
97 sel.recv(&r2);
98 match sel.ready_timeout(ms(1000)) {
99 Ok(0) => assert_eq!(r2.try_recv(), Err(TryRecvError::Disconnected)),
100 _ => panic!(),
101 }
102 })
103 .unwrap();
104 }
105
106 #[test]
default()107 fn default() {
108 let (s1, r1) = unbounded::<i32>();
109 let (s2, r2) = unbounded::<i32>();
110
111 let mut sel = Select::new();
112 sel.recv(&r1);
113 sel.recv(&r2);
114 assert!(sel.try_ready().is_err());
115
116 drop(s1);
117
118 let mut sel = Select::new();
119 sel.recv(&r1);
120 sel.recv(&r2);
121 match sel.try_ready() {
122 Ok(0) => assert!(r1.try_recv().is_err()),
123 _ => panic!(),
124 }
125
126 s2.send(2).unwrap();
127
128 let mut sel = Select::new();
129 sel.recv(&r2);
130 match sel.try_ready() {
131 Ok(0) => assert_eq!(r2.try_recv(), Ok(2)),
132 _ => panic!(),
133 }
134
135 let mut sel = Select::new();
136 sel.recv(&r2);
137 assert!(sel.try_ready().is_err());
138
139 let mut sel = Select::new();
140 assert!(sel.try_ready().is_err());
141 }
142
143 #[test]
timeout()144 fn timeout() {
145 let (_s1, r1) = unbounded::<i32>();
146 let (s2, r2) = unbounded::<i32>();
147
148 scope(|scope| {
149 scope.spawn(|_| {
150 thread::sleep(ms(1500));
151 s2.send(2).unwrap();
152 });
153
154 let mut sel = Select::new();
155 sel.recv(&r1);
156 sel.recv(&r2);
157 assert!(sel.ready_timeout(ms(1000)).is_err());
158
159 let mut sel = Select::new();
160 sel.recv(&r1);
161 sel.recv(&r2);
162 match sel.ready_timeout(ms(1000)) {
163 Ok(1) => assert_eq!(r2.try_recv(), Ok(2)),
164 _ => panic!(),
165 }
166 })
167 .unwrap();
168
169 scope(|scope| {
170 let (s, r) = unbounded::<i32>();
171
172 scope.spawn(move |_| {
173 thread::sleep(ms(500));
174 drop(s);
175 });
176
177 let mut sel = Select::new();
178 assert!(sel.ready_timeout(ms(1000)).is_err());
179
180 let mut sel = Select::new();
181 sel.recv(&r);
182 match sel.try_ready() {
183 Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
184 _ => panic!(),
185 }
186 })
187 .unwrap();
188 }
189
190 #[test]
default_when_disconnected()191 fn default_when_disconnected() {
192 let (_, r) = unbounded::<i32>();
193
194 let mut sel = Select::new();
195 sel.recv(&r);
196 match sel.try_ready() {
197 Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
198 _ => panic!(),
199 }
200
201 let (_, r) = unbounded::<i32>();
202
203 let mut sel = Select::new();
204 sel.recv(&r);
205 match sel.ready_timeout(ms(1000)) {
206 Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
207 _ => panic!(),
208 }
209
210 let (s, _) = bounded::<i32>(0);
211
212 let mut sel = Select::new();
213 sel.send(&s);
214 match sel.try_ready() {
215 Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))),
216 _ => panic!(),
217 }
218
219 let (s, _) = bounded::<i32>(0);
220
221 let mut sel = Select::new();
222 sel.send(&s);
223 match sel.ready_timeout(ms(1000)) {
224 Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))),
225 _ => panic!(),
226 }
227 }
228
229 #[test]
default_only()230 fn default_only() {
231 let start = Instant::now();
232
233 let mut sel = Select::new();
234 assert!(sel.try_ready().is_err());
235 let now = Instant::now();
236 assert!(now - start <= ms(50));
237
238 let start = Instant::now();
239 let mut sel = Select::new();
240 assert!(sel.ready_timeout(ms(500)).is_err());
241 let now = Instant::now();
242 assert!(now - start >= ms(450));
243 assert!(now - start <= ms(550));
244 }
245
246 #[test]
unblocks()247 fn unblocks() {
248 let (s1, r1) = bounded::<i32>(0);
249 let (s2, r2) = bounded::<i32>(0);
250
251 scope(|scope| {
252 scope.spawn(|_| {
253 thread::sleep(ms(500));
254 s2.send(2).unwrap();
255 });
256
257 let mut sel = Select::new();
258 sel.recv(&r1);
259 sel.recv(&r2);
260 match sel.ready_timeout(ms(1000)) {
261 Ok(1) => assert_eq!(r2.try_recv(), Ok(2)),
262 _ => panic!(),
263 }
264 })
265 .unwrap();
266
267 scope(|scope| {
268 scope.spawn(|_| {
269 thread::sleep(ms(500));
270 assert_eq!(r1.recv().unwrap(), 1);
271 });
272
273 let mut sel = Select::new();
274 let oper1 = sel.send(&s1);
275 let oper2 = sel.send(&s2);
276 let oper = sel.select_timeout(ms(1000));
277 match oper {
278 Err(_) => panic!(),
279 Ok(oper) => match oper.index() {
280 i if i == oper1 => oper.send(&s1, 1).unwrap(),
281 i if i == oper2 => panic!(),
282 _ => unreachable!(),
283 },
284 }
285 })
286 .unwrap();
287 }
288
289 #[test]
both_ready()290 fn both_ready() {
291 let (s1, r1) = bounded(0);
292 let (s2, r2) = bounded(0);
293
294 scope(|scope| {
295 scope.spawn(|_| {
296 thread::sleep(ms(500));
297 s1.send(1).unwrap();
298 assert_eq!(r2.recv().unwrap(), 2);
299 });
300
301 for _ in 0..2 {
302 let mut sel = Select::new();
303 sel.recv(&r1);
304 sel.send(&s2);
305 match sel.ready() {
306 0 => assert_eq!(r1.try_recv(), Ok(1)),
307 1 => s2.try_send(2).unwrap(),
308 _ => panic!(),
309 }
310 }
311 })
312 .unwrap();
313 }
314
315 #[test]
cloning1()316 fn cloning1() {
317 scope(|scope| {
318 let (s1, r1) = unbounded::<i32>();
319 let (_s2, r2) = unbounded::<i32>();
320 let (s3, r3) = unbounded::<()>();
321
322 scope.spawn(move |_| {
323 r3.recv().unwrap();
324 drop(s1.clone());
325 assert!(r3.try_recv().is_err());
326 s1.send(1).unwrap();
327 r3.recv().unwrap();
328 });
329
330 s3.send(()).unwrap();
331
332 let mut sel = Select::new();
333 sel.recv(&r1);
334 sel.recv(&r2);
335 match sel.ready() {
336 0 => drop(r1.try_recv()),
337 1 => drop(r2.try_recv()),
338 _ => panic!(),
339 }
340
341 s3.send(()).unwrap();
342 })
343 .unwrap();
344 }
345
346 #[test]
cloning2()347 fn cloning2() {
348 let (s1, r1) = unbounded::<()>();
349 let (s2, r2) = unbounded::<()>();
350 let (_s3, _r3) = unbounded::<()>();
351
352 scope(|scope| {
353 scope.spawn(move |_| {
354 let mut sel = Select::new();
355 sel.recv(&r1);
356 sel.recv(&r2);
357 match sel.ready() {
358 0 => panic!(),
359 1 => drop(r2.try_recv()),
360 _ => panic!(),
361 }
362 });
363
364 thread::sleep(ms(500));
365 drop(s1.clone());
366 s2.send(()).unwrap();
367 })
368 .unwrap();
369 }
370
371 #[test]
preflight1()372 fn preflight1() {
373 let (s, r) = unbounded();
374 s.send(()).unwrap();
375
376 let mut sel = Select::new();
377 sel.recv(&r);
378 match sel.ready() {
379 0 => drop(r.try_recv()),
380 _ => panic!(),
381 }
382 }
383
384 #[test]
preflight2()385 fn preflight2() {
386 let (s, r) = unbounded();
387 drop(s.clone());
388 s.send(()).unwrap();
389 drop(s);
390
391 let mut sel = Select::new();
392 sel.recv(&r);
393 match sel.ready() {
394 0 => assert_eq!(r.try_recv(), Ok(())),
395 _ => panic!(),
396 }
397
398 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
399 }
400
401 #[test]
preflight3()402 fn preflight3() {
403 let (s, r) = unbounded();
404 drop(s.clone());
405 s.send(()).unwrap();
406 drop(s);
407 r.recv().unwrap();
408
409 let mut sel = Select::new();
410 sel.recv(&r);
411 match sel.ready() {
412 0 => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
413 _ => panic!(),
414 }
415 }
416
417 #[test]
duplicate_operations()418 fn duplicate_operations() {
419 let (s, r) = unbounded::<i32>();
420 let hit = vec![Cell::new(false); 4];
421
422 while hit.iter().map(|h| h.get()).any(|hit| !hit) {
423 let mut sel = Select::new();
424 sel.recv(&r);
425 sel.recv(&r);
426 sel.send(&s);
427 sel.send(&s);
428 match sel.ready() {
429 0 => {
430 assert!(r.try_recv().is_ok());
431 hit[0].set(true);
432 }
433 1 => {
434 assert!(r.try_recv().is_ok());
435 hit[1].set(true);
436 }
437 2 => {
438 assert!(s.try_send(0).is_ok());
439 hit[2].set(true);
440 }
441 3 => {
442 assert!(s.try_send(0).is_ok());
443 hit[3].set(true);
444 }
445 _ => panic!(),
446 }
447 }
448 }
449
450 #[test]
nesting()451 fn nesting() {
452 let (s, r) = unbounded::<i32>();
453
454 let mut sel = Select::new();
455 sel.send(&s);
456 match sel.ready() {
457 0 => {
458 assert!(s.try_send(0).is_ok());
459
460 let mut sel = Select::new();
461 sel.recv(&r);
462 match sel.ready() {
463 0 => {
464 assert_eq!(r.try_recv(), Ok(0));
465
466 let mut sel = Select::new();
467 sel.send(&s);
468 match sel.ready() {
469 0 => {
470 assert!(s.try_send(1).is_ok());
471
472 let mut sel = Select::new();
473 sel.recv(&r);
474 match sel.ready() {
475 0 => {
476 assert_eq!(r.try_recv(), Ok(1));
477 }
478 _ => panic!(),
479 }
480 }
481 _ => panic!(),
482 }
483 }
484 _ => panic!(),
485 }
486 }
487 _ => panic!(),
488 }
489 }
490
491 #[test]
stress_recv()492 fn stress_recv() {
493 const COUNT: usize = 10_000;
494
495 let (s1, r1) = unbounded();
496 let (s2, r2) = bounded(5);
497 let (s3, r3) = bounded(0);
498
499 scope(|scope| {
500 scope.spawn(|_| {
501 for i in 0..COUNT {
502 s1.send(i).unwrap();
503 r3.recv().unwrap();
504
505 s2.send(i).unwrap();
506 r3.recv().unwrap();
507 }
508 });
509
510 for i in 0..COUNT {
511 for _ in 0..2 {
512 let mut sel = Select::new();
513 sel.recv(&r1);
514 sel.recv(&r2);
515 match sel.ready() {
516 0 => assert_eq!(r1.try_recv(), Ok(i)),
517 1 => assert_eq!(r2.try_recv(), Ok(i)),
518 _ => panic!(),
519 }
520
521 s3.send(()).unwrap();
522 }
523 }
524 })
525 .unwrap();
526 }
527
528 #[test]
stress_send()529 fn stress_send() {
530 const COUNT: usize = 10_000;
531
532 let (s1, r1) = bounded(0);
533 let (s2, r2) = bounded(0);
534 let (s3, r3) = bounded(100);
535
536 scope(|scope| {
537 scope.spawn(|_| {
538 for i in 0..COUNT {
539 assert_eq!(r1.recv().unwrap(), i);
540 assert_eq!(r2.recv().unwrap(), i);
541 r3.recv().unwrap();
542 }
543 });
544
545 for i in 0..COUNT {
546 for _ in 0..2 {
547 let mut sel = Select::new();
548 sel.send(&s1);
549 sel.send(&s2);
550 match sel.ready() {
551 0 => assert!(s1.try_send(i).is_ok()),
552 1 => assert!(s2.try_send(i).is_ok()),
553 _ => panic!(),
554 }
555 }
556 s3.send(()).unwrap();
557 }
558 })
559 .unwrap();
560 }
561
562 #[test]
stress_mixed()563 fn stress_mixed() {
564 const COUNT: usize = 10_000;
565
566 let (s1, r1) = bounded(0);
567 let (s2, r2) = bounded(0);
568 let (s3, r3) = bounded(100);
569
570 scope(|scope| {
571 scope.spawn(|_| {
572 for i in 0..COUNT {
573 s1.send(i).unwrap();
574 assert_eq!(r2.recv().unwrap(), i);
575 r3.recv().unwrap();
576 }
577 });
578
579 for i in 0..COUNT {
580 for _ in 0..2 {
581 let mut sel = Select::new();
582 sel.recv(&r1);
583 sel.send(&s2);
584 match sel.ready() {
585 0 => assert_eq!(r1.try_recv(), Ok(i)),
586 1 => assert!(s2.try_send(i).is_ok()),
587 _ => panic!(),
588 }
589 }
590 s3.send(()).unwrap();
591 }
592 })
593 .unwrap();
594 }
595
596 #[test]
stress_timeout_two_threads()597 fn stress_timeout_two_threads() {
598 const COUNT: usize = 20;
599
600 let (s, r) = bounded(2);
601
602 scope(|scope| {
603 scope.spawn(|_| {
604 for i in 0..COUNT {
605 if i % 2 == 0 {
606 thread::sleep(ms(500));
607 }
608
609 let done = false;
610 while !done {
611 let mut sel = Select::new();
612 sel.send(&s);
613 match sel.ready_timeout(ms(100)) {
614 Err(_) => {}
615 Ok(0) => {
616 assert!(s.try_send(i).is_ok());
617 break;
618 }
619 Ok(_) => panic!(),
620 }
621 }
622 }
623 });
624
625 scope.spawn(|_| {
626 for i in 0..COUNT {
627 if i % 2 == 0 {
628 thread::sleep(ms(500));
629 }
630
631 let mut done = false;
632 while !done {
633 let mut sel = Select::new();
634 sel.recv(&r);
635 match sel.ready_timeout(ms(100)) {
636 Err(_) => {}
637 Ok(0) => {
638 assert_eq!(r.try_recv(), Ok(i));
639 done = true;
640 }
641 Ok(_) => panic!(),
642 }
643 }
644 }
645 });
646 })
647 .unwrap();
648 }
649
650 #[test]
send_recv_same_channel()651 fn send_recv_same_channel() {
652 let (s, r) = bounded::<i32>(0);
653 let mut sel = Select::new();
654 sel.send(&s);
655 sel.recv(&r);
656 assert!(sel.ready_timeout(ms(100)).is_err());
657
658 let (s, r) = unbounded::<i32>();
659 let mut sel = Select::new();
660 sel.send(&s);
661 sel.recv(&r);
662 match sel.ready_timeout(ms(100)) {
663 Err(_) => panic!(),
664 Ok(0) => assert!(s.try_send(0).is_ok()),
665 Ok(_) => panic!(),
666 }
667 }
668
669 #[test]
channel_through_channel()670 fn channel_through_channel() {
671 const COUNT: usize = 1000;
672
673 type T = Box<dyn Any + Send>;
674
675 for cap in 1..4 {
676 let (s, r) = bounded::<T>(cap);
677
678 scope(|scope| {
679 scope.spawn(move |_| {
680 let mut s = s;
681
682 for _ in 0..COUNT {
683 let (new_s, new_r) = bounded(cap);
684 let new_r: T = Box::new(Some(new_r));
685
686 {
687 let mut sel = Select::new();
688 sel.send(&s);
689 match sel.ready() {
690 0 => assert!(s.try_send(new_r).is_ok()),
691 _ => panic!(),
692 }
693 }
694
695 s = new_s;
696 }
697 });
698
699 scope.spawn(move |_| {
700 let mut r = r;
701
702 for _ in 0..COUNT {
703 let new = {
704 let mut sel = Select::new();
705 sel.recv(&r);
706 match sel.ready() {
707 0 => r
708 .try_recv()
709 .unwrap()
710 .downcast_mut::<Option<Receiver<T>>>()
711 .unwrap()
712 .take()
713 .unwrap(),
714 _ => panic!(),
715 }
716 };
717 r = new;
718 }
719 });
720 })
721 .unwrap();
722 }
723 }
724
725 #[test]
fairness1()726 fn fairness1() {
727 const COUNT: usize = 10_000;
728
729 let (s1, r1) = bounded::<()>(COUNT);
730 let (s2, r2) = unbounded::<()>();
731
732 for _ in 0..COUNT {
733 s1.send(()).unwrap();
734 s2.send(()).unwrap();
735 }
736
737 let hits = vec![Cell::new(0usize); 4];
738 for _ in 0..COUNT {
739 let after = after(ms(0));
740 let tick = tick(ms(0));
741
742 let mut sel = Select::new();
743 sel.recv(&r1);
744 sel.recv(&r2);
745 sel.recv(&after);
746 sel.recv(&tick);
747 match sel.ready() {
748 0 => {
749 r1.try_recv().unwrap();
750 hits[0].set(hits[0].get() + 1);
751 }
752 1 => {
753 r2.try_recv().unwrap();
754 hits[1].set(hits[1].get() + 1);
755 }
756 2 => {
757 after.try_recv().unwrap();
758 hits[2].set(hits[2].get() + 1);
759 }
760 3 => {
761 tick.try_recv().unwrap();
762 hits[3].set(hits[3].get() + 1);
763 }
764 _ => panic!(),
765 }
766 }
767 assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
768 }
769
770 #[test]
fairness2()771 fn fairness2() {
772 const COUNT: usize = 100_000;
773
774 let (s1, r1) = unbounded::<()>();
775 let (s2, r2) = bounded::<()>(1);
776 let (s3, r3) = bounded::<()>(0);
777
778 scope(|scope| {
779 scope.spawn(|_| {
780 for _ in 0..COUNT {
781 let mut sel = Select::new();
782 let mut oper1 = None;
783 let mut oper2 = None;
784 if s1.is_empty() {
785 oper1 = Some(sel.send(&s1));
786 }
787 if s2.is_empty() {
788 oper2 = Some(sel.send(&s2));
789 }
790 let oper3 = sel.send(&s3);
791 let oper = sel.select();
792 match oper.index() {
793 i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
794 i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
795 i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
796 _ => unreachable!(),
797 }
798 }
799 });
800
801 let hits = vec![Cell::new(0usize); 3];
802 for _ in 0..COUNT {
803 let mut sel = Select::new();
804 sel.recv(&r1);
805 sel.recv(&r2);
806 sel.recv(&r3);
807 loop {
808 match sel.ready() {
809 0 => {
810 if r1.try_recv().is_ok() {
811 hits[0].set(hits[0].get() + 1);
812 break;
813 }
814 }
815 1 => {
816 if r2.try_recv().is_ok() {
817 hits[1].set(hits[1].get() + 1);
818 break;
819 }
820 }
821 2 => {
822 if r3.try_recv().is_ok() {
823 hits[2].set(hits[2].get() + 1);
824 break;
825 }
826 }
827 _ => unreachable!(),
828 }
829 }
830 }
831 assert!(hits.iter().all(|x| x.get() > 0));
832 })
833 .unwrap();
834 }
835