• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Tests copied from `std::sync::mpsc`.
2 //!
3 //! This is a copy of tests for the `std::sync::mpsc` channels from the standard library, but
4 //! modified to work with `crossbeam-channel` instead.
5 //!
6 //! Minor tweaks were needed to make the tests compile:
7 //!
8 //! - Replace `box` syntax with `Box::new`.
9 //! - Replace all uses of `Select` with `select!`.
10 //! - Change the imports.
11 //! - Join all spawned threads.
12 //! - Removed assertion from oneshot_multi_thread_send_close_stress tests.
13 //!
14 //! Source:
15 //!   - https://github.com/rust-lang/rust/tree/master/src/libstd/sync/mpsc
16 //!
17 //! Copyright & License:
18 //!   - Copyright 2013-2014 The Rust Project Developers
19 //!   - Apache License, Version 2.0 or MIT license, at your option
20 //!   - https://github.com/rust-lang/rust/blob/master/COPYRIGHT
21 //!   - https://www.rust-lang.org/en-US/legal.html
22 
23 #![allow(clippy::match_single_binding, clippy::redundant_clone)]
24 
25 use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError};
26 use std::sync::mpsc::{SendError, TrySendError};
27 use std::thread::JoinHandle;
28 use std::time::Duration;
29 
30 use crossbeam_channel as cc;
31 
32 pub struct Sender<T> {
33     pub inner: cc::Sender<T>,
34 }
35 
36 impl<T> Sender<T> {
send(&self, t: T) -> Result<(), SendError<T>>37     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
38         self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
39     }
40 }
41 
42 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>43     fn clone(&self) -> Sender<T> {
44         Sender {
45             inner: self.inner.clone(),
46         }
47     }
48 }
49 
50 pub struct SyncSender<T> {
51     pub inner: cc::Sender<T>,
52 }
53 
54 impl<T> SyncSender<T> {
send(&self, t: T) -> Result<(), SendError<T>>55     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
56         self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
57     }
58 
try_send(&self, t: T) -> Result<(), TrySendError<T>>59     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
60         self.inner.try_send(t).map_err(|err| match err {
61             cc::TrySendError::Full(m) => TrySendError::Full(m),
62             cc::TrySendError::Disconnected(m) => TrySendError::Disconnected(m),
63         })
64     }
65 }
66 
67 impl<T> Clone for SyncSender<T> {
clone(&self) -> SyncSender<T>68     fn clone(&self) -> SyncSender<T> {
69         SyncSender {
70             inner: self.inner.clone(),
71         }
72     }
73 }
74 
75 pub struct Receiver<T> {
76     pub inner: cc::Receiver<T>,
77 }
78 
79 impl<T> Receiver<T> {
try_recv(&self) -> Result<T, TryRecvError>80     pub fn try_recv(&self) -> Result<T, TryRecvError> {
81         self.inner.try_recv().map_err(|err| match err {
82             cc::TryRecvError::Empty => TryRecvError::Empty,
83             cc::TryRecvError::Disconnected => TryRecvError::Disconnected,
84         })
85     }
86 
recv(&self) -> Result<T, RecvError>87     pub fn recv(&self) -> Result<T, RecvError> {
88         self.inner.recv().map_err(|_| RecvError)
89     }
90 
recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>91     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
92         self.inner.recv_timeout(timeout).map_err(|err| match err {
93             cc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
94             cc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
95         })
96     }
97 
iter(&self) -> Iter<T>98     pub fn iter(&self) -> Iter<T> {
99         Iter { inner: self }
100     }
101 
try_iter(&self) -> TryIter<T>102     pub fn try_iter(&self) -> TryIter<T> {
103         TryIter { inner: self }
104     }
105 }
106 
107 impl<'a, T> IntoIterator for &'a Receiver<T> {
108     type Item = T;
109     type IntoIter = Iter<'a, T>;
110 
into_iter(self) -> Iter<'a, T>111     fn into_iter(self) -> Iter<'a, T> {
112         self.iter()
113     }
114 }
115 
116 impl<T> IntoIterator for Receiver<T> {
117     type Item = T;
118     type IntoIter = IntoIter<T>;
119 
into_iter(self) -> IntoIter<T>120     fn into_iter(self) -> IntoIter<T> {
121         IntoIter { inner: self }
122     }
123 }
124 
125 pub struct TryIter<'a, T: 'a> {
126     inner: &'a Receiver<T>,
127 }
128 
129 impl<'a, T> Iterator for TryIter<'a, T> {
130     type Item = T;
131 
next(&mut self) -> Option<T>132     fn next(&mut self) -> Option<T> {
133         self.inner.try_recv().ok()
134     }
135 }
136 
137 pub struct Iter<'a, T: 'a> {
138     inner: &'a Receiver<T>,
139 }
140 
141 impl<'a, T> Iterator for Iter<'a, T> {
142     type Item = T;
143 
next(&mut self) -> Option<T>144     fn next(&mut self) -> Option<T> {
145         self.inner.recv().ok()
146     }
147 }
148 
149 pub struct IntoIter<T> {
150     inner: Receiver<T>,
151 }
152 
153 impl<T> Iterator for IntoIter<T> {
154     type Item = T;
155 
next(&mut self) -> Option<T>156     fn next(&mut self) -> Option<T> {
157         self.inner.recv().ok()
158     }
159 }
160 
channel<T>() -> (Sender<T>, Receiver<T>)161 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
162     let (s, r) = cc::unbounded();
163     let s = Sender { inner: s };
164     let r = Receiver { inner: r };
165     (s, r)
166 }
167 
sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>)168 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
169     let (s, r) = cc::bounded(bound);
170     let s = SyncSender { inner: s };
171     let r = Receiver { inner: r };
172     (s, r)
173 }
174 
175 macro_rules! select {
176     (
177         $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
178     ) => ({
179         cc::crossbeam_channel_internal! {
180             $(
181                 $meth(($rx).inner) -> res => {
182                     let $name = res.map_err(|_| ::std::sync::mpsc::RecvError);
183                     $code
184                 }
185             )+
186         }
187     })
188 }
189 
190 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
191 mod channel_tests {
192     use super::*;
193 
194     use std::env;
195     use std::thread;
196     use std::time::{Duration, Instant};
197 
stress_factor() -> usize198     pub fn stress_factor() -> usize {
199         match env::var("RUST_TEST_STRESS") {
200             Ok(val) => val.parse().unwrap(),
201             Err(..) => 1,
202         }
203     }
204 
205     #[test]
smoke()206     fn smoke() {
207         let (tx, rx) = channel::<i32>();
208         tx.send(1).unwrap();
209         assert_eq!(rx.recv().unwrap(), 1);
210     }
211 
212     #[test]
drop_full()213     fn drop_full() {
214         let (tx, _rx) = channel::<Box<isize>>();
215         tx.send(Box::new(1)).unwrap();
216     }
217 
218     #[test]
drop_full_shared()219     fn drop_full_shared() {
220         let (tx, _rx) = channel::<Box<isize>>();
221         drop(tx.clone());
222         drop(tx.clone());
223         tx.send(Box::new(1)).unwrap();
224     }
225 
226     #[test]
smoke_shared()227     fn smoke_shared() {
228         let (tx, rx) = channel::<i32>();
229         tx.send(1).unwrap();
230         assert_eq!(rx.recv().unwrap(), 1);
231         let tx = tx.clone();
232         tx.send(1).unwrap();
233         assert_eq!(rx.recv().unwrap(), 1);
234     }
235 
236     #[test]
smoke_threads()237     fn smoke_threads() {
238         let (tx, rx) = channel::<i32>();
239         let t = thread::spawn(move || {
240             tx.send(1).unwrap();
241         });
242         assert_eq!(rx.recv().unwrap(), 1);
243         t.join().unwrap();
244     }
245 
246     #[test]
smoke_port_gone()247     fn smoke_port_gone() {
248         let (tx, rx) = channel::<i32>();
249         drop(rx);
250         assert!(tx.send(1).is_err());
251     }
252 
253     #[test]
smoke_shared_port_gone()254     fn smoke_shared_port_gone() {
255         let (tx, rx) = channel::<i32>();
256         drop(rx);
257         assert!(tx.send(1).is_err())
258     }
259 
260     #[test]
smoke_shared_port_gone2()261     fn smoke_shared_port_gone2() {
262         let (tx, rx) = channel::<i32>();
263         drop(rx);
264         let tx2 = tx.clone();
265         drop(tx);
266         assert!(tx2.send(1).is_err());
267     }
268 
269     #[test]
port_gone_concurrent()270     fn port_gone_concurrent() {
271         let (tx, rx) = channel::<i32>();
272         let t = thread::spawn(move || {
273             rx.recv().unwrap();
274         });
275         while tx.send(1).is_ok() {}
276         t.join().unwrap();
277     }
278 
279     #[test]
port_gone_concurrent_shared()280     fn port_gone_concurrent_shared() {
281         let (tx, rx) = channel::<i32>();
282         let tx2 = tx.clone();
283         let t = thread::spawn(move || {
284             rx.recv().unwrap();
285         });
286         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
287         t.join().unwrap();
288     }
289 
290     #[test]
smoke_chan_gone()291     fn smoke_chan_gone() {
292         let (tx, rx) = channel::<i32>();
293         drop(tx);
294         assert!(rx.recv().is_err());
295     }
296 
297     #[test]
smoke_chan_gone_shared()298     fn smoke_chan_gone_shared() {
299         let (tx, rx) = channel::<()>();
300         let tx2 = tx.clone();
301         drop(tx);
302         drop(tx2);
303         assert!(rx.recv().is_err());
304     }
305 
306     #[test]
chan_gone_concurrent()307     fn chan_gone_concurrent() {
308         let (tx, rx) = channel::<i32>();
309         let t = thread::spawn(move || {
310             tx.send(1).unwrap();
311             tx.send(1).unwrap();
312         });
313         while rx.recv().is_ok() {}
314         t.join().unwrap();
315     }
316 
317     #[test]
stress()318     fn stress() {
319         #[cfg(miri)]
320         const COUNT: usize = 100;
321         #[cfg(not(miri))]
322         const COUNT: usize = 10000;
323 
324         let (tx, rx) = channel::<i32>();
325         let t = thread::spawn(move || {
326             for _ in 0..COUNT {
327                 tx.send(1).unwrap();
328             }
329         });
330         for _ in 0..COUNT {
331             assert_eq!(rx.recv().unwrap(), 1);
332         }
333         t.join().ok().unwrap();
334     }
335 
336     #[test]
stress_shared()337     fn stress_shared() {
338         let amt: u32 = if cfg!(miri) { 100 } else { 10_000 };
339         let nthreads: u32 = if cfg!(miri) { 4 } else { 8 };
340         let (tx, rx) = channel::<i32>();
341 
342         let t = thread::spawn(move || {
343             for _ in 0..amt * nthreads {
344                 assert_eq!(rx.recv().unwrap(), 1);
345             }
346             assert!(rx.try_recv().is_err());
347         });
348 
349         let mut ts = Vec::with_capacity(nthreads as usize);
350         for _ in 0..nthreads {
351             let tx = tx.clone();
352             let t = thread::spawn(move || {
353                 for _ in 0..amt {
354                     tx.send(1).unwrap();
355                 }
356             });
357             ts.push(t);
358         }
359         drop(tx);
360         t.join().ok().unwrap();
361         for t in ts {
362             t.join().unwrap();
363         }
364     }
365 
366     #[test]
send_from_outside_runtime()367     fn send_from_outside_runtime() {
368         let (tx1, rx1) = channel::<()>();
369         let (tx2, rx2) = channel::<i32>();
370         let t1 = thread::spawn(move || {
371             tx1.send(()).unwrap();
372             for _ in 0..40 {
373                 assert_eq!(rx2.recv().unwrap(), 1);
374             }
375         });
376         rx1.recv().unwrap();
377         let t2 = thread::spawn(move || {
378             for _ in 0..40 {
379                 tx2.send(1).unwrap();
380             }
381         });
382         t1.join().ok().unwrap();
383         t2.join().ok().unwrap();
384     }
385 
386     #[test]
recv_from_outside_runtime()387     fn recv_from_outside_runtime() {
388         let (tx, rx) = channel::<i32>();
389         let t = thread::spawn(move || {
390             for _ in 0..40 {
391                 assert_eq!(rx.recv().unwrap(), 1);
392             }
393         });
394         for _ in 0..40 {
395             tx.send(1).unwrap();
396         }
397         t.join().ok().unwrap();
398     }
399 
400     #[test]
no_runtime()401     fn no_runtime() {
402         let (tx1, rx1) = channel::<i32>();
403         let (tx2, rx2) = channel::<i32>();
404         let t1 = thread::spawn(move || {
405             assert_eq!(rx1.recv().unwrap(), 1);
406             tx2.send(2).unwrap();
407         });
408         let t2 = thread::spawn(move || {
409             tx1.send(1).unwrap();
410             assert_eq!(rx2.recv().unwrap(), 2);
411         });
412         t1.join().ok().unwrap();
413         t2.join().ok().unwrap();
414     }
415 
416     #[test]
oneshot_single_thread_close_port_first()417     fn oneshot_single_thread_close_port_first() {
418         // Simple test of closing without sending
419         let (_tx, rx) = channel::<i32>();
420         drop(rx);
421     }
422 
423     #[test]
oneshot_single_thread_close_chan_first()424     fn oneshot_single_thread_close_chan_first() {
425         // Simple test of closing without sending
426         let (tx, _rx) = channel::<i32>();
427         drop(tx);
428     }
429 
430     #[test]
oneshot_single_thread_send_port_close()431     fn oneshot_single_thread_send_port_close() {
432         // Testing that the sender cleans up the payload if receiver is closed
433         let (tx, rx) = channel::<Box<i32>>();
434         drop(rx);
435         assert!(tx.send(Box::new(0)).is_err());
436     }
437 
438     #[test]
oneshot_single_thread_recv_chan_close()439     fn oneshot_single_thread_recv_chan_close() {
440         let (tx, rx) = channel::<i32>();
441         drop(tx);
442         assert_eq!(rx.recv(), Err(RecvError));
443     }
444 
445     #[test]
oneshot_single_thread_send_then_recv()446     fn oneshot_single_thread_send_then_recv() {
447         let (tx, rx) = channel::<Box<i32>>();
448         tx.send(Box::new(10)).unwrap();
449         assert!(*rx.recv().unwrap() == 10);
450     }
451 
452     #[test]
oneshot_single_thread_try_send_open()453     fn oneshot_single_thread_try_send_open() {
454         let (tx, rx) = channel::<i32>();
455         assert!(tx.send(10).is_ok());
456         assert!(rx.recv().unwrap() == 10);
457     }
458 
459     #[test]
oneshot_single_thread_try_send_closed()460     fn oneshot_single_thread_try_send_closed() {
461         let (tx, rx) = channel::<i32>();
462         drop(rx);
463         assert!(tx.send(10).is_err());
464     }
465 
466     #[test]
oneshot_single_thread_try_recv_open()467     fn oneshot_single_thread_try_recv_open() {
468         let (tx, rx) = channel::<i32>();
469         tx.send(10).unwrap();
470         assert!(rx.recv() == Ok(10));
471     }
472 
473     #[test]
oneshot_single_thread_try_recv_closed()474     fn oneshot_single_thread_try_recv_closed() {
475         let (tx, rx) = channel::<i32>();
476         drop(tx);
477         assert!(rx.recv().is_err());
478     }
479 
480     #[test]
oneshot_single_thread_peek_data()481     fn oneshot_single_thread_peek_data() {
482         let (tx, rx) = channel::<i32>();
483         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
484         tx.send(10).unwrap();
485         assert_eq!(rx.try_recv(), Ok(10));
486     }
487 
488     #[test]
oneshot_single_thread_peek_close()489     fn oneshot_single_thread_peek_close() {
490         let (tx, rx) = channel::<i32>();
491         drop(tx);
492         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
493         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
494     }
495 
496     #[test]
oneshot_single_thread_peek_open()497     fn oneshot_single_thread_peek_open() {
498         let (_tx, rx) = channel::<i32>();
499         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
500     }
501 
502     #[test]
oneshot_multi_task_recv_then_send()503     fn oneshot_multi_task_recv_then_send() {
504         let (tx, rx) = channel::<Box<i32>>();
505         let t = thread::spawn(move || {
506             assert!(*rx.recv().unwrap() == 10);
507         });
508 
509         tx.send(Box::new(10)).unwrap();
510         t.join().unwrap();
511     }
512 
513     #[test]
oneshot_multi_task_recv_then_close()514     fn oneshot_multi_task_recv_then_close() {
515         let (tx, rx) = channel::<Box<i32>>();
516         let t = thread::spawn(move || {
517             drop(tx);
518         });
519         thread::spawn(move || {
520             assert_eq!(rx.recv(), Err(RecvError));
521         })
522         .join()
523         .unwrap();
524         t.join().unwrap();
525     }
526 
527     #[test]
oneshot_multi_thread_close_stress()528     fn oneshot_multi_thread_close_stress() {
529         let stress_factor = stress_factor();
530         let mut ts = Vec::with_capacity(stress_factor);
531         for _ in 0..stress_factor {
532             let (tx, rx) = channel::<i32>();
533             let t = thread::spawn(move || {
534                 drop(rx);
535             });
536             ts.push(t);
537             drop(tx);
538         }
539         for t in ts {
540             t.join().unwrap();
541         }
542     }
543 
544     #[test]
oneshot_multi_thread_send_close_stress()545     fn oneshot_multi_thread_send_close_stress() {
546         let stress_factor = stress_factor();
547         let mut ts = Vec::with_capacity(2 * stress_factor);
548         for _ in 0..stress_factor {
549             let (tx, rx) = channel::<i32>();
550             let t = thread::spawn(move || {
551                 drop(rx);
552             });
553             ts.push(t);
554             thread::spawn(move || {
555                 let _ = tx.send(1);
556             })
557             .join()
558             .unwrap();
559         }
560         for t in ts {
561             t.join().unwrap();
562         }
563     }
564 
565     #[test]
oneshot_multi_thread_recv_close_stress()566     fn oneshot_multi_thread_recv_close_stress() {
567         let stress_factor = stress_factor();
568         let mut ts = Vec::with_capacity(2 * stress_factor);
569         for _ in 0..stress_factor {
570             let (tx, rx) = channel::<i32>();
571             let t = thread::spawn(move || {
572                 thread::spawn(move || {
573                     assert_eq!(rx.recv(), Err(RecvError));
574                 })
575                 .join()
576                 .unwrap();
577             });
578             ts.push(t);
579             let t2 = thread::spawn(move || {
580                 let t = thread::spawn(move || {
581                     drop(tx);
582                 });
583                 t.join().unwrap();
584             });
585             ts.push(t2);
586         }
587         for t in ts {
588             t.join().unwrap();
589         }
590     }
591 
592     #[test]
oneshot_multi_thread_send_recv_stress()593     fn oneshot_multi_thread_send_recv_stress() {
594         let stress_factor = stress_factor();
595         let mut ts = Vec::with_capacity(stress_factor);
596         for _ in 0..stress_factor {
597             let (tx, rx) = channel::<Box<isize>>();
598             let t = thread::spawn(move || {
599                 tx.send(Box::new(10)).unwrap();
600             });
601             ts.push(t);
602             assert!(*rx.recv().unwrap() == 10);
603         }
604         for t in ts {
605             t.join().unwrap();
606         }
607     }
608 
609     #[test]
stream_send_recv_stress()610     fn stream_send_recv_stress() {
611         let stress_factor = stress_factor();
612         let mut ts = Vec::with_capacity(2 * stress_factor);
613         for _ in 0..stress_factor {
614             let (tx, rx) = channel();
615 
616             if let Some(t) = send(tx, 0) {
617                 ts.push(t);
618             }
619             if let Some(t2) = recv(rx, 0) {
620                 ts.push(t2);
621             }
622 
623             fn send(tx: Sender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
624                 if i == 10 {
625                     return None;
626                 }
627 
628                 Some(thread::spawn(move || {
629                     tx.send(Box::new(i)).unwrap();
630                     send(tx, i + 1);
631                 }))
632             }
633 
634             fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
635                 if i == 10 {
636                     return None;
637                 }
638 
639                 Some(thread::spawn(move || {
640                     assert!(*rx.recv().unwrap() == i);
641                     recv(rx, i + 1);
642                 }))
643             }
644         }
645         for t in ts {
646             t.join().unwrap();
647         }
648     }
649 
650     #[test]
oneshot_single_thread_recv_timeout()651     fn oneshot_single_thread_recv_timeout() {
652         let (tx, rx) = channel();
653         tx.send(()).unwrap();
654         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
655         assert_eq!(
656             rx.recv_timeout(Duration::from_millis(1)),
657             Err(RecvTimeoutError::Timeout)
658         );
659         tx.send(()).unwrap();
660         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
661     }
662 
663     #[test]
stress_recv_timeout_two_threads()664     fn stress_recv_timeout_two_threads() {
665         let (tx, rx) = channel();
666         let stress = stress_factor() + 100;
667         let timeout = Duration::from_millis(100);
668 
669         let t = thread::spawn(move || {
670             for i in 0..stress {
671                 if i % 2 == 0 {
672                     thread::sleep(timeout * 2);
673                 }
674                 tx.send(1usize).unwrap();
675             }
676         });
677 
678         let mut recv_count = 0;
679         loop {
680             match rx.recv_timeout(timeout) {
681                 Ok(n) => {
682                     assert_eq!(n, 1usize);
683                     recv_count += 1;
684                 }
685                 Err(RecvTimeoutError::Timeout) => continue,
686                 Err(RecvTimeoutError::Disconnected) => break,
687             }
688         }
689 
690         assert_eq!(recv_count, stress);
691         t.join().unwrap()
692     }
693 
694     #[test]
recv_timeout_upgrade()695     fn recv_timeout_upgrade() {
696         let (tx, rx) = channel::<()>();
697         let timeout = Duration::from_millis(1);
698         let _tx_clone = tx.clone();
699 
700         let start = Instant::now();
701         assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
702         assert!(Instant::now() >= start + timeout);
703     }
704 
705     #[test]
stress_recv_timeout_shared()706     fn stress_recv_timeout_shared() {
707         let (tx, rx) = channel();
708         let stress = stress_factor() + 100;
709 
710         let mut ts = Vec::with_capacity(stress);
711         for i in 0..stress {
712             let tx = tx.clone();
713             let t = thread::spawn(move || {
714                 thread::sleep(Duration::from_millis(i as u64 * 10));
715                 tx.send(1usize).unwrap();
716             });
717             ts.push(t);
718         }
719 
720         drop(tx);
721 
722         let mut recv_count = 0;
723         loop {
724             match rx.recv_timeout(Duration::from_millis(10)) {
725                 Ok(n) => {
726                     assert_eq!(n, 1usize);
727                     recv_count += 1;
728                 }
729                 Err(RecvTimeoutError::Timeout) => continue,
730                 Err(RecvTimeoutError::Disconnected) => break,
731             }
732         }
733 
734         assert_eq!(recv_count, stress);
735         for t in ts {
736             t.join().unwrap();
737         }
738     }
739 
740     #[test]
recv_a_lot()741     fn recv_a_lot() {
742         #[cfg(miri)]
743         const N: usize = 50;
744         #[cfg(not(miri))]
745         const N: usize = 10000;
746 
747         // Regression test that we don't run out of stack in scheduler context
748         let (tx, rx) = channel();
749         for _ in 0..N {
750             tx.send(()).unwrap();
751         }
752         for _ in 0..N {
753             rx.recv().unwrap();
754         }
755     }
756 
757     #[test]
shared_recv_timeout()758     fn shared_recv_timeout() {
759         let (tx, rx) = channel();
760         let total = 5;
761         let mut ts = Vec::with_capacity(total);
762         for _ in 0..total {
763             let tx = tx.clone();
764             let t = thread::spawn(move || {
765                 tx.send(()).unwrap();
766             });
767             ts.push(t);
768         }
769 
770         for _ in 0..total {
771             rx.recv().unwrap();
772         }
773 
774         assert_eq!(
775             rx.recv_timeout(Duration::from_millis(1)),
776             Err(RecvTimeoutError::Timeout)
777         );
778         tx.send(()).unwrap();
779         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
780         for t in ts {
781             t.join().unwrap();
782         }
783     }
784 
785     #[test]
shared_chan_stress()786     fn shared_chan_stress() {
787         let (tx, rx) = channel();
788         let total = stress_factor() + 100;
789         let mut ts = Vec::with_capacity(total);
790         for _ in 0..total {
791             let tx = tx.clone();
792             let t = thread::spawn(move || {
793                 tx.send(()).unwrap();
794             });
795             ts.push(t);
796         }
797 
798         for _ in 0..total {
799             rx.recv().unwrap();
800         }
801         for t in ts {
802             t.join().unwrap();
803         }
804     }
805 
806     #[test]
test_nested_recv_iter()807     fn test_nested_recv_iter() {
808         let (tx, rx) = channel::<i32>();
809         let (total_tx, total_rx) = channel::<i32>();
810 
811         let t = thread::spawn(move || {
812             let mut acc = 0;
813             for x in rx.iter() {
814                 acc += x;
815             }
816             total_tx.send(acc).unwrap();
817         });
818 
819         tx.send(3).unwrap();
820         tx.send(1).unwrap();
821         tx.send(2).unwrap();
822         drop(tx);
823         assert_eq!(total_rx.recv().unwrap(), 6);
824         t.join().unwrap();
825     }
826 
827     #[test]
test_recv_iter_break()828     fn test_recv_iter_break() {
829         let (tx, rx) = channel::<i32>();
830         let (count_tx, count_rx) = channel();
831 
832         let t = thread::spawn(move || {
833             let mut count = 0;
834             for x in rx.iter() {
835                 if count >= 3 {
836                     break;
837                 } else {
838                     count += x;
839                 }
840             }
841             count_tx.send(count).unwrap();
842         });
843 
844         tx.send(2).unwrap();
845         tx.send(2).unwrap();
846         tx.send(2).unwrap();
847         let _ = tx.send(2);
848         drop(tx);
849         assert_eq!(count_rx.recv().unwrap(), 4);
850         t.join().unwrap();
851     }
852 
853     #[test]
test_recv_try_iter()854     fn test_recv_try_iter() {
855         let (request_tx, request_rx) = channel();
856         let (response_tx, response_rx) = channel();
857 
858         // Request `x`s until we have `6`.
859         let t = thread::spawn(move || {
860             let mut count = 0;
861             loop {
862                 for x in response_rx.try_iter() {
863                     count += x;
864                     if count == 6 {
865                         return count;
866                     }
867                 }
868                 request_tx.send(()).unwrap();
869             }
870         });
871 
872         for _ in request_rx.iter() {
873             if response_tx.send(2).is_err() {
874                 break;
875             }
876         }
877 
878         assert_eq!(t.join().unwrap(), 6);
879     }
880 
881     #[test]
test_recv_into_iter_owned()882     fn test_recv_into_iter_owned() {
883         let mut iter = {
884             let (tx, rx) = channel::<i32>();
885             tx.send(1).unwrap();
886             tx.send(2).unwrap();
887 
888             rx.into_iter()
889         };
890         assert_eq!(iter.next().unwrap(), 1);
891         assert_eq!(iter.next().unwrap(), 2);
892         assert!(iter.next().is_none());
893     }
894 
895     #[test]
test_recv_into_iter_borrowed()896     fn test_recv_into_iter_borrowed() {
897         let (tx, rx) = channel::<i32>();
898         tx.send(1).unwrap();
899         tx.send(2).unwrap();
900         drop(tx);
901         let mut iter = (&rx).into_iter();
902         assert_eq!(iter.next().unwrap(), 1);
903         assert_eq!(iter.next().unwrap(), 2);
904         assert!(iter.next().is_none());
905     }
906 
907     #[test]
try_recv_states()908     fn try_recv_states() {
909         let (tx1, rx1) = channel::<i32>();
910         let (tx2, rx2) = channel::<()>();
911         let (tx3, rx3) = channel::<()>();
912         let t = thread::spawn(move || {
913             rx2.recv().unwrap();
914             tx1.send(1).unwrap();
915             tx3.send(()).unwrap();
916             rx2.recv().unwrap();
917             drop(tx1);
918             tx3.send(()).unwrap();
919         });
920 
921         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
922         tx2.send(()).unwrap();
923         rx3.recv().unwrap();
924         assert_eq!(rx1.try_recv(), Ok(1));
925         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
926         tx2.send(()).unwrap();
927         rx3.recv().unwrap();
928         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
929         t.join().unwrap();
930     }
931 
932     // This bug used to end up in a livelock inside of the Receiver destructor
933     // because the internal state of the Shared packet was corrupted
934     #[test]
destroy_upgraded_shared_port_when_sender_still_active()935     fn destroy_upgraded_shared_port_when_sender_still_active() {
936         let (tx, rx) = channel();
937         let (tx2, rx2) = channel();
938         let t = thread::spawn(move || {
939             rx.recv().unwrap(); // wait on a oneshot
940             drop(rx); // destroy a shared
941             tx2.send(()).unwrap();
942         });
943         // make sure the other thread has gone to sleep
944         for _ in 0..5000 {
945             thread::yield_now();
946         }
947 
948         // upgrade to a shared chan and send a message
949         let tx2 = tx.clone();
950         drop(tx);
951         tx2.send(()).unwrap();
952 
953         // wait for the child thread to exit before we exit
954         rx2.recv().unwrap();
955         t.join().unwrap();
956     }
957 
958     #[test]
issue_32114()959     fn issue_32114() {
960         let (tx, _) = channel();
961         let _ = tx.send(123);
962         assert_eq!(tx.send(123), Err(SendError(123)));
963     }
964 }
965 
966 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
967 mod sync_channel_tests {
968     use super::*;
969 
970     use std::env;
971     use std::thread;
972     use std::time::Duration;
973 
stress_factor() -> usize974     pub fn stress_factor() -> usize {
975         match env::var("RUST_TEST_STRESS") {
976             Ok(val) => val.parse().unwrap(),
977             Err(..) => 1,
978         }
979     }
980 
981     #[test]
smoke()982     fn smoke() {
983         let (tx, rx) = sync_channel::<i32>(1);
984         tx.send(1).unwrap();
985         assert_eq!(rx.recv().unwrap(), 1);
986     }
987 
988     #[test]
drop_full()989     fn drop_full() {
990         let (tx, _rx) = sync_channel::<Box<isize>>(1);
991         tx.send(Box::new(1)).unwrap();
992     }
993 
994     #[test]
smoke_shared()995     fn smoke_shared() {
996         let (tx, rx) = sync_channel::<i32>(1);
997         tx.send(1).unwrap();
998         assert_eq!(rx.recv().unwrap(), 1);
999         let tx = tx.clone();
1000         tx.send(1).unwrap();
1001         assert_eq!(rx.recv().unwrap(), 1);
1002     }
1003 
1004     #[test]
recv_timeout()1005     fn recv_timeout() {
1006         let (tx, rx) = sync_channel::<i32>(1);
1007         assert_eq!(
1008             rx.recv_timeout(Duration::from_millis(1)),
1009             Err(RecvTimeoutError::Timeout)
1010         );
1011         tx.send(1).unwrap();
1012         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
1013     }
1014 
1015     #[test]
smoke_threads()1016     fn smoke_threads() {
1017         let (tx, rx) = sync_channel::<i32>(0);
1018         let t = thread::spawn(move || {
1019             tx.send(1).unwrap();
1020         });
1021         assert_eq!(rx.recv().unwrap(), 1);
1022         t.join().unwrap();
1023     }
1024 
1025     #[test]
smoke_port_gone()1026     fn smoke_port_gone() {
1027         let (tx, rx) = sync_channel::<i32>(0);
1028         drop(rx);
1029         assert!(tx.send(1).is_err());
1030     }
1031 
1032     #[test]
smoke_shared_port_gone2()1033     fn smoke_shared_port_gone2() {
1034         let (tx, rx) = sync_channel::<i32>(0);
1035         drop(rx);
1036         let tx2 = tx.clone();
1037         drop(tx);
1038         assert!(tx2.send(1).is_err());
1039     }
1040 
1041     #[test]
port_gone_concurrent()1042     fn port_gone_concurrent() {
1043         let (tx, rx) = sync_channel::<i32>(0);
1044         let t = thread::spawn(move || {
1045             rx.recv().unwrap();
1046         });
1047         while tx.send(1).is_ok() {}
1048         t.join().unwrap();
1049     }
1050 
1051     #[test]
port_gone_concurrent_shared()1052     fn port_gone_concurrent_shared() {
1053         let (tx, rx) = sync_channel::<i32>(0);
1054         let tx2 = tx.clone();
1055         let t = thread::spawn(move || {
1056             rx.recv().unwrap();
1057         });
1058         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1059         t.join().unwrap();
1060     }
1061 
1062     #[test]
smoke_chan_gone()1063     fn smoke_chan_gone() {
1064         let (tx, rx) = sync_channel::<i32>(0);
1065         drop(tx);
1066         assert!(rx.recv().is_err());
1067     }
1068 
1069     #[test]
smoke_chan_gone_shared()1070     fn smoke_chan_gone_shared() {
1071         let (tx, rx) = sync_channel::<()>(0);
1072         let tx2 = tx.clone();
1073         drop(tx);
1074         drop(tx2);
1075         assert!(rx.recv().is_err());
1076     }
1077 
1078     #[test]
chan_gone_concurrent()1079     fn chan_gone_concurrent() {
1080         let (tx, rx) = sync_channel::<i32>(0);
1081         let t = thread::spawn(move || {
1082             tx.send(1).unwrap();
1083             tx.send(1).unwrap();
1084         });
1085         while rx.recv().is_ok() {}
1086         t.join().unwrap();
1087     }
1088 
1089     #[test]
stress()1090     fn stress() {
1091         #[cfg(miri)]
1092         const N: usize = 100;
1093         #[cfg(not(miri))]
1094         const N: usize = 10000;
1095 
1096         let (tx, rx) = sync_channel::<i32>(0);
1097         let t = thread::spawn(move || {
1098             for _ in 0..N {
1099                 tx.send(1).unwrap();
1100             }
1101         });
1102         for _ in 0..N {
1103             assert_eq!(rx.recv().unwrap(), 1);
1104         }
1105         t.join().unwrap();
1106     }
1107 
1108     #[test]
stress_recv_timeout_two_threads()1109     fn stress_recv_timeout_two_threads() {
1110         #[cfg(miri)]
1111         const N: usize = 100;
1112         #[cfg(not(miri))]
1113         const N: usize = 10000;
1114 
1115         let (tx, rx) = sync_channel::<i32>(0);
1116 
1117         let t = thread::spawn(move || {
1118             for _ in 0..N {
1119                 tx.send(1).unwrap();
1120             }
1121         });
1122 
1123         let mut recv_count = 0;
1124         loop {
1125             match rx.recv_timeout(Duration::from_millis(1)) {
1126                 Ok(v) => {
1127                     assert_eq!(v, 1);
1128                     recv_count += 1;
1129                 }
1130                 Err(RecvTimeoutError::Timeout) => continue,
1131                 Err(RecvTimeoutError::Disconnected) => break,
1132             }
1133         }
1134 
1135         assert_eq!(recv_count, N);
1136         t.join().unwrap();
1137     }
1138 
1139     #[test]
stress_recv_timeout_shared()1140     fn stress_recv_timeout_shared() {
1141         #[cfg(miri)]
1142         const AMT: u32 = 100;
1143         #[cfg(not(miri))]
1144         const AMT: u32 = 1000;
1145         const NTHREADS: u32 = 8;
1146         let (tx, rx) = sync_channel::<i32>(0);
1147         let (dtx, drx) = sync_channel::<()>(0);
1148 
1149         let t = thread::spawn(move || {
1150             let mut recv_count = 0;
1151             loop {
1152                 match rx.recv_timeout(Duration::from_millis(10)) {
1153                     Ok(v) => {
1154                         assert_eq!(v, 1);
1155                         recv_count += 1;
1156                     }
1157                     Err(RecvTimeoutError::Timeout) => continue,
1158                     Err(RecvTimeoutError::Disconnected) => break,
1159                 }
1160             }
1161 
1162             assert_eq!(recv_count, AMT * NTHREADS);
1163             assert!(rx.try_recv().is_err());
1164 
1165             dtx.send(()).unwrap();
1166         });
1167 
1168         let mut ts = Vec::with_capacity(NTHREADS as usize);
1169         for _ in 0..NTHREADS {
1170             let tx = tx.clone();
1171             let t = thread::spawn(move || {
1172                 for _ in 0..AMT {
1173                     tx.send(1).unwrap();
1174                 }
1175             });
1176             ts.push(t);
1177         }
1178 
1179         drop(tx);
1180 
1181         drx.recv().unwrap();
1182         for t in ts {
1183             t.join().unwrap();
1184         }
1185         t.join().unwrap();
1186     }
1187 
1188     #[test]
stress_shared()1189     fn stress_shared() {
1190         #[cfg(miri)]
1191         const AMT: u32 = 100;
1192         #[cfg(not(miri))]
1193         const AMT: u32 = 1000;
1194         const NTHREADS: u32 = 8;
1195         let (tx, rx) = sync_channel::<i32>(0);
1196         let (dtx, drx) = sync_channel::<()>(0);
1197 
1198         let t = thread::spawn(move || {
1199             for _ in 0..AMT * NTHREADS {
1200                 assert_eq!(rx.recv().unwrap(), 1);
1201             }
1202             assert!(rx.try_recv().is_err());
1203             dtx.send(()).unwrap();
1204         });
1205 
1206         let mut ts = Vec::with_capacity(NTHREADS as usize);
1207         for _ in 0..NTHREADS {
1208             let tx = tx.clone();
1209             let t = thread::spawn(move || {
1210                 for _ in 0..AMT {
1211                     tx.send(1).unwrap();
1212                 }
1213             });
1214             ts.push(t);
1215         }
1216         drop(tx);
1217         drx.recv().unwrap();
1218         for t in ts {
1219             t.join().unwrap();
1220         }
1221         t.join().unwrap();
1222     }
1223 
1224     #[test]
oneshot_single_thread_close_port_first()1225     fn oneshot_single_thread_close_port_first() {
1226         // Simple test of closing without sending
1227         let (_tx, rx) = sync_channel::<i32>(0);
1228         drop(rx);
1229     }
1230 
1231     #[test]
oneshot_single_thread_close_chan_first()1232     fn oneshot_single_thread_close_chan_first() {
1233         // Simple test of closing without sending
1234         let (tx, _rx) = sync_channel::<i32>(0);
1235         drop(tx);
1236     }
1237 
1238     #[test]
oneshot_single_thread_send_port_close()1239     fn oneshot_single_thread_send_port_close() {
1240         // Testing that the sender cleans up the payload if receiver is closed
1241         let (tx, rx) = sync_channel::<Box<i32>>(0);
1242         drop(rx);
1243         assert!(tx.send(Box::new(0)).is_err());
1244     }
1245 
1246     #[test]
oneshot_single_thread_recv_chan_close()1247     fn oneshot_single_thread_recv_chan_close() {
1248         let (tx, rx) = sync_channel::<i32>(0);
1249         drop(tx);
1250         assert_eq!(rx.recv(), Err(RecvError));
1251     }
1252 
1253     #[test]
oneshot_single_thread_send_then_recv()1254     fn oneshot_single_thread_send_then_recv() {
1255         let (tx, rx) = sync_channel::<Box<i32>>(1);
1256         tx.send(Box::new(10)).unwrap();
1257         assert!(*rx.recv().unwrap() == 10);
1258     }
1259 
1260     #[test]
oneshot_single_thread_try_send_open()1261     fn oneshot_single_thread_try_send_open() {
1262         let (tx, rx) = sync_channel::<i32>(1);
1263         assert_eq!(tx.try_send(10), Ok(()));
1264         assert!(rx.recv().unwrap() == 10);
1265     }
1266 
1267     #[test]
oneshot_single_thread_try_send_closed()1268     fn oneshot_single_thread_try_send_closed() {
1269         let (tx, rx) = sync_channel::<i32>(0);
1270         drop(rx);
1271         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1272     }
1273 
1274     #[test]
oneshot_single_thread_try_send_closed2()1275     fn oneshot_single_thread_try_send_closed2() {
1276         let (tx, _rx) = sync_channel::<i32>(0);
1277         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1278     }
1279 
1280     #[test]
oneshot_single_thread_try_recv_open()1281     fn oneshot_single_thread_try_recv_open() {
1282         let (tx, rx) = sync_channel::<i32>(1);
1283         tx.send(10).unwrap();
1284         assert!(rx.recv() == Ok(10));
1285     }
1286 
1287     #[test]
oneshot_single_thread_try_recv_closed()1288     fn oneshot_single_thread_try_recv_closed() {
1289         let (tx, rx) = sync_channel::<i32>(0);
1290         drop(tx);
1291         assert!(rx.recv().is_err());
1292     }
1293 
1294     #[test]
oneshot_single_thread_try_recv_closed_with_data()1295     fn oneshot_single_thread_try_recv_closed_with_data() {
1296         let (tx, rx) = sync_channel::<i32>(1);
1297         tx.send(10).unwrap();
1298         drop(tx);
1299         assert_eq!(rx.try_recv(), Ok(10));
1300         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1301     }
1302 
1303     #[test]
oneshot_single_thread_peek_data()1304     fn oneshot_single_thread_peek_data() {
1305         let (tx, rx) = sync_channel::<i32>(1);
1306         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1307         tx.send(10).unwrap();
1308         assert_eq!(rx.try_recv(), Ok(10));
1309     }
1310 
1311     #[test]
oneshot_single_thread_peek_close()1312     fn oneshot_single_thread_peek_close() {
1313         let (tx, rx) = sync_channel::<i32>(0);
1314         drop(tx);
1315         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1316         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1317     }
1318 
1319     #[test]
oneshot_single_thread_peek_open()1320     fn oneshot_single_thread_peek_open() {
1321         let (_tx, rx) = sync_channel::<i32>(0);
1322         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1323     }
1324 
1325     #[test]
oneshot_multi_task_recv_then_send()1326     fn oneshot_multi_task_recv_then_send() {
1327         let (tx, rx) = sync_channel::<Box<i32>>(0);
1328         let t = thread::spawn(move || {
1329             assert!(*rx.recv().unwrap() == 10);
1330         });
1331 
1332         tx.send(Box::new(10)).unwrap();
1333         t.join().unwrap();
1334     }
1335 
1336     #[test]
oneshot_multi_task_recv_then_close()1337     fn oneshot_multi_task_recv_then_close() {
1338         let (tx, rx) = sync_channel::<Box<i32>>(0);
1339         let t = thread::spawn(move || {
1340             drop(tx);
1341         });
1342         thread::spawn(move || {
1343             assert_eq!(rx.recv(), Err(RecvError));
1344         })
1345         .join()
1346         .unwrap();
1347         t.join().unwrap();
1348     }
1349 
1350     #[test]
oneshot_multi_thread_close_stress()1351     fn oneshot_multi_thread_close_stress() {
1352         let stress_factor = stress_factor();
1353         let mut ts = Vec::with_capacity(stress_factor);
1354         for _ in 0..stress_factor {
1355             let (tx, rx) = sync_channel::<i32>(0);
1356             let t = thread::spawn(move || {
1357                 drop(rx);
1358             });
1359             ts.push(t);
1360             drop(tx);
1361         }
1362         for t in ts {
1363             t.join().unwrap();
1364         }
1365     }
1366 
1367     #[test]
oneshot_multi_thread_send_close_stress()1368     fn oneshot_multi_thread_send_close_stress() {
1369         let stress_factor = stress_factor();
1370         let mut ts = Vec::with_capacity(stress_factor);
1371         for _ in 0..stress_factor {
1372             let (tx, rx) = sync_channel::<i32>(0);
1373             let t = thread::spawn(move || {
1374                 drop(rx);
1375             });
1376             ts.push(t);
1377             thread::spawn(move || {
1378                 let _ = tx.send(1);
1379             })
1380             .join()
1381             .unwrap();
1382         }
1383         for t in ts {
1384             t.join().unwrap();
1385         }
1386     }
1387 
1388     #[test]
oneshot_multi_thread_recv_close_stress()1389     fn oneshot_multi_thread_recv_close_stress() {
1390         let stress_factor = stress_factor();
1391         let mut ts = Vec::with_capacity(2 * stress_factor);
1392         for _ in 0..stress_factor {
1393             let (tx, rx) = sync_channel::<i32>(0);
1394             let t = thread::spawn(move || {
1395                 thread::spawn(move || {
1396                     assert_eq!(rx.recv(), Err(RecvError));
1397                 })
1398                 .join()
1399                 .unwrap();
1400             });
1401             ts.push(t);
1402             let t2 = thread::spawn(move || {
1403                 thread::spawn(move || {
1404                     drop(tx);
1405                 });
1406             });
1407             ts.push(t2);
1408         }
1409         for t in ts {
1410             t.join().unwrap();
1411         }
1412     }
1413 
1414     #[test]
oneshot_multi_thread_send_recv_stress()1415     fn oneshot_multi_thread_send_recv_stress() {
1416         let stress_factor = stress_factor();
1417         let mut ts = Vec::with_capacity(stress_factor);
1418         for _ in 0..stress_factor {
1419             let (tx, rx) = sync_channel::<Box<i32>>(0);
1420             let t = thread::spawn(move || {
1421                 tx.send(Box::new(10)).unwrap();
1422             });
1423             ts.push(t);
1424             assert!(*rx.recv().unwrap() == 10);
1425         }
1426         for t in ts {
1427             t.join().unwrap();
1428         }
1429     }
1430 
1431     #[test]
stream_send_recv_stress()1432     fn stream_send_recv_stress() {
1433         let stress_factor = stress_factor();
1434         let mut ts = Vec::with_capacity(2 * stress_factor);
1435         for _ in 0..stress_factor {
1436             let (tx, rx) = sync_channel::<Box<i32>>(0);
1437 
1438             if let Some(t) = send(tx, 0) {
1439                 ts.push(t);
1440             }
1441             if let Some(t) = recv(rx, 0) {
1442                 ts.push(t);
1443             }
1444 
1445             fn send(tx: SyncSender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1446                 if i == 10 {
1447                     return None;
1448                 }
1449 
1450                 Some(thread::spawn(move || {
1451                     tx.send(Box::new(i)).unwrap();
1452                     send(tx, i + 1);
1453                 }))
1454             }
1455 
1456             fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1457                 if i == 10 {
1458                     return None;
1459                 }
1460 
1461                 Some(thread::spawn(move || {
1462                     assert!(*rx.recv().unwrap() == i);
1463                     recv(rx, i + 1);
1464                 }))
1465             }
1466         }
1467         for t in ts {
1468             t.join().unwrap();
1469         }
1470     }
1471 
1472     #[test]
recv_a_lot()1473     fn recv_a_lot() {
1474         #[cfg(miri)]
1475         const N: usize = 100;
1476         #[cfg(not(miri))]
1477         const N: usize = 10000;
1478 
1479         // Regression test that we don't run out of stack in scheduler context
1480         let (tx, rx) = sync_channel(N);
1481         for _ in 0..N {
1482             tx.send(()).unwrap();
1483         }
1484         for _ in 0..N {
1485             rx.recv().unwrap();
1486         }
1487     }
1488 
1489     #[test]
shared_chan_stress()1490     fn shared_chan_stress() {
1491         let (tx, rx) = sync_channel(0);
1492         let total = stress_factor() + 100;
1493         let mut ts = Vec::with_capacity(total);
1494         for _ in 0..total {
1495             let tx = tx.clone();
1496             let t = thread::spawn(move || {
1497                 tx.send(()).unwrap();
1498             });
1499             ts.push(t);
1500         }
1501 
1502         for _ in 0..total {
1503             rx.recv().unwrap();
1504         }
1505         for t in ts {
1506             t.join().unwrap();
1507         }
1508     }
1509 
1510     #[test]
test_nested_recv_iter()1511     fn test_nested_recv_iter() {
1512         let (tx, rx) = sync_channel::<i32>(0);
1513         let (total_tx, total_rx) = sync_channel::<i32>(0);
1514 
1515         let t = thread::spawn(move || {
1516             let mut acc = 0;
1517             for x in rx.iter() {
1518                 acc += x;
1519             }
1520             total_tx.send(acc).unwrap();
1521         });
1522 
1523         tx.send(3).unwrap();
1524         tx.send(1).unwrap();
1525         tx.send(2).unwrap();
1526         drop(tx);
1527         assert_eq!(total_rx.recv().unwrap(), 6);
1528         t.join().unwrap();
1529     }
1530 
1531     #[test]
test_recv_iter_break()1532     fn test_recv_iter_break() {
1533         let (tx, rx) = sync_channel::<i32>(0);
1534         let (count_tx, count_rx) = sync_channel(0);
1535 
1536         let t = thread::spawn(move || {
1537             let mut count = 0;
1538             for x in rx.iter() {
1539                 if count >= 3 {
1540                     break;
1541                 } else {
1542                     count += x;
1543                 }
1544             }
1545             count_tx.send(count).unwrap();
1546         });
1547 
1548         tx.send(2).unwrap();
1549         tx.send(2).unwrap();
1550         tx.send(2).unwrap();
1551         let _ = tx.try_send(2);
1552         drop(tx);
1553         assert_eq!(count_rx.recv().unwrap(), 4);
1554         t.join().unwrap();
1555     }
1556 
1557     #[test]
try_recv_states()1558     fn try_recv_states() {
1559         let (tx1, rx1) = sync_channel::<i32>(1);
1560         let (tx2, rx2) = sync_channel::<()>(1);
1561         let (tx3, rx3) = sync_channel::<()>(1);
1562         let t = thread::spawn(move || {
1563             rx2.recv().unwrap();
1564             tx1.send(1).unwrap();
1565             tx3.send(()).unwrap();
1566             rx2.recv().unwrap();
1567             drop(tx1);
1568             tx3.send(()).unwrap();
1569         });
1570 
1571         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1572         tx2.send(()).unwrap();
1573         rx3.recv().unwrap();
1574         assert_eq!(rx1.try_recv(), Ok(1));
1575         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1576         tx2.send(()).unwrap();
1577         rx3.recv().unwrap();
1578         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1579         t.join().unwrap();
1580     }
1581 
1582     // This bug used to end up in a livelock inside of the Receiver destructor
1583     // because the internal state of the Shared packet was corrupted
1584     #[test]
destroy_upgraded_shared_port_when_sender_still_active()1585     fn destroy_upgraded_shared_port_when_sender_still_active() {
1586         let (tx, rx) = sync_channel::<()>(0);
1587         let (tx2, rx2) = sync_channel::<()>(0);
1588         let t = thread::spawn(move || {
1589             rx.recv().unwrap(); // wait on a oneshot
1590             drop(rx); // destroy a shared
1591             tx2.send(()).unwrap();
1592         });
1593         // make sure the other thread has gone to sleep
1594         for _ in 0..5000 {
1595             thread::yield_now();
1596         }
1597 
1598         // upgrade to a shared chan and send a message
1599         let tx2 = tx.clone();
1600         drop(tx);
1601         tx2.send(()).unwrap();
1602 
1603         // wait for the child thread to exit before we exit
1604         rx2.recv().unwrap();
1605         t.join().unwrap();
1606     }
1607 
1608     #[test]
send1()1609     fn send1() {
1610         let (tx, rx) = sync_channel::<i32>(0);
1611         let t = thread::spawn(move || {
1612             rx.recv().unwrap();
1613         });
1614         assert_eq!(tx.send(1), Ok(()));
1615         t.join().unwrap();
1616     }
1617 
1618     #[test]
send2()1619     fn send2() {
1620         let (tx, rx) = sync_channel::<i32>(0);
1621         let t = thread::spawn(move || {
1622             drop(rx);
1623         });
1624         assert!(tx.send(1).is_err());
1625         t.join().unwrap();
1626     }
1627 
1628     #[test]
send3()1629     fn send3() {
1630         let (tx, rx) = sync_channel::<i32>(1);
1631         assert_eq!(tx.send(1), Ok(()));
1632         let t = thread::spawn(move || {
1633             drop(rx);
1634         });
1635         assert!(tx.send(1).is_err());
1636         t.join().unwrap();
1637     }
1638 
1639     #[test]
send4()1640     fn send4() {
1641         let (tx, rx) = sync_channel::<i32>(0);
1642         let tx2 = tx.clone();
1643         let (done, donerx) = channel();
1644         let done2 = done.clone();
1645         let t = thread::spawn(move || {
1646             assert!(tx.send(1).is_err());
1647             done.send(()).unwrap();
1648         });
1649         let t2 = thread::spawn(move || {
1650             assert!(tx2.send(2).is_err());
1651             done2.send(()).unwrap();
1652         });
1653         drop(rx);
1654         donerx.recv().unwrap();
1655         donerx.recv().unwrap();
1656         t.join().unwrap();
1657         t2.join().unwrap();
1658     }
1659 
1660     #[test]
try_send1()1661     fn try_send1() {
1662         let (tx, _rx) = sync_channel::<i32>(0);
1663         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1664     }
1665 
1666     #[test]
try_send2()1667     fn try_send2() {
1668         let (tx, _rx) = sync_channel::<i32>(1);
1669         assert_eq!(tx.try_send(1), Ok(()));
1670         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1671     }
1672 
1673     #[test]
try_send3()1674     fn try_send3() {
1675         let (tx, rx) = sync_channel::<i32>(1);
1676         assert_eq!(tx.try_send(1), Ok(()));
1677         drop(rx);
1678         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
1679     }
1680 
1681     #[test]
issue_15761()1682     fn issue_15761() {
1683         fn repro() {
1684             let (tx1, rx1) = sync_channel::<()>(3);
1685             let (tx2, rx2) = sync_channel::<()>(3);
1686 
1687             let _t = thread::spawn(move || {
1688                 rx1.recv().unwrap();
1689                 tx2.try_send(()).unwrap();
1690             });
1691 
1692             tx1.try_send(()).unwrap();
1693             rx2.recv().unwrap();
1694         }
1695 
1696         for _ in 0..100 {
1697             repro()
1698         }
1699     }
1700 }
1701 
1702 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/select.rs
1703 mod select_tests {
1704     use super::*;
1705 
1706     use std::thread;
1707 
1708     #[test]
smoke()1709     fn smoke() {
1710         let (tx1, rx1) = channel::<i32>();
1711         let (tx2, rx2) = channel::<i32>();
1712         tx1.send(1).unwrap();
1713         select! {
1714             foo = rx1.recv() => assert_eq!(foo.unwrap(), 1),
1715             _bar = rx2.recv() => panic!()
1716         }
1717         tx2.send(2).unwrap();
1718         select! {
1719             _foo = rx1.recv() => panic!(),
1720             bar = rx2.recv() => assert_eq!(bar.unwrap(), 2)
1721         }
1722         drop(tx1);
1723         select! {
1724             foo = rx1.recv() => assert!(foo.is_err()),
1725             _bar = rx2.recv() => panic!()
1726         }
1727         drop(tx2);
1728         select! {
1729             bar = rx2.recv() => assert!(bar.is_err())
1730         }
1731     }
1732 
1733     #[test]
smoke2()1734     fn smoke2() {
1735         let (_tx1, rx1) = channel::<i32>();
1736         let (_tx2, rx2) = channel::<i32>();
1737         let (_tx3, rx3) = channel::<i32>();
1738         let (_tx4, rx4) = channel::<i32>();
1739         let (tx5, rx5) = channel::<i32>();
1740         tx5.send(4).unwrap();
1741         select! {
1742             _foo = rx1.recv() => panic!("1"),
1743             _foo = rx2.recv() => panic!("2"),
1744             _foo = rx3.recv() => panic!("3"),
1745             _foo = rx4.recv() => panic!("4"),
1746             foo = rx5.recv() => assert_eq!(foo.unwrap(), 4)
1747         }
1748     }
1749 
1750     #[test]
closed()1751     fn closed() {
1752         let (_tx1, rx1) = channel::<i32>();
1753         let (tx2, rx2) = channel::<i32>();
1754         drop(tx2);
1755 
1756         select! {
1757             _a1 = rx1.recv() => panic!(),
1758             a2 = rx2.recv() => assert!(a2.is_err())
1759         }
1760     }
1761 
1762     #[test]
unblocks()1763     fn unblocks() {
1764         let (tx1, rx1) = channel::<i32>();
1765         let (_tx2, rx2) = channel::<i32>();
1766         let (tx3, rx3) = channel::<i32>();
1767 
1768         let t = thread::spawn(move || {
1769             for _ in 0..20 {
1770                 thread::yield_now();
1771             }
1772             tx1.send(1).unwrap();
1773             rx3.recv().unwrap();
1774             for _ in 0..20 {
1775                 thread::yield_now();
1776             }
1777         });
1778 
1779         select! {
1780             a = rx1.recv() => assert_eq!(a.unwrap(), 1),
1781             _b = rx2.recv() => panic!()
1782         }
1783         tx3.send(1).unwrap();
1784         select! {
1785             a = rx1.recv() => assert!(a.is_err()),
1786             _b = rx2.recv() => panic!()
1787         }
1788         t.join().unwrap();
1789     }
1790 
1791     #[test]
both_ready()1792     fn both_ready() {
1793         let (tx1, rx1) = channel::<i32>();
1794         let (tx2, rx2) = channel::<i32>();
1795         let (tx3, rx3) = channel::<()>();
1796 
1797         let t = thread::spawn(move || {
1798             for _ in 0..20 {
1799                 thread::yield_now();
1800             }
1801             tx1.send(1).unwrap();
1802             tx2.send(2).unwrap();
1803             rx3.recv().unwrap();
1804         });
1805 
1806         select! {
1807             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1808             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1809         }
1810         select! {
1811             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1812             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1813         }
1814         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1815         assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
1816         tx3.send(()).unwrap();
1817         t.join().unwrap();
1818     }
1819 
1820     #[test]
stress()1821     fn stress() {
1822         #[cfg(miri)]
1823         const AMT: i32 = 100;
1824         #[cfg(not(miri))]
1825         const AMT: i32 = 10000;
1826 
1827         let (tx1, rx1) = channel::<i32>();
1828         let (tx2, rx2) = channel::<i32>();
1829         let (tx3, rx3) = channel::<()>();
1830 
1831         let t = thread::spawn(move || {
1832             for i in 0..AMT {
1833                 if i % 2 == 0 {
1834                     tx1.send(i).unwrap();
1835                 } else {
1836                     tx2.send(i).unwrap();
1837                 }
1838                 rx3.recv().unwrap();
1839             }
1840         });
1841 
1842         for i in 0..AMT {
1843             select! {
1844                 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
1845                 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
1846             }
1847             tx3.send(()).unwrap();
1848         }
1849         t.join().unwrap();
1850     }
1851 
1852     #[allow(unused_must_use)]
1853     #[test]
cloning()1854     fn cloning() {
1855         let (tx1, rx1) = channel::<i32>();
1856         let (_tx2, rx2) = channel::<i32>();
1857         let (tx3, rx3) = channel::<()>();
1858 
1859         let t = thread::spawn(move || {
1860             rx3.recv().unwrap();
1861             tx1.clone();
1862             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1863             tx1.send(2).unwrap();
1864             rx3.recv().unwrap();
1865         });
1866 
1867         tx3.send(()).unwrap();
1868         select! {
1869             _i1 = rx1.recv() => {},
1870             _i2 = rx2.recv() => panic!()
1871         }
1872         tx3.send(()).unwrap();
1873         t.join().unwrap();
1874     }
1875 
1876     #[allow(unused_must_use)]
1877     #[test]
cloning2()1878     fn cloning2() {
1879         let (tx1, rx1) = channel::<i32>();
1880         let (_tx2, rx2) = channel::<i32>();
1881         let (tx3, rx3) = channel::<()>();
1882 
1883         let t = thread::spawn(move || {
1884             rx3.recv().unwrap();
1885             tx1.clone();
1886             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1887             tx1.send(2).unwrap();
1888             rx3.recv().unwrap();
1889         });
1890 
1891         tx3.send(()).unwrap();
1892         select! {
1893             _i1 = rx1.recv() => {},
1894             _i2 = rx2.recv() => panic!()
1895         }
1896         tx3.send(()).unwrap();
1897         t.join().unwrap();
1898     }
1899 
1900     #[test]
cloning3()1901     fn cloning3() {
1902         let (tx1, rx1) = channel::<()>();
1903         let (tx2, rx2) = channel::<()>();
1904         let (tx3, rx3) = channel::<()>();
1905         let t = thread::spawn(move || {
1906             select! {
1907                 _ = rx1.recv() => panic!(),
1908                 _ = rx2.recv() => {}
1909             }
1910             tx3.send(()).unwrap();
1911         });
1912 
1913         for _ in 0..1000 {
1914             thread::yield_now();
1915         }
1916         drop(tx1.clone());
1917         tx2.send(()).unwrap();
1918         rx3.recv().unwrap();
1919         t.join().unwrap();
1920     }
1921 
1922     #[test]
preflight1()1923     fn preflight1() {
1924         let (tx, rx) = channel();
1925         tx.send(()).unwrap();
1926         select! {
1927             _n = rx.recv() => {}
1928         }
1929     }
1930 
1931     #[test]
preflight2()1932     fn preflight2() {
1933         let (tx, rx) = channel();
1934         tx.send(()).unwrap();
1935         tx.send(()).unwrap();
1936         select! {
1937             _n = rx.recv() => {}
1938         }
1939     }
1940 
1941     #[test]
preflight3()1942     fn preflight3() {
1943         let (tx, rx) = channel();
1944         drop(tx.clone());
1945         tx.send(()).unwrap();
1946         select! {
1947             _n = rx.recv() => {}
1948         }
1949     }
1950 
1951     #[test]
preflight4()1952     fn preflight4() {
1953         let (tx, rx) = channel();
1954         tx.send(()).unwrap();
1955         select! {
1956             _ = rx.recv() => {}
1957         }
1958     }
1959 
1960     #[test]
preflight5()1961     fn preflight5() {
1962         let (tx, rx) = channel();
1963         tx.send(()).unwrap();
1964         tx.send(()).unwrap();
1965         select! {
1966             _ = rx.recv() => {}
1967         }
1968     }
1969 
1970     #[test]
preflight6()1971     fn preflight6() {
1972         let (tx, rx) = channel();
1973         drop(tx.clone());
1974         tx.send(()).unwrap();
1975         select! {
1976             _ = rx.recv() => {}
1977         }
1978     }
1979 
1980     #[test]
preflight7()1981     fn preflight7() {
1982         let (tx, rx) = channel::<()>();
1983         drop(tx);
1984         select! {
1985             _ = rx.recv() => {}
1986         }
1987     }
1988 
1989     #[test]
preflight8()1990     fn preflight8() {
1991         let (tx, rx) = channel();
1992         tx.send(()).unwrap();
1993         drop(tx);
1994         rx.recv().unwrap();
1995         select! {
1996             _ = rx.recv() => {}
1997         }
1998     }
1999 
2000     #[test]
preflight9()2001     fn preflight9() {
2002         let (tx, rx) = channel();
2003         drop(tx.clone());
2004         tx.send(()).unwrap();
2005         drop(tx);
2006         rx.recv().unwrap();
2007         select! {
2008             _ = rx.recv() => {}
2009         }
2010     }
2011 
2012     #[test]
oneshot_data_waiting()2013     fn oneshot_data_waiting() {
2014         let (tx1, rx1) = channel();
2015         let (tx2, rx2) = channel();
2016         let t = thread::spawn(move || {
2017             select! {
2018                 _n = rx1.recv() => {}
2019             }
2020             tx2.send(()).unwrap();
2021         });
2022 
2023         for _ in 0..100 {
2024             thread::yield_now()
2025         }
2026         tx1.send(()).unwrap();
2027         rx2.recv().unwrap();
2028         t.join().unwrap();
2029     }
2030 
2031     #[test]
stream_data_waiting()2032     fn stream_data_waiting() {
2033         let (tx1, rx1) = channel();
2034         let (tx2, rx2) = channel();
2035         tx1.send(()).unwrap();
2036         tx1.send(()).unwrap();
2037         rx1.recv().unwrap();
2038         rx1.recv().unwrap();
2039         let t = thread::spawn(move || {
2040             select! {
2041                 _n = rx1.recv() => {}
2042             }
2043             tx2.send(()).unwrap();
2044         });
2045 
2046         for _ in 0..100 {
2047             thread::yield_now()
2048         }
2049         tx1.send(()).unwrap();
2050         rx2.recv().unwrap();
2051         t.join().unwrap();
2052     }
2053 
2054     #[test]
shared_data_waiting()2055     fn shared_data_waiting() {
2056         let (tx1, rx1) = channel();
2057         let (tx2, rx2) = channel();
2058         drop(tx1.clone());
2059         tx1.send(()).unwrap();
2060         rx1.recv().unwrap();
2061         let t = thread::spawn(move || {
2062             select! {
2063                 _n = rx1.recv() => {}
2064             }
2065             tx2.send(()).unwrap();
2066         });
2067 
2068         for _ in 0..100 {
2069             thread::yield_now()
2070         }
2071         tx1.send(()).unwrap();
2072         rx2.recv().unwrap();
2073         t.join().unwrap();
2074     }
2075 
2076     #[test]
sync1()2077     fn sync1() {
2078         let (tx, rx) = sync_channel::<i32>(1);
2079         tx.send(1).unwrap();
2080         select! {
2081             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2082         }
2083     }
2084 
2085     #[test]
sync2()2086     fn sync2() {
2087         let (tx, rx) = sync_channel::<i32>(0);
2088         let t = thread::spawn(move || {
2089             for _ in 0..100 {
2090                 thread::yield_now()
2091             }
2092             tx.send(1).unwrap();
2093         });
2094         select! {
2095             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2096         }
2097         t.join().unwrap();
2098     }
2099 
2100     #[test]
sync3()2101     fn sync3() {
2102         let (tx1, rx1) = sync_channel::<i32>(0);
2103         let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
2104         let t = thread::spawn(move || {
2105             tx1.send(1).unwrap();
2106         });
2107         let t2 = thread::spawn(move || {
2108             tx2.send(2).unwrap();
2109         });
2110         select! {
2111             n = rx1.recv() => {
2112                 let n = n.unwrap();
2113                 assert_eq!(n, 1);
2114                 assert_eq!(rx2.recv().unwrap(), 2);
2115             },
2116             n = rx2.recv() => {
2117                 let n = n.unwrap();
2118                 assert_eq!(n, 2);
2119                 assert_eq!(rx1.recv().unwrap(), 1);
2120             }
2121         }
2122         t.join().unwrap();
2123         t2.join().unwrap();
2124     }
2125 }
2126