• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Tests copied from `std::sync::mpsc`.
2 //!
3 //! This is a copy of tests for the `std::sync::mpsc` channels from the standard library, but
4 //! modified to work with `crossbeam-channel` instead.
5 //!
6 //! Minor tweaks were needed to make the tests compile:
7 //!
8 //! - Replace `box` syntax with `Box::new`.
9 //! - Replace all uses of `Select` with `select!`.
10 //! - Change the imports.
11 //! - Join all spawned threads.
12 //! - Removed assertion from oneshot_multi_thread_send_close_stress tests.
13 //!
14 //! Source:
15 //!   - https://github.com/rust-lang/rust/tree/master/src/libstd/sync/mpsc
16 //!
17 //! Copyright & License:
18 //!   - Copyright 2013-2014 The Rust Project Developers
19 //!   - Apache License, Version 2.0 or MIT license, at your option
20 //!   - https://github.com/rust-lang/rust/blob/master/COPYRIGHT
21 //!   - https://www.rust-lang.org/en-US/legal.html
22 
23 #![allow(
24     clippy::drop_copy,
25     clippy::match_single_binding,
26     clippy::redundant_clone
27 )]
28 
29 use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError};
30 use std::sync::mpsc::{SendError, TrySendError};
31 use std::thread::JoinHandle;
32 use std::time::Duration;
33 
34 use crossbeam_channel as cc;
35 
36 pub struct Sender<T> {
37     pub inner: cc::Sender<T>,
38 }
39 
40 impl<T> Sender<T> {
send(&self, t: T) -> Result<(), SendError<T>>41     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
42         self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
43     }
44 }
45 
46 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>47     fn clone(&self) -> Sender<T> {
48         Sender {
49             inner: self.inner.clone(),
50         }
51     }
52 }
53 
54 pub struct SyncSender<T> {
55     pub inner: cc::Sender<T>,
56 }
57 
58 impl<T> SyncSender<T> {
send(&self, t: T) -> Result<(), SendError<T>>59     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
60         self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
61     }
62 
try_send(&self, t: T) -> Result<(), TrySendError<T>>63     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
64         self.inner.try_send(t).map_err(|err| match err {
65             cc::TrySendError::Full(m) => TrySendError::Full(m),
66             cc::TrySendError::Disconnected(m) => TrySendError::Disconnected(m),
67         })
68     }
69 }
70 
71 impl<T> Clone for SyncSender<T> {
clone(&self) -> SyncSender<T>72     fn clone(&self) -> SyncSender<T> {
73         SyncSender {
74             inner: self.inner.clone(),
75         }
76     }
77 }
78 
79 pub struct Receiver<T> {
80     pub inner: cc::Receiver<T>,
81 }
82 
83 impl<T> Receiver<T> {
try_recv(&self) -> Result<T, TryRecvError>84     pub fn try_recv(&self) -> Result<T, TryRecvError> {
85         self.inner.try_recv().map_err(|err| match err {
86             cc::TryRecvError::Empty => TryRecvError::Empty,
87             cc::TryRecvError::Disconnected => TryRecvError::Disconnected,
88         })
89     }
90 
recv(&self) -> Result<T, RecvError>91     pub fn recv(&self) -> Result<T, RecvError> {
92         self.inner.recv().map_err(|_| RecvError)
93     }
94 
recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>95     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
96         self.inner.recv_timeout(timeout).map_err(|err| match err {
97             cc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
98             cc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
99         })
100     }
101 
iter(&self) -> Iter<T>102     pub fn iter(&self) -> Iter<T> {
103         Iter { inner: self }
104     }
105 
try_iter(&self) -> TryIter<T>106     pub fn try_iter(&self) -> TryIter<T> {
107         TryIter { inner: self }
108     }
109 }
110 
111 impl<'a, T> IntoIterator for &'a Receiver<T> {
112     type Item = T;
113     type IntoIter = Iter<'a, T>;
114 
into_iter(self) -> Iter<'a, T>115     fn into_iter(self) -> Iter<'a, T> {
116         self.iter()
117     }
118 }
119 
120 impl<T> IntoIterator for Receiver<T> {
121     type Item = T;
122     type IntoIter = IntoIter<T>;
123 
into_iter(self) -> IntoIter<T>124     fn into_iter(self) -> IntoIter<T> {
125         IntoIter { inner: self }
126     }
127 }
128 
129 pub struct TryIter<'a, T: 'a> {
130     inner: &'a Receiver<T>,
131 }
132 
133 impl<'a, T> Iterator for TryIter<'a, T> {
134     type Item = T;
135 
next(&mut self) -> Option<T>136     fn next(&mut self) -> Option<T> {
137         self.inner.try_recv().ok()
138     }
139 }
140 
141 pub struct Iter<'a, T: 'a> {
142     inner: &'a Receiver<T>,
143 }
144 
145 impl<'a, T> Iterator for Iter<'a, T> {
146     type Item = T;
147 
next(&mut self) -> Option<T>148     fn next(&mut self) -> Option<T> {
149         self.inner.recv().ok()
150     }
151 }
152 
153 pub struct IntoIter<T> {
154     inner: Receiver<T>,
155 }
156 
157 impl<T> Iterator for IntoIter<T> {
158     type Item = T;
159 
next(&mut self) -> Option<T>160     fn next(&mut self) -> Option<T> {
161         self.inner.recv().ok()
162     }
163 }
164 
channel<T>() -> (Sender<T>, Receiver<T>)165 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
166     let (s, r) = cc::unbounded();
167     let s = Sender { inner: s };
168     let r = Receiver { inner: r };
169     (s, r)
170 }
171 
sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>)172 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
173     let (s, r) = cc::bounded(bound);
174     let s = SyncSender { inner: s };
175     let r = Receiver { inner: r };
176     (s, r)
177 }
178 
179 macro_rules! select {
180     (
181         $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
182     ) => ({
183         cc::crossbeam_channel_internal! {
184             $(
185                 $meth(($rx).inner) -> res => {
186                     let $name = res.map_err(|_| ::std::sync::mpsc::RecvError);
187                     $code
188                 }
189             )+
190         }
191     })
192 }
193 
194 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
195 mod channel_tests {
196     use super::*;
197 
198     use std::env;
199     use std::thread;
200     use std::time::{Duration, Instant};
201 
stress_factor() -> usize202     pub fn stress_factor() -> usize {
203         match env::var("RUST_TEST_STRESS") {
204             Ok(val) => val.parse().unwrap(),
205             Err(..) => 1,
206         }
207     }
208 
209     #[test]
smoke()210     fn smoke() {
211         let (tx, rx) = channel::<i32>();
212         tx.send(1).unwrap();
213         assert_eq!(rx.recv().unwrap(), 1);
214     }
215 
216     #[test]
drop_full()217     fn drop_full() {
218         let (tx, _rx) = channel::<Box<isize>>();
219         tx.send(Box::new(1)).unwrap();
220     }
221 
222     #[test]
drop_full_shared()223     fn drop_full_shared() {
224         let (tx, _rx) = channel::<Box<isize>>();
225         drop(tx.clone());
226         drop(tx.clone());
227         tx.send(Box::new(1)).unwrap();
228     }
229 
230     #[test]
smoke_shared()231     fn smoke_shared() {
232         let (tx, rx) = channel::<i32>();
233         tx.send(1).unwrap();
234         assert_eq!(rx.recv().unwrap(), 1);
235         let tx = tx.clone();
236         tx.send(1).unwrap();
237         assert_eq!(rx.recv().unwrap(), 1);
238     }
239 
240     #[test]
smoke_threads()241     fn smoke_threads() {
242         let (tx, rx) = channel::<i32>();
243         let t = thread::spawn(move || {
244             tx.send(1).unwrap();
245         });
246         assert_eq!(rx.recv().unwrap(), 1);
247         t.join().unwrap();
248     }
249 
250     #[test]
smoke_port_gone()251     fn smoke_port_gone() {
252         let (tx, rx) = channel::<i32>();
253         drop(rx);
254         assert!(tx.send(1).is_err());
255     }
256 
257     #[test]
smoke_shared_port_gone()258     fn smoke_shared_port_gone() {
259         let (tx, rx) = channel::<i32>();
260         drop(rx);
261         assert!(tx.send(1).is_err())
262     }
263 
264     #[test]
smoke_shared_port_gone2()265     fn smoke_shared_port_gone2() {
266         let (tx, rx) = channel::<i32>();
267         drop(rx);
268         let tx2 = tx.clone();
269         drop(tx);
270         assert!(tx2.send(1).is_err());
271     }
272 
273     #[test]
port_gone_concurrent()274     fn port_gone_concurrent() {
275         let (tx, rx) = channel::<i32>();
276         let t = thread::spawn(move || {
277             rx.recv().unwrap();
278         });
279         while tx.send(1).is_ok() {}
280         t.join().unwrap();
281     }
282 
283     #[test]
port_gone_concurrent_shared()284     fn port_gone_concurrent_shared() {
285         let (tx, rx) = channel::<i32>();
286         let tx2 = tx.clone();
287         let t = thread::spawn(move || {
288             rx.recv().unwrap();
289         });
290         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
291         t.join().unwrap();
292     }
293 
294     #[test]
smoke_chan_gone()295     fn smoke_chan_gone() {
296         let (tx, rx) = channel::<i32>();
297         drop(tx);
298         assert!(rx.recv().is_err());
299     }
300 
301     #[test]
smoke_chan_gone_shared()302     fn smoke_chan_gone_shared() {
303         let (tx, rx) = channel::<()>();
304         let tx2 = tx.clone();
305         drop(tx);
306         drop(tx2);
307         assert!(rx.recv().is_err());
308     }
309 
310     #[test]
chan_gone_concurrent()311     fn chan_gone_concurrent() {
312         let (tx, rx) = channel::<i32>();
313         let t = thread::spawn(move || {
314             tx.send(1).unwrap();
315             tx.send(1).unwrap();
316         });
317         while rx.recv().is_ok() {}
318         t.join().unwrap();
319     }
320 
321     #[test]
stress()322     fn stress() {
323         #[cfg(miri)]
324         const COUNT: usize = 100;
325         #[cfg(not(miri))]
326         const COUNT: usize = 10000;
327 
328         let (tx, rx) = channel::<i32>();
329         let t = thread::spawn(move || {
330             for _ in 0..COUNT {
331                 tx.send(1).unwrap();
332             }
333         });
334         for _ in 0..COUNT {
335             assert_eq!(rx.recv().unwrap(), 1);
336         }
337         t.join().ok().unwrap();
338     }
339 
340     #[test]
stress_shared()341     fn stress_shared() {
342         let amt: u32 = if cfg!(miri) { 100 } else { 10_000 };
343         let nthreads: u32 = if cfg!(miri) { 4 } else { 8 };
344         let (tx, rx) = channel::<i32>();
345 
346         let t = thread::spawn(move || {
347             for _ in 0..amt * nthreads {
348                 assert_eq!(rx.recv().unwrap(), 1);
349             }
350             assert!(rx.try_recv().is_err());
351         });
352 
353         let mut ts = Vec::with_capacity(nthreads as usize);
354         for _ in 0..nthreads {
355             let tx = tx.clone();
356             let t = thread::spawn(move || {
357                 for _ in 0..amt {
358                     tx.send(1).unwrap();
359                 }
360             });
361             ts.push(t);
362         }
363         drop(tx);
364         t.join().ok().unwrap();
365         for t in ts {
366             t.join().unwrap();
367         }
368     }
369 
370     #[test]
send_from_outside_runtime()371     fn send_from_outside_runtime() {
372         let (tx1, rx1) = channel::<()>();
373         let (tx2, rx2) = channel::<i32>();
374         let t1 = thread::spawn(move || {
375             tx1.send(()).unwrap();
376             for _ in 0..40 {
377                 assert_eq!(rx2.recv().unwrap(), 1);
378             }
379         });
380         rx1.recv().unwrap();
381         let t2 = thread::spawn(move || {
382             for _ in 0..40 {
383                 tx2.send(1).unwrap();
384             }
385         });
386         t1.join().ok().unwrap();
387         t2.join().ok().unwrap();
388     }
389 
390     #[test]
recv_from_outside_runtime()391     fn recv_from_outside_runtime() {
392         let (tx, rx) = channel::<i32>();
393         let t = thread::spawn(move || {
394             for _ in 0..40 {
395                 assert_eq!(rx.recv().unwrap(), 1);
396             }
397         });
398         for _ in 0..40 {
399             tx.send(1).unwrap();
400         }
401         t.join().ok().unwrap();
402     }
403 
404     #[test]
no_runtime()405     fn no_runtime() {
406         let (tx1, rx1) = channel::<i32>();
407         let (tx2, rx2) = channel::<i32>();
408         let t1 = thread::spawn(move || {
409             assert_eq!(rx1.recv().unwrap(), 1);
410             tx2.send(2).unwrap();
411         });
412         let t2 = thread::spawn(move || {
413             tx1.send(1).unwrap();
414             assert_eq!(rx2.recv().unwrap(), 2);
415         });
416         t1.join().ok().unwrap();
417         t2.join().ok().unwrap();
418     }
419 
420     #[test]
oneshot_single_thread_close_port_first()421     fn oneshot_single_thread_close_port_first() {
422         // Simple test of closing without sending
423         let (_tx, rx) = channel::<i32>();
424         drop(rx);
425     }
426 
427     #[test]
oneshot_single_thread_close_chan_first()428     fn oneshot_single_thread_close_chan_first() {
429         // Simple test of closing without sending
430         let (tx, _rx) = channel::<i32>();
431         drop(tx);
432     }
433 
434     #[test]
oneshot_single_thread_send_port_close()435     fn oneshot_single_thread_send_port_close() {
436         // Testing that the sender cleans up the payload if receiver is closed
437         let (tx, rx) = channel::<Box<i32>>();
438         drop(rx);
439         assert!(tx.send(Box::new(0)).is_err());
440     }
441 
442     #[test]
oneshot_single_thread_recv_chan_close()443     fn oneshot_single_thread_recv_chan_close() {
444         let (tx, rx) = channel::<i32>();
445         drop(tx);
446         assert_eq!(rx.recv(), Err(RecvError));
447     }
448 
449     #[test]
oneshot_single_thread_send_then_recv()450     fn oneshot_single_thread_send_then_recv() {
451         let (tx, rx) = channel::<Box<i32>>();
452         tx.send(Box::new(10)).unwrap();
453         assert!(*rx.recv().unwrap() == 10);
454     }
455 
456     #[test]
oneshot_single_thread_try_send_open()457     fn oneshot_single_thread_try_send_open() {
458         let (tx, rx) = channel::<i32>();
459         assert!(tx.send(10).is_ok());
460         assert!(rx.recv().unwrap() == 10);
461     }
462 
463     #[test]
oneshot_single_thread_try_send_closed()464     fn oneshot_single_thread_try_send_closed() {
465         let (tx, rx) = channel::<i32>();
466         drop(rx);
467         assert!(tx.send(10).is_err());
468     }
469 
470     #[test]
oneshot_single_thread_try_recv_open()471     fn oneshot_single_thread_try_recv_open() {
472         let (tx, rx) = channel::<i32>();
473         tx.send(10).unwrap();
474         assert!(rx.recv() == Ok(10));
475     }
476 
477     #[test]
oneshot_single_thread_try_recv_closed()478     fn oneshot_single_thread_try_recv_closed() {
479         let (tx, rx) = channel::<i32>();
480         drop(tx);
481         assert!(rx.recv().is_err());
482     }
483 
484     #[test]
oneshot_single_thread_peek_data()485     fn oneshot_single_thread_peek_data() {
486         let (tx, rx) = channel::<i32>();
487         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
488         tx.send(10).unwrap();
489         assert_eq!(rx.try_recv(), Ok(10));
490     }
491 
492     #[test]
oneshot_single_thread_peek_close()493     fn oneshot_single_thread_peek_close() {
494         let (tx, rx) = channel::<i32>();
495         drop(tx);
496         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
497         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
498     }
499 
500     #[test]
oneshot_single_thread_peek_open()501     fn oneshot_single_thread_peek_open() {
502         let (_tx, rx) = channel::<i32>();
503         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
504     }
505 
506     #[test]
oneshot_multi_task_recv_then_send()507     fn oneshot_multi_task_recv_then_send() {
508         let (tx, rx) = channel::<Box<i32>>();
509         let t = thread::spawn(move || {
510             assert!(*rx.recv().unwrap() == 10);
511         });
512 
513         tx.send(Box::new(10)).unwrap();
514         t.join().unwrap();
515     }
516 
517     #[test]
oneshot_multi_task_recv_then_close()518     fn oneshot_multi_task_recv_then_close() {
519         let (tx, rx) = channel::<Box<i32>>();
520         let t = thread::spawn(move || {
521             drop(tx);
522         });
523         thread::spawn(move || {
524             assert_eq!(rx.recv(), Err(RecvError));
525         })
526         .join()
527         .unwrap();
528         t.join().unwrap();
529     }
530 
531     #[test]
oneshot_multi_thread_close_stress()532     fn oneshot_multi_thread_close_stress() {
533         let stress_factor = stress_factor();
534         let mut ts = Vec::with_capacity(stress_factor);
535         for _ in 0..stress_factor {
536             let (tx, rx) = channel::<i32>();
537             let t = thread::spawn(move || {
538                 drop(rx);
539             });
540             ts.push(t);
541             drop(tx);
542         }
543         for t in ts {
544             t.join().unwrap();
545         }
546     }
547 
548     #[test]
oneshot_multi_thread_send_close_stress()549     fn oneshot_multi_thread_send_close_stress() {
550         let stress_factor = stress_factor();
551         let mut ts = Vec::with_capacity(2 * stress_factor);
552         for _ in 0..stress_factor {
553             let (tx, rx) = channel::<i32>();
554             let t = thread::spawn(move || {
555                 drop(rx);
556             });
557             ts.push(t);
558             thread::spawn(move || {
559                 let _ = tx.send(1);
560             })
561             .join()
562             .unwrap();
563         }
564         for t in ts {
565             t.join().unwrap();
566         }
567     }
568 
569     #[test]
oneshot_multi_thread_recv_close_stress()570     fn oneshot_multi_thread_recv_close_stress() {
571         let stress_factor = stress_factor();
572         let mut ts = Vec::with_capacity(2 * stress_factor);
573         for _ in 0..stress_factor {
574             let (tx, rx) = channel::<i32>();
575             let t = thread::spawn(move || {
576                 thread::spawn(move || {
577                     assert_eq!(rx.recv(), Err(RecvError));
578                 })
579                 .join()
580                 .unwrap();
581             });
582             ts.push(t);
583             let t2 = thread::spawn(move || {
584                 let t = thread::spawn(move || {
585                     drop(tx);
586                 });
587                 t.join().unwrap();
588             });
589             ts.push(t2);
590         }
591         for t in ts {
592             t.join().unwrap();
593         }
594     }
595 
596     #[test]
oneshot_multi_thread_send_recv_stress()597     fn oneshot_multi_thread_send_recv_stress() {
598         let stress_factor = stress_factor();
599         let mut ts = Vec::with_capacity(stress_factor);
600         for _ in 0..stress_factor {
601             let (tx, rx) = channel::<Box<isize>>();
602             let t = thread::spawn(move || {
603                 tx.send(Box::new(10)).unwrap();
604             });
605             ts.push(t);
606             assert!(*rx.recv().unwrap() == 10);
607         }
608         for t in ts {
609             t.join().unwrap();
610         }
611     }
612 
613     #[test]
stream_send_recv_stress()614     fn stream_send_recv_stress() {
615         let stress_factor = stress_factor();
616         let mut ts = Vec::with_capacity(2 * stress_factor);
617         for _ in 0..stress_factor {
618             let (tx, rx) = channel();
619 
620             if let Some(t) = send(tx, 0) {
621                 ts.push(t);
622             }
623             if let Some(t2) = recv(rx, 0) {
624                 ts.push(t2);
625             }
626 
627             fn send(tx: Sender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
628                 if i == 10 {
629                     return None;
630                 }
631 
632                 Some(thread::spawn(move || {
633                     tx.send(Box::new(i)).unwrap();
634                     send(tx, i + 1);
635                 }))
636             }
637 
638             fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
639                 if i == 10 {
640                     return None;
641                 }
642 
643                 Some(thread::spawn(move || {
644                     assert!(*rx.recv().unwrap() == i);
645                     recv(rx, i + 1);
646                 }))
647             }
648         }
649         for t in ts {
650             t.join().unwrap();
651         }
652     }
653 
654     #[test]
oneshot_single_thread_recv_timeout()655     fn oneshot_single_thread_recv_timeout() {
656         let (tx, rx) = channel();
657         tx.send(()).unwrap();
658         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
659         assert_eq!(
660             rx.recv_timeout(Duration::from_millis(1)),
661             Err(RecvTimeoutError::Timeout)
662         );
663         tx.send(()).unwrap();
664         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
665     }
666 
667     #[test]
stress_recv_timeout_two_threads()668     fn stress_recv_timeout_two_threads() {
669         let (tx, rx) = channel();
670         let stress = stress_factor() + 100;
671         let timeout = Duration::from_millis(100);
672 
673         let t = thread::spawn(move || {
674             for i in 0..stress {
675                 if i % 2 == 0 {
676                     thread::sleep(timeout * 2);
677                 }
678                 tx.send(1usize).unwrap();
679             }
680         });
681 
682         let mut recv_count = 0;
683         loop {
684             match rx.recv_timeout(timeout) {
685                 Ok(n) => {
686                     assert_eq!(n, 1usize);
687                     recv_count += 1;
688                 }
689                 Err(RecvTimeoutError::Timeout) => continue,
690                 Err(RecvTimeoutError::Disconnected) => break,
691             }
692         }
693 
694         assert_eq!(recv_count, stress);
695         t.join().unwrap()
696     }
697 
698     #[test]
recv_timeout_upgrade()699     fn recv_timeout_upgrade() {
700         let (tx, rx) = channel::<()>();
701         let timeout = Duration::from_millis(1);
702         let _tx_clone = tx.clone();
703 
704         let start = Instant::now();
705         assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
706         assert!(Instant::now() >= start + timeout);
707     }
708 
709     #[test]
stress_recv_timeout_shared()710     fn stress_recv_timeout_shared() {
711         let (tx, rx) = channel();
712         let stress = stress_factor() + 100;
713 
714         let mut ts = Vec::with_capacity(stress);
715         for i in 0..stress {
716             let tx = tx.clone();
717             let t = thread::spawn(move || {
718                 thread::sleep(Duration::from_millis(i as u64 * 10));
719                 tx.send(1usize).unwrap();
720             });
721             ts.push(t);
722         }
723 
724         drop(tx);
725 
726         let mut recv_count = 0;
727         loop {
728             match rx.recv_timeout(Duration::from_millis(10)) {
729                 Ok(n) => {
730                     assert_eq!(n, 1usize);
731                     recv_count += 1;
732                 }
733                 Err(RecvTimeoutError::Timeout) => continue,
734                 Err(RecvTimeoutError::Disconnected) => break,
735             }
736         }
737 
738         assert_eq!(recv_count, stress);
739         for t in ts {
740             t.join().unwrap();
741         }
742     }
743 
744     #[test]
recv_a_lot()745     fn recv_a_lot() {
746         #[cfg(miri)]
747         const N: usize = 50;
748         #[cfg(not(miri))]
749         const N: usize = 10000;
750 
751         // Regression test that we don't run out of stack in scheduler context
752         let (tx, rx) = channel();
753         for _ in 0..N {
754             tx.send(()).unwrap();
755         }
756         for _ in 0..N {
757             rx.recv().unwrap();
758         }
759     }
760 
761     #[test]
shared_recv_timeout()762     fn shared_recv_timeout() {
763         let (tx, rx) = channel();
764         let total = 5;
765         let mut ts = Vec::with_capacity(total);
766         for _ in 0..total {
767             let tx = tx.clone();
768             let t = thread::spawn(move || {
769                 tx.send(()).unwrap();
770             });
771             ts.push(t);
772         }
773 
774         for _ in 0..total {
775             rx.recv().unwrap();
776         }
777 
778         assert_eq!(
779             rx.recv_timeout(Duration::from_millis(1)),
780             Err(RecvTimeoutError::Timeout)
781         );
782         tx.send(()).unwrap();
783         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
784         for t in ts {
785             t.join().unwrap();
786         }
787     }
788 
789     #[test]
shared_chan_stress()790     fn shared_chan_stress() {
791         let (tx, rx) = channel();
792         let total = stress_factor() + 100;
793         let mut ts = Vec::with_capacity(total);
794         for _ in 0..total {
795             let tx = tx.clone();
796             let t = thread::spawn(move || {
797                 tx.send(()).unwrap();
798             });
799             ts.push(t);
800         }
801 
802         for _ in 0..total {
803             rx.recv().unwrap();
804         }
805         for t in ts {
806             t.join().unwrap();
807         }
808     }
809 
810     #[test]
test_nested_recv_iter()811     fn test_nested_recv_iter() {
812         let (tx, rx) = channel::<i32>();
813         let (total_tx, total_rx) = channel::<i32>();
814 
815         let t = thread::spawn(move || {
816             let mut acc = 0;
817             for x in rx.iter() {
818                 acc += x;
819             }
820             total_tx.send(acc).unwrap();
821         });
822 
823         tx.send(3).unwrap();
824         tx.send(1).unwrap();
825         tx.send(2).unwrap();
826         drop(tx);
827         assert_eq!(total_rx.recv().unwrap(), 6);
828         t.join().unwrap();
829     }
830 
831     #[test]
test_recv_iter_break()832     fn test_recv_iter_break() {
833         let (tx, rx) = channel::<i32>();
834         let (count_tx, count_rx) = channel();
835 
836         let t = thread::spawn(move || {
837             let mut count = 0;
838             for x in rx.iter() {
839                 if count >= 3 {
840                     break;
841                 } else {
842                     count += x;
843                 }
844             }
845             count_tx.send(count).unwrap();
846         });
847 
848         tx.send(2).unwrap();
849         tx.send(2).unwrap();
850         tx.send(2).unwrap();
851         let _ = tx.send(2);
852         drop(tx);
853         assert_eq!(count_rx.recv().unwrap(), 4);
854         t.join().unwrap();
855     }
856 
857     #[test]
test_recv_try_iter()858     fn test_recv_try_iter() {
859         let (request_tx, request_rx) = channel();
860         let (response_tx, response_rx) = channel();
861 
862         // Request `x`s until we have `6`.
863         let t = thread::spawn(move || {
864             let mut count = 0;
865             loop {
866                 for x in response_rx.try_iter() {
867                     count += x;
868                     if count == 6 {
869                         return count;
870                     }
871                 }
872                 request_tx.send(()).unwrap();
873             }
874         });
875 
876         for _ in request_rx.iter() {
877             if response_tx.send(2).is_err() {
878                 break;
879             }
880         }
881 
882         assert_eq!(t.join().unwrap(), 6);
883     }
884 
885     #[test]
test_recv_into_iter_owned()886     fn test_recv_into_iter_owned() {
887         let mut iter = {
888             let (tx, rx) = channel::<i32>();
889             tx.send(1).unwrap();
890             tx.send(2).unwrap();
891 
892             rx.into_iter()
893         };
894         assert_eq!(iter.next().unwrap(), 1);
895         assert_eq!(iter.next().unwrap(), 2);
896         assert!(iter.next().is_none());
897     }
898 
899     #[test]
test_recv_into_iter_borrowed()900     fn test_recv_into_iter_borrowed() {
901         let (tx, rx) = channel::<i32>();
902         tx.send(1).unwrap();
903         tx.send(2).unwrap();
904         drop(tx);
905         let mut iter = (&rx).into_iter();
906         assert_eq!(iter.next().unwrap(), 1);
907         assert_eq!(iter.next().unwrap(), 2);
908         assert!(iter.next().is_none());
909     }
910 
911     #[test]
try_recv_states()912     fn try_recv_states() {
913         let (tx1, rx1) = channel::<i32>();
914         let (tx2, rx2) = channel::<()>();
915         let (tx3, rx3) = channel::<()>();
916         let t = thread::spawn(move || {
917             rx2.recv().unwrap();
918             tx1.send(1).unwrap();
919             tx3.send(()).unwrap();
920             rx2.recv().unwrap();
921             drop(tx1);
922             tx3.send(()).unwrap();
923         });
924 
925         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
926         tx2.send(()).unwrap();
927         rx3.recv().unwrap();
928         assert_eq!(rx1.try_recv(), Ok(1));
929         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
930         tx2.send(()).unwrap();
931         rx3.recv().unwrap();
932         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
933         t.join().unwrap();
934     }
935 
936     // This bug used to end up in a livelock inside of the Receiver destructor
937     // because the internal state of the Shared packet was corrupted
938     #[test]
destroy_upgraded_shared_port_when_sender_still_active()939     fn destroy_upgraded_shared_port_when_sender_still_active() {
940         let (tx, rx) = channel();
941         let (tx2, rx2) = channel();
942         let t = thread::spawn(move || {
943             rx.recv().unwrap(); // wait on a oneshot
944             drop(rx); // destroy a shared
945             tx2.send(()).unwrap();
946         });
947         // make sure the other thread has gone to sleep
948         for _ in 0..5000 {
949             thread::yield_now();
950         }
951 
952         // upgrade to a shared chan and send a message
953         let tx2 = tx.clone();
954         drop(tx);
955         tx2.send(()).unwrap();
956 
957         // wait for the child thread to exit before we exit
958         rx2.recv().unwrap();
959         t.join().unwrap();
960     }
961 
962     #[test]
issue_32114()963     fn issue_32114() {
964         let (tx, _) = channel();
965         let _ = tx.send(123);
966         assert_eq!(tx.send(123), Err(SendError(123)));
967     }
968 }
969 
970 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
971 mod sync_channel_tests {
972     use super::*;
973 
974     use std::env;
975     use std::thread;
976     use std::time::Duration;
977 
stress_factor() -> usize978     pub fn stress_factor() -> usize {
979         match env::var("RUST_TEST_STRESS") {
980             Ok(val) => val.parse().unwrap(),
981             Err(..) => 1,
982         }
983     }
984 
985     #[test]
smoke()986     fn smoke() {
987         let (tx, rx) = sync_channel::<i32>(1);
988         tx.send(1).unwrap();
989         assert_eq!(rx.recv().unwrap(), 1);
990     }
991 
992     #[test]
drop_full()993     fn drop_full() {
994         let (tx, _rx) = sync_channel::<Box<isize>>(1);
995         tx.send(Box::new(1)).unwrap();
996     }
997 
998     #[test]
smoke_shared()999     fn smoke_shared() {
1000         let (tx, rx) = sync_channel::<i32>(1);
1001         tx.send(1).unwrap();
1002         assert_eq!(rx.recv().unwrap(), 1);
1003         let tx = tx.clone();
1004         tx.send(1).unwrap();
1005         assert_eq!(rx.recv().unwrap(), 1);
1006     }
1007 
1008     #[test]
recv_timeout()1009     fn recv_timeout() {
1010         let (tx, rx) = sync_channel::<i32>(1);
1011         assert_eq!(
1012             rx.recv_timeout(Duration::from_millis(1)),
1013             Err(RecvTimeoutError::Timeout)
1014         );
1015         tx.send(1).unwrap();
1016         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
1017     }
1018 
1019     #[test]
smoke_threads()1020     fn smoke_threads() {
1021         let (tx, rx) = sync_channel::<i32>(0);
1022         let t = thread::spawn(move || {
1023             tx.send(1).unwrap();
1024         });
1025         assert_eq!(rx.recv().unwrap(), 1);
1026         t.join().unwrap();
1027     }
1028 
1029     #[test]
smoke_port_gone()1030     fn smoke_port_gone() {
1031         let (tx, rx) = sync_channel::<i32>(0);
1032         drop(rx);
1033         assert!(tx.send(1).is_err());
1034     }
1035 
1036     #[test]
smoke_shared_port_gone2()1037     fn smoke_shared_port_gone2() {
1038         let (tx, rx) = sync_channel::<i32>(0);
1039         drop(rx);
1040         let tx2 = tx.clone();
1041         drop(tx);
1042         assert!(tx2.send(1).is_err());
1043     }
1044 
1045     #[test]
port_gone_concurrent()1046     fn port_gone_concurrent() {
1047         let (tx, rx) = sync_channel::<i32>(0);
1048         let t = thread::spawn(move || {
1049             rx.recv().unwrap();
1050         });
1051         while tx.send(1).is_ok() {}
1052         t.join().unwrap();
1053     }
1054 
1055     #[test]
port_gone_concurrent_shared()1056     fn port_gone_concurrent_shared() {
1057         let (tx, rx) = sync_channel::<i32>(0);
1058         let tx2 = tx.clone();
1059         let t = thread::spawn(move || {
1060             rx.recv().unwrap();
1061         });
1062         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1063         t.join().unwrap();
1064     }
1065 
1066     #[test]
smoke_chan_gone()1067     fn smoke_chan_gone() {
1068         let (tx, rx) = sync_channel::<i32>(0);
1069         drop(tx);
1070         assert!(rx.recv().is_err());
1071     }
1072 
1073     #[test]
smoke_chan_gone_shared()1074     fn smoke_chan_gone_shared() {
1075         let (tx, rx) = sync_channel::<()>(0);
1076         let tx2 = tx.clone();
1077         drop(tx);
1078         drop(tx2);
1079         assert!(rx.recv().is_err());
1080     }
1081 
1082     #[test]
chan_gone_concurrent()1083     fn chan_gone_concurrent() {
1084         let (tx, rx) = sync_channel::<i32>(0);
1085         let t = thread::spawn(move || {
1086             tx.send(1).unwrap();
1087             tx.send(1).unwrap();
1088         });
1089         while rx.recv().is_ok() {}
1090         t.join().unwrap();
1091     }
1092 
1093     #[test]
stress()1094     fn stress() {
1095         #[cfg(miri)]
1096         const N: usize = 100;
1097         #[cfg(not(miri))]
1098         const N: usize = 10000;
1099 
1100         let (tx, rx) = sync_channel::<i32>(0);
1101         let t = thread::spawn(move || {
1102             for _ in 0..N {
1103                 tx.send(1).unwrap();
1104             }
1105         });
1106         for _ in 0..N {
1107             assert_eq!(rx.recv().unwrap(), 1);
1108         }
1109         t.join().unwrap();
1110     }
1111 
1112     #[test]
stress_recv_timeout_two_threads()1113     fn stress_recv_timeout_two_threads() {
1114         #[cfg(miri)]
1115         const N: usize = 100;
1116         #[cfg(not(miri))]
1117         const N: usize = 10000;
1118 
1119         let (tx, rx) = sync_channel::<i32>(0);
1120 
1121         let t = thread::spawn(move || {
1122             for _ in 0..N {
1123                 tx.send(1).unwrap();
1124             }
1125         });
1126 
1127         let mut recv_count = 0;
1128         loop {
1129             match rx.recv_timeout(Duration::from_millis(1)) {
1130                 Ok(v) => {
1131                     assert_eq!(v, 1);
1132                     recv_count += 1;
1133                 }
1134                 Err(RecvTimeoutError::Timeout) => continue,
1135                 Err(RecvTimeoutError::Disconnected) => break,
1136             }
1137         }
1138 
1139         assert_eq!(recv_count, N);
1140         t.join().unwrap();
1141     }
1142 
1143     #[test]
stress_recv_timeout_shared()1144     fn stress_recv_timeout_shared() {
1145         #[cfg(miri)]
1146         const AMT: u32 = 100;
1147         #[cfg(not(miri))]
1148         const AMT: u32 = 1000;
1149         const NTHREADS: u32 = 8;
1150         let (tx, rx) = sync_channel::<i32>(0);
1151         let (dtx, drx) = sync_channel::<()>(0);
1152 
1153         let t = thread::spawn(move || {
1154             let mut recv_count = 0;
1155             loop {
1156                 match rx.recv_timeout(Duration::from_millis(10)) {
1157                     Ok(v) => {
1158                         assert_eq!(v, 1);
1159                         recv_count += 1;
1160                     }
1161                     Err(RecvTimeoutError::Timeout) => continue,
1162                     Err(RecvTimeoutError::Disconnected) => break,
1163                 }
1164             }
1165 
1166             assert_eq!(recv_count, AMT * NTHREADS);
1167             assert!(rx.try_recv().is_err());
1168 
1169             dtx.send(()).unwrap();
1170         });
1171 
1172         let mut ts = Vec::with_capacity(NTHREADS as usize);
1173         for _ in 0..NTHREADS {
1174             let tx = tx.clone();
1175             let t = thread::spawn(move || {
1176                 for _ in 0..AMT {
1177                     tx.send(1).unwrap();
1178                 }
1179             });
1180             ts.push(t);
1181         }
1182 
1183         drop(tx);
1184 
1185         drx.recv().unwrap();
1186         for t in ts {
1187             t.join().unwrap();
1188         }
1189         t.join().unwrap();
1190     }
1191 
1192     #[test]
stress_shared()1193     fn stress_shared() {
1194         #[cfg(miri)]
1195         const AMT: u32 = 100;
1196         #[cfg(not(miri))]
1197         const AMT: u32 = 1000;
1198         const NTHREADS: u32 = 8;
1199         let (tx, rx) = sync_channel::<i32>(0);
1200         let (dtx, drx) = sync_channel::<()>(0);
1201 
1202         let t = thread::spawn(move || {
1203             for _ in 0..AMT * NTHREADS {
1204                 assert_eq!(rx.recv().unwrap(), 1);
1205             }
1206             assert!(rx.try_recv().is_err());
1207             dtx.send(()).unwrap();
1208         });
1209 
1210         let mut ts = Vec::with_capacity(NTHREADS as usize);
1211         for _ in 0..NTHREADS {
1212             let tx = tx.clone();
1213             let t = thread::spawn(move || {
1214                 for _ in 0..AMT {
1215                     tx.send(1).unwrap();
1216                 }
1217             });
1218             ts.push(t);
1219         }
1220         drop(tx);
1221         drx.recv().unwrap();
1222         for t in ts {
1223             t.join().unwrap();
1224         }
1225         t.join().unwrap();
1226     }
1227 
1228     #[test]
oneshot_single_thread_close_port_first()1229     fn oneshot_single_thread_close_port_first() {
1230         // Simple test of closing without sending
1231         let (_tx, rx) = sync_channel::<i32>(0);
1232         drop(rx);
1233     }
1234 
1235     #[test]
oneshot_single_thread_close_chan_first()1236     fn oneshot_single_thread_close_chan_first() {
1237         // Simple test of closing without sending
1238         let (tx, _rx) = sync_channel::<i32>(0);
1239         drop(tx);
1240     }
1241 
1242     #[test]
oneshot_single_thread_send_port_close()1243     fn oneshot_single_thread_send_port_close() {
1244         // Testing that the sender cleans up the payload if receiver is closed
1245         let (tx, rx) = sync_channel::<Box<i32>>(0);
1246         drop(rx);
1247         assert!(tx.send(Box::new(0)).is_err());
1248     }
1249 
1250     #[test]
oneshot_single_thread_recv_chan_close()1251     fn oneshot_single_thread_recv_chan_close() {
1252         let (tx, rx) = sync_channel::<i32>(0);
1253         drop(tx);
1254         assert_eq!(rx.recv(), Err(RecvError));
1255     }
1256 
1257     #[test]
oneshot_single_thread_send_then_recv()1258     fn oneshot_single_thread_send_then_recv() {
1259         let (tx, rx) = sync_channel::<Box<i32>>(1);
1260         tx.send(Box::new(10)).unwrap();
1261         assert!(*rx.recv().unwrap() == 10);
1262     }
1263 
1264     #[test]
oneshot_single_thread_try_send_open()1265     fn oneshot_single_thread_try_send_open() {
1266         let (tx, rx) = sync_channel::<i32>(1);
1267         assert_eq!(tx.try_send(10), Ok(()));
1268         assert!(rx.recv().unwrap() == 10);
1269     }
1270 
1271     #[test]
oneshot_single_thread_try_send_closed()1272     fn oneshot_single_thread_try_send_closed() {
1273         let (tx, rx) = sync_channel::<i32>(0);
1274         drop(rx);
1275         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1276     }
1277 
1278     #[test]
oneshot_single_thread_try_send_closed2()1279     fn oneshot_single_thread_try_send_closed2() {
1280         let (tx, _rx) = sync_channel::<i32>(0);
1281         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1282     }
1283 
1284     #[test]
oneshot_single_thread_try_recv_open()1285     fn oneshot_single_thread_try_recv_open() {
1286         let (tx, rx) = sync_channel::<i32>(1);
1287         tx.send(10).unwrap();
1288         assert!(rx.recv() == Ok(10));
1289     }
1290 
1291     #[test]
oneshot_single_thread_try_recv_closed()1292     fn oneshot_single_thread_try_recv_closed() {
1293         let (tx, rx) = sync_channel::<i32>(0);
1294         drop(tx);
1295         assert!(rx.recv().is_err());
1296     }
1297 
1298     #[test]
oneshot_single_thread_try_recv_closed_with_data()1299     fn oneshot_single_thread_try_recv_closed_with_data() {
1300         let (tx, rx) = sync_channel::<i32>(1);
1301         tx.send(10).unwrap();
1302         drop(tx);
1303         assert_eq!(rx.try_recv(), Ok(10));
1304         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1305     }
1306 
1307     #[test]
oneshot_single_thread_peek_data()1308     fn oneshot_single_thread_peek_data() {
1309         let (tx, rx) = sync_channel::<i32>(1);
1310         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1311         tx.send(10).unwrap();
1312         assert_eq!(rx.try_recv(), Ok(10));
1313     }
1314 
1315     #[test]
oneshot_single_thread_peek_close()1316     fn oneshot_single_thread_peek_close() {
1317         let (tx, rx) = sync_channel::<i32>(0);
1318         drop(tx);
1319         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1320         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1321     }
1322 
1323     #[test]
oneshot_single_thread_peek_open()1324     fn oneshot_single_thread_peek_open() {
1325         let (_tx, rx) = sync_channel::<i32>(0);
1326         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1327     }
1328 
1329     #[test]
oneshot_multi_task_recv_then_send()1330     fn oneshot_multi_task_recv_then_send() {
1331         let (tx, rx) = sync_channel::<Box<i32>>(0);
1332         let t = thread::spawn(move || {
1333             assert!(*rx.recv().unwrap() == 10);
1334         });
1335 
1336         tx.send(Box::new(10)).unwrap();
1337         t.join().unwrap();
1338     }
1339 
1340     #[test]
oneshot_multi_task_recv_then_close()1341     fn oneshot_multi_task_recv_then_close() {
1342         let (tx, rx) = sync_channel::<Box<i32>>(0);
1343         let t = thread::spawn(move || {
1344             drop(tx);
1345         });
1346         thread::spawn(move || {
1347             assert_eq!(rx.recv(), Err(RecvError));
1348         })
1349         .join()
1350         .unwrap();
1351         t.join().unwrap();
1352     }
1353 
1354     #[test]
oneshot_multi_thread_close_stress()1355     fn oneshot_multi_thread_close_stress() {
1356         let stress_factor = stress_factor();
1357         let mut ts = Vec::with_capacity(stress_factor);
1358         for _ in 0..stress_factor {
1359             let (tx, rx) = sync_channel::<i32>(0);
1360             let t = thread::spawn(move || {
1361                 drop(rx);
1362             });
1363             ts.push(t);
1364             drop(tx);
1365         }
1366         for t in ts {
1367             t.join().unwrap();
1368         }
1369     }
1370 
1371     #[test]
oneshot_multi_thread_send_close_stress()1372     fn oneshot_multi_thread_send_close_stress() {
1373         let stress_factor = stress_factor();
1374         let mut ts = Vec::with_capacity(stress_factor);
1375         for _ in 0..stress_factor {
1376             let (tx, rx) = sync_channel::<i32>(0);
1377             let t = thread::spawn(move || {
1378                 drop(rx);
1379             });
1380             ts.push(t);
1381             thread::spawn(move || {
1382                 let _ = tx.send(1);
1383             })
1384             .join()
1385             .unwrap();
1386         }
1387         for t in ts {
1388             t.join().unwrap();
1389         }
1390     }
1391 
1392     #[test]
oneshot_multi_thread_recv_close_stress()1393     fn oneshot_multi_thread_recv_close_stress() {
1394         let stress_factor = stress_factor();
1395         let mut ts = Vec::with_capacity(2 * stress_factor);
1396         for _ in 0..stress_factor {
1397             let (tx, rx) = sync_channel::<i32>(0);
1398             let t = thread::spawn(move || {
1399                 thread::spawn(move || {
1400                     assert_eq!(rx.recv(), Err(RecvError));
1401                 })
1402                 .join()
1403                 .unwrap();
1404             });
1405             ts.push(t);
1406             let t2 = thread::spawn(move || {
1407                 thread::spawn(move || {
1408                     drop(tx);
1409                 });
1410             });
1411             ts.push(t2);
1412         }
1413         for t in ts {
1414             t.join().unwrap();
1415         }
1416     }
1417 
1418     #[test]
oneshot_multi_thread_send_recv_stress()1419     fn oneshot_multi_thread_send_recv_stress() {
1420         let stress_factor = stress_factor();
1421         let mut ts = Vec::with_capacity(stress_factor);
1422         for _ in 0..stress_factor {
1423             let (tx, rx) = sync_channel::<Box<i32>>(0);
1424             let t = thread::spawn(move || {
1425                 tx.send(Box::new(10)).unwrap();
1426             });
1427             ts.push(t);
1428             assert!(*rx.recv().unwrap() == 10);
1429         }
1430         for t in ts {
1431             t.join().unwrap();
1432         }
1433     }
1434 
1435     #[test]
stream_send_recv_stress()1436     fn stream_send_recv_stress() {
1437         let stress_factor = stress_factor();
1438         let mut ts = Vec::with_capacity(2 * stress_factor);
1439         for _ in 0..stress_factor {
1440             let (tx, rx) = sync_channel::<Box<i32>>(0);
1441 
1442             if let Some(t) = send(tx, 0) {
1443                 ts.push(t);
1444             }
1445             if let Some(t) = recv(rx, 0) {
1446                 ts.push(t);
1447             }
1448 
1449             fn send(tx: SyncSender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1450                 if i == 10 {
1451                     return None;
1452                 }
1453 
1454                 Some(thread::spawn(move || {
1455                     tx.send(Box::new(i)).unwrap();
1456                     send(tx, i + 1);
1457                 }))
1458             }
1459 
1460             fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1461                 if i == 10 {
1462                     return None;
1463                 }
1464 
1465                 Some(thread::spawn(move || {
1466                     assert!(*rx.recv().unwrap() == i);
1467                     recv(rx, i + 1);
1468                 }))
1469             }
1470         }
1471         for t in ts {
1472             t.join().unwrap();
1473         }
1474     }
1475 
1476     #[test]
recv_a_lot()1477     fn recv_a_lot() {
1478         #[cfg(miri)]
1479         const N: usize = 100;
1480         #[cfg(not(miri))]
1481         const N: usize = 10000;
1482 
1483         // Regression test that we don't run out of stack in scheduler context
1484         let (tx, rx) = sync_channel(N);
1485         for _ in 0..N {
1486             tx.send(()).unwrap();
1487         }
1488         for _ in 0..N {
1489             rx.recv().unwrap();
1490         }
1491     }
1492 
1493     #[test]
shared_chan_stress()1494     fn shared_chan_stress() {
1495         let (tx, rx) = sync_channel(0);
1496         let total = stress_factor() + 100;
1497         let mut ts = Vec::with_capacity(total);
1498         for _ in 0..total {
1499             let tx = tx.clone();
1500             let t = thread::spawn(move || {
1501                 tx.send(()).unwrap();
1502             });
1503             ts.push(t);
1504         }
1505 
1506         for _ in 0..total {
1507             rx.recv().unwrap();
1508         }
1509         for t in ts {
1510             t.join().unwrap();
1511         }
1512     }
1513 
1514     #[test]
test_nested_recv_iter()1515     fn test_nested_recv_iter() {
1516         let (tx, rx) = sync_channel::<i32>(0);
1517         let (total_tx, total_rx) = sync_channel::<i32>(0);
1518 
1519         let t = thread::spawn(move || {
1520             let mut acc = 0;
1521             for x in rx.iter() {
1522                 acc += x;
1523             }
1524             total_tx.send(acc).unwrap();
1525         });
1526 
1527         tx.send(3).unwrap();
1528         tx.send(1).unwrap();
1529         tx.send(2).unwrap();
1530         drop(tx);
1531         assert_eq!(total_rx.recv().unwrap(), 6);
1532         t.join().unwrap();
1533     }
1534 
1535     #[test]
test_recv_iter_break()1536     fn test_recv_iter_break() {
1537         let (tx, rx) = sync_channel::<i32>(0);
1538         let (count_tx, count_rx) = sync_channel(0);
1539 
1540         let t = thread::spawn(move || {
1541             let mut count = 0;
1542             for x in rx.iter() {
1543                 if count >= 3 {
1544                     break;
1545                 } else {
1546                     count += x;
1547                 }
1548             }
1549             count_tx.send(count).unwrap();
1550         });
1551 
1552         tx.send(2).unwrap();
1553         tx.send(2).unwrap();
1554         tx.send(2).unwrap();
1555         let _ = tx.try_send(2);
1556         drop(tx);
1557         assert_eq!(count_rx.recv().unwrap(), 4);
1558         t.join().unwrap();
1559     }
1560 
1561     #[test]
try_recv_states()1562     fn try_recv_states() {
1563         let (tx1, rx1) = sync_channel::<i32>(1);
1564         let (tx2, rx2) = sync_channel::<()>(1);
1565         let (tx3, rx3) = sync_channel::<()>(1);
1566         let t = thread::spawn(move || {
1567             rx2.recv().unwrap();
1568             tx1.send(1).unwrap();
1569             tx3.send(()).unwrap();
1570             rx2.recv().unwrap();
1571             drop(tx1);
1572             tx3.send(()).unwrap();
1573         });
1574 
1575         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1576         tx2.send(()).unwrap();
1577         rx3.recv().unwrap();
1578         assert_eq!(rx1.try_recv(), Ok(1));
1579         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1580         tx2.send(()).unwrap();
1581         rx3.recv().unwrap();
1582         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1583         t.join().unwrap();
1584     }
1585 
1586     // This bug used to end up in a livelock inside of the Receiver destructor
1587     // because the internal state of the Shared packet was corrupted
1588     #[test]
destroy_upgraded_shared_port_when_sender_still_active()1589     fn destroy_upgraded_shared_port_when_sender_still_active() {
1590         let (tx, rx) = sync_channel::<()>(0);
1591         let (tx2, rx2) = sync_channel::<()>(0);
1592         let t = thread::spawn(move || {
1593             rx.recv().unwrap(); // wait on a oneshot
1594             drop(rx); // destroy a shared
1595             tx2.send(()).unwrap();
1596         });
1597         // make sure the other thread has gone to sleep
1598         for _ in 0..5000 {
1599             thread::yield_now();
1600         }
1601 
1602         // upgrade to a shared chan and send a message
1603         let tx2 = tx.clone();
1604         drop(tx);
1605         tx2.send(()).unwrap();
1606 
1607         // wait for the child thread to exit before we exit
1608         rx2.recv().unwrap();
1609         t.join().unwrap();
1610     }
1611 
1612     #[test]
send1()1613     fn send1() {
1614         let (tx, rx) = sync_channel::<i32>(0);
1615         let t = thread::spawn(move || {
1616             rx.recv().unwrap();
1617         });
1618         assert_eq!(tx.send(1), Ok(()));
1619         t.join().unwrap();
1620     }
1621 
1622     #[test]
send2()1623     fn send2() {
1624         let (tx, rx) = sync_channel::<i32>(0);
1625         let t = thread::spawn(move || {
1626             drop(rx);
1627         });
1628         assert!(tx.send(1).is_err());
1629         t.join().unwrap();
1630     }
1631 
1632     #[test]
send3()1633     fn send3() {
1634         let (tx, rx) = sync_channel::<i32>(1);
1635         assert_eq!(tx.send(1), Ok(()));
1636         let t = thread::spawn(move || {
1637             drop(rx);
1638         });
1639         assert!(tx.send(1).is_err());
1640         t.join().unwrap();
1641     }
1642 
1643     #[test]
send4()1644     fn send4() {
1645         let (tx, rx) = sync_channel::<i32>(0);
1646         let tx2 = tx.clone();
1647         let (done, donerx) = channel();
1648         let done2 = done.clone();
1649         let t = thread::spawn(move || {
1650             assert!(tx.send(1).is_err());
1651             done.send(()).unwrap();
1652         });
1653         let t2 = thread::spawn(move || {
1654             assert!(tx2.send(2).is_err());
1655             done2.send(()).unwrap();
1656         });
1657         drop(rx);
1658         donerx.recv().unwrap();
1659         donerx.recv().unwrap();
1660         t.join().unwrap();
1661         t2.join().unwrap();
1662     }
1663 
1664     #[test]
try_send1()1665     fn try_send1() {
1666         let (tx, _rx) = sync_channel::<i32>(0);
1667         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1668     }
1669 
1670     #[test]
try_send2()1671     fn try_send2() {
1672         let (tx, _rx) = sync_channel::<i32>(1);
1673         assert_eq!(tx.try_send(1), Ok(()));
1674         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1675     }
1676 
1677     #[test]
try_send3()1678     fn try_send3() {
1679         let (tx, rx) = sync_channel::<i32>(1);
1680         assert_eq!(tx.try_send(1), Ok(()));
1681         drop(rx);
1682         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
1683     }
1684 
1685     #[test]
issue_15761()1686     fn issue_15761() {
1687         fn repro() {
1688             let (tx1, rx1) = sync_channel::<()>(3);
1689             let (tx2, rx2) = sync_channel::<()>(3);
1690 
1691             let _t = thread::spawn(move || {
1692                 rx1.recv().unwrap();
1693                 tx2.try_send(()).unwrap();
1694             });
1695 
1696             tx1.try_send(()).unwrap();
1697             rx2.recv().unwrap();
1698         }
1699 
1700         for _ in 0..100 {
1701             repro()
1702         }
1703     }
1704 }
1705 
1706 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/select.rs
1707 mod select_tests {
1708     use super::*;
1709 
1710     use std::thread;
1711 
1712     #[test]
smoke()1713     fn smoke() {
1714         let (tx1, rx1) = channel::<i32>();
1715         let (tx2, rx2) = channel::<i32>();
1716         tx1.send(1).unwrap();
1717         select! {
1718             foo = rx1.recv() => assert_eq!(foo.unwrap(), 1),
1719             _bar = rx2.recv() => panic!()
1720         }
1721         tx2.send(2).unwrap();
1722         select! {
1723             _foo = rx1.recv() => panic!(),
1724             bar = rx2.recv() => assert_eq!(bar.unwrap(), 2)
1725         }
1726         drop(tx1);
1727         select! {
1728             foo = rx1.recv() => assert!(foo.is_err()),
1729             _bar = rx2.recv() => panic!()
1730         }
1731         drop(tx2);
1732         select! {
1733             bar = rx2.recv() => assert!(bar.is_err())
1734         }
1735     }
1736 
1737     #[test]
smoke2()1738     fn smoke2() {
1739         let (_tx1, rx1) = channel::<i32>();
1740         let (_tx2, rx2) = channel::<i32>();
1741         let (_tx3, rx3) = channel::<i32>();
1742         let (_tx4, rx4) = channel::<i32>();
1743         let (tx5, rx5) = channel::<i32>();
1744         tx5.send(4).unwrap();
1745         select! {
1746             _foo = rx1.recv() => panic!("1"),
1747             _foo = rx2.recv() => panic!("2"),
1748             _foo = rx3.recv() => panic!("3"),
1749             _foo = rx4.recv() => panic!("4"),
1750             foo = rx5.recv() => assert_eq!(foo.unwrap(), 4)
1751         }
1752     }
1753 
1754     #[test]
closed()1755     fn closed() {
1756         let (_tx1, rx1) = channel::<i32>();
1757         let (tx2, rx2) = channel::<i32>();
1758         drop(tx2);
1759 
1760         select! {
1761             _a1 = rx1.recv() => panic!(),
1762             a2 = rx2.recv() => assert!(a2.is_err())
1763         }
1764     }
1765 
1766     #[test]
unblocks()1767     fn unblocks() {
1768         let (tx1, rx1) = channel::<i32>();
1769         let (_tx2, rx2) = channel::<i32>();
1770         let (tx3, rx3) = channel::<i32>();
1771 
1772         let t = thread::spawn(move || {
1773             for _ in 0..20 {
1774                 thread::yield_now();
1775             }
1776             tx1.send(1).unwrap();
1777             rx3.recv().unwrap();
1778             for _ in 0..20 {
1779                 thread::yield_now();
1780             }
1781         });
1782 
1783         select! {
1784             a = rx1.recv() => assert_eq!(a.unwrap(), 1),
1785             _b = rx2.recv() => panic!()
1786         }
1787         tx3.send(1).unwrap();
1788         select! {
1789             a = rx1.recv() => assert!(a.is_err()),
1790             _b = rx2.recv() => panic!()
1791         }
1792         t.join().unwrap();
1793     }
1794 
1795     #[test]
both_ready()1796     fn both_ready() {
1797         let (tx1, rx1) = channel::<i32>();
1798         let (tx2, rx2) = channel::<i32>();
1799         let (tx3, rx3) = channel::<()>();
1800 
1801         let t = thread::spawn(move || {
1802             for _ in 0..20 {
1803                 thread::yield_now();
1804             }
1805             tx1.send(1).unwrap();
1806             tx2.send(2).unwrap();
1807             rx3.recv().unwrap();
1808         });
1809 
1810         select! {
1811             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1812             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1813         }
1814         select! {
1815             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1816             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1817         }
1818         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1819         assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
1820         tx3.send(()).unwrap();
1821         t.join().unwrap();
1822     }
1823 
1824     #[test]
stress()1825     fn stress() {
1826         #[cfg(miri)]
1827         const AMT: i32 = 100;
1828         #[cfg(not(miri))]
1829         const AMT: i32 = 10000;
1830 
1831         let (tx1, rx1) = channel::<i32>();
1832         let (tx2, rx2) = channel::<i32>();
1833         let (tx3, rx3) = channel::<()>();
1834 
1835         let t = thread::spawn(move || {
1836             for i in 0..AMT {
1837                 if i % 2 == 0 {
1838                     tx1.send(i).unwrap();
1839                 } else {
1840                     tx2.send(i).unwrap();
1841                 }
1842                 rx3.recv().unwrap();
1843             }
1844         });
1845 
1846         for i in 0..AMT {
1847             select! {
1848                 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
1849                 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
1850             }
1851             tx3.send(()).unwrap();
1852         }
1853         t.join().unwrap();
1854     }
1855 
1856     #[allow(unused_must_use)]
1857     #[test]
cloning()1858     fn cloning() {
1859         let (tx1, rx1) = channel::<i32>();
1860         let (_tx2, rx2) = channel::<i32>();
1861         let (tx3, rx3) = channel::<()>();
1862 
1863         let t = thread::spawn(move || {
1864             rx3.recv().unwrap();
1865             tx1.clone();
1866             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1867             tx1.send(2).unwrap();
1868             rx3.recv().unwrap();
1869         });
1870 
1871         tx3.send(()).unwrap();
1872         select! {
1873             _i1 = rx1.recv() => {},
1874             _i2 = rx2.recv() => panic!()
1875         }
1876         tx3.send(()).unwrap();
1877         t.join().unwrap();
1878     }
1879 
1880     #[allow(unused_must_use)]
1881     #[test]
cloning2()1882     fn cloning2() {
1883         let (tx1, rx1) = channel::<i32>();
1884         let (_tx2, rx2) = channel::<i32>();
1885         let (tx3, rx3) = channel::<()>();
1886 
1887         let t = thread::spawn(move || {
1888             rx3.recv().unwrap();
1889             tx1.clone();
1890             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1891             tx1.send(2).unwrap();
1892             rx3.recv().unwrap();
1893         });
1894 
1895         tx3.send(()).unwrap();
1896         select! {
1897             _i1 = rx1.recv() => {},
1898             _i2 = rx2.recv() => panic!()
1899         }
1900         tx3.send(()).unwrap();
1901         t.join().unwrap();
1902     }
1903 
1904     #[test]
cloning3()1905     fn cloning3() {
1906         let (tx1, rx1) = channel::<()>();
1907         let (tx2, rx2) = channel::<()>();
1908         let (tx3, rx3) = channel::<()>();
1909         let t = thread::spawn(move || {
1910             select! {
1911                 _ = rx1.recv() => panic!(),
1912                 _ = rx2.recv() => {}
1913             }
1914             tx3.send(()).unwrap();
1915         });
1916 
1917         for _ in 0..1000 {
1918             thread::yield_now();
1919         }
1920         drop(tx1.clone());
1921         tx2.send(()).unwrap();
1922         rx3.recv().unwrap();
1923         t.join().unwrap();
1924     }
1925 
1926     #[test]
preflight1()1927     fn preflight1() {
1928         let (tx, rx) = channel();
1929         tx.send(()).unwrap();
1930         select! {
1931             _n = rx.recv() => {}
1932         }
1933     }
1934 
1935     #[test]
preflight2()1936     fn preflight2() {
1937         let (tx, rx) = channel();
1938         tx.send(()).unwrap();
1939         tx.send(()).unwrap();
1940         select! {
1941             _n = rx.recv() => {}
1942         }
1943     }
1944 
1945     #[test]
preflight3()1946     fn preflight3() {
1947         let (tx, rx) = channel();
1948         drop(tx.clone());
1949         tx.send(()).unwrap();
1950         select! {
1951             _n = rx.recv() => {}
1952         }
1953     }
1954 
1955     #[test]
preflight4()1956     fn preflight4() {
1957         let (tx, rx) = channel();
1958         tx.send(()).unwrap();
1959         select! {
1960             _ = rx.recv() => {}
1961         }
1962     }
1963 
1964     #[test]
preflight5()1965     fn preflight5() {
1966         let (tx, rx) = channel();
1967         tx.send(()).unwrap();
1968         tx.send(()).unwrap();
1969         select! {
1970             _ = rx.recv() => {}
1971         }
1972     }
1973 
1974     #[test]
preflight6()1975     fn preflight6() {
1976         let (tx, rx) = channel();
1977         drop(tx.clone());
1978         tx.send(()).unwrap();
1979         select! {
1980             _ = rx.recv() => {}
1981         }
1982     }
1983 
1984     #[test]
preflight7()1985     fn preflight7() {
1986         let (tx, rx) = channel::<()>();
1987         drop(tx);
1988         select! {
1989             _ = rx.recv() => {}
1990         }
1991     }
1992 
1993     #[test]
preflight8()1994     fn preflight8() {
1995         let (tx, rx) = channel();
1996         tx.send(()).unwrap();
1997         drop(tx);
1998         rx.recv().unwrap();
1999         select! {
2000             _ = rx.recv() => {}
2001         }
2002     }
2003 
2004     #[test]
preflight9()2005     fn preflight9() {
2006         let (tx, rx) = channel();
2007         drop(tx.clone());
2008         tx.send(()).unwrap();
2009         drop(tx);
2010         rx.recv().unwrap();
2011         select! {
2012             _ = rx.recv() => {}
2013         }
2014     }
2015 
2016     #[test]
oneshot_data_waiting()2017     fn oneshot_data_waiting() {
2018         let (tx1, rx1) = channel();
2019         let (tx2, rx2) = channel();
2020         let t = thread::spawn(move || {
2021             select! {
2022                 _n = rx1.recv() => {}
2023             }
2024             tx2.send(()).unwrap();
2025         });
2026 
2027         for _ in 0..100 {
2028             thread::yield_now()
2029         }
2030         tx1.send(()).unwrap();
2031         rx2.recv().unwrap();
2032         t.join().unwrap();
2033     }
2034 
2035     #[test]
stream_data_waiting()2036     fn stream_data_waiting() {
2037         let (tx1, rx1) = channel();
2038         let (tx2, rx2) = channel();
2039         tx1.send(()).unwrap();
2040         tx1.send(()).unwrap();
2041         rx1.recv().unwrap();
2042         rx1.recv().unwrap();
2043         let t = thread::spawn(move || {
2044             select! {
2045                 _n = rx1.recv() => {}
2046             }
2047             tx2.send(()).unwrap();
2048         });
2049 
2050         for _ in 0..100 {
2051             thread::yield_now()
2052         }
2053         tx1.send(()).unwrap();
2054         rx2.recv().unwrap();
2055         t.join().unwrap();
2056     }
2057 
2058     #[test]
shared_data_waiting()2059     fn shared_data_waiting() {
2060         let (tx1, rx1) = channel();
2061         let (tx2, rx2) = channel();
2062         drop(tx1.clone());
2063         tx1.send(()).unwrap();
2064         rx1.recv().unwrap();
2065         let t = thread::spawn(move || {
2066             select! {
2067                 _n = rx1.recv() => {}
2068             }
2069             tx2.send(()).unwrap();
2070         });
2071 
2072         for _ in 0..100 {
2073             thread::yield_now()
2074         }
2075         tx1.send(()).unwrap();
2076         rx2.recv().unwrap();
2077         t.join().unwrap();
2078     }
2079 
2080     #[test]
sync1()2081     fn sync1() {
2082         let (tx, rx) = sync_channel::<i32>(1);
2083         tx.send(1).unwrap();
2084         select! {
2085             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2086         }
2087     }
2088 
2089     #[test]
sync2()2090     fn sync2() {
2091         let (tx, rx) = sync_channel::<i32>(0);
2092         let t = thread::spawn(move || {
2093             for _ in 0..100 {
2094                 thread::yield_now()
2095             }
2096             tx.send(1).unwrap();
2097         });
2098         select! {
2099             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2100         }
2101         t.join().unwrap();
2102     }
2103 
2104     #[test]
sync3()2105     fn sync3() {
2106         let (tx1, rx1) = sync_channel::<i32>(0);
2107         let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
2108         let t = thread::spawn(move || {
2109             tx1.send(1).unwrap();
2110         });
2111         let t2 = thread::spawn(move || {
2112             tx2.send(2).unwrap();
2113         });
2114         select! {
2115             n = rx1.recv() => {
2116                 let n = n.unwrap();
2117                 assert_eq!(n, 1);
2118                 assert_eq!(rx2.recv().unwrap(), 2);
2119             },
2120             n = rx2.recv() => {
2121                 let n = n.unwrap();
2122                 assert_eq!(n, 2);
2123                 assert_eq!(rx1.recv().unwrap(), 1);
2124             }
2125         }
2126         t.join().unwrap();
2127         t2.join().unwrap();
2128     }
2129 }
2130