1 //! Tests for the `select!` macro.
2
3 #![forbid(unsafe_code)] // select! is safe.
4 #![allow(clippy::match_single_binding)]
5
6 use std::any::Any;
7 use std::cell::Cell;
8 use std::ops::Deref;
9 use std::thread;
10 use std::time::{Duration, Instant};
11
12 use crossbeam_channel::{after, bounded, never, select, select_biased, tick, unbounded};
13 use crossbeam_channel::{Receiver, RecvError, SendError, Sender, TryRecvError};
14 use crossbeam_utils::thread::scope;
15
ms(ms: u64) -> Duration16 fn ms(ms: u64) -> Duration {
17 Duration::from_millis(ms)
18 }
19
20 #[test]
smoke1()21 fn smoke1() {
22 let (s1, r1) = unbounded::<usize>();
23 let (s2, r2) = unbounded::<usize>();
24
25 s1.send(1).unwrap();
26
27 select! {
28 recv(r1) -> v => assert_eq!(v, Ok(1)),
29 recv(r2) -> _ => panic!(),
30 }
31
32 s2.send(2).unwrap();
33
34 select! {
35 recv(r1) -> _ => panic!(),
36 recv(r2) -> v => assert_eq!(v, Ok(2)),
37 }
38 }
39
40 #[test]
smoke2()41 fn smoke2() {
42 let (_s1, r1) = unbounded::<i32>();
43 let (_s2, r2) = unbounded::<i32>();
44 let (_s3, r3) = unbounded::<i32>();
45 let (_s4, r4) = unbounded::<i32>();
46 let (s5, r5) = unbounded::<i32>();
47
48 s5.send(5).unwrap();
49
50 select! {
51 recv(r1) -> _ => panic!(),
52 recv(r2) -> _ => panic!(),
53 recv(r3) -> _ => panic!(),
54 recv(r4) -> _ => panic!(),
55 recv(r5) -> v => assert_eq!(v, Ok(5)),
56 }
57 }
58
59 #[test]
disconnected()60 fn disconnected() {
61 let (s1, r1) = unbounded::<i32>();
62 let (s2, r2) = unbounded::<i32>();
63
64 scope(|scope| {
65 scope.spawn(|_| {
66 drop(s1);
67 thread::sleep(ms(500));
68 s2.send(5).unwrap();
69 });
70
71 select! {
72 recv(r1) -> v => assert!(v.is_err()),
73 recv(r2) -> _ => panic!(),
74 default(ms(1000)) => panic!(),
75 }
76
77 r2.recv().unwrap();
78 })
79 .unwrap();
80
81 select! {
82 recv(r1) -> v => assert!(v.is_err()),
83 recv(r2) -> _ => panic!(),
84 default(ms(1000)) => panic!(),
85 }
86
87 scope(|scope| {
88 scope.spawn(|_| {
89 thread::sleep(ms(500));
90 drop(s2);
91 });
92
93 select! {
94 recv(r2) -> v => assert!(v.is_err()),
95 default(ms(1000)) => panic!(),
96 }
97 })
98 .unwrap();
99 }
100
101 #[test]
default()102 fn default() {
103 let (s1, r1) = unbounded::<i32>();
104 let (s2, r2) = unbounded::<i32>();
105
106 select! {
107 recv(r1) -> _ => panic!(),
108 recv(r2) -> _ => panic!(),
109 default => {}
110 }
111
112 drop(s1);
113
114 select! {
115 recv(r1) -> v => assert!(v.is_err()),
116 recv(r2) -> _ => panic!(),
117 default => panic!(),
118 }
119
120 s2.send(2).unwrap();
121
122 select! {
123 recv(r2) -> v => assert_eq!(v, Ok(2)),
124 default => panic!(),
125 }
126
127 select! {
128 recv(r2) -> _ => panic!(),
129 default => {},
130 }
131
132 select! {
133 default => {},
134 }
135 }
136
137 #[test]
timeout()138 fn timeout() {
139 let (_s1, r1) = unbounded::<i32>();
140 let (s2, r2) = unbounded::<i32>();
141
142 scope(|scope| {
143 scope.spawn(|_| {
144 thread::sleep(ms(1500));
145 s2.send(2).unwrap();
146 });
147
148 select! {
149 recv(r1) -> _ => panic!(),
150 recv(r2) -> _ => panic!(),
151 default(ms(1000)) => {},
152 }
153
154 select! {
155 recv(r1) -> _ => panic!(),
156 recv(r2) -> v => assert_eq!(v, Ok(2)),
157 default(ms(1000)) => panic!(),
158 }
159 })
160 .unwrap();
161
162 scope(|scope| {
163 let (s, r) = unbounded::<i32>();
164
165 scope.spawn(move |_| {
166 thread::sleep(ms(500));
167 drop(s);
168 });
169
170 select! {
171 default(ms(1000)) => {
172 select! {
173 recv(r) -> v => assert!(v.is_err()),
174 default => panic!(),
175 }
176 }
177 }
178 })
179 .unwrap();
180 }
181
182 #[test]
default_when_disconnected()183 fn default_when_disconnected() {
184 let (_, r) = unbounded::<i32>();
185
186 select! {
187 recv(r) -> res => assert!(res.is_err()),
188 default => panic!(),
189 }
190
191 let (_, r) = unbounded::<i32>();
192
193 select! {
194 recv(r) -> res => assert!(res.is_err()),
195 default(ms(1000)) => panic!(),
196 }
197
198 let (s, _) = bounded::<i32>(0);
199
200 select! {
201 send(s, 0) -> res => assert!(res.is_err()),
202 default => panic!(),
203 }
204
205 let (s, _) = bounded::<i32>(0);
206
207 select! {
208 send(s, 0) -> res => assert!(res.is_err()),
209 default(ms(1000)) => panic!(),
210 }
211 }
212
213 #[test]
default_only()214 fn default_only() {
215 let start = Instant::now();
216 select! {
217 default => {}
218 }
219 let now = Instant::now();
220 assert!(now - start <= ms(50));
221
222 let start = Instant::now();
223 select! {
224 default(ms(500)) => {}
225 }
226 let now = Instant::now();
227 assert!(now - start >= ms(450));
228 assert!(now - start <= ms(550));
229 }
230
231 #[test]
unblocks()232 fn unblocks() {
233 let (s1, r1) = bounded::<i32>(0);
234 let (s2, r2) = bounded::<i32>(0);
235
236 scope(|scope| {
237 scope.spawn(|_| {
238 thread::sleep(ms(500));
239 s2.send(2).unwrap();
240 });
241
242 select! {
243 recv(r1) -> _ => panic!(),
244 recv(r2) -> v => assert_eq!(v, Ok(2)),
245 default(ms(1000)) => panic!(),
246 }
247 })
248 .unwrap();
249
250 scope(|scope| {
251 scope.spawn(|_| {
252 thread::sleep(ms(500));
253 assert_eq!(r1.recv().unwrap(), 1);
254 });
255
256 select! {
257 send(s1, 1) -> _ => {},
258 send(s2, 2) -> _ => panic!(),
259 default(ms(1000)) => panic!(),
260 }
261 })
262 .unwrap();
263 }
264
265 #[test]
both_ready()266 fn both_ready() {
267 let (s1, r1) = bounded(0);
268 let (s2, r2) = bounded(0);
269
270 scope(|scope| {
271 scope.spawn(|_| {
272 thread::sleep(ms(500));
273 s1.send(1).unwrap();
274 assert_eq!(r2.recv().unwrap(), 2);
275 });
276
277 for _ in 0..2 {
278 select! {
279 recv(r1) -> v => assert_eq!(v, Ok(1)),
280 send(s2, 2) -> _ => {},
281 }
282 }
283 })
284 .unwrap();
285 }
286
287 #[test]
loop_try()288 fn loop_try() {
289 const RUNS: usize = 20;
290
291 for _ in 0..RUNS {
292 let (s1, r1) = bounded::<i32>(0);
293 let (s2, r2) = bounded::<i32>(0);
294 let (s_end, r_end) = bounded::<()>(0);
295
296 scope(|scope| {
297 scope.spawn(|_| loop {
298 select! {
299 send(s1, 1) -> _ => break,
300 default => {}
301 }
302
303 select! {
304 recv(r_end) -> _ => break,
305 default => {}
306 }
307 });
308
309 scope.spawn(|_| loop {
310 if let Ok(x) = r2.try_recv() {
311 assert_eq!(x, 2);
312 break;
313 }
314
315 select! {
316 recv(r_end) -> _ => break,
317 default => {}
318 }
319 });
320
321 scope.spawn(|_| {
322 thread::sleep(ms(500));
323
324 select! {
325 recv(r1) -> v => assert_eq!(v, Ok(1)),
326 send(s2, 2) -> _ => {},
327 default(ms(500)) => panic!(),
328 }
329
330 drop(s_end);
331 });
332 })
333 .unwrap();
334 }
335 }
336
337 #[test]
cloning1()338 fn cloning1() {
339 scope(|scope| {
340 let (s1, r1) = unbounded::<i32>();
341 let (_s2, r2) = unbounded::<i32>();
342 let (s3, r3) = unbounded::<()>();
343
344 scope.spawn(move |_| {
345 r3.recv().unwrap();
346 drop(s1.clone());
347 assert_eq!(r3.try_recv(), Err(TryRecvError::Empty));
348 s1.send(1).unwrap();
349 r3.recv().unwrap();
350 });
351
352 s3.send(()).unwrap();
353
354 select! {
355 recv(r1) -> _ => {},
356 recv(r2) -> _ => {},
357 }
358
359 s3.send(()).unwrap();
360 })
361 .unwrap();
362 }
363
364 #[test]
cloning2()365 fn cloning2() {
366 let (s1, r1) = unbounded::<()>();
367 let (s2, r2) = unbounded::<()>();
368 let (_s3, _r3) = unbounded::<()>();
369
370 scope(|scope| {
371 scope.spawn(move |_| {
372 select! {
373 recv(r1) -> _ => panic!(),
374 recv(r2) -> _ => {},
375 }
376 });
377
378 thread::sleep(ms(500));
379 drop(s1.clone());
380 s2.send(()).unwrap();
381 })
382 .unwrap();
383 }
384
385 #[test]
preflight1()386 fn preflight1() {
387 let (s, r) = unbounded();
388 s.send(()).unwrap();
389
390 select! {
391 recv(r) -> _ => {}
392 }
393 }
394
395 #[test]
preflight2()396 fn preflight2() {
397 let (s, r) = unbounded();
398 drop(s.clone());
399 s.send(()).unwrap();
400 drop(s);
401
402 select! {
403 recv(r) -> v => assert!(v.is_ok()),
404 }
405 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
406 }
407
408 #[test]
preflight3()409 fn preflight3() {
410 let (s, r) = unbounded();
411 drop(s.clone());
412 s.send(()).unwrap();
413 drop(s);
414 r.recv().unwrap();
415
416 select! {
417 recv(r) -> v => assert!(v.is_err())
418 }
419 }
420
421 #[test]
duplicate_operations()422 fn duplicate_operations() {
423 let (s, r) = unbounded::<i32>();
424 let mut hit = [false; 4];
425
426 while hit.iter().any(|hit| !hit) {
427 select! {
428 recv(r) -> _ => hit[0] = true,
429 recv(r) -> _ => hit[1] = true,
430 send(s, 0) -> _ => hit[2] = true,
431 send(s, 0) -> _ => hit[3] = true,
432 }
433 }
434 }
435
436 #[test]
nesting()437 fn nesting() {
438 let (s, r) = unbounded::<i32>();
439
440 select! {
441 send(s, 0) -> _ => {
442 select! {
443 recv(r) -> v => {
444 assert_eq!(v, Ok(0));
445 select! {
446 send(s, 1) -> _ => {
447 select! {
448 recv(r) -> v => {
449 assert_eq!(v, Ok(1));
450 }
451 }
452 }
453 }
454 }
455 }
456 }
457 }
458 }
459
460 #[test]
461 #[should_panic(expected = "send panicked")]
panic_sender()462 fn panic_sender() {
463 fn get() -> Sender<i32> {
464 panic!("send panicked")
465 }
466
467 #[allow(unreachable_code)]
468 {
469 select! {
470 send(get(), panic!()) -> _ => {}
471 }
472 }
473 }
474
475 #[test]
476 #[should_panic(expected = "recv panicked")]
panic_receiver()477 fn panic_receiver() {
478 fn get() -> Receiver<i32> {
479 panic!("recv panicked")
480 }
481
482 select! {
483 recv(get()) -> _ => {}
484 }
485 }
486
487 #[test]
stress_recv()488 fn stress_recv() {
489 #[cfg(miri)]
490 const COUNT: usize = 50;
491 #[cfg(not(miri))]
492 const COUNT: usize = 10_000;
493
494 let (s1, r1) = unbounded();
495 let (s2, r2) = bounded(5);
496 let (s3, r3) = bounded(100);
497
498 scope(|scope| {
499 scope.spawn(|_| {
500 for i in 0..COUNT {
501 s1.send(i).unwrap();
502 r3.recv().unwrap();
503
504 s2.send(i).unwrap();
505 r3.recv().unwrap();
506 }
507 });
508
509 for i in 0..COUNT {
510 for _ in 0..2 {
511 select! {
512 recv(r1) -> v => assert_eq!(v, Ok(i)),
513 recv(r2) -> v => assert_eq!(v, Ok(i)),
514 }
515
516 s3.send(()).unwrap();
517 }
518 }
519 })
520 .unwrap();
521 }
522
523 #[test]
stress_send()524 fn stress_send() {
525 #[cfg(miri)]
526 const COUNT: usize = 100;
527 #[cfg(not(miri))]
528 const COUNT: usize = 10_000;
529
530 let (s1, r1) = bounded(0);
531 let (s2, r2) = bounded(0);
532 let (s3, r3) = bounded(100);
533
534 scope(|scope| {
535 scope.spawn(|_| {
536 for i in 0..COUNT {
537 assert_eq!(r1.recv().unwrap(), i);
538 assert_eq!(r2.recv().unwrap(), i);
539 r3.recv().unwrap();
540 }
541 });
542
543 for i in 0..COUNT {
544 for _ in 0..2 {
545 select! {
546 send(s1, i) -> _ => {},
547 send(s2, i) -> _ => {},
548 }
549 }
550 s3.send(()).unwrap();
551 }
552 })
553 .unwrap();
554 }
555
556 #[test]
stress_mixed()557 fn stress_mixed() {
558 #[cfg(miri)]
559 const COUNT: usize = 100;
560 #[cfg(not(miri))]
561 const COUNT: usize = 10_000;
562
563 let (s1, r1) = bounded(0);
564 let (s2, r2) = bounded(0);
565 let (s3, r3) = bounded(100);
566
567 scope(|scope| {
568 scope.spawn(|_| {
569 for i in 0..COUNT {
570 s1.send(i).unwrap();
571 assert_eq!(r2.recv().unwrap(), i);
572 r3.recv().unwrap();
573 }
574 });
575
576 for i in 0..COUNT {
577 for _ in 0..2 {
578 select! {
579 recv(r1) -> v => assert_eq!(v, Ok(i)),
580 send(s2, i) -> _ => {},
581 }
582 }
583 s3.send(()).unwrap();
584 }
585 })
586 .unwrap();
587 }
588
589 #[test]
stress_timeout_two_threads()590 fn stress_timeout_two_threads() {
591 const COUNT: usize = 20;
592
593 let (s, r) = bounded(2);
594
595 scope(|scope| {
596 scope.spawn(|_| {
597 for i in 0..COUNT {
598 if i % 2 == 0 {
599 thread::sleep(ms(500));
600 }
601
602 loop {
603 select! {
604 send(s, i) -> _ => break,
605 default(ms(100)) => {}
606 }
607 }
608 }
609 });
610
611 scope.spawn(|_| {
612 for i in 0..COUNT {
613 if i % 2 == 0 {
614 thread::sleep(ms(500));
615 }
616
617 loop {
618 select! {
619 recv(r) -> v => {
620 assert_eq!(v, Ok(i));
621 break;
622 }
623 default(ms(100)) => {}
624 }
625 }
626 }
627 });
628 })
629 .unwrap();
630 }
631
632 #[test]
send_recv_same_channel()633 fn send_recv_same_channel() {
634 let (s, r) = bounded::<i32>(0);
635 select! {
636 send(s, 0) -> _ => panic!(),
637 recv(r) -> _ => panic!(),
638 default(ms(500)) => {}
639 }
640
641 let (s, r) = unbounded::<i32>();
642 select! {
643 send(s, 0) -> _ => {},
644 recv(r) -> _ => panic!(),
645 default(ms(500)) => panic!(),
646 }
647 }
648
649 #[test]
matching()650 fn matching() {
651 const THREADS: usize = 44;
652
653 let (s, r) = &bounded::<usize>(0);
654
655 scope(|scope| {
656 for i in 0..THREADS {
657 scope.spawn(move |_| {
658 select! {
659 recv(r) -> v => assert_ne!(v.unwrap(), i),
660 send(s, i) -> _ => {},
661 }
662 });
663 }
664 })
665 .unwrap();
666
667 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
668 }
669
670 #[test]
matching_with_leftover()671 fn matching_with_leftover() {
672 const THREADS: usize = 55;
673
674 let (s, r) = &bounded::<usize>(0);
675
676 scope(|scope| {
677 for i in 0..THREADS {
678 scope.spawn(move |_| {
679 select! {
680 recv(r) -> v => assert_ne!(v.unwrap(), i),
681 send(s, i) -> _ => {},
682 }
683 });
684 }
685 s.send(!0).unwrap();
686 })
687 .unwrap();
688
689 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
690 }
691
692 #[test]
channel_through_channel()693 fn channel_through_channel() {
694 #[cfg(miri)]
695 const COUNT: usize = 100;
696 #[cfg(not(miri))]
697 const COUNT: usize = 1000;
698
699 type T = Box<dyn Any + Send>;
700
701 for cap in 0..3 {
702 let (s, r) = bounded::<T>(cap);
703
704 scope(|scope| {
705 scope.spawn(move |_| {
706 let mut s = s;
707
708 for _ in 0..COUNT {
709 let (new_s, new_r) = bounded(cap);
710 let new_r: T = Box::new(Some(new_r));
711
712 select! {
713 send(s, new_r) -> _ => {}
714 }
715
716 s = new_s;
717 }
718 });
719
720 scope.spawn(move |_| {
721 let mut r = r;
722
723 for _ in 0..COUNT {
724 r = select! {
725 recv(r) -> msg => {
726 msg.unwrap()
727 .downcast_mut::<Option<Receiver<T>>>()
728 .unwrap()
729 .take()
730 .unwrap()
731 }
732 }
733 }
734 });
735 })
736 .unwrap();
737 }
738 }
739
740 #[test]
linearizable_default()741 fn linearizable_default() {
742 #[cfg(miri)]
743 const COUNT: usize = 100;
744 #[cfg(not(miri))]
745 const COUNT: usize = 100_000;
746
747 for step in 0..2 {
748 let (start_s, start_r) = bounded::<()>(0);
749 let (end_s, end_r) = bounded::<()>(0);
750
751 let ((s1, r1), (s2, r2)) = if step == 0 {
752 (bounded::<i32>(1), bounded::<i32>(1))
753 } else {
754 (unbounded::<i32>(), unbounded::<i32>())
755 };
756
757 scope(|scope| {
758 scope.spawn(|_| {
759 for _ in 0..COUNT {
760 start_s.send(()).unwrap();
761
762 s1.send(1).unwrap();
763 select! {
764 recv(r1) -> _ => {}
765 recv(r2) -> _ => {}
766 default => unreachable!()
767 }
768
769 end_s.send(()).unwrap();
770 let _ = r2.try_recv();
771 }
772 });
773
774 for _ in 0..COUNT {
775 start_r.recv().unwrap();
776
777 s2.send(1).unwrap();
778 let _ = r1.try_recv();
779
780 end_r.recv().unwrap();
781 }
782 })
783 .unwrap();
784 }
785 }
786
787 #[test]
linearizable_timeout()788 fn linearizable_timeout() {
789 #[cfg(miri)]
790 const COUNT: usize = 100;
791 #[cfg(not(miri))]
792 const COUNT: usize = 100_000;
793
794 for step in 0..2 {
795 let (start_s, start_r) = bounded::<()>(0);
796 let (end_s, end_r) = bounded::<()>(0);
797
798 let ((s1, r1), (s2, r2)) = if step == 0 {
799 (bounded::<i32>(1), bounded::<i32>(1))
800 } else {
801 (unbounded::<i32>(), unbounded::<i32>())
802 };
803
804 scope(|scope| {
805 scope.spawn(|_| {
806 for _ in 0..COUNT {
807 start_s.send(()).unwrap();
808
809 s1.send(1).unwrap();
810 select! {
811 recv(r1) -> _ => {}
812 recv(r2) -> _ => {}
813 default(ms(0)) => unreachable!()
814 }
815
816 end_s.send(()).unwrap();
817 let _ = r2.try_recv();
818 }
819 });
820
821 for _ in 0..COUNT {
822 start_r.recv().unwrap();
823
824 s2.send(1).unwrap();
825 let _ = r1.try_recv();
826
827 end_r.recv().unwrap();
828 }
829 })
830 .unwrap();
831 }
832 }
833
834 #[test]
fairness1()835 fn fairness1() {
836 #[cfg(miri)]
837 const COUNT: usize = 100;
838 #[cfg(not(miri))]
839 const COUNT: usize = 10_000;
840
841 let (s1, r1) = bounded::<()>(COUNT);
842 let (s2, r2) = unbounded::<()>();
843
844 for _ in 0..COUNT {
845 s1.send(()).unwrap();
846 s2.send(()).unwrap();
847 }
848
849 let mut hits = [0usize; 4];
850 for _ in 0..COUNT {
851 select! {
852 recv(r1) -> _ => hits[0] += 1,
853 recv(r2) -> _ => hits[1] += 1,
854 recv(after(ms(0))) -> _ => hits[2] += 1,
855 recv(tick(ms(0))) -> _ => hits[3] += 1,
856 }
857 }
858 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
859 }
860
861 #[cfg_attr(crossbeam_sanitize, ignore)] // TODO: flaky: https://github.com/crossbeam-rs/crossbeam/issues/1094
862 #[test]
fairness2()863 fn fairness2() {
864 #[cfg(miri)]
865 const COUNT: usize = 100;
866 #[cfg(not(miri))]
867 const COUNT: usize = 10_000;
868
869 let (s1, r1) = unbounded::<()>();
870 let (s2, r2) = bounded::<()>(1);
871 let (s3, r3) = bounded::<()>(0);
872
873 scope(|scope| {
874 scope.spawn(|_| {
875 let (hole, _r) = bounded(0);
876
877 for _ in 0..COUNT {
878 let s1 = if s1.is_empty() { &s1 } else { &hole };
879 let s2 = if s2.is_empty() { &s2 } else { &hole };
880
881 select! {
882 send(s1, ()) -> res => assert!(res.is_ok()),
883 send(s2, ()) -> res => assert!(res.is_ok()),
884 send(s3, ()) -> res => assert!(res.is_ok()),
885 }
886 }
887 });
888
889 let hits = vec![Cell::new(0usize); 3];
890 for _ in 0..COUNT {
891 select! {
892 recv(r1) -> _ => hits[0].set(hits[0].get() + 1),
893 recv(r2) -> _ => hits[1].set(hits[1].get() + 1),
894 recv(r3) -> _ => hits[2].set(hits[2].get() + 1),
895 }
896 }
897 assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50));
898 })
899 .unwrap();
900 }
901
902 #[test]
fairness_recv()903 fn fairness_recv() {
904 #[cfg(miri)]
905 const COUNT: usize = 100;
906 #[cfg(not(miri))]
907 const COUNT: usize = 10_000;
908
909 let (s1, r1) = bounded::<()>(COUNT);
910 let (s2, r2) = unbounded::<()>();
911
912 for _ in 0..COUNT {
913 s1.send(()).unwrap();
914 s2.send(()).unwrap();
915 }
916
917 let mut hits = [0usize; 2];
918 while hits[0] + hits[1] < COUNT {
919 select! {
920 recv(r1) -> _ => hits[0] += 1,
921 recv(r2) -> _ => hits[1] += 1,
922 }
923 }
924 assert!(hits.iter().all(|x| *x >= COUNT / 4));
925 }
926
927 #[test]
fairness_send()928 fn fairness_send() {
929 #[cfg(miri)]
930 const COUNT: usize = 100;
931 #[cfg(not(miri))]
932 const COUNT: usize = 10_000;
933
934 let (s1, _r1) = bounded::<()>(COUNT);
935 let (s2, _r2) = unbounded::<()>();
936
937 let mut hits = [0usize; 2];
938 for _ in 0..COUNT {
939 select! {
940 send(s1, ()) -> _ => hits[0] += 1,
941 send(s2, ()) -> _ => hits[1] += 1,
942 }
943 }
944 assert!(hits.iter().all(|x| *x >= COUNT / 4));
945 }
946
947 #[test]
unfairness()948 fn unfairness() {
949 #[cfg(miri)]
950 const COUNT: usize = 100;
951 #[cfg(not(miri))]
952 const COUNT: usize = 10_000;
953
954 let (s1, r1) = unbounded::<()>();
955 let (s2, r2) = unbounded::<()>();
956 let (s3, r3) = unbounded::<()>();
957
958 for _ in 0..COUNT {
959 s1.send(()).unwrap();
960 s2.send(()).unwrap();
961 }
962 s3.send(()).unwrap();
963
964 let mut hits = [0usize; 3];
965 for _ in 0..COUNT {
966 select_biased! {
967 recv(r1) -> _ => hits[0] += 1,
968 recv(r2) -> _ => hits[1] += 1,
969 recv(r3) -> _ => hits[2] += 1,
970 }
971 }
972 assert_eq!(hits, [COUNT, 0, 0]);
973
974 for _ in 0..COUNT {
975 select_biased! {
976 recv(r1) -> _ => hits[0] += 1,
977 recv(r2) -> _ => hits[1] += 1,
978 recv(r3) -> _ => hits[2] += 1,
979 }
980 }
981 assert_eq!(hits, [COUNT, COUNT, 0]);
982 }
983
984 #[test]
unfairness_timeout()985 fn unfairness_timeout() {
986 #[cfg(miri)]
987 const COUNT: usize = 100;
988 #[cfg(not(miri))]
989 const COUNT: usize = 10_000;
990
991 let (s1, r1) = unbounded::<()>();
992 let (s2, r2) = unbounded::<()>();
993 let (s3, r3) = unbounded::<()>();
994
995 for _ in 0..COUNT {
996 s1.send(()).unwrap();
997 s2.send(()).unwrap();
998 }
999 s3.send(()).unwrap();
1000
1001 let mut hits = [0usize; 3];
1002 for _ in 0..COUNT {
1003 select_biased! {
1004 recv(r1) -> _ => hits[0] += 1,
1005 recv(r2) -> _ => hits[1] += 1,
1006 recv(r3) -> _ => hits[2] += 1,
1007 default(ms(1000)) => unreachable!(),
1008 }
1009 }
1010 assert_eq!(hits, [COUNT, 0, 0]);
1011
1012 for _ in 0..COUNT {
1013 select_biased! {
1014 recv(r1) -> _ => hits[0] += 1,
1015 recv(r2) -> _ => hits[1] += 1,
1016 recv(r3) -> _ => hits[2] += 1,
1017 default(ms(1000)) => unreachable!(),
1018 }
1019 }
1020 assert_eq!(hits, [COUNT, COUNT, 0]);
1021 }
1022
1023 #[test]
unfairness_try()1024 fn unfairness_try() {
1025 #[cfg(miri)]
1026 const COUNT: usize = 100;
1027 #[cfg(not(miri))]
1028 const COUNT: usize = 10_000;
1029
1030 let (s1, r1) = unbounded::<()>();
1031 let (s2, r2) = unbounded::<()>();
1032 let (s3, r3) = unbounded::<()>();
1033
1034 for _ in 0..COUNT {
1035 s1.send(()).unwrap();
1036 s2.send(()).unwrap();
1037 }
1038 s3.send(()).unwrap();
1039
1040 let mut hits = [0usize; 3];
1041 for _ in 0..COUNT {
1042 select_biased! {
1043 recv(r1) -> _ => hits[0] += 1,
1044 recv(r2) -> _ => hits[1] += 1,
1045 recv(r3) -> _ => hits[2] += 1,
1046 default() => unreachable!(),
1047 }
1048 }
1049 assert_eq!(hits, [COUNT, 0, 0]);
1050
1051 for _ in 0..COUNT {
1052 select_biased! {
1053 recv(r1) -> _ => hits[0] += 1,
1054 recv(r2) -> _ => hits[1] += 1,
1055 recv(r3) -> _ => hits[2] += 1,
1056 default() => unreachable!(),
1057 }
1058 }
1059 assert_eq!(hits, [COUNT, COUNT, 0]);
1060 }
1061
1062 #[allow(clippy::or_fun_call, clippy::unnecessary_literal_unwrap)] // This is intentional.
1063 #[test]
references()1064 fn references() {
1065 let (s, r) = unbounded::<i32>();
1066 select! {
1067 send(s, 0) -> _ => {}
1068 recv(r) -> _ => {}
1069 }
1070 select! {
1071 send(&&&&s, 0) -> _ => {}
1072 recv(&&&&r) -> _ => {}
1073 }
1074 select! {
1075 recv(Some(&r).unwrap_or(&never())) -> _ => {},
1076 default => {}
1077 }
1078 select! {
1079 recv(Some(r).unwrap_or(never())) -> _ => {},
1080 default => {}
1081 }
1082 }
1083
1084 #[test]
case_blocks()1085 fn case_blocks() {
1086 let (s, r) = unbounded::<i32>();
1087
1088 select! {
1089 recv(r) -> _ => 3.0,
1090 recv(r) -> _ => loop {
1091 unreachable!()
1092 },
1093 recv(r) -> _ => match 7 + 3 {
1094 _ => unreachable!()
1095 },
1096 default => 7.
1097 };
1098
1099 select! {
1100 recv(r) -> msg => if msg.is_ok() {
1101 unreachable!()
1102 },
1103 default => ()
1104 }
1105
1106 drop(s);
1107 }
1108
1109 #[allow(clippy::redundant_closure_call)] // This is intentional.
1110 #[test]
move_handles()1111 fn move_handles() {
1112 let (s, r) = unbounded::<i32>();
1113 select! {
1114 recv((move || r)()) -> _ => {}
1115 send((move || s)(), 0) -> _ => {}
1116 }
1117 }
1118
1119 #[test]
infer_types()1120 fn infer_types() {
1121 let (s, r) = unbounded();
1122 select! {
1123 recv(r) -> _ => {}
1124 default => {}
1125 }
1126 s.send(()).unwrap();
1127
1128 let (s, r) = unbounded();
1129 select! {
1130 send(s, ()) -> _ => {}
1131 }
1132 r.recv().unwrap();
1133 }
1134
1135 #[test]
default_syntax()1136 fn default_syntax() {
1137 let (s, r) = bounded::<i32>(0);
1138
1139 select! {
1140 recv(r) -> _ => panic!(),
1141 default => {}
1142 }
1143 select! {
1144 send(s, 0) -> _ => panic!(),
1145 default() => {}
1146 }
1147 select! {
1148 default => {}
1149 }
1150 select! {
1151 default() => {}
1152 }
1153 }
1154
1155 #[test]
same_variable_name()1156 fn same_variable_name() {
1157 let (_, r) = unbounded::<i32>();
1158 select! {
1159 recv(r) -> r => assert!(r.is_err()),
1160 }
1161 }
1162
1163 #[test]
handles_on_heap()1164 fn handles_on_heap() {
1165 let (s, r) = unbounded::<i32>();
1166 let (s, r) = (Box::new(s), Box::new(r));
1167
1168 select! {
1169 send(*s, 0) -> _ => {}
1170 recv(*r) -> _ => {}
1171 default => {}
1172 }
1173
1174 drop(s);
1175 drop(r);
1176 }
1177
1178 #[test]
once_blocks()1179 fn once_blocks() {
1180 let (s, r) = unbounded::<i32>();
1181
1182 let once = Box::new(());
1183 select! {
1184 send(s, 0) -> _ => drop(once),
1185 }
1186
1187 let once = Box::new(());
1188 select! {
1189 recv(r) -> _ => drop(once),
1190 }
1191
1192 let once1 = Box::new(());
1193 let once2 = Box::new(());
1194 select! {
1195 send(s, 0) -> _ => drop(once1),
1196 default => drop(once2),
1197 }
1198
1199 let once1 = Box::new(());
1200 let once2 = Box::new(());
1201 select! {
1202 recv(r) -> _ => drop(once1),
1203 default => drop(once2),
1204 }
1205
1206 let once1 = Box::new(());
1207 let once2 = Box::new(());
1208 select! {
1209 recv(r) -> _ => drop(once1),
1210 send(s, 0) -> _ => drop(once2),
1211 }
1212 }
1213
1214 #[test]
once_receiver()1215 fn once_receiver() {
1216 let (_, r) = unbounded::<i32>();
1217
1218 let once = Box::new(());
1219 let get = move || {
1220 drop(once);
1221 r
1222 };
1223
1224 select! {
1225 recv(get()) -> _ => {}
1226 }
1227 }
1228
1229 #[test]
once_sender()1230 fn once_sender() {
1231 let (s, _) = unbounded::<i32>();
1232
1233 let once = Box::new(());
1234 let get = move || {
1235 drop(once);
1236 s
1237 };
1238
1239 select! {
1240 send(get(), 5) -> _ => {}
1241 }
1242 }
1243
1244 #[test]
parse_nesting()1245 fn parse_nesting() {
1246 let (_, r) = unbounded::<i32>();
1247
1248 select! {
1249 recv(r) -> _ => {}
1250 recv(r) -> _ => {
1251 select! {
1252 recv(r) -> _ => {}
1253 recv(r) -> _ => {
1254 select! {
1255 recv(r) -> _ => {}
1256 recv(r) -> _ => {
1257 select! {
1258 default => {}
1259 }
1260 }
1261 }
1262 }
1263 }
1264 }
1265 }
1266 }
1267
1268 #[test]
evaluate()1269 fn evaluate() {
1270 let (s, r) = unbounded::<i32>();
1271
1272 let v = select! {
1273 recv(r) -> _ => "foo".into(),
1274 send(s, 0) -> _ => "bar".to_owned(),
1275 default => "baz".to_string(),
1276 };
1277 assert_eq!(v, "bar");
1278
1279 let v = select! {
1280 recv(r) -> _ => "foo".into(),
1281 default => "baz".to_string(),
1282 };
1283 assert_eq!(v, "foo");
1284
1285 let v = select! {
1286 recv(r) -> _ => "foo".into(),
1287 default => "baz".to_string(),
1288 };
1289 assert_eq!(v, "baz");
1290 }
1291
1292 #[test]
deref()1293 fn deref() {
1294 use crossbeam_channel as cc;
1295
1296 struct Sender<T>(cc::Sender<T>);
1297 struct Receiver<T>(cc::Receiver<T>);
1298
1299 impl<T> Deref for Receiver<T> {
1300 type Target = cc::Receiver<T>;
1301
1302 fn deref(&self) -> &Self::Target {
1303 &self.0
1304 }
1305 }
1306
1307 impl<T> Deref for Sender<T> {
1308 type Target = cc::Sender<T>;
1309
1310 fn deref(&self) -> &Self::Target {
1311 &self.0
1312 }
1313 }
1314
1315 let (s, r) = bounded::<i32>(0);
1316 let (s, r) = (Sender(s), Receiver(r));
1317
1318 select! {
1319 send(s, 0) -> _ => panic!(),
1320 recv(r) -> _ => panic!(),
1321 default => {}
1322 }
1323 }
1324
1325 #[test]
result_types()1326 fn result_types() {
1327 let (s, _) = bounded::<i32>(0);
1328 let (_, r) = bounded::<i32>(0);
1329
1330 select! {
1331 recv(r) -> res => { let _: Result<i32, RecvError> = res; },
1332 }
1333 select! {
1334 recv(r) -> res => { let _: Result<i32, RecvError> = res; },
1335 default => {}
1336 }
1337 select! {
1338 recv(r) -> res => { let _: Result<i32, RecvError> = res; },
1339 default(ms(0)) => {}
1340 }
1341
1342 select! {
1343 send(s, 0) -> res => { let _: Result<(), SendError<i32>> = res; },
1344 }
1345 select! {
1346 send(s, 0) -> res => { let _: Result<(), SendError<i32>> = res; },
1347 default => {}
1348 }
1349 select! {
1350 send(s, 0) -> res => { let _: Result<(), SendError<i32>> = res; },
1351 default(ms(0)) => {}
1352 }
1353
1354 select! {
1355 send(s, 0) -> res => { let _: Result<(), SendError<i32>> = res; },
1356 recv(r) -> res => { let _: Result<i32, RecvError> = res; },
1357 }
1358 }
1359
1360 #[test]
try_recv()1361 fn try_recv() {
1362 let (s, r) = bounded(0);
1363
1364 scope(|scope| {
1365 scope.spawn(move |_| {
1366 select! {
1367 recv(r) -> _ => panic!(),
1368 default => {}
1369 }
1370 thread::sleep(ms(1500));
1371 select! {
1372 recv(r) -> v => assert_eq!(v, Ok(7)),
1373 default => panic!(),
1374 }
1375 thread::sleep(ms(500));
1376 select! {
1377 recv(r) -> v => assert_eq!(v, Err(RecvError)),
1378 default => panic!(),
1379 }
1380 });
1381 scope.spawn(move |_| {
1382 thread::sleep(ms(1000));
1383 select! {
1384 send(s, 7) -> res => res.unwrap(),
1385 }
1386 });
1387 })
1388 .unwrap();
1389 }
1390
1391 #[test]
recv()1392 fn recv() {
1393 let (s, r) = bounded(0);
1394
1395 scope(|scope| {
1396 scope.spawn(move |_| {
1397 select! {
1398 recv(r) -> v => assert_eq!(v, Ok(7)),
1399 }
1400 thread::sleep(ms(1000));
1401 select! {
1402 recv(r) -> v => assert_eq!(v, Ok(8)),
1403 }
1404 thread::sleep(ms(1000));
1405 select! {
1406 recv(r) -> v => assert_eq!(v, Ok(9)),
1407 }
1408 select! {
1409 recv(r) -> v => assert_eq!(v, Err(RecvError)),
1410 }
1411 });
1412 scope.spawn(move |_| {
1413 thread::sleep(ms(1500));
1414 select! {
1415 send(s, 7) -> res => res.unwrap(),
1416 }
1417 select! {
1418 send(s, 8) -> res => res.unwrap(),
1419 }
1420 select! {
1421 send(s, 9) -> res => res.unwrap(),
1422 }
1423 });
1424 })
1425 .unwrap();
1426 }
1427
1428 #[test]
recv_timeout()1429 fn recv_timeout() {
1430 let (s, r) = bounded::<i32>(0);
1431
1432 scope(|scope| {
1433 scope.spawn(move |_| {
1434 select! {
1435 recv(r) -> _ => panic!(),
1436 default(ms(1000)) => {}
1437 }
1438 select! {
1439 recv(r) -> v => assert_eq!(v, Ok(7)),
1440 default(ms(1000)) => panic!(),
1441 }
1442 select! {
1443 recv(r) -> v => assert_eq!(v, Err(RecvError)),
1444 default(ms(1000)) => panic!(),
1445 }
1446 });
1447 scope.spawn(move |_| {
1448 thread::sleep(ms(1500));
1449 select! {
1450 send(s, 7) -> res => res.unwrap(),
1451 }
1452 });
1453 })
1454 .unwrap();
1455 }
1456
1457 #[test]
try_send()1458 fn try_send() {
1459 let (s, r) = bounded(0);
1460
1461 scope(|scope| {
1462 scope.spawn(move |_| {
1463 select! {
1464 send(s, 7) -> _ => panic!(),
1465 default => {}
1466 }
1467 thread::sleep(ms(1500));
1468 select! {
1469 send(s, 8) -> res => res.unwrap(),
1470 default => panic!(),
1471 }
1472 thread::sleep(ms(500));
1473 select! {
1474 send(s, 8) -> res => assert_eq!(res, Err(SendError(8))),
1475 default => panic!(),
1476 }
1477 });
1478 scope.spawn(move |_| {
1479 thread::sleep(ms(1000));
1480 select! {
1481 recv(r) -> v => assert_eq!(v, Ok(8)),
1482 }
1483 });
1484 })
1485 .unwrap();
1486 }
1487
1488 #[test]
send()1489 fn send() {
1490 let (s, r) = bounded(0);
1491
1492 scope(|scope| {
1493 scope.spawn(move |_| {
1494 select! {
1495 send(s, 7) -> res => res.unwrap(),
1496 }
1497 thread::sleep(ms(1000));
1498 select! {
1499 send(s, 8) -> res => res.unwrap(),
1500 }
1501 thread::sleep(ms(1000));
1502 select! {
1503 send(s, 9) -> res => res.unwrap(),
1504 }
1505 });
1506 scope.spawn(move |_| {
1507 thread::sleep(ms(1500));
1508 select! {
1509 recv(r) -> v => assert_eq!(v, Ok(7)),
1510 }
1511 select! {
1512 recv(r) -> v => assert_eq!(v, Ok(8)),
1513 }
1514 select! {
1515 recv(r) -> v => assert_eq!(v, Ok(9)),
1516 }
1517 });
1518 })
1519 .unwrap();
1520 }
1521
1522 #[test]
send_timeout()1523 fn send_timeout() {
1524 let (s, r) = bounded(0);
1525
1526 scope(|scope| {
1527 scope.spawn(move |_| {
1528 select! {
1529 send(s, 7) -> _ => panic!(),
1530 default(ms(1000)) => {}
1531 }
1532 select! {
1533 send(s, 8) -> res => res.unwrap(),
1534 default(ms(1000)) => panic!(),
1535 }
1536 select! {
1537 send(s, 9) -> res => assert_eq!(res, Err(SendError(9))),
1538 default(ms(1000)) => panic!(),
1539 }
1540 });
1541 scope.spawn(move |_| {
1542 thread::sleep(ms(1500));
1543 select! {
1544 recv(r) -> v => assert_eq!(v, Ok(8)),
1545 }
1546 });
1547 })
1548 .unwrap();
1549 }
1550
1551 #[test]
disconnect_wakes_sender()1552 fn disconnect_wakes_sender() {
1553 let (s, r) = bounded(0);
1554
1555 scope(|scope| {
1556 scope.spawn(move |_| {
1557 select! {
1558 send(s, ()) -> res => assert_eq!(res, Err(SendError(()))),
1559 }
1560 });
1561 scope.spawn(move |_| {
1562 thread::sleep(ms(1000));
1563 drop(r);
1564 });
1565 })
1566 .unwrap();
1567 }
1568
1569 #[test]
disconnect_wakes_receiver()1570 fn disconnect_wakes_receiver() {
1571 let (s, r) = bounded::<()>(0);
1572
1573 scope(|scope| {
1574 scope.spawn(move |_| {
1575 select! {
1576 recv(r) -> res => assert_eq!(res, Err(RecvError)),
1577 }
1578 });
1579 scope.spawn(move |_| {
1580 thread::sleep(ms(1000));
1581 drop(s);
1582 });
1583 })
1584 .unwrap();
1585 }
1586
1587 #[test]
trailing_comma()1588 fn trailing_comma() {
1589 let (s, r) = unbounded::<usize>();
1590
1591 select! {
1592 send(s, 1,) -> _ => {},
1593 recv(r,) -> _ => {},
1594 default(ms(1000),) => {},
1595 }
1596 }
1597