• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Tests for the zero channel flavor.
2 
3 use std::any::Any;
4 use std::sync::atomic::AtomicUsize;
5 use std::sync::atomic::Ordering;
6 use std::thread;
7 use std::time::Duration;
8 
9 use crossbeam_channel::{bounded, select, Receiver};
10 use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
11 use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
12 use crossbeam_utils::thread::scope;
13 use rand::{thread_rng, Rng};
14 
ms(ms: u64) -> Duration15 fn ms(ms: u64) -> Duration {
16     Duration::from_millis(ms)
17 }
18 
19 #[test]
smoke()20 fn smoke() {
21     let (s, r) = bounded(0);
22     assert_eq!(s.try_send(7), Err(TrySendError::Full(7)));
23     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
24 }
25 
26 #[test]
capacity()27 fn capacity() {
28     let (s, r) = bounded::<()>(0);
29     assert_eq!(s.capacity(), Some(0));
30     assert_eq!(r.capacity(), Some(0));
31 }
32 
33 #[test]
len_empty_full()34 fn len_empty_full() {
35     let (s, r) = bounded(0);
36 
37     assert_eq!(s.len(), 0);
38     assert!(s.is_empty());
39     assert!(s.is_full());
40     assert_eq!(r.len(), 0);
41     assert!(r.is_empty());
42     assert!(r.is_full());
43 
44     scope(|scope| {
45         scope.spawn(|_| s.send(0).unwrap());
46         scope.spawn(|_| r.recv().unwrap());
47     })
48     .unwrap();
49 
50     assert_eq!(s.len(), 0);
51     assert!(s.is_empty());
52     assert!(s.is_full());
53     assert_eq!(r.len(), 0);
54     assert!(r.is_empty());
55     assert!(r.is_full());
56 }
57 
58 #[test]
try_recv()59 fn try_recv() {
60     let (s, r) = bounded(0);
61 
62     scope(|scope| {
63         scope.spawn(move |_| {
64             assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
65             thread::sleep(ms(1500));
66             assert_eq!(r.try_recv(), Ok(7));
67             thread::sleep(ms(500));
68             assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
69         });
70         scope.spawn(move |_| {
71             thread::sleep(ms(1000));
72             s.send(7).unwrap();
73         });
74     })
75     .unwrap();
76 }
77 
78 #[test]
recv()79 fn recv() {
80     let (s, r) = bounded(0);
81 
82     scope(|scope| {
83         scope.spawn(move |_| {
84             assert_eq!(r.recv(), Ok(7));
85             thread::sleep(ms(1000));
86             assert_eq!(r.recv(), Ok(8));
87             thread::sleep(ms(1000));
88             assert_eq!(r.recv(), Ok(9));
89             assert_eq!(r.recv(), Err(RecvError));
90         });
91         scope.spawn(move |_| {
92             thread::sleep(ms(1500));
93             s.send(7).unwrap();
94             s.send(8).unwrap();
95             s.send(9).unwrap();
96         });
97     })
98     .unwrap();
99 }
100 
101 #[test]
recv_timeout()102 fn recv_timeout() {
103     let (s, r) = bounded::<i32>(0);
104 
105     scope(|scope| {
106         scope.spawn(move |_| {
107             assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
108             assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
109             assert_eq!(
110                 r.recv_timeout(ms(1000)),
111                 Err(RecvTimeoutError::Disconnected)
112             );
113         });
114         scope.spawn(move |_| {
115             thread::sleep(ms(1500));
116             s.send(7).unwrap();
117         });
118     })
119     .unwrap();
120 }
121 
122 #[test]
try_send()123 fn try_send() {
124     let (s, r) = bounded(0);
125 
126     scope(|scope| {
127         scope.spawn(move |_| {
128             assert_eq!(s.try_send(7), Err(TrySendError::Full(7)));
129             thread::sleep(ms(1500));
130             assert_eq!(s.try_send(8), Ok(()));
131             thread::sleep(ms(500));
132             assert_eq!(s.try_send(9), Err(TrySendError::Disconnected(9)));
133         });
134         scope.spawn(move |_| {
135             thread::sleep(ms(1000));
136             assert_eq!(r.recv(), Ok(8));
137         });
138     })
139     .unwrap();
140 }
141 
142 #[test]
send()143 fn send() {
144     let (s, r) = bounded(0);
145 
146     scope(|scope| {
147         scope.spawn(move |_| {
148             s.send(7).unwrap();
149             thread::sleep(ms(1000));
150             s.send(8).unwrap();
151             thread::sleep(ms(1000));
152             s.send(9).unwrap();
153         });
154         scope.spawn(move |_| {
155             thread::sleep(ms(1500));
156             assert_eq!(r.recv(), Ok(7));
157             assert_eq!(r.recv(), Ok(8));
158             assert_eq!(r.recv(), Ok(9));
159         });
160     })
161     .unwrap();
162 }
163 
164 #[test]
send_timeout()165 fn send_timeout() {
166     let (s, r) = bounded(0);
167 
168     scope(|scope| {
169         scope.spawn(move |_| {
170             assert_eq!(
171                 s.send_timeout(7, ms(1000)),
172                 Err(SendTimeoutError::Timeout(7))
173             );
174             assert_eq!(s.send_timeout(8, ms(1000)), Ok(()));
175             assert_eq!(
176                 s.send_timeout(9, ms(1000)),
177                 Err(SendTimeoutError::Disconnected(9))
178             );
179         });
180         scope.spawn(move |_| {
181             thread::sleep(ms(1500));
182             assert_eq!(r.recv(), Ok(8));
183         });
184     })
185     .unwrap();
186 }
187 
188 #[test]
len()189 fn len() {
190     #[cfg(miri)]
191     const COUNT: usize = 100;
192     #[cfg(not(miri))]
193     const COUNT: usize = 25_000;
194 
195     let (s, r) = bounded(0);
196 
197     assert_eq!(s.len(), 0);
198     assert_eq!(r.len(), 0);
199 
200     scope(|scope| {
201         scope.spawn(|_| {
202             for i in 0..COUNT {
203                 assert_eq!(r.recv(), Ok(i));
204                 assert_eq!(r.len(), 0);
205             }
206         });
207 
208         scope.spawn(|_| {
209             for i in 0..COUNT {
210                 s.send(i).unwrap();
211                 assert_eq!(s.len(), 0);
212             }
213         });
214     })
215     .unwrap();
216 
217     assert_eq!(s.len(), 0);
218     assert_eq!(r.len(), 0);
219 }
220 
221 #[test]
disconnect_wakes_sender()222 fn disconnect_wakes_sender() {
223     let (s, r) = bounded(0);
224 
225     scope(|scope| {
226         scope.spawn(move |_| {
227             assert_eq!(s.send(()), Err(SendError(())));
228         });
229         scope.spawn(move |_| {
230             thread::sleep(ms(1000));
231             drop(r);
232         });
233     })
234     .unwrap();
235 }
236 
237 #[test]
disconnect_wakes_receiver()238 fn disconnect_wakes_receiver() {
239     let (s, r) = bounded::<()>(0);
240 
241     scope(|scope| {
242         scope.spawn(move |_| {
243             assert_eq!(r.recv(), Err(RecvError));
244         });
245         scope.spawn(move |_| {
246             thread::sleep(ms(1000));
247             drop(s);
248         });
249     })
250     .unwrap();
251 }
252 
253 #[test]
spsc()254 fn spsc() {
255     #[cfg(miri)]
256     const COUNT: usize = 100;
257     #[cfg(not(miri))]
258     const COUNT: usize = 100_000;
259 
260     let (s, r) = bounded(0);
261 
262     scope(|scope| {
263         scope.spawn(move |_| {
264             for i in 0..COUNT {
265                 assert_eq!(r.recv(), Ok(i));
266             }
267             assert_eq!(r.recv(), Err(RecvError));
268         });
269         scope.spawn(move |_| {
270             for i in 0..COUNT {
271                 s.send(i).unwrap();
272             }
273         });
274     })
275     .unwrap();
276 }
277 
278 #[test]
mpmc()279 fn mpmc() {
280     #[cfg(miri)]
281     const COUNT: usize = 100;
282     #[cfg(not(miri))]
283     const COUNT: usize = 25_000;
284     const THREADS: usize = 4;
285 
286     let (s, r) = bounded::<usize>(0);
287     let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
288 
289     scope(|scope| {
290         for _ in 0..THREADS {
291             scope.spawn(|_| {
292                 for _ in 0..COUNT {
293                     let n = r.recv().unwrap();
294                     v[n].fetch_add(1, Ordering::SeqCst);
295                 }
296             });
297         }
298         for _ in 0..THREADS {
299             scope.spawn(|_| {
300                 for i in 0..COUNT {
301                     s.send(i).unwrap();
302                 }
303             });
304         }
305     })
306     .unwrap();
307 
308     for c in v {
309         assert_eq!(c.load(Ordering::SeqCst), THREADS);
310     }
311 }
312 
313 #[test]
stress_oneshot()314 fn stress_oneshot() {
315     #[cfg(miri)]
316     const COUNT: usize = 100;
317     #[cfg(not(miri))]
318     const COUNT: usize = 10_000;
319 
320     for _ in 0..COUNT {
321         let (s, r) = bounded(1);
322 
323         scope(|scope| {
324             scope.spawn(|_| r.recv().unwrap());
325             scope.spawn(|_| s.send(0).unwrap());
326         })
327         .unwrap();
328     }
329 }
330 
331 #[cfg_attr(miri, ignore)] // Miri is too slow
332 #[test]
stress_iter()333 fn stress_iter() {
334     const COUNT: usize = 1000;
335 
336     let (request_s, request_r) = bounded(0);
337     let (response_s, response_r) = bounded(0);
338 
339     scope(|scope| {
340         scope.spawn(move |_| {
341             let mut count = 0;
342             loop {
343                 for x in response_r.try_iter() {
344                     count += x;
345                     if count == COUNT {
346                         return;
347                     }
348                 }
349                 let _ = request_s.try_send(());
350             }
351         });
352 
353         for _ in request_r.iter() {
354             if response_s.send(1).is_err() {
355                 break;
356             }
357         }
358     })
359     .unwrap();
360 }
361 
362 #[test]
stress_timeout_two_threads()363 fn stress_timeout_two_threads() {
364     const COUNT: usize = 100;
365 
366     let (s, r) = bounded(0);
367 
368     scope(|scope| {
369         scope.spawn(|_| {
370             for i in 0..COUNT {
371                 if i % 2 == 0 {
372                     thread::sleep(ms(50));
373                 }
374                 loop {
375                     if let Ok(()) = s.send_timeout(i, ms(10)) {
376                         break;
377                     }
378                 }
379             }
380         });
381 
382         scope.spawn(|_| {
383             for i in 0..COUNT {
384                 if i % 2 == 0 {
385                     thread::sleep(ms(50));
386                 }
387                 loop {
388                     if let Ok(x) = r.recv_timeout(ms(10)) {
389                         assert_eq!(x, i);
390                         break;
391                     }
392                 }
393             }
394         });
395     })
396     .unwrap();
397 }
398 
399 #[cfg_attr(miri, ignore)] // Miri is too slow
400 #[test]
drops()401 fn drops() {
402     const RUNS: usize = 100;
403 
404     static DROPS: AtomicUsize = AtomicUsize::new(0);
405 
406     #[derive(Debug, PartialEq)]
407     struct DropCounter;
408 
409     impl Drop for DropCounter {
410         fn drop(&mut self) {
411             DROPS.fetch_add(1, Ordering::SeqCst);
412         }
413     }
414 
415     let mut rng = thread_rng();
416 
417     for _ in 0..RUNS {
418         let steps = rng.gen_range(0..3_000);
419 
420         DROPS.store(0, Ordering::SeqCst);
421         let (s, r) = bounded::<DropCounter>(0);
422 
423         scope(|scope| {
424             scope.spawn(|_| {
425                 for _ in 0..steps {
426                     r.recv().unwrap();
427                 }
428             });
429 
430             scope.spawn(|_| {
431                 for _ in 0..steps {
432                     s.send(DropCounter).unwrap();
433                 }
434             });
435         })
436         .unwrap();
437 
438         assert_eq!(DROPS.load(Ordering::SeqCst), steps);
439         drop(s);
440         drop(r);
441         assert_eq!(DROPS.load(Ordering::SeqCst), steps);
442     }
443 }
444 
445 #[test]
fairness()446 fn fairness() {
447     #[cfg(miri)]
448     const COUNT: usize = 100;
449     #[cfg(not(miri))]
450     const COUNT: usize = 10_000;
451 
452     let (s1, r1) = bounded::<()>(0);
453     let (s2, r2) = bounded::<()>(0);
454 
455     scope(|scope| {
456         scope.spawn(|_| {
457             let mut hits = [0usize; 2];
458             for _ in 0..COUNT {
459                 select! {
460                     recv(r1) -> _ => hits[0] += 1,
461                     recv(r2) -> _ => hits[1] += 1,
462                 }
463             }
464             assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
465         });
466 
467         let mut hits = [0usize; 2];
468         for _ in 0..COUNT {
469             select! {
470                 send(s1, ()) -> _ => hits[0] += 1,
471                 send(s2, ()) -> _ => hits[1] += 1,
472             }
473         }
474         assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
475     })
476     .unwrap();
477 }
478 
479 #[test]
fairness_duplicates()480 fn fairness_duplicates() {
481     #[cfg(miri)]
482     const COUNT: usize = 100;
483     #[cfg(not(miri))]
484     const COUNT: usize = 10_000;
485 
486     let (s, r) = bounded::<()>(0);
487 
488     scope(|scope| {
489         scope.spawn(|_| {
490             let mut hits = [0usize; 5];
491             for _ in 0..COUNT {
492                 select! {
493                     recv(r) -> _ => hits[0] += 1,
494                     recv(r) -> _ => hits[1] += 1,
495                     recv(r) -> _ => hits[2] += 1,
496                     recv(r) -> _ => hits[3] += 1,
497                     recv(r) -> _ => hits[4] += 1,
498                 }
499             }
500             assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
501         });
502 
503         let mut hits = [0usize; 5];
504         for _ in 0..COUNT {
505             select! {
506                 send(s, ()) -> _ => hits[0] += 1,
507                 send(s, ()) -> _ => hits[1] += 1,
508                 send(s, ()) -> _ => hits[2] += 1,
509                 send(s, ()) -> _ => hits[3] += 1,
510                 send(s, ()) -> _ => hits[4] += 1,
511             }
512         }
513         assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
514     })
515     .unwrap();
516 }
517 
518 #[test]
recv_in_send()519 fn recv_in_send() {
520     let (s, r) = bounded(0);
521 
522     scope(|scope| {
523         scope.spawn(|_| {
524             thread::sleep(ms(100));
525             r.recv()
526         });
527 
528         scope.spawn(|_| {
529             thread::sleep(ms(500));
530             s.send(()).unwrap();
531         });
532 
533         select! {
534             send(s, r.recv().unwrap()) -> _ => {}
535         }
536     })
537     .unwrap();
538 }
539 
540 #[test]
channel_through_channel()541 fn channel_through_channel() {
542     #[cfg(miri)]
543     const COUNT: usize = 100;
544     #[cfg(not(miri))]
545     const COUNT: usize = 1000;
546 
547     type T = Box<dyn Any + Send>;
548 
549     let (s, r) = bounded::<T>(0);
550 
551     scope(|scope| {
552         scope.spawn(move |_| {
553             let mut s = s;
554 
555             for _ in 0..COUNT {
556                 let (new_s, new_r) = bounded(0);
557                 let new_r: T = Box::new(Some(new_r));
558 
559                 s.send(new_r).unwrap();
560                 s = new_s;
561             }
562         });
563 
564         scope.spawn(move |_| {
565             let mut r = r;
566 
567             for _ in 0..COUNT {
568                 r = r
569                     .recv()
570                     .unwrap()
571                     .downcast_mut::<Option<Receiver<T>>>()
572                     .unwrap()
573                     .take()
574                     .unwrap()
575             }
576         });
577     })
578     .unwrap();
579 }
580