1 //! Tests for the `select!` macro.
2
3 #![forbid(unsafe_code)] // select! is safe.
4 #![allow(clippy::drop_copy, clippy::match_single_binding)]
5
6 use std::any::Any;
7 use std::cell::Cell;
8 use std::ops::Deref;
9 use std::thread;
10 use std::time::{Duration, Instant};
11
12 use crossbeam_channel::{after, bounded, never, select, tick, unbounded};
13 use crossbeam_channel::{Receiver, RecvError, SendError, Sender, TryRecvError};
14 use crossbeam_utils::thread::scope;
15
ms(ms: u64) -> Duration16 fn ms(ms: u64) -> Duration {
17 Duration::from_millis(ms)
18 }
19
20 #[test]
smoke1()21 fn smoke1() {
22 let (s1, r1) = unbounded::<usize>();
23 let (s2, r2) = unbounded::<usize>();
24
25 s1.send(1).unwrap();
26
27 select! {
28 recv(r1) -> v => assert_eq!(v, Ok(1)),
29 recv(r2) -> _ => panic!(),
30 }
31
32 s2.send(2).unwrap();
33
34 select! {
35 recv(r1) -> _ => panic!(),
36 recv(r2) -> v => assert_eq!(v, Ok(2)),
37 }
38 }
39
40 #[test]
smoke2()41 fn smoke2() {
42 let (_s1, r1) = unbounded::<i32>();
43 let (_s2, r2) = unbounded::<i32>();
44 let (_s3, r3) = unbounded::<i32>();
45 let (_s4, r4) = unbounded::<i32>();
46 let (s5, r5) = unbounded::<i32>();
47
48 s5.send(5).unwrap();
49
50 select! {
51 recv(r1) -> _ => panic!(),
52 recv(r2) -> _ => panic!(),
53 recv(r3) -> _ => panic!(),
54 recv(r4) -> _ => panic!(),
55 recv(r5) -> v => assert_eq!(v, Ok(5)),
56 }
57 }
58
59 #[test]
disconnected()60 fn disconnected() {
61 let (s1, r1) = unbounded::<i32>();
62 let (s2, r2) = unbounded::<i32>();
63
64 scope(|scope| {
65 scope.spawn(|_| {
66 drop(s1);
67 thread::sleep(ms(500));
68 s2.send(5).unwrap();
69 });
70
71 select! {
72 recv(r1) -> v => assert!(v.is_err()),
73 recv(r2) -> _ => panic!(),
74 default(ms(1000)) => panic!(),
75 }
76
77 r2.recv().unwrap();
78 })
79 .unwrap();
80
81 select! {
82 recv(r1) -> v => assert!(v.is_err()),
83 recv(r2) -> _ => panic!(),
84 default(ms(1000)) => panic!(),
85 }
86
87 scope(|scope| {
88 scope.spawn(|_| {
89 thread::sleep(ms(500));
90 drop(s2);
91 });
92
93 select! {
94 recv(r2) -> v => assert!(v.is_err()),
95 default(ms(1000)) => panic!(),
96 }
97 })
98 .unwrap();
99 }
100
101 #[test]
default()102 fn default() {
103 let (s1, r1) = unbounded::<i32>();
104 let (s2, r2) = unbounded::<i32>();
105
106 select! {
107 recv(r1) -> _ => panic!(),
108 recv(r2) -> _ => panic!(),
109 default => {}
110 }
111
112 drop(s1);
113
114 select! {
115 recv(r1) -> v => assert!(v.is_err()),
116 recv(r2) -> _ => panic!(),
117 default => panic!(),
118 }
119
120 s2.send(2).unwrap();
121
122 select! {
123 recv(r2) -> v => assert_eq!(v, Ok(2)),
124 default => panic!(),
125 }
126
127 select! {
128 recv(r2) -> _ => panic!(),
129 default => {},
130 }
131
132 select! {
133 default => {},
134 }
135 }
136
137 #[test]
timeout()138 fn timeout() {
139 let (_s1, r1) = unbounded::<i32>();
140 let (s2, r2) = unbounded::<i32>();
141
142 scope(|scope| {
143 scope.spawn(|_| {
144 thread::sleep(ms(1500));
145 s2.send(2).unwrap();
146 });
147
148 select! {
149 recv(r1) -> _ => panic!(),
150 recv(r2) -> _ => panic!(),
151 default(ms(1000)) => {},
152 }
153
154 select! {
155 recv(r1) -> _ => panic!(),
156 recv(r2) -> v => assert_eq!(v, Ok(2)),
157 default(ms(1000)) => panic!(),
158 }
159 })
160 .unwrap();
161
162 scope(|scope| {
163 let (s, r) = unbounded::<i32>();
164
165 scope.spawn(move |_| {
166 thread::sleep(ms(500));
167 drop(s);
168 });
169
170 select! {
171 default(ms(1000)) => {
172 select! {
173 recv(r) -> v => assert!(v.is_err()),
174 default => panic!(),
175 }
176 }
177 }
178 })
179 .unwrap();
180 }
181
182 #[test]
default_when_disconnected()183 fn default_when_disconnected() {
184 let (_, r) = unbounded::<i32>();
185
186 select! {
187 recv(r) -> res => assert!(res.is_err()),
188 default => panic!(),
189 }
190
191 let (_, r) = unbounded::<i32>();
192
193 select! {
194 recv(r) -> res => assert!(res.is_err()),
195 default(ms(1000)) => panic!(),
196 }
197
198 let (s, _) = bounded::<i32>(0);
199
200 select! {
201 send(s, 0) -> res => assert!(res.is_err()),
202 default => panic!(),
203 }
204
205 let (s, _) = bounded::<i32>(0);
206
207 select! {
208 send(s, 0) -> res => assert!(res.is_err()),
209 default(ms(1000)) => panic!(),
210 }
211 }
212
213 #[test]
default_only()214 fn default_only() {
215 let start = Instant::now();
216 select! {
217 default => {}
218 }
219 let now = Instant::now();
220 assert!(now - start <= ms(50));
221
222 let start = Instant::now();
223 select! {
224 default(ms(500)) => {}
225 }
226 let now = Instant::now();
227 assert!(now - start >= ms(450));
228 assert!(now - start <= ms(550));
229 }
230
231 #[test]
unblocks()232 fn unblocks() {
233 let (s1, r1) = bounded::<i32>(0);
234 let (s2, r2) = bounded::<i32>(0);
235
236 scope(|scope| {
237 scope.spawn(|_| {
238 thread::sleep(ms(500));
239 s2.send(2).unwrap();
240 });
241
242 select! {
243 recv(r1) -> _ => panic!(),
244 recv(r2) -> v => assert_eq!(v, Ok(2)),
245 default(ms(1000)) => panic!(),
246 }
247 })
248 .unwrap();
249
250 scope(|scope| {
251 scope.spawn(|_| {
252 thread::sleep(ms(500));
253 assert_eq!(r1.recv().unwrap(), 1);
254 });
255
256 select! {
257 send(s1, 1) -> _ => {},
258 send(s2, 2) -> _ => panic!(),
259 default(ms(1000)) => panic!(),
260 }
261 })
262 .unwrap();
263 }
264
265 #[test]
both_ready()266 fn both_ready() {
267 let (s1, r1) = bounded(0);
268 let (s2, r2) = bounded(0);
269
270 scope(|scope| {
271 scope.spawn(|_| {
272 thread::sleep(ms(500));
273 s1.send(1).unwrap();
274 assert_eq!(r2.recv().unwrap(), 2);
275 });
276
277 for _ in 0..2 {
278 select! {
279 recv(r1) -> v => assert_eq!(v, Ok(1)),
280 send(s2, 2) -> _ => {},
281 }
282 }
283 })
284 .unwrap();
285 }
286
287 #[cfg_attr(miri, ignore)] // Miri is too slow
288 #[test]
loop_try()289 fn loop_try() {
290 const RUNS: usize = 20;
291
292 for _ in 0..RUNS {
293 let (s1, r1) = bounded::<i32>(0);
294 let (s2, r2) = bounded::<i32>(0);
295 let (s_end, r_end) = bounded::<()>(0);
296
297 scope(|scope| {
298 scope.spawn(|_| loop {
299 select! {
300 send(s1, 1) -> _ => break,
301 default => {}
302 }
303
304 select! {
305 recv(r_end) -> _ => break,
306 default => {}
307 }
308 });
309
310 scope.spawn(|_| loop {
311 if let Ok(x) = r2.try_recv() {
312 assert_eq!(x, 2);
313 break;
314 }
315
316 select! {
317 recv(r_end) -> _ => break,
318 default => {}
319 }
320 });
321
322 scope.spawn(|_| {
323 thread::sleep(ms(500));
324
325 select! {
326 recv(r1) -> v => assert_eq!(v, Ok(1)),
327 send(s2, 2) -> _ => {},
328 default(ms(500)) => panic!(),
329 }
330
331 drop(s_end);
332 });
333 })
334 .unwrap();
335 }
336 }
337
338 #[test]
cloning1()339 fn cloning1() {
340 scope(|scope| {
341 let (s1, r1) = unbounded::<i32>();
342 let (_s2, r2) = unbounded::<i32>();
343 let (s3, r3) = unbounded::<()>();
344
345 scope.spawn(move |_| {
346 r3.recv().unwrap();
347 drop(s1.clone());
348 assert_eq!(r3.try_recv(), Err(TryRecvError::Empty));
349 s1.send(1).unwrap();
350 r3.recv().unwrap();
351 });
352
353 s3.send(()).unwrap();
354
355 select! {
356 recv(r1) -> _ => {},
357 recv(r2) -> _ => {},
358 }
359
360 s3.send(()).unwrap();
361 })
362 .unwrap();
363 }
364
365 #[test]
cloning2()366 fn cloning2() {
367 let (s1, r1) = unbounded::<()>();
368 let (s2, r2) = unbounded::<()>();
369 let (_s3, _r3) = unbounded::<()>();
370
371 scope(|scope| {
372 scope.spawn(move |_| {
373 select! {
374 recv(r1) -> _ => panic!(),
375 recv(r2) -> _ => {},
376 }
377 });
378
379 thread::sleep(ms(500));
380 drop(s1.clone());
381 s2.send(()).unwrap();
382 })
383 .unwrap();
384 }
385
386 #[test]
preflight1()387 fn preflight1() {
388 let (s, r) = unbounded();
389 s.send(()).unwrap();
390
391 select! {
392 recv(r) -> _ => {}
393 }
394 }
395
396 #[test]
preflight2()397 fn preflight2() {
398 let (s, r) = unbounded();
399 drop(s.clone());
400 s.send(()).unwrap();
401 drop(s);
402
403 select! {
404 recv(r) -> v => assert!(v.is_ok()),
405 }
406 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
407 }
408
409 #[test]
preflight3()410 fn preflight3() {
411 let (s, r) = unbounded();
412 drop(s.clone());
413 s.send(()).unwrap();
414 drop(s);
415 r.recv().unwrap();
416
417 select! {
418 recv(r) -> v => assert!(v.is_err())
419 }
420 }
421
422 #[test]
duplicate_operations()423 fn duplicate_operations() {
424 let (s, r) = unbounded::<i32>();
425 let mut hit = [false; 4];
426
427 while hit.iter().any(|hit| !hit) {
428 select! {
429 recv(r) -> _ => hit[0] = true,
430 recv(r) -> _ => hit[1] = true,
431 send(s, 0) -> _ => hit[2] = true,
432 send(s, 0) -> _ => hit[3] = true,
433 }
434 }
435 }
436
437 #[test]
nesting()438 fn nesting() {
439 let (s, r) = unbounded::<i32>();
440
441 select! {
442 send(s, 0) -> _ => {
443 select! {
444 recv(r) -> v => {
445 assert_eq!(v, Ok(0));
446 select! {
447 send(s, 1) -> _ => {
448 select! {
449 recv(r) -> v => {
450 assert_eq!(v, Ok(1));
451 }
452 }
453 }
454 }
455 }
456 }
457 }
458 }
459 }
460
461 #[test]
462 #[should_panic(expected = "send panicked")]
panic_sender()463 fn panic_sender() {
464 fn get() -> Sender<i32> {
465 panic!("send panicked")
466 }
467
468 #[allow(unreachable_code)]
469 {
470 select! {
471 send(get(), panic!()) -> _ => {}
472 }
473 }
474 }
475
476 #[test]
477 #[should_panic(expected = "recv panicked")]
panic_receiver()478 fn panic_receiver() {
479 fn get() -> Receiver<i32> {
480 panic!("recv panicked")
481 }
482
483 select! {
484 recv(get()) -> _ => {}
485 }
486 }
487
488 #[test]
stress_recv()489 fn stress_recv() {
490 #[cfg(miri)]
491 const COUNT: usize = 100;
492 #[cfg(not(miri))]
493 const COUNT: usize = 10_000;
494
495 let (s1, r1) = unbounded();
496 let (s2, r2) = bounded(5);
497 let (s3, r3) = bounded(100);
498
499 scope(|scope| {
500 scope.spawn(|_| {
501 for i in 0..COUNT {
502 s1.send(i).unwrap();
503 r3.recv().unwrap();
504
505 s2.send(i).unwrap();
506 r3.recv().unwrap();
507 }
508 });
509
510 for i in 0..COUNT {
511 for _ in 0..2 {
512 select! {
513 recv(r1) -> v => assert_eq!(v, Ok(i)),
514 recv(r2) -> v => assert_eq!(v, Ok(i)),
515 }
516
517 s3.send(()).unwrap();
518 }
519 }
520 })
521 .unwrap();
522 }
523
524 #[test]
stress_send()525 fn stress_send() {
526 #[cfg(miri)]
527 const COUNT: usize = 100;
528 #[cfg(not(miri))]
529 const COUNT: usize = 10_000;
530
531 let (s1, r1) = bounded(0);
532 let (s2, r2) = bounded(0);
533 let (s3, r3) = bounded(100);
534
535 scope(|scope| {
536 scope.spawn(|_| {
537 for i in 0..COUNT {
538 assert_eq!(r1.recv().unwrap(), i);
539 assert_eq!(r2.recv().unwrap(), i);
540 r3.recv().unwrap();
541 }
542 });
543
544 for i in 0..COUNT {
545 for _ in 0..2 {
546 select! {
547 send(s1, i) -> _ => {},
548 send(s2, i) -> _ => {},
549 }
550 }
551 s3.send(()).unwrap();
552 }
553 })
554 .unwrap();
555 }
556
557 #[test]
stress_mixed()558 fn stress_mixed() {
559 #[cfg(miri)]
560 const COUNT: usize = 100;
561 #[cfg(not(miri))]
562 const COUNT: usize = 10_000;
563
564 let (s1, r1) = bounded(0);
565 let (s2, r2) = bounded(0);
566 let (s3, r3) = bounded(100);
567
568 scope(|scope| {
569 scope.spawn(|_| {
570 for i in 0..COUNT {
571 s1.send(i).unwrap();
572 assert_eq!(r2.recv().unwrap(), i);
573 r3.recv().unwrap();
574 }
575 });
576
577 for i in 0..COUNT {
578 for _ in 0..2 {
579 select! {
580 recv(r1) -> v => assert_eq!(v, Ok(i)),
581 send(s2, i) -> _ => {},
582 }
583 }
584 s3.send(()).unwrap();
585 }
586 })
587 .unwrap();
588 }
589
590 #[test]
stress_timeout_two_threads()591 fn stress_timeout_two_threads() {
592 const COUNT: usize = 20;
593
594 let (s, r) = bounded(2);
595
596 scope(|scope| {
597 scope.spawn(|_| {
598 for i in 0..COUNT {
599 if i % 2 == 0 {
600 thread::sleep(ms(500));
601 }
602
603 loop {
604 select! {
605 send(s, i) -> _ => break,
606 default(ms(100)) => {}
607 }
608 }
609 }
610 });
611
612 scope.spawn(|_| {
613 for i in 0..COUNT {
614 if i % 2 == 0 {
615 thread::sleep(ms(500));
616 }
617
618 loop {
619 select! {
620 recv(r) -> v => {
621 assert_eq!(v, Ok(i));
622 break;
623 }
624 default(ms(100)) => {}
625 }
626 }
627 }
628 });
629 })
630 .unwrap();
631 }
632
633 #[test]
send_recv_same_channel()634 fn send_recv_same_channel() {
635 let (s, r) = bounded::<i32>(0);
636 select! {
637 send(s, 0) -> _ => panic!(),
638 recv(r) -> _ => panic!(),
639 default(ms(500)) => {}
640 }
641
642 let (s, r) = unbounded::<i32>();
643 select! {
644 send(s, 0) -> _ => {},
645 recv(r) -> _ => panic!(),
646 default(ms(500)) => panic!(),
647 }
648 }
649
650 #[test]
matching()651 fn matching() {
652 const THREADS: usize = 44;
653
654 let (s, r) = &bounded::<usize>(0);
655
656 scope(|scope| {
657 for i in 0..THREADS {
658 scope.spawn(move |_| {
659 select! {
660 recv(r) -> v => assert_ne!(v.unwrap(), i),
661 send(s, i) -> _ => {},
662 }
663 });
664 }
665 })
666 .unwrap();
667
668 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
669 }
670
671 #[test]
matching_with_leftover()672 fn matching_with_leftover() {
673 const THREADS: usize = 55;
674
675 let (s, r) = &bounded::<usize>(0);
676
677 scope(|scope| {
678 for i in 0..THREADS {
679 scope.spawn(move |_| {
680 select! {
681 recv(r) -> v => assert_ne!(v.unwrap(), i),
682 send(s, i) -> _ => {},
683 }
684 });
685 }
686 s.send(!0).unwrap();
687 })
688 .unwrap();
689
690 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
691 }
692
693 #[test]
channel_through_channel()694 fn channel_through_channel() {
695 #[cfg(miri)]
696 const COUNT: usize = 100;
697 #[cfg(not(miri))]
698 const COUNT: usize = 1000;
699
700 type T = Box<dyn Any + Send>;
701
702 for cap in 0..3 {
703 let (s, r) = bounded::<T>(cap);
704
705 scope(|scope| {
706 scope.spawn(move |_| {
707 let mut s = s;
708
709 for _ in 0..COUNT {
710 let (new_s, new_r) = bounded(cap);
711 let new_r: T = Box::new(Some(new_r));
712
713 select! {
714 send(s, new_r) -> _ => {}
715 }
716
717 s = new_s;
718 }
719 });
720
721 scope.spawn(move |_| {
722 let mut r = r;
723
724 for _ in 0..COUNT {
725 r = select! {
726 recv(r) -> msg => {
727 msg.unwrap()
728 .downcast_mut::<Option<Receiver<T>>>()
729 .unwrap()
730 .take()
731 .unwrap()
732 }
733 }
734 }
735 });
736 })
737 .unwrap();
738 }
739 }
740
741 #[test]
linearizable_default()742 fn linearizable_default() {
743 #[cfg(miri)]
744 const COUNT: usize = 100;
745 #[cfg(not(miri))]
746 const COUNT: usize = 100_000;
747
748 for step in 0..2 {
749 let (start_s, start_r) = bounded::<()>(0);
750 let (end_s, end_r) = bounded::<()>(0);
751
752 let ((s1, r1), (s2, r2)) = if step == 0 {
753 (bounded::<i32>(1), bounded::<i32>(1))
754 } else {
755 (unbounded::<i32>(), unbounded::<i32>())
756 };
757
758 scope(|scope| {
759 scope.spawn(|_| {
760 for _ in 0..COUNT {
761 start_s.send(()).unwrap();
762
763 s1.send(1).unwrap();
764 select! {
765 recv(r1) -> _ => {}
766 recv(r2) -> _ => {}
767 default => unreachable!()
768 }
769
770 end_s.send(()).unwrap();
771 let _ = r2.try_recv();
772 }
773 });
774
775 for _ in 0..COUNT {
776 start_r.recv().unwrap();
777
778 s2.send(1).unwrap();
779 let _ = r1.try_recv();
780
781 end_r.recv().unwrap();
782 }
783 })
784 .unwrap();
785 }
786 }
787
788 #[test]
linearizable_timeout()789 fn linearizable_timeout() {
790 #[cfg(miri)]
791 const COUNT: usize = 100;
792 #[cfg(not(miri))]
793 const COUNT: usize = 100_000;
794
795 for step in 0..2 {
796 let (start_s, start_r) = bounded::<()>(0);
797 let (end_s, end_r) = bounded::<()>(0);
798
799 let ((s1, r1), (s2, r2)) = if step == 0 {
800 (bounded::<i32>(1), bounded::<i32>(1))
801 } else {
802 (unbounded::<i32>(), unbounded::<i32>())
803 };
804
805 scope(|scope| {
806 scope.spawn(|_| {
807 for _ in 0..COUNT {
808 start_s.send(()).unwrap();
809
810 s1.send(1).unwrap();
811 select! {
812 recv(r1) -> _ => {}
813 recv(r2) -> _ => {}
814 default(ms(0)) => unreachable!()
815 }
816
817 end_s.send(()).unwrap();
818 let _ = r2.try_recv();
819 }
820 });
821
822 for _ in 0..COUNT {
823 start_r.recv().unwrap();
824
825 s2.send(1).unwrap();
826 let _ = r1.try_recv();
827
828 end_r.recv().unwrap();
829 }
830 })
831 .unwrap();
832 }
833 }
834
835 #[test]
fairness1()836 fn fairness1() {
837 #[cfg(miri)]
838 const COUNT: usize = 100;
839 #[cfg(not(miri))]
840 const COUNT: usize = 10_000;
841
842 let (s1, r1) = bounded::<()>(COUNT);
843 let (s2, r2) = unbounded::<()>();
844
845 for _ in 0..COUNT {
846 s1.send(()).unwrap();
847 s2.send(()).unwrap();
848 }
849
850 let mut hits = [0usize; 4];
851 for _ in 0..COUNT {
852 select! {
853 recv(r1) -> _ => hits[0] += 1,
854 recv(r2) -> _ => hits[1] += 1,
855 recv(after(ms(0))) -> _ => hits[2] += 1,
856 recv(tick(ms(0))) -> _ => hits[3] += 1,
857 }
858 }
859 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
860 }
861
862 #[test]
fairness2()863 fn fairness2() {
864 #[cfg(miri)]
865 const COUNT: usize = 100;
866 #[cfg(not(miri))]
867 const COUNT: usize = 10_000;
868
869 let (s1, r1) = unbounded::<()>();
870 let (s2, r2) = bounded::<()>(1);
871 let (s3, r3) = bounded::<()>(0);
872
873 scope(|scope| {
874 scope.spawn(|_| {
875 let (hole, _r) = bounded(0);
876
877 for _ in 0..COUNT {
878 let s1 = if s1.is_empty() { &s1 } else { &hole };
879 let s2 = if s2.is_empty() { &s2 } else { &hole };
880
881 select! {
882 send(s1, ()) -> res => assert!(res.is_ok()),
883 send(s2, ()) -> res => assert!(res.is_ok()),
884 send(s3, ()) -> res => assert!(res.is_ok()),
885 }
886 }
887 });
888
889 let hits = vec![Cell::new(0usize); 3];
890 for _ in 0..COUNT {
891 select! {
892 recv(r1) -> _ => hits[0].set(hits[0].get() + 1),
893 recv(r2) -> _ => hits[1].set(hits[1].get() + 1),
894 recv(r3) -> _ => hits[2].set(hits[2].get() + 1),
895 }
896 }
897 assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50));
898 })
899 .unwrap();
900 }
901
902 #[test]
fairness_recv()903 fn fairness_recv() {
904 #[cfg(miri)]
905 const COUNT: usize = 100;
906 #[cfg(not(miri))]
907 const COUNT: usize = 10_000;
908
909 let (s1, r1) = bounded::<()>(COUNT);
910 let (s2, r2) = unbounded::<()>();
911
912 for _ in 0..COUNT {
913 s1.send(()).unwrap();
914 s2.send(()).unwrap();
915 }
916
917 let mut hits = [0usize; 2];
918 while hits[0] + hits[1] < COUNT {
919 select! {
920 recv(r1) -> _ => hits[0] += 1,
921 recv(r2) -> _ => hits[1] += 1,
922 }
923 }
924 assert!(hits.iter().all(|x| *x >= COUNT / 4));
925 }
926
927 #[test]
fairness_send()928 fn fairness_send() {
929 #[cfg(miri)]
930 const COUNT: usize = 100;
931 #[cfg(not(miri))]
932 const COUNT: usize = 10_000;
933
934 let (s1, _r1) = bounded::<()>(COUNT);
935 let (s2, _r2) = unbounded::<()>();
936
937 let mut hits = [0usize; 2];
938 for _ in 0..COUNT {
939 select! {
940 send(s1, ()) -> _ => hits[0] += 1,
941 send(s2, ()) -> _ => hits[1] += 1,
942 }
943 }
944 assert!(hits.iter().all(|x| *x >= COUNT / 4));
945 }
946
947 #[allow(clippy::or_fun_call)] // This is intentional.
948 #[test]
references()949 fn references() {
950 let (s, r) = unbounded::<i32>();
951 select! {
952 send(s, 0) -> _ => {}
953 recv(r) -> _ => {}
954 }
955 select! {
956 send(&&&&s, 0) -> _ => {}
957 recv(&&&&r) -> _ => {}
958 }
959 select! {
960 recv(Some(&r).unwrap_or(&never())) -> _ => {},
961 default => {}
962 }
963 select! {
964 recv(Some(r).unwrap_or(never())) -> _ => {},
965 default => {}
966 }
967 }
968
969 #[test]
case_blocks()970 fn case_blocks() {
971 let (s, r) = unbounded::<i32>();
972
973 select! {
974 recv(r) -> _ => 3.0,
975 recv(r) -> _ => loop {
976 unreachable!()
977 },
978 recv(r) -> _ => match 7 + 3 {
979 _ => unreachable!()
980 },
981 default => 7.
982 };
983
984 select! {
985 recv(r) -> msg => if msg.is_ok() {
986 unreachable!()
987 },
988 default => ()
989 }
990
991 drop(s);
992 }
993
994 #[allow(clippy::redundant_closure_call)] // This is intentional.
995 #[test]
move_handles()996 fn move_handles() {
997 let (s, r) = unbounded::<i32>();
998 select! {
999 recv((move || r)()) -> _ => {}
1000 send((move || s)(), 0) -> _ => {}
1001 }
1002 }
1003
1004 #[test]
infer_types()1005 fn infer_types() {
1006 let (s, r) = unbounded();
1007 select! {
1008 recv(r) -> _ => {}
1009 default => {}
1010 }
1011 s.send(()).unwrap();
1012
1013 let (s, r) = unbounded();
1014 select! {
1015 send(s, ()) -> _ => {}
1016 }
1017 r.recv().unwrap();
1018 }
1019
1020 #[test]
default_syntax()1021 fn default_syntax() {
1022 let (s, r) = bounded::<i32>(0);
1023
1024 select! {
1025 recv(r) -> _ => panic!(),
1026 default => {}
1027 }
1028 select! {
1029 send(s, 0) -> _ => panic!(),
1030 default() => {}
1031 }
1032 select! {
1033 default => {}
1034 }
1035 select! {
1036 default() => {}
1037 }
1038 }
1039
1040 #[test]
same_variable_name()1041 fn same_variable_name() {
1042 let (_, r) = unbounded::<i32>();
1043 select! {
1044 recv(r) -> r => assert!(r.is_err()),
1045 }
1046 }
1047
1048 #[test]
handles_on_heap()1049 fn handles_on_heap() {
1050 let (s, r) = unbounded::<i32>();
1051 let (s, r) = (Box::new(s), Box::new(r));
1052
1053 select! {
1054 send(*s, 0) -> _ => {}
1055 recv(*r) -> _ => {}
1056 default => {}
1057 }
1058
1059 drop(s);
1060 drop(r);
1061 }
1062
1063 #[test]
once_blocks()1064 fn once_blocks() {
1065 let (s, r) = unbounded::<i32>();
1066
1067 let once = Box::new(());
1068 select! {
1069 send(s, 0) -> _ => drop(once),
1070 }
1071
1072 let once = Box::new(());
1073 select! {
1074 recv(r) -> _ => drop(once),
1075 }
1076
1077 let once1 = Box::new(());
1078 let once2 = Box::new(());
1079 select! {
1080 send(s, 0) -> _ => drop(once1),
1081 default => drop(once2),
1082 }
1083
1084 let once1 = Box::new(());
1085 let once2 = Box::new(());
1086 select! {
1087 recv(r) -> _ => drop(once1),
1088 default => drop(once2),
1089 }
1090
1091 let once1 = Box::new(());
1092 let once2 = Box::new(());
1093 select! {
1094 recv(r) -> _ => drop(once1),
1095 send(s, 0) -> _ => drop(once2),
1096 }
1097 }
1098
1099 #[test]
once_receiver()1100 fn once_receiver() {
1101 let (_, r) = unbounded::<i32>();
1102
1103 let once = Box::new(());
1104 let get = move || {
1105 drop(once);
1106 r
1107 };
1108
1109 select! {
1110 recv(get()) -> _ => {}
1111 }
1112 }
1113
1114 #[test]
once_sender()1115 fn once_sender() {
1116 let (s, _) = unbounded::<i32>();
1117
1118 let once = Box::new(());
1119 let get = move || {
1120 drop(once);
1121 s
1122 };
1123
1124 select! {
1125 send(get(), 5) -> _ => {}
1126 }
1127 }
1128
1129 #[test]
parse_nesting()1130 fn parse_nesting() {
1131 let (_, r) = unbounded::<i32>();
1132
1133 select! {
1134 recv(r) -> _ => {}
1135 recv(r) -> _ => {
1136 select! {
1137 recv(r) -> _ => {}
1138 recv(r) -> _ => {
1139 select! {
1140 recv(r) -> _ => {}
1141 recv(r) -> _ => {
1142 select! {
1143 default => {}
1144 }
1145 }
1146 }
1147 }
1148 }
1149 }
1150 }
1151 }
1152
1153 #[test]
evaluate()1154 fn evaluate() {
1155 let (s, r) = unbounded::<i32>();
1156
1157 let v = select! {
1158 recv(r) -> _ => "foo".into(),
1159 send(s, 0) -> _ => "bar".to_owned(),
1160 default => "baz".to_string(),
1161 };
1162 assert_eq!(v, "bar");
1163
1164 let v = select! {
1165 recv(r) -> _ => "foo".into(),
1166 default => "baz".to_string(),
1167 };
1168 assert_eq!(v, "foo");
1169
1170 let v = select! {
1171 recv(r) -> _ => "foo".into(),
1172 default => "baz".to_string(),
1173 };
1174 assert_eq!(v, "baz");
1175 }
1176
1177 #[test]
deref()1178 fn deref() {
1179 use crossbeam_channel as cc;
1180
1181 struct Sender<T>(cc::Sender<T>);
1182 struct Receiver<T>(cc::Receiver<T>);
1183
1184 impl<T> Deref for Receiver<T> {
1185 type Target = cc::Receiver<T>;
1186
1187 fn deref(&self) -> &Self::Target {
1188 &self.0
1189 }
1190 }
1191
1192 impl<T> Deref for Sender<T> {
1193 type Target = cc::Sender<T>;
1194
1195 fn deref(&self) -> &Self::Target {
1196 &self.0
1197 }
1198 }
1199
1200 let (s, r) = bounded::<i32>(0);
1201 let (s, r) = (Sender(s), Receiver(r));
1202
1203 select! {
1204 send(s, 0) -> _ => panic!(),
1205 recv(r) -> _ => panic!(),
1206 default => {}
1207 }
1208 }
1209
1210 #[test]
result_types()1211 fn result_types() {
1212 let (s, _) = bounded::<i32>(0);
1213 let (_, r) = bounded::<i32>(0);
1214
1215 select! {
1216 recv(r) -> res => drop::<Result<i32, RecvError>>(res),
1217 }
1218 select! {
1219 recv(r) -> res => drop::<Result<i32, RecvError>>(res),
1220 default => {}
1221 }
1222 select! {
1223 recv(r) -> res => drop::<Result<i32, RecvError>>(res),
1224 default(ms(0)) => {}
1225 }
1226
1227 select! {
1228 send(s, 0) -> res => drop::<Result<(), SendError<i32>>>(res),
1229 }
1230 select! {
1231 send(s, 0) -> res => drop::<Result<(), SendError<i32>>>(res),
1232 default => {}
1233 }
1234 select! {
1235 send(s, 0) -> res => drop::<Result<(), SendError<i32>>>(res),
1236 default(ms(0)) => {}
1237 }
1238
1239 select! {
1240 send(s, 0) -> res => drop::<Result<(), SendError<i32>>>(res),
1241 recv(r) -> res => drop::<Result<i32, RecvError>>(res),
1242 }
1243 }
1244
1245 #[test]
try_recv()1246 fn try_recv() {
1247 let (s, r) = bounded(0);
1248
1249 scope(|scope| {
1250 scope.spawn(move |_| {
1251 select! {
1252 recv(r) -> _ => panic!(),
1253 default => {}
1254 }
1255 thread::sleep(ms(1500));
1256 select! {
1257 recv(r) -> v => assert_eq!(v, Ok(7)),
1258 default => panic!(),
1259 }
1260 thread::sleep(ms(500));
1261 select! {
1262 recv(r) -> v => assert_eq!(v, Err(RecvError)),
1263 default => panic!(),
1264 }
1265 });
1266 scope.spawn(move |_| {
1267 thread::sleep(ms(1000));
1268 select! {
1269 send(s, 7) -> res => res.unwrap(),
1270 }
1271 });
1272 })
1273 .unwrap();
1274 }
1275
1276 #[test]
recv()1277 fn recv() {
1278 let (s, r) = bounded(0);
1279
1280 scope(|scope| {
1281 scope.spawn(move |_| {
1282 select! {
1283 recv(r) -> v => assert_eq!(v, Ok(7)),
1284 }
1285 thread::sleep(ms(1000));
1286 select! {
1287 recv(r) -> v => assert_eq!(v, Ok(8)),
1288 }
1289 thread::sleep(ms(1000));
1290 select! {
1291 recv(r) -> v => assert_eq!(v, Ok(9)),
1292 }
1293 select! {
1294 recv(r) -> v => assert_eq!(v, Err(RecvError)),
1295 }
1296 });
1297 scope.spawn(move |_| {
1298 thread::sleep(ms(1500));
1299 select! {
1300 send(s, 7) -> res => res.unwrap(),
1301 }
1302 select! {
1303 send(s, 8) -> res => res.unwrap(),
1304 }
1305 select! {
1306 send(s, 9) -> res => res.unwrap(),
1307 }
1308 });
1309 })
1310 .unwrap();
1311 }
1312
1313 #[test]
recv_timeout()1314 fn recv_timeout() {
1315 let (s, r) = bounded::<i32>(0);
1316
1317 scope(|scope| {
1318 scope.spawn(move |_| {
1319 select! {
1320 recv(r) -> _ => panic!(),
1321 default(ms(1000)) => {}
1322 }
1323 select! {
1324 recv(r) -> v => assert_eq!(v, Ok(7)),
1325 default(ms(1000)) => panic!(),
1326 }
1327 select! {
1328 recv(r) -> v => assert_eq!(v, Err(RecvError)),
1329 default(ms(1000)) => panic!(),
1330 }
1331 });
1332 scope.spawn(move |_| {
1333 thread::sleep(ms(1500));
1334 select! {
1335 send(s, 7) -> res => res.unwrap(),
1336 }
1337 });
1338 })
1339 .unwrap();
1340 }
1341
1342 #[test]
try_send()1343 fn try_send() {
1344 let (s, r) = bounded(0);
1345
1346 scope(|scope| {
1347 scope.spawn(move |_| {
1348 select! {
1349 send(s, 7) -> _ => panic!(),
1350 default => {}
1351 }
1352 thread::sleep(ms(1500));
1353 select! {
1354 send(s, 8) -> res => res.unwrap(),
1355 default => panic!(),
1356 }
1357 thread::sleep(ms(500));
1358 select! {
1359 send(s, 8) -> res => assert_eq!(res, Err(SendError(8))),
1360 default => panic!(),
1361 }
1362 });
1363 scope.spawn(move |_| {
1364 thread::sleep(ms(1000));
1365 select! {
1366 recv(r) -> v => assert_eq!(v, Ok(8)),
1367 }
1368 });
1369 })
1370 .unwrap();
1371 }
1372
1373 #[test]
send()1374 fn send() {
1375 let (s, r) = bounded(0);
1376
1377 scope(|scope| {
1378 scope.spawn(move |_| {
1379 select! {
1380 send(s, 7) -> res => res.unwrap(),
1381 }
1382 thread::sleep(ms(1000));
1383 select! {
1384 send(s, 8) -> res => res.unwrap(),
1385 }
1386 thread::sleep(ms(1000));
1387 select! {
1388 send(s, 9) -> res => res.unwrap(),
1389 }
1390 });
1391 scope.spawn(move |_| {
1392 thread::sleep(ms(1500));
1393 select! {
1394 recv(r) -> v => assert_eq!(v, Ok(7)),
1395 }
1396 select! {
1397 recv(r) -> v => assert_eq!(v, Ok(8)),
1398 }
1399 select! {
1400 recv(r) -> v => assert_eq!(v, Ok(9)),
1401 }
1402 });
1403 })
1404 .unwrap();
1405 }
1406
1407 #[test]
send_timeout()1408 fn send_timeout() {
1409 let (s, r) = bounded(0);
1410
1411 scope(|scope| {
1412 scope.spawn(move |_| {
1413 select! {
1414 send(s, 7) -> _ => panic!(),
1415 default(ms(1000)) => {}
1416 }
1417 select! {
1418 send(s, 8) -> res => res.unwrap(),
1419 default(ms(1000)) => panic!(),
1420 }
1421 select! {
1422 send(s, 9) -> res => assert_eq!(res, Err(SendError(9))),
1423 default(ms(1000)) => panic!(),
1424 }
1425 });
1426 scope.spawn(move |_| {
1427 thread::sleep(ms(1500));
1428 select! {
1429 recv(r) -> v => assert_eq!(v, Ok(8)),
1430 }
1431 });
1432 })
1433 .unwrap();
1434 }
1435
1436 #[test]
disconnect_wakes_sender()1437 fn disconnect_wakes_sender() {
1438 let (s, r) = bounded(0);
1439
1440 scope(|scope| {
1441 scope.spawn(move |_| {
1442 select! {
1443 send(s, ()) -> res => assert_eq!(res, Err(SendError(()))),
1444 }
1445 });
1446 scope.spawn(move |_| {
1447 thread::sleep(ms(1000));
1448 drop(r);
1449 });
1450 })
1451 .unwrap();
1452 }
1453
1454 #[test]
disconnect_wakes_receiver()1455 fn disconnect_wakes_receiver() {
1456 let (s, r) = bounded::<()>(0);
1457
1458 scope(|scope| {
1459 scope.spawn(move |_| {
1460 select! {
1461 recv(r) -> res => assert_eq!(res, Err(RecvError)),
1462 }
1463 });
1464 scope.spawn(move |_| {
1465 thread::sleep(ms(1000));
1466 drop(s);
1467 });
1468 })
1469 .unwrap();
1470 }
1471