• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Tests for the array channel flavor.
2 
3 use std::any::Any;
4 use std::sync::atomic::AtomicUsize;
5 use std::sync::atomic::Ordering;
6 use std::thread;
7 use std::time::Duration;
8 
9 use crossbeam_channel::{bounded, select, Receiver};
10 use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
11 use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
12 use crossbeam_utils::thread::scope;
13 use rand::{thread_rng, Rng};
14 
ms(ms: u64) -> Duration15 fn ms(ms: u64) -> Duration {
16     Duration::from_millis(ms)
17 }
18 
19 #[test]
smoke()20 fn smoke() {
21     let (s, r) = bounded(1);
22     s.send(7).unwrap();
23     assert_eq!(r.try_recv(), Ok(7));
24 
25     s.send(8).unwrap();
26     assert_eq!(r.recv(), Ok(8));
27 
28     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
29     assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
30 }
31 
32 #[test]
capacity()33 fn capacity() {
34     for i in 1..10 {
35         let (s, r) = bounded::<()>(i);
36         assert_eq!(s.capacity(), Some(i));
37         assert_eq!(r.capacity(), Some(i));
38     }
39 }
40 
41 #[test]
len_empty_full()42 fn len_empty_full() {
43     let (s, r) = bounded(2);
44 
45     assert_eq!(s.len(), 0);
46     assert_eq!(s.is_empty(), true);
47     assert_eq!(s.is_full(), false);
48     assert_eq!(r.len(), 0);
49     assert_eq!(r.is_empty(), true);
50     assert_eq!(r.is_full(), false);
51 
52     s.send(()).unwrap();
53 
54     assert_eq!(s.len(), 1);
55     assert_eq!(s.is_empty(), false);
56     assert_eq!(s.is_full(), false);
57     assert_eq!(r.len(), 1);
58     assert_eq!(r.is_empty(), false);
59     assert_eq!(r.is_full(), false);
60 
61     s.send(()).unwrap();
62 
63     assert_eq!(s.len(), 2);
64     assert_eq!(s.is_empty(), false);
65     assert_eq!(s.is_full(), true);
66     assert_eq!(r.len(), 2);
67     assert_eq!(r.is_empty(), false);
68     assert_eq!(r.is_full(), true);
69 
70     r.recv().unwrap();
71 
72     assert_eq!(s.len(), 1);
73     assert_eq!(s.is_empty(), false);
74     assert_eq!(s.is_full(), false);
75     assert_eq!(r.len(), 1);
76     assert_eq!(r.is_empty(), false);
77     assert_eq!(r.is_full(), false);
78 }
79 
80 #[test]
try_recv()81 fn try_recv() {
82     let (s, r) = bounded(100);
83 
84     scope(|scope| {
85         scope.spawn(move |_| {
86             assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
87             thread::sleep(ms(1500));
88             assert_eq!(r.try_recv(), Ok(7));
89             thread::sleep(ms(500));
90             assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
91         });
92         scope.spawn(move |_| {
93             thread::sleep(ms(1000));
94             s.send(7).unwrap();
95         });
96     })
97     .unwrap();
98 }
99 
100 #[test]
recv()101 fn recv() {
102     let (s, r) = bounded(100);
103 
104     scope(|scope| {
105         scope.spawn(move |_| {
106             assert_eq!(r.recv(), Ok(7));
107             thread::sleep(ms(1000));
108             assert_eq!(r.recv(), Ok(8));
109             thread::sleep(ms(1000));
110             assert_eq!(r.recv(), Ok(9));
111             assert_eq!(r.recv(), Err(RecvError));
112         });
113         scope.spawn(move |_| {
114             thread::sleep(ms(1500));
115             s.send(7).unwrap();
116             s.send(8).unwrap();
117             s.send(9).unwrap();
118         });
119     })
120     .unwrap();
121 }
122 
123 #[test]
recv_timeout()124 fn recv_timeout() {
125     let (s, r) = bounded::<i32>(100);
126 
127     scope(|scope| {
128         scope.spawn(move |_| {
129             assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
130             assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
131             assert_eq!(
132                 r.recv_timeout(ms(1000)),
133                 Err(RecvTimeoutError::Disconnected)
134             );
135         });
136         scope.spawn(move |_| {
137             thread::sleep(ms(1500));
138             s.send(7).unwrap();
139         });
140     })
141     .unwrap();
142 }
143 
144 #[test]
try_send()145 fn try_send() {
146     let (s, r) = bounded(1);
147 
148     scope(|scope| {
149         scope.spawn(move |_| {
150             assert_eq!(s.try_send(1), Ok(()));
151             assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
152             thread::sleep(ms(1500));
153             assert_eq!(s.try_send(3), Ok(()));
154             thread::sleep(ms(500));
155             assert_eq!(s.try_send(4), Err(TrySendError::Disconnected(4)));
156         });
157         scope.spawn(move |_| {
158             thread::sleep(ms(1000));
159             assert_eq!(r.try_recv(), Ok(1));
160             assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
161             assert_eq!(r.recv(), Ok(3));
162         });
163     })
164     .unwrap();
165 }
166 
167 #[test]
send()168 fn send() {
169     let (s, r) = bounded(1);
170 
171     scope(|scope| {
172         scope.spawn(|_| {
173             s.send(7).unwrap();
174             thread::sleep(ms(1000));
175             s.send(8).unwrap();
176             thread::sleep(ms(1000));
177             s.send(9).unwrap();
178             thread::sleep(ms(1000));
179             s.send(10).unwrap();
180         });
181         scope.spawn(|_| {
182             thread::sleep(ms(1500));
183             assert_eq!(r.recv(), Ok(7));
184             assert_eq!(r.recv(), Ok(8));
185             assert_eq!(r.recv(), Ok(9));
186         });
187     })
188     .unwrap();
189 }
190 
191 #[test]
send_timeout()192 fn send_timeout() {
193     let (s, r) = bounded(2);
194 
195     scope(|scope| {
196         scope.spawn(move |_| {
197             assert_eq!(s.send_timeout(1, ms(1000)), Ok(()));
198             assert_eq!(s.send_timeout(2, ms(1000)), Ok(()));
199             assert_eq!(
200                 s.send_timeout(3, ms(500)),
201                 Err(SendTimeoutError::Timeout(3))
202             );
203             thread::sleep(ms(1000));
204             assert_eq!(s.send_timeout(4, ms(1000)), Ok(()));
205             thread::sleep(ms(1000));
206             assert_eq!(s.send(5), Err(SendError(5)));
207         });
208         scope.spawn(move |_| {
209             thread::sleep(ms(1000));
210             assert_eq!(r.recv(), Ok(1));
211             thread::sleep(ms(1000));
212             assert_eq!(r.recv(), Ok(2));
213             assert_eq!(r.recv(), Ok(4));
214         });
215     })
216     .unwrap();
217 }
218 
219 #[test]
send_after_disconnect()220 fn send_after_disconnect() {
221     let (s, r) = bounded(100);
222 
223     s.send(1).unwrap();
224     s.send(2).unwrap();
225     s.send(3).unwrap();
226 
227     drop(r);
228 
229     assert_eq!(s.send(4), Err(SendError(4)));
230     assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5)));
231     assert_eq!(
232         s.send_timeout(6, ms(500)),
233         Err(SendTimeoutError::Disconnected(6))
234     );
235 }
236 
237 #[test]
recv_after_disconnect()238 fn recv_after_disconnect() {
239     let (s, r) = bounded(100);
240 
241     s.send(1).unwrap();
242     s.send(2).unwrap();
243     s.send(3).unwrap();
244 
245     drop(s);
246 
247     assert_eq!(r.recv(), Ok(1));
248     assert_eq!(r.recv(), Ok(2));
249     assert_eq!(r.recv(), Ok(3));
250     assert_eq!(r.recv(), Err(RecvError));
251 }
252 
253 #[test]
len()254 fn len() {
255     const COUNT: usize = 25_000;
256     const CAP: usize = 1000;
257 
258     let (s, r) = bounded(CAP);
259 
260     assert_eq!(s.len(), 0);
261     assert_eq!(r.len(), 0);
262 
263     for _ in 0..CAP / 10 {
264         for i in 0..50 {
265             s.send(i).unwrap();
266             assert_eq!(s.len(), i + 1);
267         }
268 
269         for i in 0..50 {
270             r.recv().unwrap();
271             assert_eq!(r.len(), 50 - i - 1);
272         }
273     }
274 
275     assert_eq!(s.len(), 0);
276     assert_eq!(r.len(), 0);
277 
278     for i in 0..CAP {
279         s.send(i).unwrap();
280         assert_eq!(s.len(), i + 1);
281     }
282 
283     for _ in 0..CAP {
284         r.recv().unwrap();
285     }
286 
287     assert_eq!(s.len(), 0);
288     assert_eq!(r.len(), 0);
289 
290     scope(|scope| {
291         scope.spawn(|_| {
292             for i in 0..COUNT {
293                 assert_eq!(r.recv(), Ok(i));
294                 let len = r.len();
295                 assert!(len <= CAP);
296             }
297         });
298 
299         scope.spawn(|_| {
300             for i in 0..COUNT {
301                 s.send(i).unwrap();
302                 let len = s.len();
303                 assert!(len <= CAP);
304             }
305         });
306     })
307     .unwrap();
308 
309     assert_eq!(s.len(), 0);
310     assert_eq!(r.len(), 0);
311 }
312 
313 #[test]
disconnect_wakes_sender()314 fn disconnect_wakes_sender() {
315     let (s, r) = bounded(1);
316 
317     scope(|scope| {
318         scope.spawn(move |_| {
319             assert_eq!(s.send(()), Ok(()));
320             assert_eq!(s.send(()), Err(SendError(())));
321         });
322         scope.spawn(move |_| {
323             thread::sleep(ms(1000));
324             drop(r);
325         });
326     })
327     .unwrap();
328 }
329 
330 #[test]
disconnect_wakes_receiver()331 fn disconnect_wakes_receiver() {
332     let (s, r) = bounded::<()>(1);
333 
334     scope(|scope| {
335         scope.spawn(move |_| {
336             assert_eq!(r.recv(), Err(RecvError));
337         });
338         scope.spawn(move |_| {
339             thread::sleep(ms(1000));
340             drop(s);
341         });
342     })
343     .unwrap();
344 }
345 
346 #[test]
spsc()347 fn spsc() {
348     const COUNT: usize = 100_000;
349 
350     let (s, r) = bounded(3);
351 
352     scope(|scope| {
353         scope.spawn(move |_| {
354             for i in 0..COUNT {
355                 assert_eq!(r.recv(), Ok(i));
356             }
357             assert_eq!(r.recv(), Err(RecvError));
358         });
359         scope.spawn(move |_| {
360             for i in 0..COUNT {
361                 s.send(i).unwrap();
362             }
363         });
364     })
365     .unwrap();
366 }
367 
368 #[test]
mpmc()369 fn mpmc() {
370     const COUNT: usize = 25_000;
371     const THREADS: usize = 4;
372 
373     let (s, r) = bounded::<usize>(3);
374     let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
375 
376     scope(|scope| {
377         for _ in 0..THREADS {
378             scope.spawn(|_| {
379                 for _ in 0..COUNT {
380                     let n = r.recv().unwrap();
381                     v[n].fetch_add(1, Ordering::SeqCst);
382                 }
383             });
384         }
385         for _ in 0..THREADS {
386             scope.spawn(|_| {
387                 for i in 0..COUNT {
388                     s.send(i).unwrap();
389                 }
390             });
391         }
392     })
393     .unwrap();
394 
395     for c in v {
396         assert_eq!(c.load(Ordering::SeqCst), THREADS);
397     }
398 }
399 
400 #[test]
stress_oneshot()401 fn stress_oneshot() {
402     const COUNT: usize = 10_000;
403 
404     for _ in 0..COUNT {
405         let (s, r) = bounded(1);
406 
407         scope(|scope| {
408             scope.spawn(|_| r.recv().unwrap());
409             scope.spawn(|_| s.send(0).unwrap());
410         })
411         .unwrap();
412     }
413 }
414 
415 #[test]
stress_iter()416 fn stress_iter() {
417     const COUNT: usize = 100_000;
418 
419     let (request_s, request_r) = bounded(1);
420     let (response_s, response_r) = bounded(1);
421 
422     scope(|scope| {
423         scope.spawn(move |_| {
424             let mut count = 0;
425             loop {
426                 for x in response_r.try_iter() {
427                     count += x;
428                     if count == COUNT {
429                         return;
430                     }
431                 }
432                 request_s.send(()).unwrap();
433             }
434         });
435 
436         for _ in request_r.iter() {
437             if response_s.send(1).is_err() {
438                 break;
439             }
440         }
441     })
442     .unwrap();
443 }
444 
445 #[test]
stress_timeout_two_threads()446 fn stress_timeout_two_threads() {
447     const COUNT: usize = 100;
448 
449     let (s, r) = bounded(2);
450 
451     scope(|scope| {
452         scope.spawn(|_| {
453             for i in 0..COUNT {
454                 if i % 2 == 0 {
455                     thread::sleep(ms(50));
456                 }
457                 loop {
458                     if let Ok(()) = s.send_timeout(i, ms(10)) {
459                         break;
460                     }
461                 }
462             }
463         });
464 
465         scope.spawn(|_| {
466             for i in 0..COUNT {
467                 if i % 2 == 0 {
468                     thread::sleep(ms(50));
469                 }
470                 loop {
471                     if let Ok(x) = r.recv_timeout(ms(10)) {
472                         assert_eq!(x, i);
473                         break;
474                     }
475                 }
476             }
477         });
478     })
479     .unwrap();
480 }
481 
482 #[test]
drops()483 fn drops() {
484     const RUNS: usize = 100;
485 
486     static DROPS: AtomicUsize = AtomicUsize::new(0);
487 
488     #[derive(Debug, PartialEq)]
489     struct DropCounter;
490 
491     impl Drop for DropCounter {
492         fn drop(&mut self) {
493             DROPS.fetch_add(1, Ordering::SeqCst);
494         }
495     }
496 
497     let mut rng = thread_rng();
498 
499     for _ in 0..RUNS {
500         let steps = rng.gen_range(0, 10_000);
501         let additional = rng.gen_range(0, 50);
502 
503         DROPS.store(0, Ordering::SeqCst);
504         let (s, r) = bounded::<DropCounter>(50);
505 
506         scope(|scope| {
507             scope.spawn(|_| {
508                 for _ in 0..steps {
509                     r.recv().unwrap();
510                 }
511             });
512 
513             scope.spawn(|_| {
514                 for _ in 0..steps {
515                     s.send(DropCounter).unwrap();
516                 }
517             });
518         })
519         .unwrap();
520 
521         for _ in 0..additional {
522             s.send(DropCounter).unwrap();
523         }
524 
525         assert_eq!(DROPS.load(Ordering::SeqCst), steps);
526         drop(s);
527         drop(r);
528         assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
529     }
530 }
531 
532 #[test]
linearizable()533 fn linearizable() {
534     const COUNT: usize = 25_000;
535     const THREADS: usize = 4;
536 
537     let (s, r) = bounded(THREADS);
538 
539     scope(|scope| {
540         for _ in 0..THREADS {
541             scope.spawn(|_| {
542                 for _ in 0..COUNT {
543                     s.send(0).unwrap();
544                     r.try_recv().unwrap();
545                 }
546             });
547         }
548     })
549     .unwrap();
550 }
551 
552 #[test]
fairness()553 fn fairness() {
554     const COUNT: usize = 10_000;
555 
556     let (s1, r1) = bounded::<()>(COUNT);
557     let (s2, r2) = bounded::<()>(COUNT);
558 
559     for _ in 0..COUNT {
560         s1.send(()).unwrap();
561         s2.send(()).unwrap();
562     }
563 
564     let mut hits = [0usize; 2];
565     for _ in 0..COUNT {
566         select! {
567             recv(r1) -> _  => hits[0] += 1,
568             recv(r2) -> _  => hits[1] += 1,
569         }
570     }
571     assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
572 }
573 
574 #[test]
fairness_duplicates()575 fn fairness_duplicates() {
576     const COUNT: usize = 10_000;
577 
578     let (s, r) = bounded::<()>(COUNT);
579 
580     for _ in 0..COUNT {
581         s.send(()).unwrap();
582     }
583 
584     let mut hits = [0usize; 5];
585     for _ in 0..COUNT {
586         select! {
587             recv(r) -> _ => hits[0] += 1,
588             recv(r) -> _ => hits[1] += 1,
589             recv(r) -> _ => hits[2] += 1,
590             recv(r) -> _ => hits[3] += 1,
591             recv(r) -> _ => hits[4] += 1,
592         }
593     }
594     assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
595 }
596 
597 #[test]
recv_in_send()598 fn recv_in_send() {
599     let (s, _r) = bounded(1);
600     s.send(()).unwrap();
601 
602     #[allow(unreachable_code)]
603     {
604         select! {
605             send(s, panic!()) -> _ => panic!(),
606             default => {}
607         }
608     }
609 
610     let (s, r) = bounded(2);
611     s.send(()).unwrap();
612 
613     select! {
614         send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {}
615     }
616 }
617 
618 #[test]
channel_through_channel()619 fn channel_through_channel() {
620     const COUNT: usize = 1000;
621 
622     type T = Box<dyn Any + Send>;
623 
624     let (s, r) = bounded::<T>(1);
625 
626     scope(|scope| {
627         scope.spawn(move |_| {
628             let mut s = s;
629 
630             for _ in 0..COUNT {
631                 let (new_s, new_r) = bounded(1);
632                 let new_r: T = Box::new(Some(new_r));
633 
634                 s.send(new_r).unwrap();
635                 s = new_s;
636             }
637         });
638 
639         scope.spawn(move |_| {
640             let mut r = r;
641 
642             for _ in 0..COUNT {
643                 r = r
644                     .recv()
645                     .unwrap()
646                     .downcast_mut::<Option<Receiver<T>>>()
647                     .unwrap()
648                     .take()
649                     .unwrap()
650             }
651         });
652     })
653     .unwrap();
654 }
655