1 //! Tests for the zero channel flavor.
2 
3 use std::any::Any;
4 use std::sync::atomic::AtomicUsize;
5 use std::sync::atomic::Ordering;
6 use std::thread;
7 use std::time::Duration;
8 
9 use crossbeam_channel::{bounded, select, Receiver};
10 use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
11 use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
12 use crossbeam_utils::thread::scope;
13 use rand::{thread_rng, Rng};
14 
ms(ms: u64) -> Duration15 fn ms(ms: u64) -> Duration {
16     Duration::from_millis(ms)
17 }
18 
19 #[test]
smoke()20 fn smoke() {
21     let (s, r) = bounded(0);
22     assert_eq!(s.try_send(7), Err(TrySendError::Full(7)));
23     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
24 }
25 
26 #[test]
capacity()27 fn capacity() {
28     let (s, r) = bounded::<()>(0);
29     assert_eq!(s.capacity(), Some(0));
30     assert_eq!(r.capacity(), Some(0));
31 }
32 
33 #[test]
len_empty_full()34 fn len_empty_full() {
35     let (s, r) = bounded(0);
36 
37     assert_eq!(s.len(), 0);
38     assert!(s.is_empty());
39     assert!(s.is_full());
40     assert_eq!(r.len(), 0);
41     assert!(r.is_empty());
42     assert!(r.is_full());
43 
44     scope(|scope| {
45         scope.spawn(|_| s.send(0).unwrap());
46         scope.spawn(|_| r.recv().unwrap());
47     })
48     .unwrap();
49 
50     assert_eq!(s.len(), 0);
51     assert!(s.is_empty());
52     assert!(s.is_full());
53     assert_eq!(r.len(), 0);
54     assert!(r.is_empty());
55     assert!(r.is_full());
56 }
57 
58 #[test]
try_recv()59 fn try_recv() {
60     let (s, r) = bounded(0);
61 
62     scope(|scope| {
63         scope.spawn(move |_| {
64             assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
65             thread::sleep(ms(1500));
66             assert_eq!(r.try_recv(), Ok(7));
67             thread::sleep(ms(500));
68             assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
69         });
70         scope.spawn(move |_| {
71             thread::sleep(ms(1000));
72             s.send(7).unwrap();
73         });
74     })
75     .unwrap();
76 }
77 
78 #[test]
recv()79 fn recv() {
80     let (s, r) = bounded(0);
81 
82     scope(|scope| {
83         scope.spawn(move |_| {
84             assert_eq!(r.recv(), Ok(7));
85             thread::sleep(ms(1000));
86             assert_eq!(r.recv(), Ok(8));
87             thread::sleep(ms(1000));
88             assert_eq!(r.recv(), Ok(9));
89             assert_eq!(r.recv(), Err(RecvError));
90         });
91         scope.spawn(move |_| {
92             thread::sleep(ms(1500));
93             s.send(7).unwrap();
94             s.send(8).unwrap();
95             s.send(9).unwrap();
96         });
97     })
98     .unwrap();
99 }
100 
101 #[test]
recv_timeout()102 fn recv_timeout() {
103     let (s, r) = bounded::<i32>(0);
104 
105     scope(|scope| {
106         scope.spawn(move |_| {
107             assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
108             assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
109             assert_eq!(
110                 r.recv_timeout(ms(1000)),
111                 Err(RecvTimeoutError::Disconnected)
112             );
113         });
114         scope.spawn(move |_| {
115             thread::sleep(ms(1500));
116             s.send(7).unwrap();
117         });
118     })
119     .unwrap();
120 }
121 
122 #[test]
try_send()123 fn try_send() {
124     let (s, r) = bounded(0);
125 
126     scope(|scope| {
127         scope.spawn(move |_| {
128             assert_eq!(s.try_send(7), Err(TrySendError::Full(7)));
129             thread::sleep(ms(1500));
130             assert_eq!(s.try_send(8), Ok(()));
131             thread::sleep(ms(500));
132             assert_eq!(s.try_send(9), Err(TrySendError::Disconnected(9)));
133         });
134         scope.spawn(move |_| {
135             thread::sleep(ms(1000));
136             assert_eq!(r.recv(), Ok(8));
137         });
138     })
139     .unwrap();
140 }
141 
142 #[test]
send()143 fn send() {
144     let (s, r) = bounded(0);
145 
146     scope(|scope| {
147         scope.spawn(move |_| {
148             s.send(7).unwrap();
149             thread::sleep(ms(1000));
150             s.send(8).unwrap();
151             thread::sleep(ms(1000));
152             s.send(9).unwrap();
153         });
154         scope.spawn(move |_| {
155             thread::sleep(ms(1500));
156             assert_eq!(r.recv(), Ok(7));
157             assert_eq!(r.recv(), Ok(8));
158             assert_eq!(r.recv(), Ok(9));
159         });
160     })
161     .unwrap();
162 }
163 
164 #[test]
send_timeout()165 fn send_timeout() {
166     let (s, r) = bounded(0);
167 
168     scope(|scope| {
169         scope.spawn(move |_| {
170             assert_eq!(
171                 s.send_timeout(7, ms(1000)),
172                 Err(SendTimeoutError::Timeout(7))
173             );
174             assert_eq!(s.send_timeout(8, ms(1000)), Ok(()));
175             assert_eq!(
176                 s.send_timeout(9, ms(1000)),
177                 Err(SendTimeoutError::Disconnected(9))
178             );
179         });
180         scope.spawn(move |_| {
181             thread::sleep(ms(1500));
182             assert_eq!(r.recv(), Ok(8));
183         });
184     })
185     .unwrap();
186 }
187 
188 #[test]
len()189 fn len() {
190     #[cfg(miri)]
191     const COUNT: usize = 50;
192     #[cfg(not(miri))]
193     const COUNT: usize = 25_000;
194 
195     let (s, r) = bounded(0);
196 
197     assert_eq!(s.len(), 0);
198     assert_eq!(r.len(), 0);
199 
200     scope(|scope| {
201         scope.spawn(|_| {
202             for i in 0..COUNT {
203                 assert_eq!(r.recv(), Ok(i));
204                 assert_eq!(r.len(), 0);
205             }
206         });
207 
208         scope.spawn(|_| {
209             for i in 0..COUNT {
210                 s.send(i).unwrap();
211                 assert_eq!(s.len(), 0);
212             }
213         });
214     })
215     .unwrap();
216 
217     assert_eq!(s.len(), 0);
218     assert_eq!(r.len(), 0);
219 }
220 
221 #[test]
disconnect_wakes_sender()222 fn disconnect_wakes_sender() {
223     let (s, r) = bounded(0);
224 
225     scope(|scope| {
226         scope.spawn(move |_| {
227             assert_eq!(s.send(()), Err(SendError(())));
228         });
229         scope.spawn(move |_| {
230             thread::sleep(ms(1000));
231             drop(r);
232         });
233     })
234     .unwrap();
235 }
236 
237 #[test]
disconnect_wakes_receiver()238 fn disconnect_wakes_receiver() {
239     let (s, r) = bounded::<()>(0);
240 
241     scope(|scope| {
242         scope.spawn(move |_| {
243             assert_eq!(r.recv(), Err(RecvError));
244         });
245         scope.spawn(move |_| {
246             thread::sleep(ms(1000));
247             drop(s);
248         });
249     })
250     .unwrap();
251 }
252 
253 #[test]
spsc()254 fn spsc() {
255     #[cfg(miri)]
256     const COUNT: usize = 50;
257     #[cfg(not(miri))]
258     const COUNT: usize = 100_000;
259 
260     let (s, r) = bounded(0);
261 
262     scope(|scope| {
263         scope.spawn(move |_| {
264             for i in 0..COUNT {
265                 assert_eq!(r.recv(), Ok(i));
266             }
267             assert_eq!(r.recv(), Err(RecvError));
268         });
269         scope.spawn(move |_| {
270             for i in 0..COUNT {
271                 s.send(i).unwrap();
272             }
273         });
274     })
275     .unwrap();
276 }
277 
278 #[test]
mpmc()279 fn mpmc() {
280     #[cfg(miri)]
281     const COUNT: usize = 50;
282     #[cfg(not(miri))]
283     const COUNT: usize = 25_000;
284     const THREADS: usize = 4;
285 
286     let (s, r) = bounded::<usize>(0);
287     let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
288 
289     scope(|scope| {
290         for _ in 0..THREADS {
291             scope.spawn(|_| {
292                 for _ in 0..COUNT {
293                     let n = r.recv().unwrap();
294                     v[n].fetch_add(1, Ordering::SeqCst);
295                 }
296             });
297         }
298         for _ in 0..THREADS {
299             scope.spawn(|_| {
300                 for i in 0..COUNT {
301                     s.send(i).unwrap();
302                 }
303             });
304         }
305     })
306     .unwrap();
307 
308     for c in v {
309         assert_eq!(c.load(Ordering::SeqCst), THREADS);
310     }
311 }
312 
313 #[test]
stress_oneshot()314 fn stress_oneshot() {
315     #[cfg(miri)]
316     const COUNT: usize = 50;
317     #[cfg(not(miri))]
318     const COUNT: usize = 10_000;
319 
320     for _ in 0..COUNT {
321         let (s, r) = bounded(1);
322 
323         scope(|scope| {
324             scope.spawn(|_| r.recv().unwrap());
325             scope.spawn(|_| s.send(0).unwrap());
326         })
327         .unwrap();
328     }
329 }
330 
331 #[test]
stress_iter()332 fn stress_iter() {
333     #[cfg(miri)]
334     const COUNT: usize = 50;
335     #[cfg(not(miri))]
336     const COUNT: usize = 1000;
337 
338     let (request_s, request_r) = bounded(0);
339     let (response_s, response_r) = bounded(0);
340 
341     scope(|scope| {
342         scope.spawn(move |_| {
343             let mut count = 0;
344             loop {
345                 for x in response_r.try_iter() {
346                     count += x;
347                     if count == COUNT {
348                         return;
349                     }
350                 }
351                 let _ = request_s.try_send(());
352             }
353         });
354 
355         for _ in request_r.iter() {
356             if response_s.send(1).is_err() {
357                 break;
358             }
359         }
360     })
361     .unwrap();
362 }
363 
364 #[test]
stress_timeout_two_threads()365 fn stress_timeout_two_threads() {
366     const COUNT: usize = 100;
367 
368     let (s, r) = bounded(0);
369 
370     scope(|scope| {
371         scope.spawn(|_| {
372             for i in 0..COUNT {
373                 if i % 2 == 0 {
374                     thread::sleep(ms(50));
375                 }
376                 loop {
377                     if let Ok(()) = s.send_timeout(i, ms(10)) {
378                         break;
379                     }
380                 }
381             }
382         });
383 
384         scope.spawn(|_| {
385             for i in 0..COUNT {
386                 if i % 2 == 0 {
387                     thread::sleep(ms(50));
388                 }
389                 loop {
390                     if let Ok(x) = r.recv_timeout(ms(10)) {
391                         assert_eq!(x, i);
392                         break;
393                     }
394                 }
395             }
396         });
397     })
398     .unwrap();
399 }
400 
401 #[test]
drops()402 fn drops() {
403     #[cfg(miri)]
404     const RUNS: usize = 20;
405     #[cfg(not(miri))]
406     const RUNS: usize = 100;
407     #[cfg(miri)]
408     const STEPS: usize = 100;
409     #[cfg(not(miri))]
410     const STEPS: usize = 10_000;
411 
412     static DROPS: AtomicUsize = AtomicUsize::new(0);
413 
414     #[derive(Debug, PartialEq)]
415     struct DropCounter;
416 
417     impl Drop for DropCounter {
418         fn drop(&mut self) {
419             DROPS.fetch_add(1, Ordering::SeqCst);
420         }
421     }
422 
423     let mut rng = thread_rng();
424 
425     for _ in 0..RUNS {
426         let steps = rng.gen_range(0..STEPS);
427 
428         DROPS.store(0, Ordering::SeqCst);
429         let (s, r) = bounded::<DropCounter>(0);
430 
431         scope(|scope| {
432             scope.spawn(|_| {
433                 for _ in 0..steps {
434                     r.recv().unwrap();
435                 }
436             });
437 
438             scope.spawn(|_| {
439                 for _ in 0..steps {
440                     s.send(DropCounter).unwrap();
441                 }
442             });
443         })
444         .unwrap();
445 
446         assert_eq!(DROPS.load(Ordering::SeqCst), steps);
447         drop(s);
448         drop(r);
449         assert_eq!(DROPS.load(Ordering::SeqCst), steps);
450     }
451 }
452 
453 #[test]
fairness()454 fn fairness() {
455     #[cfg(miri)]
456     const COUNT: usize = 50;
457     #[cfg(not(miri))]
458     const COUNT: usize = 10_000;
459 
460     let (s1, r1) = bounded::<()>(0);
461     let (s2, r2) = bounded::<()>(0);
462 
463     scope(|scope| {
464         scope.spawn(|_| {
465             let mut hits = [0usize; 2];
466             for _ in 0..COUNT {
467                 select! {
468                     recv(r1) -> _ => hits[0] += 1,
469                     recv(r2) -> _ => hits[1] += 1,
470                 }
471             }
472             assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
473         });
474 
475         let mut hits = [0usize; 2];
476         for _ in 0..COUNT {
477             select! {
478                 send(s1, ()) -> _ => hits[0] += 1,
479                 send(s2, ()) -> _ => hits[1] += 1,
480             }
481         }
482         assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
483     })
484     .unwrap();
485 }
486 
487 #[test]
fairness_duplicates()488 fn fairness_duplicates() {
489     #[cfg(miri)]
490     const COUNT: usize = 100;
491     #[cfg(not(miri))]
492     const COUNT: usize = 10_000;
493 
494     let (s, r) = bounded::<()>(0);
495 
496     scope(|scope| {
497         scope.spawn(|_| {
498             let mut hits = [0usize; 5];
499             for _ in 0..COUNT {
500                 select! {
501                     recv(r) -> _ => hits[0] += 1,
502                     recv(r) -> _ => hits[1] += 1,
503                     recv(r) -> _ => hits[2] += 1,
504                     recv(r) -> _ => hits[3] += 1,
505                     recv(r) -> _ => hits[4] += 1,
506                 }
507             }
508             assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
509         });
510 
511         let mut hits = [0usize; 5];
512         for _ in 0..COUNT {
513             select! {
514                 send(s, ()) -> _ => hits[0] += 1,
515                 send(s, ()) -> _ => hits[1] += 1,
516                 send(s, ()) -> _ => hits[2] += 1,
517                 send(s, ()) -> _ => hits[3] += 1,
518                 send(s, ()) -> _ => hits[4] += 1,
519             }
520         }
521         assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
522     })
523     .unwrap();
524 }
525 
526 #[test]
recv_in_send()527 fn recv_in_send() {
528     let (s, r) = bounded(0);
529 
530     scope(|scope| {
531         scope.spawn(|_| {
532             thread::sleep(ms(100));
533             r.recv()
534         });
535 
536         scope.spawn(|_| {
537             thread::sleep(ms(500));
538             s.send(()).unwrap();
539         });
540 
541         select! {
542             send(s, r.recv().unwrap()) -> _ => {}
543         }
544     })
545     .unwrap();
546 }
547 
548 #[test]
channel_through_channel()549 fn channel_through_channel() {
550     #[cfg(miri)]
551     const COUNT: usize = 50;
552     #[cfg(not(miri))]
553     const COUNT: usize = 1000;
554 
555     type T = Box<dyn Any + Send>;
556 
557     let (s, r) = bounded::<T>(0);
558 
559     scope(|scope| {
560         scope.spawn(move |_| {
561             let mut s = s;
562 
563             for _ in 0..COUNT {
564                 let (new_s, new_r) = bounded(0);
565                 let new_r: T = Box::new(Some(new_r));
566 
567                 s.send(new_r).unwrap();
568                 s = new_s;
569             }
570         });
571 
572         scope.spawn(move |_| {
573             let mut r = r;
574 
575             for _ in 0..COUNT {
576                 r = r
577                     .recv()
578                     .unwrap()
579                     .downcast_mut::<Option<Receiver<T>>>()
580                     .unwrap()
581                     .take()
582                     .unwrap()
583             }
584         });
585     })
586     .unwrap();
587 }
588