• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Tests for channel readiness using the `Select` struct.
2 
3 use std::any::Any;
4 use std::cell::Cell;
5 use std::thread;
6 use std::time::{Duration, Instant};
7 
8 use crossbeam_channel::{after, bounded, tick, unbounded};
9 use crossbeam_channel::{Receiver, Select, TryRecvError, TrySendError};
10 use crossbeam_utils::thread::scope;
11 
ms(ms: u64) -> Duration12 fn ms(ms: u64) -> Duration {
13     Duration::from_millis(ms)
14 }
15 
16 #[test]
smoke1()17 fn smoke1() {
18     let (s1, r1) = unbounded::<usize>();
19     let (s2, r2) = unbounded::<usize>();
20 
21     s1.send(1).unwrap();
22 
23     let mut sel = Select::new();
24     sel.recv(&r1);
25     sel.recv(&r2);
26     assert_eq!(sel.ready(), 0);
27     assert_eq!(r1.try_recv(), Ok(1));
28 
29     s2.send(2).unwrap();
30 
31     let mut sel = Select::new();
32     sel.recv(&r1);
33     sel.recv(&r2);
34     assert_eq!(sel.ready(), 1);
35     assert_eq!(r2.try_recv(), Ok(2));
36 }
37 
38 #[test]
smoke2()39 fn smoke2() {
40     let (_s1, r1) = unbounded::<i32>();
41     let (_s2, r2) = unbounded::<i32>();
42     let (_s3, r3) = unbounded::<i32>();
43     let (_s4, r4) = unbounded::<i32>();
44     let (s5, r5) = unbounded::<i32>();
45 
46     s5.send(5).unwrap();
47 
48     let mut sel = Select::new();
49     sel.recv(&r1);
50     sel.recv(&r2);
51     sel.recv(&r3);
52     sel.recv(&r4);
53     sel.recv(&r5);
54     assert_eq!(sel.ready(), 4);
55     assert_eq!(r5.try_recv(), Ok(5));
56 }
57 
58 #[test]
disconnected()59 fn disconnected() {
60     let (s1, r1) = unbounded::<i32>();
61     let (s2, r2) = unbounded::<i32>();
62 
63     scope(|scope| {
64         scope.spawn(|_| {
65             drop(s1);
66             thread::sleep(ms(500));
67             s2.send(5).unwrap();
68         });
69 
70         let mut sel = Select::new();
71         sel.recv(&r1);
72         sel.recv(&r2);
73         match sel.ready_timeout(ms(1000)) {
74             Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)),
75             _ => panic!(),
76         }
77 
78         r2.recv().unwrap();
79     })
80     .unwrap();
81 
82     let mut sel = Select::new();
83     sel.recv(&r1);
84     sel.recv(&r2);
85     match sel.ready_timeout(ms(1000)) {
86         Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)),
87         _ => panic!(),
88     }
89 
90     scope(|scope| {
91         scope.spawn(|_| {
92             thread::sleep(ms(500));
93             drop(s2);
94         });
95 
96         let mut sel = Select::new();
97         sel.recv(&r2);
98         match sel.ready_timeout(ms(1000)) {
99             Ok(0) => assert_eq!(r2.try_recv(), Err(TryRecvError::Disconnected)),
100             _ => panic!(),
101         }
102     })
103     .unwrap();
104 }
105 
106 #[test]
default()107 fn default() {
108     let (s1, r1) = unbounded::<i32>();
109     let (s2, r2) = unbounded::<i32>();
110 
111     let mut sel = Select::new();
112     sel.recv(&r1);
113     sel.recv(&r2);
114     assert!(sel.try_ready().is_err());
115 
116     drop(s1);
117 
118     let mut sel = Select::new();
119     sel.recv(&r1);
120     sel.recv(&r2);
121     match sel.try_ready() {
122         Ok(0) => assert!(r1.try_recv().is_err()),
123         _ => panic!(),
124     }
125 
126     s2.send(2).unwrap();
127 
128     let mut sel = Select::new();
129     sel.recv(&r2);
130     match sel.try_ready() {
131         Ok(0) => assert_eq!(r2.try_recv(), Ok(2)),
132         _ => panic!(),
133     }
134 
135     let mut sel = Select::new();
136     sel.recv(&r2);
137     assert!(sel.try_ready().is_err());
138 
139     let mut sel = Select::new();
140     assert!(sel.try_ready().is_err());
141 }
142 
143 #[test]
timeout()144 fn timeout() {
145     let (_s1, r1) = unbounded::<i32>();
146     let (s2, r2) = unbounded::<i32>();
147 
148     scope(|scope| {
149         scope.spawn(|_| {
150             thread::sleep(ms(1500));
151             s2.send(2).unwrap();
152         });
153 
154         let mut sel = Select::new();
155         sel.recv(&r1);
156         sel.recv(&r2);
157         assert!(sel.ready_timeout(ms(1000)).is_err());
158 
159         let mut sel = Select::new();
160         sel.recv(&r1);
161         sel.recv(&r2);
162         match sel.ready_timeout(ms(1000)) {
163             Ok(1) => assert_eq!(r2.try_recv(), Ok(2)),
164             _ => panic!(),
165         }
166     })
167     .unwrap();
168 
169     scope(|scope| {
170         let (s, r) = unbounded::<i32>();
171 
172         scope.spawn(move |_| {
173             thread::sleep(ms(500));
174             drop(s);
175         });
176 
177         let mut sel = Select::new();
178         assert!(sel.ready_timeout(ms(1000)).is_err());
179 
180         let mut sel = Select::new();
181         sel.recv(&r);
182         match sel.try_ready() {
183             Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
184             _ => panic!(),
185         }
186     })
187     .unwrap();
188 }
189 
190 #[test]
default_when_disconnected()191 fn default_when_disconnected() {
192     let (_, r) = unbounded::<i32>();
193 
194     let mut sel = Select::new();
195     sel.recv(&r);
196     match sel.try_ready() {
197         Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
198         _ => panic!(),
199     }
200 
201     let (_, r) = unbounded::<i32>();
202 
203     let mut sel = Select::new();
204     sel.recv(&r);
205     match sel.ready_timeout(ms(1000)) {
206         Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
207         _ => panic!(),
208     }
209 
210     let (s, _) = bounded::<i32>(0);
211 
212     let mut sel = Select::new();
213     sel.send(&s);
214     match sel.try_ready() {
215         Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))),
216         _ => panic!(),
217     }
218 
219     let (s, _) = bounded::<i32>(0);
220 
221     let mut sel = Select::new();
222     sel.send(&s);
223     match sel.ready_timeout(ms(1000)) {
224         Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))),
225         _ => panic!(),
226     }
227 }
228 
229 #[test]
default_only()230 fn default_only() {
231     let start = Instant::now();
232 
233     let mut sel = Select::new();
234     assert!(sel.try_ready().is_err());
235     let now = Instant::now();
236     assert!(now - start <= ms(50));
237 
238     let start = Instant::now();
239     let mut sel = Select::new();
240     assert!(sel.ready_timeout(ms(500)).is_err());
241     let now = Instant::now();
242     assert!(now - start >= ms(450));
243     assert!(now - start <= ms(550));
244 }
245 
246 #[test]
unblocks()247 fn unblocks() {
248     let (s1, r1) = bounded::<i32>(0);
249     let (s2, r2) = bounded::<i32>(0);
250 
251     scope(|scope| {
252         scope.spawn(|_| {
253             thread::sleep(ms(500));
254             s2.send(2).unwrap();
255         });
256 
257         let mut sel = Select::new();
258         sel.recv(&r1);
259         sel.recv(&r2);
260         match sel.ready_timeout(ms(1000)) {
261             Ok(1) => assert_eq!(r2.try_recv(), Ok(2)),
262             _ => panic!(),
263         }
264     })
265     .unwrap();
266 
267     scope(|scope| {
268         scope.spawn(|_| {
269             thread::sleep(ms(500));
270             assert_eq!(r1.recv().unwrap(), 1);
271         });
272 
273         let mut sel = Select::new();
274         let oper1 = sel.send(&s1);
275         let oper2 = sel.send(&s2);
276         let oper = sel.select_timeout(ms(1000));
277         match oper {
278             Err(_) => panic!(),
279             Ok(oper) => match oper.index() {
280                 i if i == oper1 => oper.send(&s1, 1).unwrap(),
281                 i if i == oper2 => panic!(),
282                 _ => unreachable!(),
283             },
284         }
285     })
286     .unwrap();
287 }
288 
289 #[test]
both_ready()290 fn both_ready() {
291     let (s1, r1) = bounded(0);
292     let (s2, r2) = bounded(0);
293 
294     scope(|scope| {
295         scope.spawn(|_| {
296             thread::sleep(ms(500));
297             s1.send(1).unwrap();
298             assert_eq!(r2.recv().unwrap(), 2);
299         });
300 
301         for _ in 0..2 {
302             let mut sel = Select::new();
303             sel.recv(&r1);
304             sel.send(&s2);
305             match sel.ready() {
306                 0 => assert_eq!(r1.try_recv(), Ok(1)),
307                 1 => s2.try_send(2).unwrap(),
308                 _ => panic!(),
309             }
310         }
311     })
312     .unwrap();
313 }
314 
315 #[test]
cloning1()316 fn cloning1() {
317     scope(|scope| {
318         let (s1, r1) = unbounded::<i32>();
319         let (_s2, r2) = unbounded::<i32>();
320         let (s3, r3) = unbounded::<()>();
321 
322         scope.spawn(move |_| {
323             r3.recv().unwrap();
324             drop(s1.clone());
325             assert!(r3.try_recv().is_err());
326             s1.send(1).unwrap();
327             r3.recv().unwrap();
328         });
329 
330         s3.send(()).unwrap();
331 
332         let mut sel = Select::new();
333         sel.recv(&r1);
334         sel.recv(&r2);
335         match sel.ready() {
336             0 => drop(r1.try_recv()),
337             1 => drop(r2.try_recv()),
338             _ => panic!(),
339         }
340 
341         s3.send(()).unwrap();
342     })
343     .unwrap();
344 }
345 
346 #[test]
cloning2()347 fn cloning2() {
348     let (s1, r1) = unbounded::<()>();
349     let (s2, r2) = unbounded::<()>();
350     let (_s3, _r3) = unbounded::<()>();
351 
352     scope(|scope| {
353         scope.spawn(move |_| {
354             let mut sel = Select::new();
355             sel.recv(&r1);
356             sel.recv(&r2);
357             match sel.ready() {
358                 0 => panic!(),
359                 1 => drop(r2.try_recv()),
360                 _ => panic!(),
361             }
362         });
363 
364         thread::sleep(ms(500));
365         drop(s1.clone());
366         s2.send(()).unwrap();
367     })
368     .unwrap();
369 }
370 
371 #[test]
preflight1()372 fn preflight1() {
373     let (s, r) = unbounded();
374     s.send(()).unwrap();
375 
376     let mut sel = Select::new();
377     sel.recv(&r);
378     match sel.ready() {
379         0 => drop(r.try_recv()),
380         _ => panic!(),
381     }
382 }
383 
384 #[test]
preflight2()385 fn preflight2() {
386     let (s, r) = unbounded();
387     drop(s.clone());
388     s.send(()).unwrap();
389     drop(s);
390 
391     let mut sel = Select::new();
392     sel.recv(&r);
393     match sel.ready() {
394         0 => assert_eq!(r.try_recv(), Ok(())),
395         _ => panic!(),
396     }
397 
398     assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
399 }
400 
401 #[test]
preflight3()402 fn preflight3() {
403     let (s, r) = unbounded();
404     drop(s.clone());
405     s.send(()).unwrap();
406     drop(s);
407     r.recv().unwrap();
408 
409     let mut sel = Select::new();
410     sel.recv(&r);
411     match sel.ready() {
412         0 => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
413         _ => panic!(),
414     }
415 }
416 
417 #[test]
duplicate_operations()418 fn duplicate_operations() {
419     let (s, r) = unbounded::<i32>();
420     let hit = vec![Cell::new(false); 4];
421 
422     while hit.iter().map(|h| h.get()).any(|hit| !hit) {
423         let mut sel = Select::new();
424         sel.recv(&r);
425         sel.recv(&r);
426         sel.send(&s);
427         sel.send(&s);
428         match sel.ready() {
429             0 => {
430                 assert!(r.try_recv().is_ok());
431                 hit[0].set(true);
432             }
433             1 => {
434                 assert!(r.try_recv().is_ok());
435                 hit[1].set(true);
436             }
437             2 => {
438                 assert!(s.try_send(0).is_ok());
439                 hit[2].set(true);
440             }
441             3 => {
442                 assert!(s.try_send(0).is_ok());
443                 hit[3].set(true);
444             }
445             _ => panic!(),
446         }
447     }
448 }
449 
450 #[test]
nesting()451 fn nesting() {
452     let (s, r) = unbounded::<i32>();
453 
454     let mut sel = Select::new();
455     sel.send(&s);
456     match sel.ready() {
457         0 => {
458             assert!(s.try_send(0).is_ok());
459 
460             let mut sel = Select::new();
461             sel.recv(&r);
462             match sel.ready() {
463                 0 => {
464                     assert_eq!(r.try_recv(), Ok(0));
465 
466                     let mut sel = Select::new();
467                     sel.send(&s);
468                     match sel.ready() {
469                         0 => {
470                             assert!(s.try_send(1).is_ok());
471 
472                             let mut sel = Select::new();
473                             sel.recv(&r);
474                             match sel.ready() {
475                                 0 => {
476                                     assert_eq!(r.try_recv(), Ok(1));
477                                 }
478                                 _ => panic!(),
479                             }
480                         }
481                         _ => panic!(),
482                     }
483                 }
484                 _ => panic!(),
485             }
486         }
487         _ => panic!(),
488     }
489 }
490 
491 #[test]
stress_recv()492 fn stress_recv() {
493     const COUNT: usize = 10_000;
494 
495     let (s1, r1) = unbounded();
496     let (s2, r2) = bounded(5);
497     let (s3, r3) = bounded(0);
498 
499     scope(|scope| {
500         scope.spawn(|_| {
501             for i in 0..COUNT {
502                 s1.send(i).unwrap();
503                 r3.recv().unwrap();
504 
505                 s2.send(i).unwrap();
506                 r3.recv().unwrap();
507             }
508         });
509 
510         for i in 0..COUNT {
511             for _ in 0..2 {
512                 let mut sel = Select::new();
513                 sel.recv(&r1);
514                 sel.recv(&r2);
515                 match sel.ready() {
516                     0 => assert_eq!(r1.try_recv(), Ok(i)),
517                     1 => assert_eq!(r2.try_recv(), Ok(i)),
518                     _ => panic!(),
519                 }
520 
521                 s3.send(()).unwrap();
522             }
523         }
524     })
525     .unwrap();
526 }
527 
528 #[test]
stress_send()529 fn stress_send() {
530     const COUNT: usize = 10_000;
531 
532     let (s1, r1) = bounded(0);
533     let (s2, r2) = bounded(0);
534     let (s3, r3) = bounded(100);
535 
536     scope(|scope| {
537         scope.spawn(|_| {
538             for i in 0..COUNT {
539                 assert_eq!(r1.recv().unwrap(), i);
540                 assert_eq!(r2.recv().unwrap(), i);
541                 r3.recv().unwrap();
542             }
543         });
544 
545         for i in 0..COUNT {
546             for _ in 0..2 {
547                 let mut sel = Select::new();
548                 sel.send(&s1);
549                 sel.send(&s2);
550                 match sel.ready() {
551                     0 => assert!(s1.try_send(i).is_ok()),
552                     1 => assert!(s2.try_send(i).is_ok()),
553                     _ => panic!(),
554                 }
555             }
556             s3.send(()).unwrap();
557         }
558     })
559     .unwrap();
560 }
561 
562 #[test]
stress_mixed()563 fn stress_mixed() {
564     const COUNT: usize = 10_000;
565 
566     let (s1, r1) = bounded(0);
567     let (s2, r2) = bounded(0);
568     let (s3, r3) = bounded(100);
569 
570     scope(|scope| {
571         scope.spawn(|_| {
572             for i in 0..COUNT {
573                 s1.send(i).unwrap();
574                 assert_eq!(r2.recv().unwrap(), i);
575                 r3.recv().unwrap();
576             }
577         });
578 
579         for i in 0..COUNT {
580             for _ in 0..2 {
581                 let mut sel = Select::new();
582                 sel.recv(&r1);
583                 sel.send(&s2);
584                 match sel.ready() {
585                     0 => assert_eq!(r1.try_recv(), Ok(i)),
586                     1 => assert!(s2.try_send(i).is_ok()),
587                     _ => panic!(),
588                 }
589             }
590             s3.send(()).unwrap();
591         }
592     })
593     .unwrap();
594 }
595 
596 #[test]
stress_timeout_two_threads()597 fn stress_timeout_two_threads() {
598     const COUNT: usize = 20;
599 
600     let (s, r) = bounded(2);
601 
602     scope(|scope| {
603         scope.spawn(|_| {
604             for i in 0..COUNT {
605                 if i % 2 == 0 {
606                     thread::sleep(ms(500));
607                 }
608 
609                 let done = false;
610                 while !done {
611                     let mut sel = Select::new();
612                     sel.send(&s);
613                     match sel.ready_timeout(ms(100)) {
614                         Err(_) => {}
615                         Ok(0) => {
616                             assert!(s.try_send(i).is_ok());
617                             break;
618                         }
619                         Ok(_) => panic!(),
620                     }
621                 }
622             }
623         });
624 
625         scope.spawn(|_| {
626             for i in 0..COUNT {
627                 if i % 2 == 0 {
628                     thread::sleep(ms(500));
629                 }
630 
631                 let mut done = false;
632                 while !done {
633                     let mut sel = Select::new();
634                     sel.recv(&r);
635                     match sel.ready_timeout(ms(100)) {
636                         Err(_) => {}
637                         Ok(0) => {
638                             assert_eq!(r.try_recv(), Ok(i));
639                             done = true;
640                         }
641                         Ok(_) => panic!(),
642                     }
643                 }
644             }
645         });
646     })
647     .unwrap();
648 }
649 
650 #[test]
send_recv_same_channel()651 fn send_recv_same_channel() {
652     let (s, r) = bounded::<i32>(0);
653     let mut sel = Select::new();
654     sel.send(&s);
655     sel.recv(&r);
656     assert!(sel.ready_timeout(ms(100)).is_err());
657 
658     let (s, r) = unbounded::<i32>();
659     let mut sel = Select::new();
660     sel.send(&s);
661     sel.recv(&r);
662     match sel.ready_timeout(ms(100)) {
663         Err(_) => panic!(),
664         Ok(0) => assert!(s.try_send(0).is_ok()),
665         Ok(_) => panic!(),
666     }
667 }
668 
669 #[test]
channel_through_channel()670 fn channel_through_channel() {
671     const COUNT: usize = 1000;
672 
673     type T = Box<dyn Any + Send>;
674 
675     for cap in 1..4 {
676         let (s, r) = bounded::<T>(cap);
677 
678         scope(|scope| {
679             scope.spawn(move |_| {
680                 let mut s = s;
681 
682                 for _ in 0..COUNT {
683                     let (new_s, new_r) = bounded(cap);
684                     let new_r: T = Box::new(Some(new_r));
685 
686                     {
687                         let mut sel = Select::new();
688                         sel.send(&s);
689                         match sel.ready() {
690                             0 => assert!(s.try_send(new_r).is_ok()),
691                             _ => panic!(),
692                         }
693                     }
694 
695                     s = new_s;
696                 }
697             });
698 
699             scope.spawn(move |_| {
700                 let mut r = r;
701 
702                 for _ in 0..COUNT {
703                     let new = {
704                         let mut sel = Select::new();
705                         sel.recv(&r);
706                         match sel.ready() {
707                             0 => r
708                                 .try_recv()
709                                 .unwrap()
710                                 .downcast_mut::<Option<Receiver<T>>>()
711                                 .unwrap()
712                                 .take()
713                                 .unwrap(),
714                             _ => panic!(),
715                         }
716                     };
717                     r = new;
718                 }
719             });
720         })
721         .unwrap();
722     }
723 }
724 
725 #[test]
fairness1()726 fn fairness1() {
727     const COUNT: usize = 10_000;
728 
729     let (s1, r1) = bounded::<()>(COUNT);
730     let (s2, r2) = unbounded::<()>();
731 
732     for _ in 0..COUNT {
733         s1.send(()).unwrap();
734         s2.send(()).unwrap();
735     }
736 
737     let hits = vec![Cell::new(0usize); 4];
738     for _ in 0..COUNT {
739         let after = after(ms(0));
740         let tick = tick(ms(0));
741 
742         let mut sel = Select::new();
743         sel.recv(&r1);
744         sel.recv(&r2);
745         sel.recv(&after);
746         sel.recv(&tick);
747         match sel.ready() {
748             0 => {
749                 r1.try_recv().unwrap();
750                 hits[0].set(hits[0].get() + 1);
751             }
752             1 => {
753                 r2.try_recv().unwrap();
754                 hits[1].set(hits[1].get() + 1);
755             }
756             2 => {
757                 after.try_recv().unwrap();
758                 hits[2].set(hits[2].get() + 1);
759             }
760             3 => {
761                 tick.try_recv().unwrap();
762                 hits[3].set(hits[3].get() + 1);
763             }
764             _ => panic!(),
765         }
766     }
767     assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
768 }
769 
770 #[test]
fairness2()771 fn fairness2() {
772     const COUNT: usize = 100_000;
773 
774     let (s1, r1) = unbounded::<()>();
775     let (s2, r2) = bounded::<()>(1);
776     let (s3, r3) = bounded::<()>(0);
777 
778     scope(|scope| {
779         scope.spawn(|_| {
780             for _ in 0..COUNT {
781                 let mut sel = Select::new();
782                 let mut oper1 = None;
783                 let mut oper2 = None;
784                 if s1.is_empty() {
785                     oper1 = Some(sel.send(&s1));
786                 }
787                 if s2.is_empty() {
788                     oper2 = Some(sel.send(&s2));
789                 }
790                 let oper3 = sel.send(&s3);
791                 let oper = sel.select();
792                 match oper.index() {
793                     i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
794                     i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
795                     i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
796                     _ => unreachable!(),
797                 }
798             }
799         });
800 
801         let hits = vec![Cell::new(0usize); 3];
802         for _ in 0..COUNT {
803             let mut sel = Select::new();
804             sel.recv(&r1);
805             sel.recv(&r2);
806             sel.recv(&r3);
807             loop {
808                 match sel.ready() {
809                     0 => {
810                         if r1.try_recv().is_ok() {
811                             hits[0].set(hits[0].get() + 1);
812                             break;
813                         }
814                     }
815                     1 => {
816                         if r2.try_recv().is_ok() {
817                             hits[1].set(hits[1].get() + 1);
818                             break;
819                         }
820                     }
821                     2 => {
822                         if r3.try_recv().is_ok() {
823                             hits[2].set(hits[2].get() + 1);
824                             break;
825                         }
826                     }
827                     _ => unreachable!(),
828                 }
829             }
830         }
831         assert!(hits.iter().all(|x| x.get() > 0));
832     })
833     .unwrap();
834 }
835