• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Tests copied from `std::sync::mpsc`.
2 //!
3 //! This is a copy of tests for the `std::sync::mpsc` channels from the standard library, but
4 //! modified to work with `crossbeam-channel` instead.
5 //!
6 //! Minor tweaks were needed to make the tests compile:
7 //!
8 //! - Replace `box` syntax with `Box::new`.
9 //! - Replace all uses of `Select` with `select!`.
10 //! - Change the imports.
11 //! - Join all spawned threads.
12 //! - Removed assertion from oneshot_multi_thread_send_close_stress tests.
13 //!
14 //! Source:
15 //!   - https://github.com/rust-lang/rust/tree/master/src/libstd/sync/mpsc
16 //!
17 //! Copyright & License:
18 //!   - Copyright 2013-2014 The Rust Project Developers
19 //!   - Apache License, Version 2.0 or MIT license, at your option
20 //!   - https://github.com/rust-lang/rust/blob/master/COPYRIGHT
21 //!   - https://www.rust-lang.org/en-US/legal.html
22 
23 #![allow(
24     clippy::drop_copy,
25     clippy::match_single_binding,
26     clippy::redundant_clone
27 )]
28 
29 use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError};
30 use std::sync::mpsc::{SendError, TrySendError};
31 use std::thread::JoinHandle;
32 use std::time::Duration;
33 
34 use crossbeam_channel as cc;
35 
36 pub struct Sender<T> {
37     pub inner: cc::Sender<T>,
38 }
39 
40 impl<T> Sender<T> {
send(&self, t: T) -> Result<(), SendError<T>>41     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
42         self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
43     }
44 }
45 
46 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>47     fn clone(&self) -> Sender<T> {
48         Sender {
49             inner: self.inner.clone(),
50         }
51     }
52 }
53 
54 pub struct SyncSender<T> {
55     pub inner: cc::Sender<T>,
56 }
57 
58 impl<T> SyncSender<T> {
send(&self, t: T) -> Result<(), SendError<T>>59     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
60         self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
61     }
62 
try_send(&self, t: T) -> Result<(), TrySendError<T>>63     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
64         self.inner.try_send(t).map_err(|err| match err {
65             cc::TrySendError::Full(m) => TrySendError::Full(m),
66             cc::TrySendError::Disconnected(m) => TrySendError::Disconnected(m),
67         })
68     }
69 }
70 
71 impl<T> Clone for SyncSender<T> {
clone(&self) -> SyncSender<T>72     fn clone(&self) -> SyncSender<T> {
73         SyncSender {
74             inner: self.inner.clone(),
75         }
76     }
77 }
78 
79 pub struct Receiver<T> {
80     pub inner: cc::Receiver<T>,
81 }
82 
83 impl<T> Receiver<T> {
try_recv(&self) -> Result<T, TryRecvError>84     pub fn try_recv(&self) -> Result<T, TryRecvError> {
85         self.inner.try_recv().map_err(|err| match err {
86             cc::TryRecvError::Empty => TryRecvError::Empty,
87             cc::TryRecvError::Disconnected => TryRecvError::Disconnected,
88         })
89     }
90 
recv(&self) -> Result<T, RecvError>91     pub fn recv(&self) -> Result<T, RecvError> {
92         self.inner.recv().map_err(|_| RecvError)
93     }
94 
recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>95     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
96         self.inner.recv_timeout(timeout).map_err(|err| match err {
97             cc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
98             cc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
99         })
100     }
101 
iter(&self) -> Iter<T>102     pub fn iter(&self) -> Iter<T> {
103         Iter { inner: self }
104     }
105 
try_iter(&self) -> TryIter<T>106     pub fn try_iter(&self) -> TryIter<T> {
107         TryIter { inner: self }
108     }
109 }
110 
111 impl<'a, T> IntoIterator for &'a Receiver<T> {
112     type Item = T;
113     type IntoIter = Iter<'a, T>;
114 
into_iter(self) -> Iter<'a, T>115     fn into_iter(self) -> Iter<'a, T> {
116         self.iter()
117     }
118 }
119 
120 impl<T> IntoIterator for Receiver<T> {
121     type Item = T;
122     type IntoIter = IntoIter<T>;
123 
into_iter(self) -> IntoIter<T>124     fn into_iter(self) -> IntoIter<T> {
125         IntoIter { inner: self }
126     }
127 }
128 
129 pub struct TryIter<'a, T: 'a> {
130     inner: &'a Receiver<T>,
131 }
132 
133 impl<'a, T> Iterator for TryIter<'a, T> {
134     type Item = T;
135 
next(&mut self) -> Option<T>136     fn next(&mut self) -> Option<T> {
137         self.inner.try_recv().ok()
138     }
139 }
140 
141 pub struct Iter<'a, T: 'a> {
142     inner: &'a Receiver<T>,
143 }
144 
145 impl<'a, T> Iterator for Iter<'a, T> {
146     type Item = T;
147 
next(&mut self) -> Option<T>148     fn next(&mut self) -> Option<T> {
149         self.inner.recv().ok()
150     }
151 }
152 
153 pub struct IntoIter<T> {
154     inner: Receiver<T>,
155 }
156 
157 impl<T> Iterator for IntoIter<T> {
158     type Item = T;
159 
next(&mut self) -> Option<T>160     fn next(&mut self) -> Option<T> {
161         self.inner.recv().ok()
162     }
163 }
164 
channel<T>() -> (Sender<T>, Receiver<T>)165 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
166     let (s, r) = cc::unbounded();
167     let s = Sender { inner: s };
168     let r = Receiver { inner: r };
169     (s, r)
170 }
171 
sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>)172 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
173     let (s, r) = cc::bounded(bound);
174     let s = SyncSender { inner: s };
175     let r = Receiver { inner: r };
176     (s, r)
177 }
178 
179 macro_rules! select {
180     (
181         $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
182     ) => ({
183         cc::crossbeam_channel_internal! {
184             $(
185                 $meth(($rx).inner) -> res => {
186                     let $name = res.map_err(|_| ::std::sync::mpsc::RecvError);
187                     $code
188                 }
189             )+
190         }
191     })
192 }
193 
194 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
195 mod channel_tests {
196     use super::*;
197 
198     use std::env;
199     use std::thread;
200     use std::time::{Duration, Instant};
201 
stress_factor() -> usize202     pub fn stress_factor() -> usize {
203         match env::var("RUST_TEST_STRESS") {
204             Ok(val) => val.parse().unwrap(),
205             Err(..) => 1,
206         }
207     }
208 
209     #[test]
smoke()210     fn smoke() {
211         let (tx, rx) = channel::<i32>();
212         tx.send(1).unwrap();
213         assert_eq!(rx.recv().unwrap(), 1);
214     }
215 
216     #[test]
drop_full()217     fn drop_full() {
218         let (tx, _rx) = channel::<Box<isize>>();
219         tx.send(Box::new(1)).unwrap();
220     }
221 
222     #[test]
drop_full_shared()223     fn drop_full_shared() {
224         let (tx, _rx) = channel::<Box<isize>>();
225         drop(tx.clone());
226         drop(tx.clone());
227         tx.send(Box::new(1)).unwrap();
228     }
229 
230     #[test]
smoke_shared()231     fn smoke_shared() {
232         let (tx, rx) = channel::<i32>();
233         tx.send(1).unwrap();
234         assert_eq!(rx.recv().unwrap(), 1);
235         let tx = tx.clone();
236         tx.send(1).unwrap();
237         assert_eq!(rx.recv().unwrap(), 1);
238     }
239 
240     #[test]
smoke_threads()241     fn smoke_threads() {
242         let (tx, rx) = channel::<i32>();
243         let t = thread::spawn(move || {
244             tx.send(1).unwrap();
245         });
246         assert_eq!(rx.recv().unwrap(), 1);
247         t.join().unwrap();
248     }
249 
250     #[test]
smoke_port_gone()251     fn smoke_port_gone() {
252         let (tx, rx) = channel::<i32>();
253         drop(rx);
254         assert!(tx.send(1).is_err());
255     }
256 
257     #[test]
smoke_shared_port_gone()258     fn smoke_shared_port_gone() {
259         let (tx, rx) = channel::<i32>();
260         drop(rx);
261         assert!(tx.send(1).is_err())
262     }
263 
264     #[test]
smoke_shared_port_gone2()265     fn smoke_shared_port_gone2() {
266         let (tx, rx) = channel::<i32>();
267         drop(rx);
268         let tx2 = tx.clone();
269         drop(tx);
270         assert!(tx2.send(1).is_err());
271     }
272 
273     #[test]
port_gone_concurrent()274     fn port_gone_concurrent() {
275         let (tx, rx) = channel::<i32>();
276         let t = thread::spawn(move || {
277             rx.recv().unwrap();
278         });
279         while tx.send(1).is_ok() {}
280         t.join().unwrap();
281     }
282 
283     #[test]
port_gone_concurrent_shared()284     fn port_gone_concurrent_shared() {
285         let (tx, rx) = channel::<i32>();
286         let tx2 = tx.clone();
287         let t = thread::spawn(move || {
288             rx.recv().unwrap();
289         });
290         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
291         t.join().unwrap();
292     }
293 
294     #[test]
smoke_chan_gone()295     fn smoke_chan_gone() {
296         let (tx, rx) = channel::<i32>();
297         drop(tx);
298         assert!(rx.recv().is_err());
299     }
300 
301     #[test]
smoke_chan_gone_shared()302     fn smoke_chan_gone_shared() {
303         let (tx, rx) = channel::<()>();
304         let tx2 = tx.clone();
305         drop(tx);
306         drop(tx2);
307         assert!(rx.recv().is_err());
308     }
309 
310     #[test]
chan_gone_concurrent()311     fn chan_gone_concurrent() {
312         let (tx, rx) = channel::<i32>();
313         let t = thread::spawn(move || {
314             tx.send(1).unwrap();
315             tx.send(1).unwrap();
316         });
317         while rx.recv().is_ok() {}
318         t.join().unwrap();
319     }
320 
321     #[test]
stress()322     fn stress() {
323         #[cfg(miri)]
324         const COUNT: usize = 500;
325         #[cfg(not(miri))]
326         const COUNT: usize = 10000;
327 
328         let (tx, rx) = channel::<i32>();
329         let t = thread::spawn(move || {
330             for _ in 0..COUNT {
331                 tx.send(1).unwrap();
332             }
333         });
334         for _ in 0..COUNT {
335             assert_eq!(rx.recv().unwrap(), 1);
336         }
337         t.join().ok().unwrap();
338     }
339 
340     #[test]
stress_shared()341     fn stress_shared() {
342         #[cfg(miri)]
343         const AMT: u32 = 500;
344         #[cfg(not(miri))]
345         const AMT: u32 = 10000;
346         const NTHREADS: u32 = 8;
347         let (tx, rx) = channel::<i32>();
348 
349         let t = thread::spawn(move || {
350             for _ in 0..AMT * NTHREADS {
351                 assert_eq!(rx.recv().unwrap(), 1);
352             }
353             assert!(rx.try_recv().is_err());
354         });
355 
356         let mut ts = Vec::with_capacity(NTHREADS as usize);
357         for _ in 0..NTHREADS {
358             let tx = tx.clone();
359             let t = thread::spawn(move || {
360                 for _ in 0..AMT {
361                     tx.send(1).unwrap();
362                 }
363             });
364             ts.push(t);
365         }
366         drop(tx);
367         t.join().ok().unwrap();
368         for t in ts {
369             t.join().unwrap();
370         }
371     }
372 
373     #[test]
send_from_outside_runtime()374     fn send_from_outside_runtime() {
375         let (tx1, rx1) = channel::<()>();
376         let (tx2, rx2) = channel::<i32>();
377         let t1 = thread::spawn(move || {
378             tx1.send(()).unwrap();
379             for _ in 0..40 {
380                 assert_eq!(rx2.recv().unwrap(), 1);
381             }
382         });
383         rx1.recv().unwrap();
384         let t2 = thread::spawn(move || {
385             for _ in 0..40 {
386                 tx2.send(1).unwrap();
387             }
388         });
389         t1.join().ok().unwrap();
390         t2.join().ok().unwrap();
391     }
392 
393     #[test]
recv_from_outside_runtime()394     fn recv_from_outside_runtime() {
395         let (tx, rx) = channel::<i32>();
396         let t = thread::spawn(move || {
397             for _ in 0..40 {
398                 assert_eq!(rx.recv().unwrap(), 1);
399             }
400         });
401         for _ in 0..40 {
402             tx.send(1).unwrap();
403         }
404         t.join().ok().unwrap();
405     }
406 
407     #[test]
no_runtime()408     fn no_runtime() {
409         let (tx1, rx1) = channel::<i32>();
410         let (tx2, rx2) = channel::<i32>();
411         let t1 = thread::spawn(move || {
412             assert_eq!(rx1.recv().unwrap(), 1);
413             tx2.send(2).unwrap();
414         });
415         let t2 = thread::spawn(move || {
416             tx1.send(1).unwrap();
417             assert_eq!(rx2.recv().unwrap(), 2);
418         });
419         t1.join().ok().unwrap();
420         t2.join().ok().unwrap();
421     }
422 
423     #[test]
oneshot_single_thread_close_port_first()424     fn oneshot_single_thread_close_port_first() {
425         // Simple test of closing without sending
426         let (_tx, rx) = channel::<i32>();
427         drop(rx);
428     }
429 
430     #[test]
oneshot_single_thread_close_chan_first()431     fn oneshot_single_thread_close_chan_first() {
432         // Simple test of closing without sending
433         let (tx, _rx) = channel::<i32>();
434         drop(tx);
435     }
436 
437     #[test]
oneshot_single_thread_send_port_close()438     fn oneshot_single_thread_send_port_close() {
439         // Testing that the sender cleans up the payload if receiver is closed
440         let (tx, rx) = channel::<Box<i32>>();
441         drop(rx);
442         assert!(tx.send(Box::new(0)).is_err());
443     }
444 
445     #[test]
oneshot_single_thread_recv_chan_close()446     fn oneshot_single_thread_recv_chan_close() {
447         let (tx, rx) = channel::<i32>();
448         drop(tx);
449         assert_eq!(rx.recv(), Err(RecvError));
450     }
451 
452     #[test]
oneshot_single_thread_send_then_recv()453     fn oneshot_single_thread_send_then_recv() {
454         let (tx, rx) = channel::<Box<i32>>();
455         tx.send(Box::new(10)).unwrap();
456         assert!(*rx.recv().unwrap() == 10);
457     }
458 
459     #[test]
oneshot_single_thread_try_send_open()460     fn oneshot_single_thread_try_send_open() {
461         let (tx, rx) = channel::<i32>();
462         assert!(tx.send(10).is_ok());
463         assert!(rx.recv().unwrap() == 10);
464     }
465 
466     #[test]
oneshot_single_thread_try_send_closed()467     fn oneshot_single_thread_try_send_closed() {
468         let (tx, rx) = channel::<i32>();
469         drop(rx);
470         assert!(tx.send(10).is_err());
471     }
472 
473     #[test]
oneshot_single_thread_try_recv_open()474     fn oneshot_single_thread_try_recv_open() {
475         let (tx, rx) = channel::<i32>();
476         tx.send(10).unwrap();
477         assert!(rx.recv() == Ok(10));
478     }
479 
480     #[test]
oneshot_single_thread_try_recv_closed()481     fn oneshot_single_thread_try_recv_closed() {
482         let (tx, rx) = channel::<i32>();
483         drop(tx);
484         assert!(rx.recv().is_err());
485     }
486 
487     #[test]
oneshot_single_thread_peek_data()488     fn oneshot_single_thread_peek_data() {
489         let (tx, rx) = channel::<i32>();
490         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
491         tx.send(10).unwrap();
492         assert_eq!(rx.try_recv(), Ok(10));
493     }
494 
495     #[test]
oneshot_single_thread_peek_close()496     fn oneshot_single_thread_peek_close() {
497         let (tx, rx) = channel::<i32>();
498         drop(tx);
499         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
500         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
501     }
502 
503     #[test]
oneshot_single_thread_peek_open()504     fn oneshot_single_thread_peek_open() {
505         let (_tx, rx) = channel::<i32>();
506         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
507     }
508 
509     #[test]
oneshot_multi_task_recv_then_send()510     fn oneshot_multi_task_recv_then_send() {
511         let (tx, rx) = channel::<Box<i32>>();
512         let t = thread::spawn(move || {
513             assert!(*rx.recv().unwrap() == 10);
514         });
515 
516         tx.send(Box::new(10)).unwrap();
517         t.join().unwrap();
518     }
519 
520     #[test]
oneshot_multi_task_recv_then_close()521     fn oneshot_multi_task_recv_then_close() {
522         let (tx, rx) = channel::<Box<i32>>();
523         let t = thread::spawn(move || {
524             drop(tx);
525         });
526         thread::spawn(move || {
527             assert_eq!(rx.recv(), Err(RecvError));
528         })
529         .join()
530         .unwrap();
531         t.join().unwrap();
532     }
533 
534     #[test]
oneshot_multi_thread_close_stress()535     fn oneshot_multi_thread_close_stress() {
536         let stress_factor = stress_factor();
537         let mut ts = Vec::with_capacity(stress_factor);
538         for _ in 0..stress_factor {
539             let (tx, rx) = channel::<i32>();
540             let t = thread::spawn(move || {
541                 drop(rx);
542             });
543             ts.push(t);
544             drop(tx);
545         }
546         for t in ts {
547             t.join().unwrap();
548         }
549     }
550 
551     #[test]
oneshot_multi_thread_send_close_stress()552     fn oneshot_multi_thread_send_close_stress() {
553         let stress_factor = stress_factor();
554         let mut ts = Vec::with_capacity(2 * stress_factor);
555         for _ in 0..stress_factor {
556             let (tx, rx) = channel::<i32>();
557             let t = thread::spawn(move || {
558                 drop(rx);
559             });
560             ts.push(t);
561             thread::spawn(move || {
562                 let _ = tx.send(1);
563             })
564             .join()
565             .unwrap();
566         }
567         for t in ts {
568             t.join().unwrap();
569         }
570     }
571 
572     #[test]
oneshot_multi_thread_recv_close_stress()573     fn oneshot_multi_thread_recv_close_stress() {
574         let stress_factor = stress_factor();
575         let mut ts = Vec::with_capacity(2 * stress_factor);
576         for _ in 0..stress_factor {
577             let (tx, rx) = channel::<i32>();
578             let t = thread::spawn(move || {
579                 thread::spawn(move || {
580                     assert_eq!(rx.recv(), Err(RecvError));
581                 })
582                 .join()
583                 .unwrap();
584             });
585             ts.push(t);
586             let t2 = thread::spawn(move || {
587                 let t = thread::spawn(move || {
588                     drop(tx);
589                 });
590                 t.join().unwrap();
591             });
592             ts.push(t2);
593         }
594         for t in ts {
595             t.join().unwrap();
596         }
597     }
598 
599     #[test]
oneshot_multi_thread_send_recv_stress()600     fn oneshot_multi_thread_send_recv_stress() {
601         let stress_factor = stress_factor();
602         let mut ts = Vec::with_capacity(stress_factor);
603         for _ in 0..stress_factor {
604             let (tx, rx) = channel::<Box<isize>>();
605             let t = thread::spawn(move || {
606                 tx.send(Box::new(10)).unwrap();
607             });
608             ts.push(t);
609             assert!(*rx.recv().unwrap() == 10);
610         }
611         for t in ts {
612             t.join().unwrap();
613         }
614     }
615 
616     #[test]
stream_send_recv_stress()617     fn stream_send_recv_stress() {
618         let stress_factor = stress_factor();
619         let mut ts = Vec::with_capacity(2 * stress_factor);
620         for _ in 0..stress_factor {
621             let (tx, rx) = channel();
622 
623             if let Some(t) = send(tx, 0) {
624                 ts.push(t);
625             }
626             if let Some(t2) = recv(rx, 0) {
627                 ts.push(t2);
628             }
629 
630             fn send(tx: Sender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
631                 if i == 10 {
632                     return None;
633                 }
634 
635                 Some(thread::spawn(move || {
636                     tx.send(Box::new(i)).unwrap();
637                     send(tx, i + 1);
638                 }))
639             }
640 
641             fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
642                 if i == 10 {
643                     return None;
644                 }
645 
646                 Some(thread::spawn(move || {
647                     assert!(*rx.recv().unwrap() == i);
648                     recv(rx, i + 1);
649                 }))
650             }
651         }
652         for t in ts {
653             t.join().unwrap();
654         }
655     }
656 
657     #[test]
oneshot_single_thread_recv_timeout()658     fn oneshot_single_thread_recv_timeout() {
659         let (tx, rx) = channel();
660         tx.send(()).unwrap();
661         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
662         assert_eq!(
663             rx.recv_timeout(Duration::from_millis(1)),
664             Err(RecvTimeoutError::Timeout)
665         );
666         tx.send(()).unwrap();
667         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
668     }
669 
670     #[test]
stress_recv_timeout_two_threads()671     fn stress_recv_timeout_two_threads() {
672         let (tx, rx) = channel();
673         let stress = stress_factor() + 100;
674         let timeout = Duration::from_millis(100);
675 
676         let t = thread::spawn(move || {
677             for i in 0..stress {
678                 if i % 2 == 0 {
679                     thread::sleep(timeout * 2);
680                 }
681                 tx.send(1usize).unwrap();
682             }
683         });
684 
685         let mut recv_count = 0;
686         loop {
687             match rx.recv_timeout(timeout) {
688                 Ok(n) => {
689                     assert_eq!(n, 1usize);
690                     recv_count += 1;
691                 }
692                 Err(RecvTimeoutError::Timeout) => continue,
693                 Err(RecvTimeoutError::Disconnected) => break,
694             }
695         }
696 
697         assert_eq!(recv_count, stress);
698         t.join().unwrap()
699     }
700 
701     #[test]
recv_timeout_upgrade()702     fn recv_timeout_upgrade() {
703         let (tx, rx) = channel::<()>();
704         let timeout = Duration::from_millis(1);
705         let _tx_clone = tx.clone();
706 
707         let start = Instant::now();
708         assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
709         assert!(Instant::now() >= start + timeout);
710     }
711 
712     #[test]
stress_recv_timeout_shared()713     fn stress_recv_timeout_shared() {
714         let (tx, rx) = channel();
715         let stress = stress_factor() + 100;
716 
717         let mut ts = Vec::with_capacity(stress);
718         for i in 0..stress {
719             let tx = tx.clone();
720             let t = thread::spawn(move || {
721                 thread::sleep(Duration::from_millis(i as u64 * 10));
722                 tx.send(1usize).unwrap();
723             });
724             ts.push(t);
725         }
726 
727         drop(tx);
728 
729         let mut recv_count = 0;
730         loop {
731             match rx.recv_timeout(Duration::from_millis(10)) {
732                 Ok(n) => {
733                     assert_eq!(n, 1usize);
734                     recv_count += 1;
735                 }
736                 Err(RecvTimeoutError::Timeout) => continue,
737                 Err(RecvTimeoutError::Disconnected) => break,
738             }
739         }
740 
741         assert_eq!(recv_count, stress);
742         for t in ts {
743             t.join().unwrap();
744         }
745     }
746 
747     #[test]
recv_a_lot()748     fn recv_a_lot() {
749         #[cfg(miri)]
750         const N: usize = 100;
751         #[cfg(not(miri))]
752         const N: usize = 10000;
753 
754         // Regression test that we don't run out of stack in scheduler context
755         let (tx, rx) = channel();
756         for _ in 0..N {
757             tx.send(()).unwrap();
758         }
759         for _ in 0..N {
760             rx.recv().unwrap();
761         }
762     }
763 
764     #[test]
shared_recv_timeout()765     fn shared_recv_timeout() {
766         let (tx, rx) = channel();
767         let total = 5;
768         let mut ts = Vec::with_capacity(total);
769         for _ in 0..total {
770             let tx = tx.clone();
771             let t = thread::spawn(move || {
772                 tx.send(()).unwrap();
773             });
774             ts.push(t);
775         }
776 
777         for _ in 0..total {
778             rx.recv().unwrap();
779         }
780 
781         assert_eq!(
782             rx.recv_timeout(Duration::from_millis(1)),
783             Err(RecvTimeoutError::Timeout)
784         );
785         tx.send(()).unwrap();
786         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
787         for t in ts {
788             t.join().unwrap();
789         }
790     }
791 
792     #[test]
shared_chan_stress()793     fn shared_chan_stress() {
794         let (tx, rx) = channel();
795         let total = stress_factor() + 100;
796         let mut ts = Vec::with_capacity(total);
797         for _ in 0..total {
798             let tx = tx.clone();
799             let t = thread::spawn(move || {
800                 tx.send(()).unwrap();
801             });
802             ts.push(t);
803         }
804 
805         for _ in 0..total {
806             rx.recv().unwrap();
807         }
808         for t in ts {
809             t.join().unwrap();
810         }
811     }
812 
813     #[test]
test_nested_recv_iter()814     fn test_nested_recv_iter() {
815         let (tx, rx) = channel::<i32>();
816         let (total_tx, total_rx) = channel::<i32>();
817 
818         let t = thread::spawn(move || {
819             let mut acc = 0;
820             for x in rx.iter() {
821                 acc += x;
822             }
823             total_tx.send(acc).unwrap();
824         });
825 
826         tx.send(3).unwrap();
827         tx.send(1).unwrap();
828         tx.send(2).unwrap();
829         drop(tx);
830         assert_eq!(total_rx.recv().unwrap(), 6);
831         t.join().unwrap();
832     }
833 
834     #[test]
test_recv_iter_break()835     fn test_recv_iter_break() {
836         let (tx, rx) = channel::<i32>();
837         let (count_tx, count_rx) = channel();
838 
839         let t = thread::spawn(move || {
840             let mut count = 0;
841             for x in rx.iter() {
842                 if count >= 3 {
843                     break;
844                 } else {
845                     count += x;
846                 }
847             }
848             count_tx.send(count).unwrap();
849         });
850 
851         tx.send(2).unwrap();
852         tx.send(2).unwrap();
853         tx.send(2).unwrap();
854         let _ = tx.send(2);
855         drop(tx);
856         assert_eq!(count_rx.recv().unwrap(), 4);
857         t.join().unwrap();
858     }
859 
860     #[test]
test_recv_try_iter()861     fn test_recv_try_iter() {
862         let (request_tx, request_rx) = channel();
863         let (response_tx, response_rx) = channel();
864 
865         // Request `x`s until we have `6`.
866         let t = thread::spawn(move || {
867             let mut count = 0;
868             loop {
869                 for x in response_rx.try_iter() {
870                     count += x;
871                     if count == 6 {
872                         return count;
873                     }
874                 }
875                 request_tx.send(()).unwrap();
876             }
877         });
878 
879         for _ in request_rx.iter() {
880             if response_tx.send(2).is_err() {
881                 break;
882             }
883         }
884 
885         assert_eq!(t.join().unwrap(), 6);
886     }
887 
888     #[test]
test_recv_into_iter_owned()889     fn test_recv_into_iter_owned() {
890         let mut iter = {
891             let (tx, rx) = channel::<i32>();
892             tx.send(1).unwrap();
893             tx.send(2).unwrap();
894 
895             rx.into_iter()
896         };
897         assert_eq!(iter.next().unwrap(), 1);
898         assert_eq!(iter.next().unwrap(), 2);
899         assert!(iter.next().is_none());
900     }
901 
902     #[test]
test_recv_into_iter_borrowed()903     fn test_recv_into_iter_borrowed() {
904         let (tx, rx) = channel::<i32>();
905         tx.send(1).unwrap();
906         tx.send(2).unwrap();
907         drop(tx);
908         let mut iter = (&rx).into_iter();
909         assert_eq!(iter.next().unwrap(), 1);
910         assert_eq!(iter.next().unwrap(), 2);
911         assert!(iter.next().is_none());
912     }
913 
914     #[test]
try_recv_states()915     fn try_recv_states() {
916         let (tx1, rx1) = channel::<i32>();
917         let (tx2, rx2) = channel::<()>();
918         let (tx3, rx3) = channel::<()>();
919         let t = thread::spawn(move || {
920             rx2.recv().unwrap();
921             tx1.send(1).unwrap();
922             tx3.send(()).unwrap();
923             rx2.recv().unwrap();
924             drop(tx1);
925             tx3.send(()).unwrap();
926         });
927 
928         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
929         tx2.send(()).unwrap();
930         rx3.recv().unwrap();
931         assert_eq!(rx1.try_recv(), Ok(1));
932         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
933         tx2.send(()).unwrap();
934         rx3.recv().unwrap();
935         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
936         t.join().unwrap();
937     }
938 
939     // This bug used to end up in a livelock inside of the Receiver destructor
940     // because the internal state of the Shared packet was corrupted
941     #[test]
destroy_upgraded_shared_port_when_sender_still_active()942     fn destroy_upgraded_shared_port_when_sender_still_active() {
943         let (tx, rx) = channel();
944         let (tx2, rx2) = channel();
945         let t = thread::spawn(move || {
946             rx.recv().unwrap(); // wait on a oneshot
947             drop(rx); // destroy a shared
948             tx2.send(()).unwrap();
949         });
950         // make sure the other thread has gone to sleep
951         for _ in 0..5000 {
952             thread::yield_now();
953         }
954 
955         // upgrade to a shared chan and send a message
956         let tx2 = tx.clone();
957         drop(tx);
958         tx2.send(()).unwrap();
959 
960         // wait for the child thread to exit before we exit
961         rx2.recv().unwrap();
962         t.join().unwrap();
963     }
964 
965     #[test]
issue_32114()966     fn issue_32114() {
967         let (tx, _) = channel();
968         let _ = tx.send(123);
969         assert_eq!(tx.send(123), Err(SendError(123)));
970     }
971 }
972 
973 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
974 mod sync_channel_tests {
975     use super::*;
976 
977     use std::env;
978     use std::thread;
979     use std::time::Duration;
980 
stress_factor() -> usize981     pub fn stress_factor() -> usize {
982         match env::var("RUST_TEST_STRESS") {
983             Ok(val) => val.parse().unwrap(),
984             Err(..) => 1,
985         }
986     }
987 
988     #[test]
smoke()989     fn smoke() {
990         let (tx, rx) = sync_channel::<i32>(1);
991         tx.send(1).unwrap();
992         assert_eq!(rx.recv().unwrap(), 1);
993     }
994 
995     #[test]
drop_full()996     fn drop_full() {
997         let (tx, _rx) = sync_channel::<Box<isize>>(1);
998         tx.send(Box::new(1)).unwrap();
999     }
1000 
1001     #[test]
smoke_shared()1002     fn smoke_shared() {
1003         let (tx, rx) = sync_channel::<i32>(1);
1004         tx.send(1).unwrap();
1005         assert_eq!(rx.recv().unwrap(), 1);
1006         let tx = tx.clone();
1007         tx.send(1).unwrap();
1008         assert_eq!(rx.recv().unwrap(), 1);
1009     }
1010 
1011     #[test]
recv_timeout()1012     fn recv_timeout() {
1013         let (tx, rx) = sync_channel::<i32>(1);
1014         assert_eq!(
1015             rx.recv_timeout(Duration::from_millis(1)),
1016             Err(RecvTimeoutError::Timeout)
1017         );
1018         tx.send(1).unwrap();
1019         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
1020     }
1021 
1022     #[test]
smoke_threads()1023     fn smoke_threads() {
1024         let (tx, rx) = sync_channel::<i32>(0);
1025         let t = thread::spawn(move || {
1026             tx.send(1).unwrap();
1027         });
1028         assert_eq!(rx.recv().unwrap(), 1);
1029         t.join().unwrap();
1030     }
1031 
1032     #[test]
smoke_port_gone()1033     fn smoke_port_gone() {
1034         let (tx, rx) = sync_channel::<i32>(0);
1035         drop(rx);
1036         assert!(tx.send(1).is_err());
1037     }
1038 
1039     #[test]
smoke_shared_port_gone2()1040     fn smoke_shared_port_gone2() {
1041         let (tx, rx) = sync_channel::<i32>(0);
1042         drop(rx);
1043         let tx2 = tx.clone();
1044         drop(tx);
1045         assert!(tx2.send(1).is_err());
1046     }
1047 
1048     #[test]
port_gone_concurrent()1049     fn port_gone_concurrent() {
1050         let (tx, rx) = sync_channel::<i32>(0);
1051         let t = thread::spawn(move || {
1052             rx.recv().unwrap();
1053         });
1054         while tx.send(1).is_ok() {}
1055         t.join().unwrap();
1056     }
1057 
1058     #[test]
port_gone_concurrent_shared()1059     fn port_gone_concurrent_shared() {
1060         let (tx, rx) = sync_channel::<i32>(0);
1061         let tx2 = tx.clone();
1062         let t = thread::spawn(move || {
1063             rx.recv().unwrap();
1064         });
1065         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1066         t.join().unwrap();
1067     }
1068 
1069     #[test]
smoke_chan_gone()1070     fn smoke_chan_gone() {
1071         let (tx, rx) = sync_channel::<i32>(0);
1072         drop(tx);
1073         assert!(rx.recv().is_err());
1074     }
1075 
1076     #[test]
smoke_chan_gone_shared()1077     fn smoke_chan_gone_shared() {
1078         let (tx, rx) = sync_channel::<()>(0);
1079         let tx2 = tx.clone();
1080         drop(tx);
1081         drop(tx2);
1082         assert!(rx.recv().is_err());
1083     }
1084 
1085     #[test]
chan_gone_concurrent()1086     fn chan_gone_concurrent() {
1087         let (tx, rx) = sync_channel::<i32>(0);
1088         let t = thread::spawn(move || {
1089             tx.send(1).unwrap();
1090             tx.send(1).unwrap();
1091         });
1092         while rx.recv().is_ok() {}
1093         t.join().unwrap();
1094     }
1095 
1096     #[test]
stress()1097     fn stress() {
1098         #[cfg(miri)]
1099         const N: usize = 100;
1100         #[cfg(not(miri))]
1101         const N: usize = 10000;
1102 
1103         let (tx, rx) = sync_channel::<i32>(0);
1104         let t = thread::spawn(move || {
1105             for _ in 0..N {
1106                 tx.send(1).unwrap();
1107             }
1108         });
1109         for _ in 0..N {
1110             assert_eq!(rx.recv().unwrap(), 1);
1111         }
1112         t.join().unwrap();
1113     }
1114 
1115     #[test]
stress_recv_timeout_two_threads()1116     fn stress_recv_timeout_two_threads() {
1117         #[cfg(miri)]
1118         const N: usize = 100;
1119         #[cfg(not(miri))]
1120         const N: usize = 10000;
1121 
1122         let (tx, rx) = sync_channel::<i32>(0);
1123 
1124         let t = thread::spawn(move || {
1125             for _ in 0..N {
1126                 tx.send(1).unwrap();
1127             }
1128         });
1129 
1130         let mut recv_count = 0;
1131         loop {
1132             match rx.recv_timeout(Duration::from_millis(1)) {
1133                 Ok(v) => {
1134                     assert_eq!(v, 1);
1135                     recv_count += 1;
1136                 }
1137                 Err(RecvTimeoutError::Timeout) => continue,
1138                 Err(RecvTimeoutError::Disconnected) => break,
1139             }
1140         }
1141 
1142         assert_eq!(recv_count, N);
1143         t.join().unwrap();
1144     }
1145 
1146     #[test]
stress_recv_timeout_shared()1147     fn stress_recv_timeout_shared() {
1148         #[cfg(miri)]
1149         const AMT: u32 = 100;
1150         #[cfg(not(miri))]
1151         const AMT: u32 = 1000;
1152         const NTHREADS: u32 = 8;
1153         let (tx, rx) = sync_channel::<i32>(0);
1154         let (dtx, drx) = sync_channel::<()>(0);
1155 
1156         let t = thread::spawn(move || {
1157             let mut recv_count = 0;
1158             loop {
1159                 match rx.recv_timeout(Duration::from_millis(10)) {
1160                     Ok(v) => {
1161                         assert_eq!(v, 1);
1162                         recv_count += 1;
1163                     }
1164                     Err(RecvTimeoutError::Timeout) => continue,
1165                     Err(RecvTimeoutError::Disconnected) => break,
1166                 }
1167             }
1168 
1169             assert_eq!(recv_count, AMT * NTHREADS);
1170             assert!(rx.try_recv().is_err());
1171 
1172             dtx.send(()).unwrap();
1173         });
1174 
1175         let mut ts = Vec::with_capacity(NTHREADS as usize);
1176         for _ in 0..NTHREADS {
1177             let tx = tx.clone();
1178             let t = thread::spawn(move || {
1179                 for _ in 0..AMT {
1180                     tx.send(1).unwrap();
1181                 }
1182             });
1183             ts.push(t);
1184         }
1185 
1186         drop(tx);
1187 
1188         drx.recv().unwrap();
1189         for t in ts {
1190             t.join().unwrap();
1191         }
1192         t.join().unwrap();
1193     }
1194 
1195     #[test]
stress_shared()1196     fn stress_shared() {
1197         #[cfg(miri)]
1198         const AMT: u32 = 100;
1199         #[cfg(not(miri))]
1200         const AMT: u32 = 1000;
1201         const NTHREADS: u32 = 8;
1202         let (tx, rx) = sync_channel::<i32>(0);
1203         let (dtx, drx) = sync_channel::<()>(0);
1204 
1205         let t = thread::spawn(move || {
1206             for _ in 0..AMT * NTHREADS {
1207                 assert_eq!(rx.recv().unwrap(), 1);
1208             }
1209             assert!(rx.try_recv().is_err());
1210             dtx.send(()).unwrap();
1211         });
1212 
1213         let mut ts = Vec::with_capacity(NTHREADS as usize);
1214         for _ in 0..NTHREADS {
1215             let tx = tx.clone();
1216             let t = thread::spawn(move || {
1217                 for _ in 0..AMT {
1218                     tx.send(1).unwrap();
1219                 }
1220             });
1221             ts.push(t);
1222         }
1223         drop(tx);
1224         drx.recv().unwrap();
1225         for t in ts {
1226             t.join().unwrap();
1227         }
1228         t.join().unwrap();
1229     }
1230 
1231     #[test]
oneshot_single_thread_close_port_first()1232     fn oneshot_single_thread_close_port_first() {
1233         // Simple test of closing without sending
1234         let (_tx, rx) = sync_channel::<i32>(0);
1235         drop(rx);
1236     }
1237 
1238     #[test]
oneshot_single_thread_close_chan_first()1239     fn oneshot_single_thread_close_chan_first() {
1240         // Simple test of closing without sending
1241         let (tx, _rx) = sync_channel::<i32>(0);
1242         drop(tx);
1243     }
1244 
1245     #[test]
oneshot_single_thread_send_port_close()1246     fn oneshot_single_thread_send_port_close() {
1247         // Testing that the sender cleans up the payload if receiver is closed
1248         let (tx, rx) = sync_channel::<Box<i32>>(0);
1249         drop(rx);
1250         assert!(tx.send(Box::new(0)).is_err());
1251     }
1252 
1253     #[test]
oneshot_single_thread_recv_chan_close()1254     fn oneshot_single_thread_recv_chan_close() {
1255         let (tx, rx) = sync_channel::<i32>(0);
1256         drop(tx);
1257         assert_eq!(rx.recv(), Err(RecvError));
1258     }
1259 
1260     #[test]
oneshot_single_thread_send_then_recv()1261     fn oneshot_single_thread_send_then_recv() {
1262         let (tx, rx) = sync_channel::<Box<i32>>(1);
1263         tx.send(Box::new(10)).unwrap();
1264         assert!(*rx.recv().unwrap() == 10);
1265     }
1266 
1267     #[test]
oneshot_single_thread_try_send_open()1268     fn oneshot_single_thread_try_send_open() {
1269         let (tx, rx) = sync_channel::<i32>(1);
1270         assert_eq!(tx.try_send(10), Ok(()));
1271         assert!(rx.recv().unwrap() == 10);
1272     }
1273 
1274     #[test]
oneshot_single_thread_try_send_closed()1275     fn oneshot_single_thread_try_send_closed() {
1276         let (tx, rx) = sync_channel::<i32>(0);
1277         drop(rx);
1278         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1279     }
1280 
1281     #[test]
oneshot_single_thread_try_send_closed2()1282     fn oneshot_single_thread_try_send_closed2() {
1283         let (tx, _rx) = sync_channel::<i32>(0);
1284         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1285     }
1286 
1287     #[test]
oneshot_single_thread_try_recv_open()1288     fn oneshot_single_thread_try_recv_open() {
1289         let (tx, rx) = sync_channel::<i32>(1);
1290         tx.send(10).unwrap();
1291         assert!(rx.recv() == Ok(10));
1292     }
1293 
1294     #[test]
oneshot_single_thread_try_recv_closed()1295     fn oneshot_single_thread_try_recv_closed() {
1296         let (tx, rx) = sync_channel::<i32>(0);
1297         drop(tx);
1298         assert!(rx.recv().is_err());
1299     }
1300 
1301     #[test]
oneshot_single_thread_try_recv_closed_with_data()1302     fn oneshot_single_thread_try_recv_closed_with_data() {
1303         let (tx, rx) = sync_channel::<i32>(1);
1304         tx.send(10).unwrap();
1305         drop(tx);
1306         assert_eq!(rx.try_recv(), Ok(10));
1307         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1308     }
1309 
1310     #[test]
oneshot_single_thread_peek_data()1311     fn oneshot_single_thread_peek_data() {
1312         let (tx, rx) = sync_channel::<i32>(1);
1313         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1314         tx.send(10).unwrap();
1315         assert_eq!(rx.try_recv(), Ok(10));
1316     }
1317 
1318     #[test]
oneshot_single_thread_peek_close()1319     fn oneshot_single_thread_peek_close() {
1320         let (tx, rx) = sync_channel::<i32>(0);
1321         drop(tx);
1322         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1323         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1324     }
1325 
1326     #[test]
oneshot_single_thread_peek_open()1327     fn oneshot_single_thread_peek_open() {
1328         let (_tx, rx) = sync_channel::<i32>(0);
1329         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1330     }
1331 
1332     #[test]
oneshot_multi_task_recv_then_send()1333     fn oneshot_multi_task_recv_then_send() {
1334         let (tx, rx) = sync_channel::<Box<i32>>(0);
1335         let t = thread::spawn(move || {
1336             assert!(*rx.recv().unwrap() == 10);
1337         });
1338 
1339         tx.send(Box::new(10)).unwrap();
1340         t.join().unwrap();
1341     }
1342 
1343     #[test]
oneshot_multi_task_recv_then_close()1344     fn oneshot_multi_task_recv_then_close() {
1345         let (tx, rx) = sync_channel::<Box<i32>>(0);
1346         let t = thread::spawn(move || {
1347             drop(tx);
1348         });
1349         thread::spawn(move || {
1350             assert_eq!(rx.recv(), Err(RecvError));
1351         })
1352         .join()
1353         .unwrap();
1354         t.join().unwrap();
1355     }
1356 
1357     #[test]
oneshot_multi_thread_close_stress()1358     fn oneshot_multi_thread_close_stress() {
1359         let stress_factor = stress_factor();
1360         let mut ts = Vec::with_capacity(stress_factor);
1361         for _ in 0..stress_factor {
1362             let (tx, rx) = sync_channel::<i32>(0);
1363             let t = thread::spawn(move || {
1364                 drop(rx);
1365             });
1366             ts.push(t);
1367             drop(tx);
1368         }
1369         for t in ts {
1370             t.join().unwrap();
1371         }
1372     }
1373 
1374     #[test]
oneshot_multi_thread_send_close_stress()1375     fn oneshot_multi_thread_send_close_stress() {
1376         let stress_factor = stress_factor();
1377         let mut ts = Vec::with_capacity(stress_factor);
1378         for _ in 0..stress_factor {
1379             let (tx, rx) = sync_channel::<i32>(0);
1380             let t = thread::spawn(move || {
1381                 drop(rx);
1382             });
1383             ts.push(t);
1384             thread::spawn(move || {
1385                 let _ = tx.send(1);
1386             })
1387             .join()
1388             .unwrap();
1389         }
1390         for t in ts {
1391             t.join().unwrap();
1392         }
1393     }
1394 
1395     #[test]
oneshot_multi_thread_recv_close_stress()1396     fn oneshot_multi_thread_recv_close_stress() {
1397         let stress_factor = stress_factor();
1398         let mut ts = Vec::with_capacity(2 * stress_factor);
1399         for _ in 0..stress_factor {
1400             let (tx, rx) = sync_channel::<i32>(0);
1401             let t = thread::spawn(move || {
1402                 thread::spawn(move || {
1403                     assert_eq!(rx.recv(), Err(RecvError));
1404                 })
1405                 .join()
1406                 .unwrap();
1407             });
1408             ts.push(t);
1409             let t2 = thread::spawn(move || {
1410                 thread::spawn(move || {
1411                     drop(tx);
1412                 });
1413             });
1414             ts.push(t2);
1415         }
1416         for t in ts {
1417             t.join().unwrap();
1418         }
1419     }
1420 
1421     #[test]
oneshot_multi_thread_send_recv_stress()1422     fn oneshot_multi_thread_send_recv_stress() {
1423         let stress_factor = stress_factor();
1424         let mut ts = Vec::with_capacity(stress_factor);
1425         for _ in 0..stress_factor {
1426             let (tx, rx) = sync_channel::<Box<i32>>(0);
1427             let t = thread::spawn(move || {
1428                 tx.send(Box::new(10)).unwrap();
1429             });
1430             ts.push(t);
1431             assert!(*rx.recv().unwrap() == 10);
1432         }
1433         for t in ts {
1434             t.join().unwrap();
1435         }
1436     }
1437 
1438     #[test]
stream_send_recv_stress()1439     fn stream_send_recv_stress() {
1440         let stress_factor = stress_factor();
1441         let mut ts = Vec::with_capacity(2 * stress_factor);
1442         for _ in 0..stress_factor {
1443             let (tx, rx) = sync_channel::<Box<i32>>(0);
1444 
1445             if let Some(t) = send(tx, 0) {
1446                 ts.push(t);
1447             }
1448             if let Some(t) = recv(rx, 0) {
1449                 ts.push(t);
1450             }
1451 
1452             fn send(tx: SyncSender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1453                 if i == 10 {
1454                     return None;
1455                 }
1456 
1457                 Some(thread::spawn(move || {
1458                     tx.send(Box::new(i)).unwrap();
1459                     send(tx, i + 1);
1460                 }))
1461             }
1462 
1463             fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1464                 if i == 10 {
1465                     return None;
1466                 }
1467 
1468                 Some(thread::spawn(move || {
1469                     assert!(*rx.recv().unwrap() == i);
1470                     recv(rx, i + 1);
1471                 }))
1472             }
1473         }
1474         for t in ts {
1475             t.join().unwrap();
1476         }
1477     }
1478 
1479     #[test]
recv_a_lot()1480     fn recv_a_lot() {
1481         #[cfg(miri)]
1482         const N: usize = 100;
1483         #[cfg(not(miri))]
1484         const N: usize = 10000;
1485 
1486         // Regression test that we don't run out of stack in scheduler context
1487         let (tx, rx) = sync_channel(N);
1488         for _ in 0..N {
1489             tx.send(()).unwrap();
1490         }
1491         for _ in 0..N {
1492             rx.recv().unwrap();
1493         }
1494     }
1495 
1496     #[test]
shared_chan_stress()1497     fn shared_chan_stress() {
1498         let (tx, rx) = sync_channel(0);
1499         let total = stress_factor() + 100;
1500         let mut ts = Vec::with_capacity(total);
1501         for _ in 0..total {
1502             let tx = tx.clone();
1503             let t = thread::spawn(move || {
1504                 tx.send(()).unwrap();
1505             });
1506             ts.push(t);
1507         }
1508 
1509         for _ in 0..total {
1510             rx.recv().unwrap();
1511         }
1512         for t in ts {
1513             t.join().unwrap();
1514         }
1515     }
1516 
1517     #[test]
test_nested_recv_iter()1518     fn test_nested_recv_iter() {
1519         let (tx, rx) = sync_channel::<i32>(0);
1520         let (total_tx, total_rx) = sync_channel::<i32>(0);
1521 
1522         let t = thread::spawn(move || {
1523             let mut acc = 0;
1524             for x in rx.iter() {
1525                 acc += x;
1526             }
1527             total_tx.send(acc).unwrap();
1528         });
1529 
1530         tx.send(3).unwrap();
1531         tx.send(1).unwrap();
1532         tx.send(2).unwrap();
1533         drop(tx);
1534         assert_eq!(total_rx.recv().unwrap(), 6);
1535         t.join().unwrap();
1536     }
1537 
1538     #[test]
test_recv_iter_break()1539     fn test_recv_iter_break() {
1540         let (tx, rx) = sync_channel::<i32>(0);
1541         let (count_tx, count_rx) = sync_channel(0);
1542 
1543         let t = thread::spawn(move || {
1544             let mut count = 0;
1545             for x in rx.iter() {
1546                 if count >= 3 {
1547                     break;
1548                 } else {
1549                     count += x;
1550                 }
1551             }
1552             count_tx.send(count).unwrap();
1553         });
1554 
1555         tx.send(2).unwrap();
1556         tx.send(2).unwrap();
1557         tx.send(2).unwrap();
1558         let _ = tx.try_send(2);
1559         drop(tx);
1560         assert_eq!(count_rx.recv().unwrap(), 4);
1561         t.join().unwrap();
1562     }
1563 
1564     #[test]
try_recv_states()1565     fn try_recv_states() {
1566         let (tx1, rx1) = sync_channel::<i32>(1);
1567         let (tx2, rx2) = sync_channel::<()>(1);
1568         let (tx3, rx3) = sync_channel::<()>(1);
1569         let t = thread::spawn(move || {
1570             rx2.recv().unwrap();
1571             tx1.send(1).unwrap();
1572             tx3.send(()).unwrap();
1573             rx2.recv().unwrap();
1574             drop(tx1);
1575             tx3.send(()).unwrap();
1576         });
1577 
1578         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1579         tx2.send(()).unwrap();
1580         rx3.recv().unwrap();
1581         assert_eq!(rx1.try_recv(), Ok(1));
1582         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1583         tx2.send(()).unwrap();
1584         rx3.recv().unwrap();
1585         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1586         t.join().unwrap();
1587     }
1588 
1589     // This bug used to end up in a livelock inside of the Receiver destructor
1590     // because the internal state of the Shared packet was corrupted
1591     #[test]
destroy_upgraded_shared_port_when_sender_still_active()1592     fn destroy_upgraded_shared_port_when_sender_still_active() {
1593         let (tx, rx) = sync_channel::<()>(0);
1594         let (tx2, rx2) = sync_channel::<()>(0);
1595         let t = thread::spawn(move || {
1596             rx.recv().unwrap(); // wait on a oneshot
1597             drop(rx); // destroy a shared
1598             tx2.send(()).unwrap();
1599         });
1600         // make sure the other thread has gone to sleep
1601         for _ in 0..5000 {
1602             thread::yield_now();
1603         }
1604 
1605         // upgrade to a shared chan and send a message
1606         let tx2 = tx.clone();
1607         drop(tx);
1608         tx2.send(()).unwrap();
1609 
1610         // wait for the child thread to exit before we exit
1611         rx2.recv().unwrap();
1612         t.join().unwrap();
1613     }
1614 
1615     #[test]
send1()1616     fn send1() {
1617         let (tx, rx) = sync_channel::<i32>(0);
1618         let t = thread::spawn(move || {
1619             rx.recv().unwrap();
1620         });
1621         assert_eq!(tx.send(1), Ok(()));
1622         t.join().unwrap();
1623     }
1624 
1625     #[test]
send2()1626     fn send2() {
1627         let (tx, rx) = sync_channel::<i32>(0);
1628         let t = thread::spawn(move || {
1629             drop(rx);
1630         });
1631         assert!(tx.send(1).is_err());
1632         t.join().unwrap();
1633     }
1634 
1635     #[test]
send3()1636     fn send3() {
1637         let (tx, rx) = sync_channel::<i32>(1);
1638         assert_eq!(tx.send(1), Ok(()));
1639         let t = thread::spawn(move || {
1640             drop(rx);
1641         });
1642         assert!(tx.send(1).is_err());
1643         t.join().unwrap();
1644     }
1645 
1646     #[test]
send4()1647     fn send4() {
1648         let (tx, rx) = sync_channel::<i32>(0);
1649         let tx2 = tx.clone();
1650         let (done, donerx) = channel();
1651         let done2 = done.clone();
1652         let t = thread::spawn(move || {
1653             assert!(tx.send(1).is_err());
1654             done.send(()).unwrap();
1655         });
1656         let t2 = thread::spawn(move || {
1657             assert!(tx2.send(2).is_err());
1658             done2.send(()).unwrap();
1659         });
1660         drop(rx);
1661         donerx.recv().unwrap();
1662         donerx.recv().unwrap();
1663         t.join().unwrap();
1664         t2.join().unwrap();
1665     }
1666 
1667     #[test]
try_send1()1668     fn try_send1() {
1669         let (tx, _rx) = sync_channel::<i32>(0);
1670         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1671     }
1672 
1673     #[test]
try_send2()1674     fn try_send2() {
1675         let (tx, _rx) = sync_channel::<i32>(1);
1676         assert_eq!(tx.try_send(1), Ok(()));
1677         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1678     }
1679 
1680     #[test]
try_send3()1681     fn try_send3() {
1682         let (tx, rx) = sync_channel::<i32>(1);
1683         assert_eq!(tx.try_send(1), Ok(()));
1684         drop(rx);
1685         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
1686     }
1687 
1688     #[test]
issue_15761()1689     fn issue_15761() {
1690         fn repro() {
1691             let (tx1, rx1) = sync_channel::<()>(3);
1692             let (tx2, rx2) = sync_channel::<()>(3);
1693 
1694             let _t = thread::spawn(move || {
1695                 rx1.recv().unwrap();
1696                 tx2.try_send(()).unwrap();
1697             });
1698 
1699             tx1.try_send(()).unwrap();
1700             rx2.recv().unwrap();
1701         }
1702 
1703         for _ in 0..100 {
1704             repro()
1705         }
1706     }
1707 }
1708 
1709 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/select.rs
1710 mod select_tests {
1711     use super::*;
1712 
1713     use std::thread;
1714 
1715     #[test]
smoke()1716     fn smoke() {
1717         let (tx1, rx1) = channel::<i32>();
1718         let (tx2, rx2) = channel::<i32>();
1719         tx1.send(1).unwrap();
1720         select! {
1721             foo = rx1.recv() => assert_eq!(foo.unwrap(), 1),
1722             _bar = rx2.recv() => panic!()
1723         }
1724         tx2.send(2).unwrap();
1725         select! {
1726             _foo = rx1.recv() => panic!(),
1727             bar = rx2.recv() => assert_eq!(bar.unwrap(), 2)
1728         }
1729         drop(tx1);
1730         select! {
1731             foo = rx1.recv() => assert!(foo.is_err()),
1732             _bar = rx2.recv() => panic!()
1733         }
1734         drop(tx2);
1735         select! {
1736             bar = rx2.recv() => assert!(bar.is_err())
1737         }
1738     }
1739 
1740     #[test]
smoke2()1741     fn smoke2() {
1742         let (_tx1, rx1) = channel::<i32>();
1743         let (_tx2, rx2) = channel::<i32>();
1744         let (_tx3, rx3) = channel::<i32>();
1745         let (_tx4, rx4) = channel::<i32>();
1746         let (tx5, rx5) = channel::<i32>();
1747         tx5.send(4).unwrap();
1748         select! {
1749             _foo = rx1.recv() => panic!("1"),
1750             _foo = rx2.recv() => panic!("2"),
1751             _foo = rx3.recv() => panic!("3"),
1752             _foo = rx4.recv() => panic!("4"),
1753             foo = rx5.recv() => assert_eq!(foo.unwrap(), 4)
1754         }
1755     }
1756 
1757     #[test]
closed()1758     fn closed() {
1759         let (_tx1, rx1) = channel::<i32>();
1760         let (tx2, rx2) = channel::<i32>();
1761         drop(tx2);
1762 
1763         select! {
1764             _a1 = rx1.recv() => panic!(),
1765             a2 = rx2.recv() => assert!(a2.is_err())
1766         }
1767     }
1768 
1769     #[test]
unblocks()1770     fn unblocks() {
1771         let (tx1, rx1) = channel::<i32>();
1772         let (_tx2, rx2) = channel::<i32>();
1773         let (tx3, rx3) = channel::<i32>();
1774 
1775         let t = thread::spawn(move || {
1776             for _ in 0..20 {
1777                 thread::yield_now();
1778             }
1779             tx1.send(1).unwrap();
1780             rx3.recv().unwrap();
1781             for _ in 0..20 {
1782                 thread::yield_now();
1783             }
1784         });
1785 
1786         select! {
1787             a = rx1.recv() => assert_eq!(a.unwrap(), 1),
1788             _b = rx2.recv() => panic!()
1789         }
1790         tx3.send(1).unwrap();
1791         select! {
1792             a = rx1.recv() => assert!(a.is_err()),
1793             _b = rx2.recv() => panic!()
1794         }
1795         t.join().unwrap();
1796     }
1797 
1798     #[test]
both_ready()1799     fn both_ready() {
1800         let (tx1, rx1) = channel::<i32>();
1801         let (tx2, rx2) = channel::<i32>();
1802         let (tx3, rx3) = channel::<()>();
1803 
1804         let t = thread::spawn(move || {
1805             for _ in 0..20 {
1806                 thread::yield_now();
1807             }
1808             tx1.send(1).unwrap();
1809             tx2.send(2).unwrap();
1810             rx3.recv().unwrap();
1811         });
1812 
1813         select! {
1814             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1815             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1816         }
1817         select! {
1818             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1819             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1820         }
1821         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1822         assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
1823         tx3.send(()).unwrap();
1824         t.join().unwrap();
1825     }
1826 
1827     #[test]
stress()1828     fn stress() {
1829         #[cfg(miri)]
1830         const AMT: i32 = 100;
1831         #[cfg(not(miri))]
1832         const AMT: i32 = 10000;
1833 
1834         let (tx1, rx1) = channel::<i32>();
1835         let (tx2, rx2) = channel::<i32>();
1836         let (tx3, rx3) = channel::<()>();
1837 
1838         let t = thread::spawn(move || {
1839             for i in 0..AMT {
1840                 if i % 2 == 0 {
1841                     tx1.send(i).unwrap();
1842                 } else {
1843                     tx2.send(i).unwrap();
1844                 }
1845                 rx3.recv().unwrap();
1846             }
1847         });
1848 
1849         for i in 0..AMT {
1850             select! {
1851                 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
1852                 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
1853             }
1854             tx3.send(()).unwrap();
1855         }
1856         t.join().unwrap();
1857     }
1858 
1859     #[allow(unused_must_use)]
1860     #[test]
cloning()1861     fn cloning() {
1862         let (tx1, rx1) = channel::<i32>();
1863         let (_tx2, rx2) = channel::<i32>();
1864         let (tx3, rx3) = channel::<()>();
1865 
1866         let t = thread::spawn(move || {
1867             rx3.recv().unwrap();
1868             tx1.clone();
1869             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1870             tx1.send(2).unwrap();
1871             rx3.recv().unwrap();
1872         });
1873 
1874         tx3.send(()).unwrap();
1875         select! {
1876             _i1 = rx1.recv() => {},
1877             _i2 = rx2.recv() => panic!()
1878         }
1879         tx3.send(()).unwrap();
1880         t.join().unwrap();
1881     }
1882 
1883     #[allow(unused_must_use)]
1884     #[test]
cloning2()1885     fn cloning2() {
1886         let (tx1, rx1) = channel::<i32>();
1887         let (_tx2, rx2) = channel::<i32>();
1888         let (tx3, rx3) = channel::<()>();
1889 
1890         let t = thread::spawn(move || {
1891             rx3.recv().unwrap();
1892             tx1.clone();
1893             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1894             tx1.send(2).unwrap();
1895             rx3.recv().unwrap();
1896         });
1897 
1898         tx3.send(()).unwrap();
1899         select! {
1900             _i1 = rx1.recv() => {},
1901             _i2 = rx2.recv() => panic!()
1902         }
1903         tx3.send(()).unwrap();
1904         t.join().unwrap();
1905     }
1906 
1907     #[test]
cloning3()1908     fn cloning3() {
1909         let (tx1, rx1) = channel::<()>();
1910         let (tx2, rx2) = channel::<()>();
1911         let (tx3, rx3) = channel::<()>();
1912         let t = thread::spawn(move || {
1913             select! {
1914                 _ = rx1.recv() => panic!(),
1915                 _ = rx2.recv() => {}
1916             }
1917             tx3.send(()).unwrap();
1918         });
1919 
1920         for _ in 0..1000 {
1921             thread::yield_now();
1922         }
1923         drop(tx1.clone());
1924         tx2.send(()).unwrap();
1925         rx3.recv().unwrap();
1926         t.join().unwrap();
1927     }
1928 
1929     #[test]
preflight1()1930     fn preflight1() {
1931         let (tx, rx) = channel();
1932         tx.send(()).unwrap();
1933         select! {
1934             _n = rx.recv() => {}
1935         }
1936     }
1937 
1938     #[test]
preflight2()1939     fn preflight2() {
1940         let (tx, rx) = channel();
1941         tx.send(()).unwrap();
1942         tx.send(()).unwrap();
1943         select! {
1944             _n = rx.recv() => {}
1945         }
1946     }
1947 
1948     #[test]
preflight3()1949     fn preflight3() {
1950         let (tx, rx) = channel();
1951         drop(tx.clone());
1952         tx.send(()).unwrap();
1953         select! {
1954             _n = rx.recv() => {}
1955         }
1956     }
1957 
1958     #[test]
preflight4()1959     fn preflight4() {
1960         let (tx, rx) = channel();
1961         tx.send(()).unwrap();
1962         select! {
1963             _ = rx.recv() => {}
1964         }
1965     }
1966 
1967     #[test]
preflight5()1968     fn preflight5() {
1969         let (tx, rx) = channel();
1970         tx.send(()).unwrap();
1971         tx.send(()).unwrap();
1972         select! {
1973             _ = rx.recv() => {}
1974         }
1975     }
1976 
1977     #[test]
preflight6()1978     fn preflight6() {
1979         let (tx, rx) = channel();
1980         drop(tx.clone());
1981         tx.send(()).unwrap();
1982         select! {
1983             _ = rx.recv() => {}
1984         }
1985     }
1986 
1987     #[test]
preflight7()1988     fn preflight7() {
1989         let (tx, rx) = channel::<()>();
1990         drop(tx);
1991         select! {
1992             _ = rx.recv() => {}
1993         }
1994     }
1995 
1996     #[test]
preflight8()1997     fn preflight8() {
1998         let (tx, rx) = channel();
1999         tx.send(()).unwrap();
2000         drop(tx);
2001         rx.recv().unwrap();
2002         select! {
2003             _ = rx.recv() => {}
2004         }
2005     }
2006 
2007     #[test]
preflight9()2008     fn preflight9() {
2009         let (tx, rx) = channel();
2010         drop(tx.clone());
2011         tx.send(()).unwrap();
2012         drop(tx);
2013         rx.recv().unwrap();
2014         select! {
2015             _ = rx.recv() => {}
2016         }
2017     }
2018 
2019     #[test]
oneshot_data_waiting()2020     fn oneshot_data_waiting() {
2021         let (tx1, rx1) = channel();
2022         let (tx2, rx2) = channel();
2023         let t = thread::spawn(move || {
2024             select! {
2025                 _n = rx1.recv() => {}
2026             }
2027             tx2.send(()).unwrap();
2028         });
2029 
2030         for _ in 0..100 {
2031             thread::yield_now()
2032         }
2033         tx1.send(()).unwrap();
2034         rx2.recv().unwrap();
2035         t.join().unwrap();
2036     }
2037 
2038     #[test]
stream_data_waiting()2039     fn stream_data_waiting() {
2040         let (tx1, rx1) = channel();
2041         let (tx2, rx2) = channel();
2042         tx1.send(()).unwrap();
2043         tx1.send(()).unwrap();
2044         rx1.recv().unwrap();
2045         rx1.recv().unwrap();
2046         let t = thread::spawn(move || {
2047             select! {
2048                 _n = rx1.recv() => {}
2049             }
2050             tx2.send(()).unwrap();
2051         });
2052 
2053         for _ in 0..100 {
2054             thread::yield_now()
2055         }
2056         tx1.send(()).unwrap();
2057         rx2.recv().unwrap();
2058         t.join().unwrap();
2059     }
2060 
2061     #[test]
shared_data_waiting()2062     fn shared_data_waiting() {
2063         let (tx1, rx1) = channel();
2064         let (tx2, rx2) = channel();
2065         drop(tx1.clone());
2066         tx1.send(()).unwrap();
2067         rx1.recv().unwrap();
2068         let t = thread::spawn(move || {
2069             select! {
2070                 _n = rx1.recv() => {}
2071             }
2072             tx2.send(()).unwrap();
2073         });
2074 
2075         for _ in 0..100 {
2076             thread::yield_now()
2077         }
2078         tx1.send(()).unwrap();
2079         rx2.recv().unwrap();
2080         t.join().unwrap();
2081     }
2082 
2083     #[test]
sync1()2084     fn sync1() {
2085         let (tx, rx) = sync_channel::<i32>(1);
2086         tx.send(1).unwrap();
2087         select! {
2088             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2089         }
2090     }
2091 
2092     #[test]
sync2()2093     fn sync2() {
2094         let (tx, rx) = sync_channel::<i32>(0);
2095         let t = thread::spawn(move || {
2096             for _ in 0..100 {
2097                 thread::yield_now()
2098             }
2099             tx.send(1).unwrap();
2100         });
2101         select! {
2102             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2103         }
2104         t.join().unwrap();
2105     }
2106 
2107     #[test]
sync3()2108     fn sync3() {
2109         let (tx1, rx1) = sync_channel::<i32>(0);
2110         let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
2111         let t = thread::spawn(move || {
2112             tx1.send(1).unwrap();
2113         });
2114         let t2 = thread::spawn(move || {
2115             tx2.send(2).unwrap();
2116         });
2117         select! {
2118             n = rx1.recv() => {
2119                 let n = n.unwrap();
2120                 assert_eq!(n, 1);
2121                 assert_eq!(rx2.recv().unwrap(), 2);
2122             },
2123             n = rx2.recv() => {
2124                 let n = n.unwrap();
2125                 assert_eq!(n, 2);
2126                 assert_eq!(rx1.recv().unwrap(), 1);
2127             }
2128         }
2129         t.join().unwrap();
2130         t2.join().unwrap();
2131     }
2132 }
2133