//! Tests copied from `std::sync::mpsc`. //! //! This is a copy of tests for the `std::sync::mpsc` channels from the standard library, but //! modified to work with `crossbeam-channel` instead. //! //! Minor tweaks were needed to make the tests compile: //! //! - Replace `box` syntax with `Box::new`. //! - Replace all uses of `Select` with `select!`. //! - Change the imports. //! - Join all spawned threads. //! - Removed assertion from oneshot_multi_thread_send_close_stress tests. //! //! Source: //! - https://github.com/rust-lang/rust/tree/master/src/libstd/sync/mpsc //! //! Copyright & License: //! - Copyright 2013-2014 The Rust Project Developers //! - Apache License, Version 2.0 or MIT license, at your option //! - https://github.com/rust-lang/rust/blob/master/COPYRIGHT //! - https://www.rust-lang.org/en-US/legal.html use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError}; use std::sync::mpsc::{SendError, TrySendError}; use std::thread::JoinHandle; use std::time::Duration; use crossbeam_channel as cc; pub struct Sender { pub inner: cc::Sender, } impl Sender { pub fn send(&self, t: T) -> Result<(), SendError> { self.inner.send(t).map_err(|cc::SendError(m)| SendError(m)) } } impl Clone for Sender { fn clone(&self) -> Sender { Sender { inner: self.inner.clone(), } } } pub struct SyncSender { pub inner: cc::Sender, } impl SyncSender { pub fn send(&self, t: T) -> Result<(), SendError> { self.inner.send(t).map_err(|cc::SendError(m)| SendError(m)) } pub fn try_send(&self, t: T) -> Result<(), TrySendError> { self.inner.try_send(t).map_err(|err| match err { cc::TrySendError::Full(m) => TrySendError::Full(m), cc::TrySendError::Disconnected(m) => TrySendError::Disconnected(m), }) } } impl Clone for SyncSender { fn clone(&self) -> SyncSender { SyncSender { inner: self.inner.clone(), } } } pub struct Receiver { pub inner: cc::Receiver, } impl Receiver { pub fn try_recv(&self) -> Result { self.inner.try_recv().map_err(|err| match err { cc::TryRecvError::Empty => TryRecvError::Empty, cc::TryRecvError::Disconnected => TryRecvError::Disconnected, }) } pub fn recv(&self) -> Result { self.inner.recv().map_err(|_| RecvError) } pub fn recv_timeout(&self, timeout: Duration) -> Result { self.inner.recv_timeout(timeout).map_err(|err| match err { cc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout, cc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected, }) } pub fn iter(&self) -> Iter { Iter { inner: self } } pub fn try_iter(&self) -> TryIter { TryIter { inner: self } } } impl<'a, T> IntoIterator for &'a Receiver { type Item = T; type IntoIter = Iter<'a, T>; fn into_iter(self) -> Iter<'a, T> { self.iter() } } impl IntoIterator for Receiver { type Item = T; type IntoIter = IntoIter; fn into_iter(self) -> IntoIter { IntoIter { inner: self } } } pub struct TryIter<'a, T: 'a> { inner: &'a Receiver, } impl<'a, T> Iterator for TryIter<'a, T> { type Item = T; fn next(&mut self) -> Option { self.inner.try_recv().ok() } } pub struct Iter<'a, T: 'a> { inner: &'a Receiver, } impl<'a, T> Iterator for Iter<'a, T> { type Item = T; fn next(&mut self) -> Option { self.inner.recv().ok() } } pub struct IntoIter { inner: Receiver, } impl Iterator for IntoIter { type Item = T; fn next(&mut self) -> Option { self.inner.recv().ok() } } pub fn channel() -> (Sender, Receiver) { let (s, r) = cc::unbounded(); let s = Sender { inner: s }; let r = Receiver { inner: r }; (s, r) } pub fn sync_channel(bound: usize) -> (SyncSender, Receiver) { let (s, r) = cc::bounded(bound); let s = SyncSender { inner: s }; let r = Receiver { inner: r }; (s, r) } macro_rules! select { ( $($name:pat = $rx:ident.$meth:ident() => $code:expr),+ ) => ({ cc::crossbeam_channel_internal! { $( recv(($rx).inner) -> res => { let $name = res.map_err(|_| ::std::sync::mpsc::RecvError); $code } )+ } }) } // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs mod channel_tests { use super::*; use std::env; use std::thread; use std::time::{Duration, Instant}; pub fn stress_factor() -> usize { match env::var("RUST_TEST_STRESS") { Ok(val) => val.parse().unwrap(), Err(..) => 1, } } #[test] fn smoke() { let (tx, rx) = channel::(); tx.send(1).unwrap(); assert_eq!(rx.recv().unwrap(), 1); } #[test] fn drop_full() { let (tx, _rx) = channel::>(); tx.send(Box::new(1)).unwrap(); } #[test] fn drop_full_shared() { let (tx, _rx) = channel::>(); drop(tx.clone()); drop(tx.clone()); tx.send(Box::new(1)).unwrap(); } #[test] fn smoke_shared() { let (tx, rx) = channel::(); tx.send(1).unwrap(); assert_eq!(rx.recv().unwrap(), 1); let tx = tx.clone(); tx.send(1).unwrap(); assert_eq!(rx.recv().unwrap(), 1); } #[test] fn smoke_threads() { let (tx, rx) = channel::(); let t = thread::spawn(move || { tx.send(1).unwrap(); }); assert_eq!(rx.recv().unwrap(), 1); t.join().unwrap(); } #[test] fn smoke_port_gone() { let (tx, rx) = channel::(); drop(rx); assert!(tx.send(1).is_err()); } #[test] fn smoke_shared_port_gone() { let (tx, rx) = channel::(); drop(rx); assert!(tx.send(1).is_err()) } #[test] fn smoke_shared_port_gone2() { let (tx, rx) = channel::(); drop(rx); let tx2 = tx.clone(); drop(tx); assert!(tx2.send(1).is_err()); } #[test] fn port_gone_concurrent() { let (tx, rx) = channel::(); let t = thread::spawn(move || { rx.recv().unwrap(); }); while tx.send(1).is_ok() {} t.join().unwrap(); } #[test] fn port_gone_concurrent_shared() { let (tx, rx) = channel::(); let tx2 = tx.clone(); let t = thread::spawn(move || { rx.recv().unwrap(); }); while tx.send(1).is_ok() && tx2.send(1).is_ok() {} t.join().unwrap(); } #[test] fn smoke_chan_gone() { let (tx, rx) = channel::(); drop(tx); assert!(rx.recv().is_err()); } #[test] fn smoke_chan_gone_shared() { let (tx, rx) = channel::<()>(); let tx2 = tx.clone(); drop(tx); drop(tx2); assert!(rx.recv().is_err()); } #[test] fn chan_gone_concurrent() { let (tx, rx) = channel::(); let t = thread::spawn(move || { tx.send(1).unwrap(); tx.send(1).unwrap(); }); while rx.recv().is_ok() {} t.join().unwrap(); } #[test] fn stress() { let (tx, rx) = channel::(); let t = thread::spawn(move || { for _ in 0..10000 { tx.send(1).unwrap(); } }); for _ in 0..10000 { assert_eq!(rx.recv().unwrap(), 1); } t.join().ok().unwrap(); } #[test] fn stress_shared() { const AMT: u32 = 10000; const NTHREADS: u32 = 8; let (tx, rx) = channel::(); let t = thread::spawn(move || { for _ in 0..AMT * NTHREADS { assert_eq!(rx.recv().unwrap(), 1); } match rx.try_recv() { Ok(..) => panic!(), _ => {} } }); let mut ts = Vec::with_capacity(NTHREADS as usize); for _ in 0..NTHREADS { let tx = tx.clone(); let t = thread::spawn(move || { for _ in 0..AMT { tx.send(1).unwrap(); } }); ts.push(t); } drop(tx); t.join().ok().unwrap(); for t in ts { t.join().unwrap(); } } #[test] fn send_from_outside_runtime() { let (tx1, rx1) = channel::<()>(); let (tx2, rx2) = channel::(); let t1 = thread::spawn(move || { tx1.send(()).unwrap(); for _ in 0..40 { assert_eq!(rx2.recv().unwrap(), 1); } }); rx1.recv().unwrap(); let t2 = thread::spawn(move || { for _ in 0..40 { tx2.send(1).unwrap(); } }); t1.join().ok().unwrap(); t2.join().ok().unwrap(); } #[test] fn recv_from_outside_runtime() { let (tx, rx) = channel::(); let t = thread::spawn(move || { for _ in 0..40 { assert_eq!(rx.recv().unwrap(), 1); } }); for _ in 0..40 { tx.send(1).unwrap(); } t.join().ok().unwrap(); } #[test] fn no_runtime() { let (tx1, rx1) = channel::(); let (tx2, rx2) = channel::(); let t1 = thread::spawn(move || { assert_eq!(rx1.recv().unwrap(), 1); tx2.send(2).unwrap(); }); let t2 = thread::spawn(move || { tx1.send(1).unwrap(); assert_eq!(rx2.recv().unwrap(), 2); }); t1.join().ok().unwrap(); t2.join().ok().unwrap(); } #[test] fn oneshot_single_thread_close_port_first() { // Simple test of closing without sending let (_tx, rx) = channel::(); drop(rx); } #[test] fn oneshot_single_thread_close_chan_first() { // Simple test of closing without sending let (tx, _rx) = channel::(); drop(tx); } #[test] fn oneshot_single_thread_send_port_close() { // Testing that the sender cleans up the payload if receiver is closed let (tx, rx) = channel::>(); drop(rx); assert!(tx.send(Box::new(0)).is_err()); } #[test] fn oneshot_single_thread_recv_chan_close() { let (tx, rx) = channel::(); drop(tx); assert_eq!(rx.recv(), Err(RecvError)); } #[test] fn oneshot_single_thread_send_then_recv() { let (tx, rx) = channel::>(); tx.send(Box::new(10)).unwrap(); assert!(*rx.recv().unwrap() == 10); } #[test] fn oneshot_single_thread_try_send_open() { let (tx, rx) = channel::(); assert!(tx.send(10).is_ok()); assert!(rx.recv().unwrap() == 10); } #[test] fn oneshot_single_thread_try_send_closed() { let (tx, rx) = channel::(); drop(rx); assert!(tx.send(10).is_err()); } #[test] fn oneshot_single_thread_try_recv_open() { let (tx, rx) = channel::(); tx.send(10).unwrap(); assert!(rx.recv() == Ok(10)); } #[test] fn oneshot_single_thread_try_recv_closed() { let (tx, rx) = channel::(); drop(tx); assert!(rx.recv().is_err()); } #[test] fn oneshot_single_thread_peek_data() { let (tx, rx) = channel::(); assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); tx.send(10).unwrap(); assert_eq!(rx.try_recv(), Ok(10)); } #[test] fn oneshot_single_thread_peek_close() { let (tx, rx) = channel::(); drop(tx); assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); } #[test] fn oneshot_single_thread_peek_open() { let (_tx, rx) = channel::(); assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); } #[test] fn oneshot_multi_task_recv_then_send() { let (tx, rx) = channel::>(); let t = thread::spawn(move || { assert!(*rx.recv().unwrap() == 10); }); tx.send(Box::new(10)).unwrap(); t.join().unwrap(); } #[test] fn oneshot_multi_task_recv_then_close() { let (tx, rx) = channel::>(); let t = thread::spawn(move || { drop(tx); }); thread::spawn(move || { assert_eq!(rx.recv(), Err(RecvError)); }) .join() .unwrap(); t.join().unwrap(); } #[test] fn oneshot_multi_thread_close_stress() { let stress_factor = stress_factor(); let mut ts = Vec::with_capacity(stress_factor); for _ in 0..stress_factor { let (tx, rx) = channel::(); let t = thread::spawn(move || { drop(rx); }); ts.push(t); drop(tx); } for t in ts { t.join().unwrap(); } } #[test] fn oneshot_multi_thread_send_close_stress() { let stress_factor = stress_factor(); let mut ts = Vec::with_capacity(2 * stress_factor); for _ in 0..stress_factor { let (tx, rx) = channel::(); let t = thread::spawn(move || { drop(rx); }); ts.push(t); thread::spawn(move || { let _ = tx.send(1); }) .join() .unwrap(); } for t in ts { t.join().unwrap(); } } #[test] fn oneshot_multi_thread_recv_close_stress() { let stress_factor = stress_factor(); let mut ts = Vec::with_capacity(2 * stress_factor); for _ in 0..stress_factor { let (tx, rx) = channel::(); let t = thread::spawn(move || { thread::spawn(move || { assert_eq!(rx.recv(), Err(RecvError)); }) .join() .unwrap(); }); ts.push(t); let t2 = thread::spawn(move || { let t = thread::spawn(move || { drop(tx); }); t.join().unwrap(); }); ts.push(t2); } for t in ts { t.join().unwrap(); } } #[test] fn oneshot_multi_thread_send_recv_stress() { let stress_factor = stress_factor(); let mut ts = Vec::with_capacity(stress_factor); for _ in 0..stress_factor { let (tx, rx) = channel::>(); let t = thread::spawn(move || { tx.send(Box::new(10)).unwrap(); }); ts.push(t); assert!(*rx.recv().unwrap() == 10); } for t in ts { t.join().unwrap(); } } #[test] fn stream_send_recv_stress() { let stress_factor = stress_factor(); let mut ts = Vec::with_capacity(2 * stress_factor); for _ in 0..stress_factor { let (tx, rx) = channel(); if let Some(t) = send(tx, 0) { ts.push(t); } if let Some(t2) = recv(rx, 0) { ts.push(t2); } fn send(tx: Sender>, i: i32) -> Option> { if i == 10 { return None; } Some(thread::spawn(move || { tx.send(Box::new(i)).unwrap(); send(tx, i + 1); })) } fn recv(rx: Receiver>, i: i32) -> Option> { if i == 10 { return None; } Some(thread::spawn(move || { assert!(*rx.recv().unwrap() == i); recv(rx, i + 1); })) } } for t in ts { t.join().unwrap(); } } #[test] fn oneshot_single_thread_recv_timeout() { let (tx, rx) = channel(); tx.send(()).unwrap(); assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); assert_eq!( rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout) ); tx.send(()).unwrap(); assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); } #[test] fn stress_recv_timeout_two_threads() { let (tx, rx) = channel(); let stress = stress_factor() + 100; let timeout = Duration::from_millis(100); let t = thread::spawn(move || { for i in 0..stress { if i % 2 == 0 { thread::sleep(timeout * 2); } tx.send(1usize).unwrap(); } }); let mut recv_count = 0; loop { match rx.recv_timeout(timeout) { Ok(n) => { assert_eq!(n, 1usize); recv_count += 1; } Err(RecvTimeoutError::Timeout) => continue, Err(RecvTimeoutError::Disconnected) => break, } } assert_eq!(recv_count, stress); t.join().unwrap() } #[test] fn recv_timeout_upgrade() { let (tx, rx) = channel::<()>(); let timeout = Duration::from_millis(1); let _tx_clone = tx.clone(); let start = Instant::now(); assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout)); assert!(Instant::now() >= start + timeout); } #[test] fn stress_recv_timeout_shared() { let (tx, rx) = channel(); let stress = stress_factor() + 100; let mut ts = Vec::with_capacity(stress); for i in 0..stress { let tx = tx.clone(); let t = thread::spawn(move || { thread::sleep(Duration::from_millis(i as u64 * 10)); tx.send(1usize).unwrap(); }); ts.push(t); } drop(tx); let mut recv_count = 0; loop { match rx.recv_timeout(Duration::from_millis(10)) { Ok(n) => { assert_eq!(n, 1usize); recv_count += 1; } Err(RecvTimeoutError::Timeout) => continue, Err(RecvTimeoutError::Disconnected) => break, } } assert_eq!(recv_count, stress); for t in ts { t.join().unwrap(); } } #[test] fn recv_a_lot() { // Regression test that we don't run out of stack in scheduler context let (tx, rx) = channel(); for _ in 0..10000 { tx.send(()).unwrap(); } for _ in 0..10000 { rx.recv().unwrap(); } } #[test] fn shared_recv_timeout() { let (tx, rx) = channel(); let total = 5; let mut ts = Vec::with_capacity(total); for _ in 0..total { let tx = tx.clone(); let t = thread::spawn(move || { tx.send(()).unwrap(); }); ts.push(t); } for _ in 0..total { rx.recv().unwrap(); } assert_eq!( rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout) ); tx.send(()).unwrap(); assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); for t in ts { t.join().unwrap(); } } #[test] fn shared_chan_stress() { let (tx, rx) = channel(); let total = stress_factor() + 100; let mut ts = Vec::with_capacity(total); for _ in 0..total { let tx = tx.clone(); let t = thread::spawn(move || { tx.send(()).unwrap(); }); ts.push(t); } for _ in 0..total { rx.recv().unwrap(); } for t in ts { t.join().unwrap(); } } #[test] fn test_nested_recv_iter() { let (tx, rx) = channel::(); let (total_tx, total_rx) = channel::(); let t = thread::spawn(move || { let mut acc = 0; for x in rx.iter() { acc += x; } total_tx.send(acc).unwrap(); }); tx.send(3).unwrap(); tx.send(1).unwrap(); tx.send(2).unwrap(); drop(tx); assert_eq!(total_rx.recv().unwrap(), 6); t.join().unwrap(); } #[test] fn test_recv_iter_break() { let (tx, rx) = channel::(); let (count_tx, count_rx) = channel(); let t = thread::spawn(move || { let mut count = 0; for x in rx.iter() { if count >= 3 { break; } else { count += x; } } count_tx.send(count).unwrap(); }); tx.send(2).unwrap(); tx.send(2).unwrap(); tx.send(2).unwrap(); let _ = tx.send(2); drop(tx); assert_eq!(count_rx.recv().unwrap(), 4); t.join().unwrap(); } #[test] fn test_recv_try_iter() { let (request_tx, request_rx) = channel(); let (response_tx, response_rx) = channel(); // Request `x`s until we have `6`. let t = thread::spawn(move || { let mut count = 0; loop { for x in response_rx.try_iter() { count += x; if count == 6 { return count; } } request_tx.send(()).unwrap(); } }); for _ in request_rx.iter() { if response_tx.send(2).is_err() { break; } } assert_eq!(t.join().unwrap(), 6); } #[test] fn test_recv_into_iter_owned() { let mut iter = { let (tx, rx) = channel::(); tx.send(1).unwrap(); tx.send(2).unwrap(); rx.into_iter() }; assert_eq!(iter.next().unwrap(), 1); assert_eq!(iter.next().unwrap(), 2); assert_eq!(iter.next().is_none(), true); } #[test] fn test_recv_into_iter_borrowed() { let (tx, rx) = channel::(); tx.send(1).unwrap(); tx.send(2).unwrap(); drop(tx); let mut iter = (&rx).into_iter(); assert_eq!(iter.next().unwrap(), 1); assert_eq!(iter.next().unwrap(), 2); assert_eq!(iter.next().is_none(), true); } #[test] fn try_recv_states() { let (tx1, rx1) = channel::(); let (tx2, rx2) = channel::<()>(); let (tx3, rx3) = channel::<()>(); let t = thread::spawn(move || { rx2.recv().unwrap(); tx1.send(1).unwrap(); tx3.send(()).unwrap(); rx2.recv().unwrap(); drop(tx1); tx3.send(()).unwrap(); }); assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); tx2.send(()).unwrap(); rx3.recv().unwrap(); assert_eq!(rx1.try_recv(), Ok(1)); assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); tx2.send(()).unwrap(); rx3.recv().unwrap(); assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); t.join().unwrap(); } // This bug used to end up in a livelock inside of the Receiver destructor // because the internal state of the Shared packet was corrupted #[test] fn destroy_upgraded_shared_port_when_sender_still_active() { let (tx, rx) = channel(); let (tx2, rx2) = channel(); let t = thread::spawn(move || { rx.recv().unwrap(); // wait on a oneshot drop(rx); // destroy a shared tx2.send(()).unwrap(); }); // make sure the other thread has gone to sleep for _ in 0..5000 { thread::yield_now(); } // upgrade to a shared chan and send a message let tx2 = tx.clone(); drop(tx); tx2.send(()).unwrap(); // wait for the child thread to exit before we exit rx2.recv().unwrap(); t.join().unwrap(); } #[test] fn issue_32114() { let (tx, _) = channel(); let _ = tx.send(123); assert_eq!(tx.send(123), Err(SendError(123))); } } // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs mod sync_channel_tests { use super::*; use std::env; use std::thread; use std::time::Duration; pub fn stress_factor() -> usize { match env::var("RUST_TEST_STRESS") { Ok(val) => val.parse().unwrap(), Err(..) => 1, } } #[test] fn smoke() { let (tx, rx) = sync_channel::(1); tx.send(1).unwrap(); assert_eq!(rx.recv().unwrap(), 1); } #[test] fn drop_full() { let (tx, _rx) = sync_channel::>(1); tx.send(Box::new(1)).unwrap(); } #[test] fn smoke_shared() { let (tx, rx) = sync_channel::(1); tx.send(1).unwrap(); assert_eq!(rx.recv().unwrap(), 1); let tx = tx.clone(); tx.send(1).unwrap(); assert_eq!(rx.recv().unwrap(), 1); } #[test] fn recv_timeout() { let (tx, rx) = sync_channel::(1); assert_eq!( rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout) ); tx.send(1).unwrap(); assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1)); } #[test] fn smoke_threads() { let (tx, rx) = sync_channel::(0); let t = thread::spawn(move || { tx.send(1).unwrap(); }); assert_eq!(rx.recv().unwrap(), 1); t.join().unwrap(); } #[test] fn smoke_port_gone() { let (tx, rx) = sync_channel::(0); drop(rx); assert!(tx.send(1).is_err()); } #[test] fn smoke_shared_port_gone2() { let (tx, rx) = sync_channel::(0); drop(rx); let tx2 = tx.clone(); drop(tx); assert!(tx2.send(1).is_err()); } #[test] fn port_gone_concurrent() { let (tx, rx) = sync_channel::(0); let t = thread::spawn(move || { rx.recv().unwrap(); }); while tx.send(1).is_ok() {} t.join().unwrap(); } #[test] fn port_gone_concurrent_shared() { let (tx, rx) = sync_channel::(0); let tx2 = tx.clone(); let t = thread::spawn(move || { rx.recv().unwrap(); }); while tx.send(1).is_ok() && tx2.send(1).is_ok() {} t.join().unwrap(); } #[test] fn smoke_chan_gone() { let (tx, rx) = sync_channel::(0); drop(tx); assert!(rx.recv().is_err()); } #[test] fn smoke_chan_gone_shared() { let (tx, rx) = sync_channel::<()>(0); let tx2 = tx.clone(); drop(tx); drop(tx2); assert!(rx.recv().is_err()); } #[test] fn chan_gone_concurrent() { let (tx, rx) = sync_channel::(0); let t = thread::spawn(move || { tx.send(1).unwrap(); tx.send(1).unwrap(); }); while rx.recv().is_ok() {} t.join().unwrap(); } #[test] fn stress() { let (tx, rx) = sync_channel::(0); let t = thread::spawn(move || { for _ in 0..10000 { tx.send(1).unwrap(); } }); for _ in 0..10000 { assert_eq!(rx.recv().unwrap(), 1); } t.join().unwrap(); } #[test] fn stress_recv_timeout_two_threads() { let (tx, rx) = sync_channel::(0); let t = thread::spawn(move || { for _ in 0..10000 { tx.send(1).unwrap(); } }); let mut recv_count = 0; loop { match rx.recv_timeout(Duration::from_millis(1)) { Ok(v) => { assert_eq!(v, 1); recv_count += 1; } Err(RecvTimeoutError::Timeout) => continue, Err(RecvTimeoutError::Disconnected) => break, } } assert_eq!(recv_count, 10000); t.join().unwrap(); } #[test] fn stress_recv_timeout_shared() { const AMT: u32 = 1000; const NTHREADS: u32 = 8; let (tx, rx) = sync_channel::(0); let (dtx, drx) = sync_channel::<()>(0); let t = thread::spawn(move || { let mut recv_count = 0; loop { match rx.recv_timeout(Duration::from_millis(10)) { Ok(v) => { assert_eq!(v, 1); recv_count += 1; } Err(RecvTimeoutError::Timeout) => continue, Err(RecvTimeoutError::Disconnected) => break, } } assert_eq!(recv_count, AMT * NTHREADS); assert!(rx.try_recv().is_err()); dtx.send(()).unwrap(); }); let mut ts = Vec::with_capacity(NTHREADS as usize); for _ in 0..NTHREADS { let tx = tx.clone(); let t = thread::spawn(move || { for _ in 0..AMT { tx.send(1).unwrap(); } }); ts.push(t); } drop(tx); drx.recv().unwrap(); for t in ts { t.join().unwrap(); } t.join().unwrap(); } #[test] fn stress_shared() { const AMT: u32 = 1000; const NTHREADS: u32 = 8; let (tx, rx) = sync_channel::(0); let (dtx, drx) = sync_channel::<()>(0); let t = thread::spawn(move || { for _ in 0..AMT * NTHREADS { assert_eq!(rx.recv().unwrap(), 1); } match rx.try_recv() { Ok(..) => panic!(), _ => {} } dtx.send(()).unwrap(); }); let mut ts = Vec::with_capacity(NTHREADS as usize); for _ in 0..NTHREADS { let tx = tx.clone(); let t = thread::spawn(move || { for _ in 0..AMT { tx.send(1).unwrap(); } }); ts.push(t); } drop(tx); drx.recv().unwrap(); for t in ts { t.join().unwrap(); } t.join().unwrap(); } #[test] fn oneshot_single_thread_close_port_first() { // Simple test of closing without sending let (_tx, rx) = sync_channel::(0); drop(rx); } #[test] fn oneshot_single_thread_close_chan_first() { // Simple test of closing without sending let (tx, _rx) = sync_channel::(0); drop(tx); } #[test] fn oneshot_single_thread_send_port_close() { // Testing that the sender cleans up the payload if receiver is closed let (tx, rx) = sync_channel::>(0); drop(rx); assert!(tx.send(Box::new(0)).is_err()); } #[test] fn oneshot_single_thread_recv_chan_close() { let (tx, rx) = sync_channel::(0); drop(tx); assert_eq!(rx.recv(), Err(RecvError)); } #[test] fn oneshot_single_thread_send_then_recv() { let (tx, rx) = sync_channel::>(1); tx.send(Box::new(10)).unwrap(); assert!(*rx.recv().unwrap() == 10); } #[test] fn oneshot_single_thread_try_send_open() { let (tx, rx) = sync_channel::(1); assert_eq!(tx.try_send(10), Ok(())); assert!(rx.recv().unwrap() == 10); } #[test] fn oneshot_single_thread_try_send_closed() { let (tx, rx) = sync_channel::(0); drop(rx); assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10))); } #[test] fn oneshot_single_thread_try_send_closed2() { let (tx, _rx) = sync_channel::(0); assert_eq!(tx.try_send(10), Err(TrySendError::Full(10))); } #[test] fn oneshot_single_thread_try_recv_open() { let (tx, rx) = sync_channel::(1); tx.send(10).unwrap(); assert!(rx.recv() == Ok(10)); } #[test] fn oneshot_single_thread_try_recv_closed() { let (tx, rx) = sync_channel::(0); drop(tx); assert!(rx.recv().is_err()); } #[test] fn oneshot_single_thread_try_recv_closed_with_data() { let (tx, rx) = sync_channel::(1); tx.send(10).unwrap(); drop(tx); assert_eq!(rx.try_recv(), Ok(10)); assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); } #[test] fn oneshot_single_thread_peek_data() { let (tx, rx) = sync_channel::(1); assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); tx.send(10).unwrap(); assert_eq!(rx.try_recv(), Ok(10)); } #[test] fn oneshot_single_thread_peek_close() { let (tx, rx) = sync_channel::(0); drop(tx); assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); } #[test] fn oneshot_single_thread_peek_open() { let (_tx, rx) = sync_channel::(0); assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); } #[test] fn oneshot_multi_task_recv_then_send() { let (tx, rx) = sync_channel::>(0); let t = thread::spawn(move || { assert!(*rx.recv().unwrap() == 10); }); tx.send(Box::new(10)).unwrap(); t.join().unwrap(); } #[test] fn oneshot_multi_task_recv_then_close() { let (tx, rx) = sync_channel::>(0); let t = thread::spawn(move || { drop(tx); }); thread::spawn(move || { assert_eq!(rx.recv(), Err(RecvError)); }) .join() .unwrap(); t.join().unwrap(); } #[test] fn oneshot_multi_thread_close_stress() { let stress_factor = stress_factor(); let mut ts = Vec::with_capacity(stress_factor); for _ in 0..stress_factor { let (tx, rx) = sync_channel::(0); let t = thread::spawn(move || { drop(rx); }); ts.push(t); drop(tx); } for t in ts { t.join().unwrap(); } } #[test] fn oneshot_multi_thread_send_close_stress() { let stress_factor = stress_factor(); let mut ts = Vec::with_capacity(stress_factor); for _ in 0..stress_factor { let (tx, rx) = sync_channel::(0); let t = thread::spawn(move || { drop(rx); }); ts.push(t); thread::spawn(move || { let _ = tx.send(1); }) .join() .unwrap(); } for t in ts { t.join().unwrap(); } } #[test] fn oneshot_multi_thread_recv_close_stress() { let stress_factor = stress_factor(); let mut ts = Vec::with_capacity(2 * stress_factor); for _ in 0..stress_factor { let (tx, rx) = sync_channel::(0); let t = thread::spawn(move || { thread::spawn(move || { assert_eq!(rx.recv(), Err(RecvError)); }) .join() .unwrap(); }); ts.push(t); let t2 = thread::spawn(move || { thread::spawn(move || { drop(tx); }); }); ts.push(t2); } for t in ts { t.join().unwrap(); } } #[test] fn oneshot_multi_thread_send_recv_stress() { let stress_factor = stress_factor(); let mut ts = Vec::with_capacity(stress_factor); for _ in 0..stress_factor { let (tx, rx) = sync_channel::>(0); let t = thread::spawn(move || { tx.send(Box::new(10)).unwrap(); }); ts.push(t); assert!(*rx.recv().unwrap() == 10); } for t in ts { t.join().unwrap(); } } #[test] fn stream_send_recv_stress() { let stress_factor = stress_factor(); let mut ts = Vec::with_capacity(2 * stress_factor); for _ in 0..stress_factor { let (tx, rx) = sync_channel::>(0); if let Some(t) = send(tx, 0) { ts.push(t); } if let Some(t) = recv(rx, 0) { ts.push(t); } fn send(tx: SyncSender>, i: i32) -> Option> { if i == 10 { return None; } Some(thread::spawn(move || { tx.send(Box::new(i)).unwrap(); send(tx, i + 1); })) } fn recv(rx: Receiver>, i: i32) -> Option> { if i == 10 { return None; } Some(thread::spawn(move || { assert!(*rx.recv().unwrap() == i); recv(rx, i + 1); })) } } for t in ts { t.join().unwrap(); } } #[test] fn recv_a_lot() { // Regression test that we don't run out of stack in scheduler context let (tx, rx) = sync_channel(10000); for _ in 0..10000 { tx.send(()).unwrap(); } for _ in 0..10000 { rx.recv().unwrap(); } } #[test] fn shared_chan_stress() { let (tx, rx) = sync_channel(0); let total = stress_factor() + 100; let mut ts = Vec::with_capacity(total); for _ in 0..total { let tx = tx.clone(); let t = thread::spawn(move || { tx.send(()).unwrap(); }); ts.push(t); } for _ in 0..total { rx.recv().unwrap(); } for t in ts { t.join().unwrap(); } } #[test] fn test_nested_recv_iter() { let (tx, rx) = sync_channel::(0); let (total_tx, total_rx) = sync_channel::(0); let t = thread::spawn(move || { let mut acc = 0; for x in rx.iter() { acc += x; } total_tx.send(acc).unwrap(); }); tx.send(3).unwrap(); tx.send(1).unwrap(); tx.send(2).unwrap(); drop(tx); assert_eq!(total_rx.recv().unwrap(), 6); t.join().unwrap(); } #[test] fn test_recv_iter_break() { let (tx, rx) = sync_channel::(0); let (count_tx, count_rx) = sync_channel(0); let t = thread::spawn(move || { let mut count = 0; for x in rx.iter() { if count >= 3 { break; } else { count += x; } } count_tx.send(count).unwrap(); }); tx.send(2).unwrap(); tx.send(2).unwrap(); tx.send(2).unwrap(); let _ = tx.try_send(2); drop(tx); assert_eq!(count_rx.recv().unwrap(), 4); t.join().unwrap(); } #[test] fn try_recv_states() { let (tx1, rx1) = sync_channel::(1); let (tx2, rx2) = sync_channel::<()>(1); let (tx3, rx3) = sync_channel::<()>(1); let t = thread::spawn(move || { rx2.recv().unwrap(); tx1.send(1).unwrap(); tx3.send(()).unwrap(); rx2.recv().unwrap(); drop(tx1); tx3.send(()).unwrap(); }); assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); tx2.send(()).unwrap(); rx3.recv().unwrap(); assert_eq!(rx1.try_recv(), Ok(1)); assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); tx2.send(()).unwrap(); rx3.recv().unwrap(); assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); t.join().unwrap(); } // This bug used to end up in a livelock inside of the Receiver destructor // because the internal state of the Shared packet was corrupted #[test] fn destroy_upgraded_shared_port_when_sender_still_active() { let (tx, rx) = sync_channel::<()>(0); let (tx2, rx2) = sync_channel::<()>(0); let t = thread::spawn(move || { rx.recv().unwrap(); // wait on a oneshot drop(rx); // destroy a shared tx2.send(()).unwrap(); }); // make sure the other thread has gone to sleep for _ in 0..5000 { thread::yield_now(); } // upgrade to a shared chan and send a message let tx2 = tx.clone(); drop(tx); tx2.send(()).unwrap(); // wait for the child thread to exit before we exit rx2.recv().unwrap(); t.join().unwrap(); } #[test] fn send1() { let (tx, rx) = sync_channel::(0); let t = thread::spawn(move || { rx.recv().unwrap(); }); assert_eq!(tx.send(1), Ok(())); t.join().unwrap(); } #[test] fn send2() { let (tx, rx) = sync_channel::(0); let t = thread::spawn(move || { drop(rx); }); assert!(tx.send(1).is_err()); t.join().unwrap(); } #[test] fn send3() { let (tx, rx) = sync_channel::(1); assert_eq!(tx.send(1), Ok(())); let t = thread::spawn(move || { drop(rx); }); assert!(tx.send(1).is_err()); t.join().unwrap(); } #[test] fn send4() { let (tx, rx) = sync_channel::(0); let tx2 = tx.clone(); let (done, donerx) = channel(); let done2 = done.clone(); let t = thread::spawn(move || { assert!(tx.send(1).is_err()); done.send(()).unwrap(); }); let t2 = thread::spawn(move || { assert!(tx2.send(2).is_err()); done2.send(()).unwrap(); }); drop(rx); donerx.recv().unwrap(); donerx.recv().unwrap(); t.join().unwrap(); t2.join().unwrap(); } #[test] fn try_send1() { let (tx, _rx) = sync_channel::(0); assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); } #[test] fn try_send2() { let (tx, _rx) = sync_channel::(1); assert_eq!(tx.try_send(1), Ok(())); assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); } #[test] fn try_send3() { let (tx, rx) = sync_channel::(1); assert_eq!(tx.try_send(1), Ok(())); drop(rx); assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1))); } #[test] fn issue_15761() { fn repro() { let (tx1, rx1) = sync_channel::<()>(3); let (tx2, rx2) = sync_channel::<()>(3); let _t = thread::spawn(move || { rx1.recv().unwrap(); tx2.try_send(()).unwrap(); }); tx1.try_send(()).unwrap(); rx2.recv().unwrap(); } for _ in 0..100 { repro() } } } // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/select.rs mod select_tests { use super::*; use std::thread; #[test] fn smoke() { let (tx1, rx1) = channel::(); let (tx2, rx2) = channel::(); tx1.send(1).unwrap(); select! { foo = rx1.recv() => assert_eq!(foo.unwrap(), 1), _bar = rx2.recv() => panic!() } tx2.send(2).unwrap(); select! { _foo = rx1.recv() => panic!(), bar = rx2.recv() => assert_eq!(bar.unwrap(), 2) } drop(tx1); select! { foo = rx1.recv() => assert!(foo.is_err()), _bar = rx2.recv() => panic!() } drop(tx2); select! { bar = rx2.recv() => assert!(bar.is_err()) } } #[test] fn smoke2() { let (_tx1, rx1) = channel::(); let (_tx2, rx2) = channel::(); let (_tx3, rx3) = channel::(); let (_tx4, rx4) = channel::(); let (tx5, rx5) = channel::(); tx5.send(4).unwrap(); select! { _foo = rx1.recv() => panic!("1"), _foo = rx2.recv() => panic!("2"), _foo = rx3.recv() => panic!("3"), _foo = rx4.recv() => panic!("4"), foo = rx5.recv() => assert_eq!(foo.unwrap(), 4) } } #[test] fn closed() { let (_tx1, rx1) = channel::(); let (tx2, rx2) = channel::(); drop(tx2); select! { _a1 = rx1.recv() => panic!(), a2 = rx2.recv() => assert!(a2.is_err()) } } #[test] fn unblocks() { let (tx1, rx1) = channel::(); let (_tx2, rx2) = channel::(); let (tx3, rx3) = channel::(); let t = thread::spawn(move || { for _ in 0..20 { thread::yield_now(); } tx1.send(1).unwrap(); rx3.recv().unwrap(); for _ in 0..20 { thread::yield_now(); } }); select! { a = rx1.recv() => assert_eq!(a.unwrap(), 1), _b = rx2.recv() => panic!() } tx3.send(1).unwrap(); select! { a = rx1.recv() => assert!(a.is_err()), _b = rx2.recv() => panic!() } t.join().unwrap(); } #[test] fn both_ready() { let (tx1, rx1) = channel::(); let (tx2, rx2) = channel::(); let (tx3, rx3) = channel::<()>(); let t = thread::spawn(move || { for _ in 0..20 { thread::yield_now(); } tx1.send(1).unwrap(); tx2.send(2).unwrap(); rx3.recv().unwrap(); }); select! { a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, a = rx2.recv() => { assert_eq!(a.unwrap(), 2); } } select! { a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, a = rx2.recv() => { assert_eq!(a.unwrap(), 2); } } assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty)); tx3.send(()).unwrap(); t.join().unwrap(); } #[test] fn stress() { const AMT: i32 = 10000; let (tx1, rx1) = channel::(); let (tx2, rx2) = channel::(); let (tx3, rx3) = channel::<()>(); let t = thread::spawn(move || { for i in 0..AMT { if i % 2 == 0 { tx1.send(i).unwrap(); } else { tx2.send(i).unwrap(); } rx3.recv().unwrap(); } }); for i in 0..AMT { select! { i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); }, i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); } } tx3.send(()).unwrap(); } t.join().unwrap(); } #[allow(unused_must_use)] #[test] fn cloning() { let (tx1, rx1) = channel::(); let (_tx2, rx2) = channel::(); let (tx3, rx3) = channel::<()>(); let t = thread::spawn(move || { rx3.recv().unwrap(); tx1.clone(); assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty)); tx1.send(2).unwrap(); rx3.recv().unwrap(); }); tx3.send(()).unwrap(); select! { _i1 = rx1.recv() => {}, _i2 = rx2.recv() => panic!() } tx3.send(()).unwrap(); t.join().unwrap(); } #[allow(unused_must_use)] #[test] fn cloning2() { let (tx1, rx1) = channel::(); let (_tx2, rx2) = channel::(); let (tx3, rx3) = channel::<()>(); let t = thread::spawn(move || { rx3.recv().unwrap(); tx1.clone(); assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty)); tx1.send(2).unwrap(); rx3.recv().unwrap(); }); tx3.send(()).unwrap(); select! { _i1 = rx1.recv() => {}, _i2 = rx2.recv() => panic!() } tx3.send(()).unwrap(); t.join().unwrap(); } #[test] fn cloning3() { let (tx1, rx1) = channel::<()>(); let (tx2, rx2) = channel::<()>(); let (tx3, rx3) = channel::<()>(); let t = thread::spawn(move || { select! { _ = rx1.recv() => panic!(), _ = rx2.recv() => {} } tx3.send(()).unwrap(); }); for _ in 0..1000 { thread::yield_now(); } drop(tx1.clone()); tx2.send(()).unwrap(); rx3.recv().unwrap(); t.join().unwrap(); } #[test] fn preflight1() { let (tx, rx) = channel(); tx.send(()).unwrap(); select! { _n = rx.recv() => {} } } #[test] fn preflight2() { let (tx, rx) = channel(); tx.send(()).unwrap(); tx.send(()).unwrap(); select! { _n = rx.recv() => {} } } #[test] fn preflight3() { let (tx, rx) = channel(); drop(tx.clone()); tx.send(()).unwrap(); select! { _n = rx.recv() => {} } } #[test] fn preflight4() { let (tx, rx) = channel(); tx.send(()).unwrap(); select! { _ = rx.recv() => {} } } #[test] fn preflight5() { let (tx, rx) = channel(); tx.send(()).unwrap(); tx.send(()).unwrap(); select! { _ = rx.recv() => {} } } #[test] fn preflight6() { let (tx, rx) = channel(); drop(tx.clone()); tx.send(()).unwrap(); select! { _ = rx.recv() => {} } } #[test] fn preflight7() { let (tx, rx) = channel::<()>(); drop(tx); select! { _ = rx.recv() => {} } } #[test] fn preflight8() { let (tx, rx) = channel(); tx.send(()).unwrap(); drop(tx); rx.recv().unwrap(); select! { _ = rx.recv() => {} } } #[test] fn preflight9() { let (tx, rx) = channel(); drop(tx.clone()); tx.send(()).unwrap(); drop(tx); rx.recv().unwrap(); select! { _ = rx.recv() => {} } } #[test] fn oneshot_data_waiting() { let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); let t = thread::spawn(move || { select! { _n = rx1.recv() => {} } tx2.send(()).unwrap(); }); for _ in 0..100 { thread::yield_now() } tx1.send(()).unwrap(); rx2.recv().unwrap(); t.join().unwrap(); } #[test] fn stream_data_waiting() { let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); tx1.send(()).unwrap(); tx1.send(()).unwrap(); rx1.recv().unwrap(); rx1.recv().unwrap(); let t = thread::spawn(move || { select! { _n = rx1.recv() => {} } tx2.send(()).unwrap(); }); for _ in 0..100 { thread::yield_now() } tx1.send(()).unwrap(); rx2.recv().unwrap(); t.join().unwrap(); } #[test] fn shared_data_waiting() { let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); drop(tx1.clone()); tx1.send(()).unwrap(); rx1.recv().unwrap(); let t = thread::spawn(move || { select! { _n = rx1.recv() => {} } tx2.send(()).unwrap(); }); for _ in 0..100 { thread::yield_now() } tx1.send(()).unwrap(); rx2.recv().unwrap(); t.join().unwrap(); } #[test] fn sync1() { let (tx, rx) = sync_channel::(1); tx.send(1).unwrap(); select! { n = rx.recv() => { assert_eq!(n.unwrap(), 1); } } } #[test] fn sync2() { let (tx, rx) = sync_channel::(0); let t = thread::spawn(move || { for _ in 0..100 { thread::yield_now() } tx.send(1).unwrap(); }); select! { n = rx.recv() => { assert_eq!(n.unwrap(), 1); } } t.join().unwrap(); } #[test] fn sync3() { let (tx1, rx1) = sync_channel::(0); let (tx2, rx2): (Sender, Receiver) = channel(); let t = thread::spawn(move || { tx1.send(1).unwrap(); }); let t2 = thread::spawn(move || { tx2.send(2).unwrap(); }); select! { n = rx1.recv() => { let n = n.unwrap(); assert_eq!(n, 1); assert_eq!(rx2.recv().unwrap(), 2); }, n = rx2.recv() => { let n = n.unwrap(); assert_eq!(n, 2); assert_eq!(rx1.recv().unwrap(), 1); } } t.join().unwrap(); t2.join().unwrap(); } }