1 //! Tests copied from `std::sync::mpsc`.
2 //!
3 //! This is a copy of tests for the `std::sync::mpsc` channels from the standard library, but
4 //! modified to work with `crossbeam-channel` instead.
5 //!
6 //! Minor tweaks were needed to make the tests compile:
7 //!
8 //! - Replace `box` syntax with `Box::new`.
9 //! - Replace all uses of `Select` with `select!`.
10 //! - Change the imports.
11 //! - Join all spawned threads.
12 //! - Removed assertion from oneshot_multi_thread_send_close_stress tests.
13 //!
14 //! Source:
15 //! - https://github.com/rust-lang/rust/tree/master/src/libstd/sync/mpsc
16 //!
17 //! Copyright & License:
18 //! - Copyright 2013-2014 The Rust Project Developers
19 //! - Apache License, Version 2.0 or MIT license, at your option
20 //! - https://github.com/rust-lang/rust/blob/master/COPYRIGHT
21 //! - https://www.rust-lang.org/en-US/legal.html
22
23 #![allow(
24 clippy::drop_copy,
25 clippy::match_single_binding,
26 clippy::redundant_clone
27 )]
28
29 use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError};
30 use std::sync::mpsc::{SendError, TrySendError};
31 use std::thread::JoinHandle;
32 use std::time::Duration;
33
34 use crossbeam_channel as cc;
35
36 pub struct Sender<T> {
37 pub inner: cc::Sender<T>,
38 }
39
40 impl<T> Sender<T> {
send(&self, t: T) -> Result<(), SendError<T>>41 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
42 self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
43 }
44 }
45
46 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>47 fn clone(&self) -> Sender<T> {
48 Sender {
49 inner: self.inner.clone(),
50 }
51 }
52 }
53
54 pub struct SyncSender<T> {
55 pub inner: cc::Sender<T>,
56 }
57
58 impl<T> SyncSender<T> {
send(&self, t: T) -> Result<(), SendError<T>>59 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
60 self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
61 }
62
try_send(&self, t: T) -> Result<(), TrySendError<T>>63 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
64 self.inner.try_send(t).map_err(|err| match err {
65 cc::TrySendError::Full(m) => TrySendError::Full(m),
66 cc::TrySendError::Disconnected(m) => TrySendError::Disconnected(m),
67 })
68 }
69 }
70
71 impl<T> Clone for SyncSender<T> {
clone(&self) -> SyncSender<T>72 fn clone(&self) -> SyncSender<T> {
73 SyncSender {
74 inner: self.inner.clone(),
75 }
76 }
77 }
78
79 pub struct Receiver<T> {
80 pub inner: cc::Receiver<T>,
81 }
82
83 impl<T> Receiver<T> {
try_recv(&self) -> Result<T, TryRecvError>84 pub fn try_recv(&self) -> Result<T, TryRecvError> {
85 self.inner.try_recv().map_err(|err| match err {
86 cc::TryRecvError::Empty => TryRecvError::Empty,
87 cc::TryRecvError::Disconnected => TryRecvError::Disconnected,
88 })
89 }
90
recv(&self) -> Result<T, RecvError>91 pub fn recv(&self) -> Result<T, RecvError> {
92 self.inner.recv().map_err(|_| RecvError)
93 }
94
recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>95 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
96 self.inner.recv_timeout(timeout).map_err(|err| match err {
97 cc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
98 cc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
99 })
100 }
101
iter(&self) -> Iter<T>102 pub fn iter(&self) -> Iter<T> {
103 Iter { inner: self }
104 }
105
try_iter(&self) -> TryIter<T>106 pub fn try_iter(&self) -> TryIter<T> {
107 TryIter { inner: self }
108 }
109 }
110
111 impl<'a, T> IntoIterator for &'a Receiver<T> {
112 type Item = T;
113 type IntoIter = Iter<'a, T>;
114
into_iter(self) -> Iter<'a, T>115 fn into_iter(self) -> Iter<'a, T> {
116 self.iter()
117 }
118 }
119
120 impl<T> IntoIterator for Receiver<T> {
121 type Item = T;
122 type IntoIter = IntoIter<T>;
123
into_iter(self) -> IntoIter<T>124 fn into_iter(self) -> IntoIter<T> {
125 IntoIter { inner: self }
126 }
127 }
128
129 pub struct TryIter<'a, T: 'a> {
130 inner: &'a Receiver<T>,
131 }
132
133 impl<'a, T> Iterator for TryIter<'a, T> {
134 type Item = T;
135
next(&mut self) -> Option<T>136 fn next(&mut self) -> Option<T> {
137 self.inner.try_recv().ok()
138 }
139 }
140
141 pub struct Iter<'a, T: 'a> {
142 inner: &'a Receiver<T>,
143 }
144
145 impl<'a, T> Iterator for Iter<'a, T> {
146 type Item = T;
147
next(&mut self) -> Option<T>148 fn next(&mut self) -> Option<T> {
149 self.inner.recv().ok()
150 }
151 }
152
153 pub struct IntoIter<T> {
154 inner: Receiver<T>,
155 }
156
157 impl<T> Iterator for IntoIter<T> {
158 type Item = T;
159
next(&mut self) -> Option<T>160 fn next(&mut self) -> Option<T> {
161 self.inner.recv().ok()
162 }
163 }
164
channel<T>() -> (Sender<T>, Receiver<T>)165 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
166 let (s, r) = cc::unbounded();
167 let s = Sender { inner: s };
168 let r = Receiver { inner: r };
169 (s, r)
170 }
171
sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>)172 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
173 let (s, r) = cc::bounded(bound);
174 let s = SyncSender { inner: s };
175 let r = Receiver { inner: r };
176 (s, r)
177 }
178
179 macro_rules! select {
180 (
181 $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
182 ) => ({
183 cc::crossbeam_channel_internal! {
184 $(
185 $meth(($rx).inner) -> res => {
186 let $name = res.map_err(|_| ::std::sync::mpsc::RecvError);
187 $code
188 }
189 )+
190 }
191 })
192 }
193
194 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
195 mod channel_tests {
196 use super::*;
197
198 use std::env;
199 use std::thread;
200 use std::time::{Duration, Instant};
201
stress_factor() -> usize202 pub fn stress_factor() -> usize {
203 match env::var("RUST_TEST_STRESS") {
204 Ok(val) => val.parse().unwrap(),
205 Err(..) => 1,
206 }
207 }
208
209 #[test]
smoke()210 fn smoke() {
211 let (tx, rx) = channel::<i32>();
212 tx.send(1).unwrap();
213 assert_eq!(rx.recv().unwrap(), 1);
214 }
215
216 #[test]
drop_full()217 fn drop_full() {
218 let (tx, _rx) = channel::<Box<isize>>();
219 tx.send(Box::new(1)).unwrap();
220 }
221
222 #[test]
drop_full_shared()223 fn drop_full_shared() {
224 let (tx, _rx) = channel::<Box<isize>>();
225 drop(tx.clone());
226 drop(tx.clone());
227 tx.send(Box::new(1)).unwrap();
228 }
229
230 #[test]
smoke_shared()231 fn smoke_shared() {
232 let (tx, rx) = channel::<i32>();
233 tx.send(1).unwrap();
234 assert_eq!(rx.recv().unwrap(), 1);
235 let tx = tx.clone();
236 tx.send(1).unwrap();
237 assert_eq!(rx.recv().unwrap(), 1);
238 }
239
240 #[test]
smoke_threads()241 fn smoke_threads() {
242 let (tx, rx) = channel::<i32>();
243 let t = thread::spawn(move || {
244 tx.send(1).unwrap();
245 });
246 assert_eq!(rx.recv().unwrap(), 1);
247 t.join().unwrap();
248 }
249
250 #[test]
smoke_port_gone()251 fn smoke_port_gone() {
252 let (tx, rx) = channel::<i32>();
253 drop(rx);
254 assert!(tx.send(1).is_err());
255 }
256
257 #[test]
smoke_shared_port_gone()258 fn smoke_shared_port_gone() {
259 let (tx, rx) = channel::<i32>();
260 drop(rx);
261 assert!(tx.send(1).is_err())
262 }
263
264 #[test]
smoke_shared_port_gone2()265 fn smoke_shared_port_gone2() {
266 let (tx, rx) = channel::<i32>();
267 drop(rx);
268 let tx2 = tx.clone();
269 drop(tx);
270 assert!(tx2.send(1).is_err());
271 }
272
273 #[test]
port_gone_concurrent()274 fn port_gone_concurrent() {
275 let (tx, rx) = channel::<i32>();
276 let t = thread::spawn(move || {
277 rx.recv().unwrap();
278 });
279 while tx.send(1).is_ok() {}
280 t.join().unwrap();
281 }
282
283 #[test]
port_gone_concurrent_shared()284 fn port_gone_concurrent_shared() {
285 let (tx, rx) = channel::<i32>();
286 let tx2 = tx.clone();
287 let t = thread::spawn(move || {
288 rx.recv().unwrap();
289 });
290 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
291 t.join().unwrap();
292 }
293
294 #[test]
smoke_chan_gone()295 fn smoke_chan_gone() {
296 let (tx, rx) = channel::<i32>();
297 drop(tx);
298 assert!(rx.recv().is_err());
299 }
300
301 #[test]
smoke_chan_gone_shared()302 fn smoke_chan_gone_shared() {
303 let (tx, rx) = channel::<()>();
304 let tx2 = tx.clone();
305 drop(tx);
306 drop(tx2);
307 assert!(rx.recv().is_err());
308 }
309
310 #[test]
chan_gone_concurrent()311 fn chan_gone_concurrent() {
312 let (tx, rx) = channel::<i32>();
313 let t = thread::spawn(move || {
314 tx.send(1).unwrap();
315 tx.send(1).unwrap();
316 });
317 while rx.recv().is_ok() {}
318 t.join().unwrap();
319 }
320
321 #[test]
stress()322 fn stress() {
323 #[cfg(miri)]
324 const COUNT: usize = 100;
325 #[cfg(not(miri))]
326 const COUNT: usize = 10000;
327
328 let (tx, rx) = channel::<i32>();
329 let t = thread::spawn(move || {
330 for _ in 0..COUNT {
331 tx.send(1).unwrap();
332 }
333 });
334 for _ in 0..COUNT {
335 assert_eq!(rx.recv().unwrap(), 1);
336 }
337 t.join().ok().unwrap();
338 }
339
340 #[test]
stress_shared()341 fn stress_shared() {
342 let amt: u32 = if cfg!(miri) { 100 } else { 10_000 };
343 let nthreads: u32 = if cfg!(miri) { 4 } else { 8 };
344 let (tx, rx) = channel::<i32>();
345
346 let t = thread::spawn(move || {
347 for _ in 0..amt * nthreads {
348 assert_eq!(rx.recv().unwrap(), 1);
349 }
350 assert!(rx.try_recv().is_err());
351 });
352
353 let mut ts = Vec::with_capacity(nthreads as usize);
354 for _ in 0..nthreads {
355 let tx = tx.clone();
356 let t = thread::spawn(move || {
357 for _ in 0..amt {
358 tx.send(1).unwrap();
359 }
360 });
361 ts.push(t);
362 }
363 drop(tx);
364 t.join().ok().unwrap();
365 for t in ts {
366 t.join().unwrap();
367 }
368 }
369
370 #[test]
send_from_outside_runtime()371 fn send_from_outside_runtime() {
372 let (tx1, rx1) = channel::<()>();
373 let (tx2, rx2) = channel::<i32>();
374 let t1 = thread::spawn(move || {
375 tx1.send(()).unwrap();
376 for _ in 0..40 {
377 assert_eq!(rx2.recv().unwrap(), 1);
378 }
379 });
380 rx1.recv().unwrap();
381 let t2 = thread::spawn(move || {
382 for _ in 0..40 {
383 tx2.send(1).unwrap();
384 }
385 });
386 t1.join().ok().unwrap();
387 t2.join().ok().unwrap();
388 }
389
390 #[test]
recv_from_outside_runtime()391 fn recv_from_outside_runtime() {
392 let (tx, rx) = channel::<i32>();
393 let t = thread::spawn(move || {
394 for _ in 0..40 {
395 assert_eq!(rx.recv().unwrap(), 1);
396 }
397 });
398 for _ in 0..40 {
399 tx.send(1).unwrap();
400 }
401 t.join().ok().unwrap();
402 }
403
404 #[test]
no_runtime()405 fn no_runtime() {
406 let (tx1, rx1) = channel::<i32>();
407 let (tx2, rx2) = channel::<i32>();
408 let t1 = thread::spawn(move || {
409 assert_eq!(rx1.recv().unwrap(), 1);
410 tx2.send(2).unwrap();
411 });
412 let t2 = thread::spawn(move || {
413 tx1.send(1).unwrap();
414 assert_eq!(rx2.recv().unwrap(), 2);
415 });
416 t1.join().ok().unwrap();
417 t2.join().ok().unwrap();
418 }
419
420 #[test]
oneshot_single_thread_close_port_first()421 fn oneshot_single_thread_close_port_first() {
422 // Simple test of closing without sending
423 let (_tx, rx) = channel::<i32>();
424 drop(rx);
425 }
426
427 #[test]
oneshot_single_thread_close_chan_first()428 fn oneshot_single_thread_close_chan_first() {
429 // Simple test of closing without sending
430 let (tx, _rx) = channel::<i32>();
431 drop(tx);
432 }
433
434 #[test]
oneshot_single_thread_send_port_close()435 fn oneshot_single_thread_send_port_close() {
436 // Testing that the sender cleans up the payload if receiver is closed
437 let (tx, rx) = channel::<Box<i32>>();
438 drop(rx);
439 assert!(tx.send(Box::new(0)).is_err());
440 }
441
442 #[test]
oneshot_single_thread_recv_chan_close()443 fn oneshot_single_thread_recv_chan_close() {
444 let (tx, rx) = channel::<i32>();
445 drop(tx);
446 assert_eq!(rx.recv(), Err(RecvError));
447 }
448
449 #[test]
oneshot_single_thread_send_then_recv()450 fn oneshot_single_thread_send_then_recv() {
451 let (tx, rx) = channel::<Box<i32>>();
452 tx.send(Box::new(10)).unwrap();
453 assert!(*rx.recv().unwrap() == 10);
454 }
455
456 #[test]
oneshot_single_thread_try_send_open()457 fn oneshot_single_thread_try_send_open() {
458 let (tx, rx) = channel::<i32>();
459 assert!(tx.send(10).is_ok());
460 assert!(rx.recv().unwrap() == 10);
461 }
462
463 #[test]
oneshot_single_thread_try_send_closed()464 fn oneshot_single_thread_try_send_closed() {
465 let (tx, rx) = channel::<i32>();
466 drop(rx);
467 assert!(tx.send(10).is_err());
468 }
469
470 #[test]
oneshot_single_thread_try_recv_open()471 fn oneshot_single_thread_try_recv_open() {
472 let (tx, rx) = channel::<i32>();
473 tx.send(10).unwrap();
474 assert!(rx.recv() == Ok(10));
475 }
476
477 #[test]
oneshot_single_thread_try_recv_closed()478 fn oneshot_single_thread_try_recv_closed() {
479 let (tx, rx) = channel::<i32>();
480 drop(tx);
481 assert!(rx.recv().is_err());
482 }
483
484 #[test]
oneshot_single_thread_peek_data()485 fn oneshot_single_thread_peek_data() {
486 let (tx, rx) = channel::<i32>();
487 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
488 tx.send(10).unwrap();
489 assert_eq!(rx.try_recv(), Ok(10));
490 }
491
492 #[test]
oneshot_single_thread_peek_close()493 fn oneshot_single_thread_peek_close() {
494 let (tx, rx) = channel::<i32>();
495 drop(tx);
496 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
497 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
498 }
499
500 #[test]
oneshot_single_thread_peek_open()501 fn oneshot_single_thread_peek_open() {
502 let (_tx, rx) = channel::<i32>();
503 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
504 }
505
506 #[test]
oneshot_multi_task_recv_then_send()507 fn oneshot_multi_task_recv_then_send() {
508 let (tx, rx) = channel::<Box<i32>>();
509 let t = thread::spawn(move || {
510 assert!(*rx.recv().unwrap() == 10);
511 });
512
513 tx.send(Box::new(10)).unwrap();
514 t.join().unwrap();
515 }
516
517 #[test]
oneshot_multi_task_recv_then_close()518 fn oneshot_multi_task_recv_then_close() {
519 let (tx, rx) = channel::<Box<i32>>();
520 let t = thread::spawn(move || {
521 drop(tx);
522 });
523 thread::spawn(move || {
524 assert_eq!(rx.recv(), Err(RecvError));
525 })
526 .join()
527 .unwrap();
528 t.join().unwrap();
529 }
530
531 #[test]
oneshot_multi_thread_close_stress()532 fn oneshot_multi_thread_close_stress() {
533 let stress_factor = stress_factor();
534 let mut ts = Vec::with_capacity(stress_factor);
535 for _ in 0..stress_factor {
536 let (tx, rx) = channel::<i32>();
537 let t = thread::spawn(move || {
538 drop(rx);
539 });
540 ts.push(t);
541 drop(tx);
542 }
543 for t in ts {
544 t.join().unwrap();
545 }
546 }
547
548 #[test]
oneshot_multi_thread_send_close_stress()549 fn oneshot_multi_thread_send_close_stress() {
550 let stress_factor = stress_factor();
551 let mut ts = Vec::with_capacity(2 * stress_factor);
552 for _ in 0..stress_factor {
553 let (tx, rx) = channel::<i32>();
554 let t = thread::spawn(move || {
555 drop(rx);
556 });
557 ts.push(t);
558 thread::spawn(move || {
559 let _ = tx.send(1);
560 })
561 .join()
562 .unwrap();
563 }
564 for t in ts {
565 t.join().unwrap();
566 }
567 }
568
569 #[test]
oneshot_multi_thread_recv_close_stress()570 fn oneshot_multi_thread_recv_close_stress() {
571 let stress_factor = stress_factor();
572 let mut ts = Vec::with_capacity(2 * stress_factor);
573 for _ in 0..stress_factor {
574 let (tx, rx) = channel::<i32>();
575 let t = thread::spawn(move || {
576 thread::spawn(move || {
577 assert_eq!(rx.recv(), Err(RecvError));
578 })
579 .join()
580 .unwrap();
581 });
582 ts.push(t);
583 let t2 = thread::spawn(move || {
584 let t = thread::spawn(move || {
585 drop(tx);
586 });
587 t.join().unwrap();
588 });
589 ts.push(t2);
590 }
591 for t in ts {
592 t.join().unwrap();
593 }
594 }
595
596 #[test]
oneshot_multi_thread_send_recv_stress()597 fn oneshot_multi_thread_send_recv_stress() {
598 let stress_factor = stress_factor();
599 let mut ts = Vec::with_capacity(stress_factor);
600 for _ in 0..stress_factor {
601 let (tx, rx) = channel::<Box<isize>>();
602 let t = thread::spawn(move || {
603 tx.send(Box::new(10)).unwrap();
604 });
605 ts.push(t);
606 assert!(*rx.recv().unwrap() == 10);
607 }
608 for t in ts {
609 t.join().unwrap();
610 }
611 }
612
613 #[test]
stream_send_recv_stress()614 fn stream_send_recv_stress() {
615 let stress_factor = stress_factor();
616 let mut ts = Vec::with_capacity(2 * stress_factor);
617 for _ in 0..stress_factor {
618 let (tx, rx) = channel();
619
620 if let Some(t) = send(tx, 0) {
621 ts.push(t);
622 }
623 if let Some(t2) = recv(rx, 0) {
624 ts.push(t2);
625 }
626
627 fn send(tx: Sender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
628 if i == 10 {
629 return None;
630 }
631
632 Some(thread::spawn(move || {
633 tx.send(Box::new(i)).unwrap();
634 send(tx, i + 1);
635 }))
636 }
637
638 fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
639 if i == 10 {
640 return None;
641 }
642
643 Some(thread::spawn(move || {
644 assert!(*rx.recv().unwrap() == i);
645 recv(rx, i + 1);
646 }))
647 }
648 }
649 for t in ts {
650 t.join().unwrap();
651 }
652 }
653
654 #[test]
oneshot_single_thread_recv_timeout()655 fn oneshot_single_thread_recv_timeout() {
656 let (tx, rx) = channel();
657 tx.send(()).unwrap();
658 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
659 assert_eq!(
660 rx.recv_timeout(Duration::from_millis(1)),
661 Err(RecvTimeoutError::Timeout)
662 );
663 tx.send(()).unwrap();
664 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
665 }
666
667 #[test]
stress_recv_timeout_two_threads()668 fn stress_recv_timeout_two_threads() {
669 let (tx, rx) = channel();
670 let stress = stress_factor() + 100;
671 let timeout = Duration::from_millis(100);
672
673 let t = thread::spawn(move || {
674 for i in 0..stress {
675 if i % 2 == 0 {
676 thread::sleep(timeout * 2);
677 }
678 tx.send(1usize).unwrap();
679 }
680 });
681
682 let mut recv_count = 0;
683 loop {
684 match rx.recv_timeout(timeout) {
685 Ok(n) => {
686 assert_eq!(n, 1usize);
687 recv_count += 1;
688 }
689 Err(RecvTimeoutError::Timeout) => continue,
690 Err(RecvTimeoutError::Disconnected) => break,
691 }
692 }
693
694 assert_eq!(recv_count, stress);
695 t.join().unwrap()
696 }
697
698 #[test]
recv_timeout_upgrade()699 fn recv_timeout_upgrade() {
700 let (tx, rx) = channel::<()>();
701 let timeout = Duration::from_millis(1);
702 let _tx_clone = tx.clone();
703
704 let start = Instant::now();
705 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
706 assert!(Instant::now() >= start + timeout);
707 }
708
709 #[test]
stress_recv_timeout_shared()710 fn stress_recv_timeout_shared() {
711 let (tx, rx) = channel();
712 let stress = stress_factor() + 100;
713
714 let mut ts = Vec::with_capacity(stress);
715 for i in 0..stress {
716 let tx = tx.clone();
717 let t = thread::spawn(move || {
718 thread::sleep(Duration::from_millis(i as u64 * 10));
719 tx.send(1usize).unwrap();
720 });
721 ts.push(t);
722 }
723
724 drop(tx);
725
726 let mut recv_count = 0;
727 loop {
728 match rx.recv_timeout(Duration::from_millis(10)) {
729 Ok(n) => {
730 assert_eq!(n, 1usize);
731 recv_count += 1;
732 }
733 Err(RecvTimeoutError::Timeout) => continue,
734 Err(RecvTimeoutError::Disconnected) => break,
735 }
736 }
737
738 assert_eq!(recv_count, stress);
739 for t in ts {
740 t.join().unwrap();
741 }
742 }
743
744 #[test]
recv_a_lot()745 fn recv_a_lot() {
746 #[cfg(miri)]
747 const N: usize = 50;
748 #[cfg(not(miri))]
749 const N: usize = 10000;
750
751 // Regression test that we don't run out of stack in scheduler context
752 let (tx, rx) = channel();
753 for _ in 0..N {
754 tx.send(()).unwrap();
755 }
756 for _ in 0..N {
757 rx.recv().unwrap();
758 }
759 }
760
761 #[test]
shared_recv_timeout()762 fn shared_recv_timeout() {
763 let (tx, rx) = channel();
764 let total = 5;
765 let mut ts = Vec::with_capacity(total);
766 for _ in 0..total {
767 let tx = tx.clone();
768 let t = thread::spawn(move || {
769 tx.send(()).unwrap();
770 });
771 ts.push(t);
772 }
773
774 for _ in 0..total {
775 rx.recv().unwrap();
776 }
777
778 assert_eq!(
779 rx.recv_timeout(Duration::from_millis(1)),
780 Err(RecvTimeoutError::Timeout)
781 );
782 tx.send(()).unwrap();
783 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
784 for t in ts {
785 t.join().unwrap();
786 }
787 }
788
789 #[test]
shared_chan_stress()790 fn shared_chan_stress() {
791 let (tx, rx) = channel();
792 let total = stress_factor() + 100;
793 let mut ts = Vec::with_capacity(total);
794 for _ in 0..total {
795 let tx = tx.clone();
796 let t = thread::spawn(move || {
797 tx.send(()).unwrap();
798 });
799 ts.push(t);
800 }
801
802 for _ in 0..total {
803 rx.recv().unwrap();
804 }
805 for t in ts {
806 t.join().unwrap();
807 }
808 }
809
810 #[test]
test_nested_recv_iter()811 fn test_nested_recv_iter() {
812 let (tx, rx) = channel::<i32>();
813 let (total_tx, total_rx) = channel::<i32>();
814
815 let t = thread::spawn(move || {
816 let mut acc = 0;
817 for x in rx.iter() {
818 acc += x;
819 }
820 total_tx.send(acc).unwrap();
821 });
822
823 tx.send(3).unwrap();
824 tx.send(1).unwrap();
825 tx.send(2).unwrap();
826 drop(tx);
827 assert_eq!(total_rx.recv().unwrap(), 6);
828 t.join().unwrap();
829 }
830
831 #[test]
test_recv_iter_break()832 fn test_recv_iter_break() {
833 let (tx, rx) = channel::<i32>();
834 let (count_tx, count_rx) = channel();
835
836 let t = thread::spawn(move || {
837 let mut count = 0;
838 for x in rx.iter() {
839 if count >= 3 {
840 break;
841 } else {
842 count += x;
843 }
844 }
845 count_tx.send(count).unwrap();
846 });
847
848 tx.send(2).unwrap();
849 tx.send(2).unwrap();
850 tx.send(2).unwrap();
851 let _ = tx.send(2);
852 drop(tx);
853 assert_eq!(count_rx.recv().unwrap(), 4);
854 t.join().unwrap();
855 }
856
857 #[test]
test_recv_try_iter()858 fn test_recv_try_iter() {
859 let (request_tx, request_rx) = channel();
860 let (response_tx, response_rx) = channel();
861
862 // Request `x`s until we have `6`.
863 let t = thread::spawn(move || {
864 let mut count = 0;
865 loop {
866 for x in response_rx.try_iter() {
867 count += x;
868 if count == 6 {
869 return count;
870 }
871 }
872 request_tx.send(()).unwrap();
873 }
874 });
875
876 for _ in request_rx.iter() {
877 if response_tx.send(2).is_err() {
878 break;
879 }
880 }
881
882 assert_eq!(t.join().unwrap(), 6);
883 }
884
885 #[test]
test_recv_into_iter_owned()886 fn test_recv_into_iter_owned() {
887 let mut iter = {
888 let (tx, rx) = channel::<i32>();
889 tx.send(1).unwrap();
890 tx.send(2).unwrap();
891
892 rx.into_iter()
893 };
894 assert_eq!(iter.next().unwrap(), 1);
895 assert_eq!(iter.next().unwrap(), 2);
896 assert!(iter.next().is_none());
897 }
898
899 #[test]
test_recv_into_iter_borrowed()900 fn test_recv_into_iter_borrowed() {
901 let (tx, rx) = channel::<i32>();
902 tx.send(1).unwrap();
903 tx.send(2).unwrap();
904 drop(tx);
905 let mut iter = (&rx).into_iter();
906 assert_eq!(iter.next().unwrap(), 1);
907 assert_eq!(iter.next().unwrap(), 2);
908 assert!(iter.next().is_none());
909 }
910
911 #[test]
try_recv_states()912 fn try_recv_states() {
913 let (tx1, rx1) = channel::<i32>();
914 let (tx2, rx2) = channel::<()>();
915 let (tx3, rx3) = channel::<()>();
916 let t = thread::spawn(move || {
917 rx2.recv().unwrap();
918 tx1.send(1).unwrap();
919 tx3.send(()).unwrap();
920 rx2.recv().unwrap();
921 drop(tx1);
922 tx3.send(()).unwrap();
923 });
924
925 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
926 tx2.send(()).unwrap();
927 rx3.recv().unwrap();
928 assert_eq!(rx1.try_recv(), Ok(1));
929 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
930 tx2.send(()).unwrap();
931 rx3.recv().unwrap();
932 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
933 t.join().unwrap();
934 }
935
936 // This bug used to end up in a livelock inside of the Receiver destructor
937 // because the internal state of the Shared packet was corrupted
938 #[test]
destroy_upgraded_shared_port_when_sender_still_active()939 fn destroy_upgraded_shared_port_when_sender_still_active() {
940 let (tx, rx) = channel();
941 let (tx2, rx2) = channel();
942 let t = thread::spawn(move || {
943 rx.recv().unwrap(); // wait on a oneshot
944 drop(rx); // destroy a shared
945 tx2.send(()).unwrap();
946 });
947 // make sure the other thread has gone to sleep
948 for _ in 0..5000 {
949 thread::yield_now();
950 }
951
952 // upgrade to a shared chan and send a message
953 let tx2 = tx.clone();
954 drop(tx);
955 tx2.send(()).unwrap();
956
957 // wait for the child thread to exit before we exit
958 rx2.recv().unwrap();
959 t.join().unwrap();
960 }
961
962 #[test]
issue_32114()963 fn issue_32114() {
964 let (tx, _) = channel();
965 let _ = tx.send(123);
966 assert_eq!(tx.send(123), Err(SendError(123)));
967 }
968 }
969
970 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
971 mod sync_channel_tests {
972 use super::*;
973
974 use std::env;
975 use std::thread;
976 use std::time::Duration;
977
stress_factor() -> usize978 pub fn stress_factor() -> usize {
979 match env::var("RUST_TEST_STRESS") {
980 Ok(val) => val.parse().unwrap(),
981 Err(..) => 1,
982 }
983 }
984
985 #[test]
smoke()986 fn smoke() {
987 let (tx, rx) = sync_channel::<i32>(1);
988 tx.send(1).unwrap();
989 assert_eq!(rx.recv().unwrap(), 1);
990 }
991
992 #[test]
drop_full()993 fn drop_full() {
994 let (tx, _rx) = sync_channel::<Box<isize>>(1);
995 tx.send(Box::new(1)).unwrap();
996 }
997
998 #[test]
smoke_shared()999 fn smoke_shared() {
1000 let (tx, rx) = sync_channel::<i32>(1);
1001 tx.send(1).unwrap();
1002 assert_eq!(rx.recv().unwrap(), 1);
1003 let tx = tx.clone();
1004 tx.send(1).unwrap();
1005 assert_eq!(rx.recv().unwrap(), 1);
1006 }
1007
1008 #[test]
recv_timeout()1009 fn recv_timeout() {
1010 let (tx, rx) = sync_channel::<i32>(1);
1011 assert_eq!(
1012 rx.recv_timeout(Duration::from_millis(1)),
1013 Err(RecvTimeoutError::Timeout)
1014 );
1015 tx.send(1).unwrap();
1016 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
1017 }
1018
1019 #[test]
smoke_threads()1020 fn smoke_threads() {
1021 let (tx, rx) = sync_channel::<i32>(0);
1022 let t = thread::spawn(move || {
1023 tx.send(1).unwrap();
1024 });
1025 assert_eq!(rx.recv().unwrap(), 1);
1026 t.join().unwrap();
1027 }
1028
1029 #[test]
smoke_port_gone()1030 fn smoke_port_gone() {
1031 let (tx, rx) = sync_channel::<i32>(0);
1032 drop(rx);
1033 assert!(tx.send(1).is_err());
1034 }
1035
1036 #[test]
smoke_shared_port_gone2()1037 fn smoke_shared_port_gone2() {
1038 let (tx, rx) = sync_channel::<i32>(0);
1039 drop(rx);
1040 let tx2 = tx.clone();
1041 drop(tx);
1042 assert!(tx2.send(1).is_err());
1043 }
1044
1045 #[test]
port_gone_concurrent()1046 fn port_gone_concurrent() {
1047 let (tx, rx) = sync_channel::<i32>(0);
1048 let t = thread::spawn(move || {
1049 rx.recv().unwrap();
1050 });
1051 while tx.send(1).is_ok() {}
1052 t.join().unwrap();
1053 }
1054
1055 #[test]
port_gone_concurrent_shared()1056 fn port_gone_concurrent_shared() {
1057 let (tx, rx) = sync_channel::<i32>(0);
1058 let tx2 = tx.clone();
1059 let t = thread::spawn(move || {
1060 rx.recv().unwrap();
1061 });
1062 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1063 t.join().unwrap();
1064 }
1065
1066 #[test]
smoke_chan_gone()1067 fn smoke_chan_gone() {
1068 let (tx, rx) = sync_channel::<i32>(0);
1069 drop(tx);
1070 assert!(rx.recv().is_err());
1071 }
1072
1073 #[test]
smoke_chan_gone_shared()1074 fn smoke_chan_gone_shared() {
1075 let (tx, rx) = sync_channel::<()>(0);
1076 let tx2 = tx.clone();
1077 drop(tx);
1078 drop(tx2);
1079 assert!(rx.recv().is_err());
1080 }
1081
1082 #[test]
chan_gone_concurrent()1083 fn chan_gone_concurrent() {
1084 let (tx, rx) = sync_channel::<i32>(0);
1085 let t = thread::spawn(move || {
1086 tx.send(1).unwrap();
1087 tx.send(1).unwrap();
1088 });
1089 while rx.recv().is_ok() {}
1090 t.join().unwrap();
1091 }
1092
1093 #[test]
stress()1094 fn stress() {
1095 #[cfg(miri)]
1096 const N: usize = 100;
1097 #[cfg(not(miri))]
1098 const N: usize = 10000;
1099
1100 let (tx, rx) = sync_channel::<i32>(0);
1101 let t = thread::spawn(move || {
1102 for _ in 0..N {
1103 tx.send(1).unwrap();
1104 }
1105 });
1106 for _ in 0..N {
1107 assert_eq!(rx.recv().unwrap(), 1);
1108 }
1109 t.join().unwrap();
1110 }
1111
1112 #[test]
stress_recv_timeout_two_threads()1113 fn stress_recv_timeout_two_threads() {
1114 #[cfg(miri)]
1115 const N: usize = 100;
1116 #[cfg(not(miri))]
1117 const N: usize = 10000;
1118
1119 let (tx, rx) = sync_channel::<i32>(0);
1120
1121 let t = thread::spawn(move || {
1122 for _ in 0..N {
1123 tx.send(1).unwrap();
1124 }
1125 });
1126
1127 let mut recv_count = 0;
1128 loop {
1129 match rx.recv_timeout(Duration::from_millis(1)) {
1130 Ok(v) => {
1131 assert_eq!(v, 1);
1132 recv_count += 1;
1133 }
1134 Err(RecvTimeoutError::Timeout) => continue,
1135 Err(RecvTimeoutError::Disconnected) => break,
1136 }
1137 }
1138
1139 assert_eq!(recv_count, N);
1140 t.join().unwrap();
1141 }
1142
1143 #[test]
stress_recv_timeout_shared()1144 fn stress_recv_timeout_shared() {
1145 #[cfg(miri)]
1146 const AMT: u32 = 100;
1147 #[cfg(not(miri))]
1148 const AMT: u32 = 1000;
1149 const NTHREADS: u32 = 8;
1150 let (tx, rx) = sync_channel::<i32>(0);
1151 let (dtx, drx) = sync_channel::<()>(0);
1152
1153 let t = thread::spawn(move || {
1154 let mut recv_count = 0;
1155 loop {
1156 match rx.recv_timeout(Duration::from_millis(10)) {
1157 Ok(v) => {
1158 assert_eq!(v, 1);
1159 recv_count += 1;
1160 }
1161 Err(RecvTimeoutError::Timeout) => continue,
1162 Err(RecvTimeoutError::Disconnected) => break,
1163 }
1164 }
1165
1166 assert_eq!(recv_count, AMT * NTHREADS);
1167 assert!(rx.try_recv().is_err());
1168
1169 dtx.send(()).unwrap();
1170 });
1171
1172 let mut ts = Vec::with_capacity(NTHREADS as usize);
1173 for _ in 0..NTHREADS {
1174 let tx = tx.clone();
1175 let t = thread::spawn(move || {
1176 for _ in 0..AMT {
1177 tx.send(1).unwrap();
1178 }
1179 });
1180 ts.push(t);
1181 }
1182
1183 drop(tx);
1184
1185 drx.recv().unwrap();
1186 for t in ts {
1187 t.join().unwrap();
1188 }
1189 t.join().unwrap();
1190 }
1191
1192 #[test]
stress_shared()1193 fn stress_shared() {
1194 #[cfg(miri)]
1195 const AMT: u32 = 100;
1196 #[cfg(not(miri))]
1197 const AMT: u32 = 1000;
1198 const NTHREADS: u32 = 8;
1199 let (tx, rx) = sync_channel::<i32>(0);
1200 let (dtx, drx) = sync_channel::<()>(0);
1201
1202 let t = thread::spawn(move || {
1203 for _ in 0..AMT * NTHREADS {
1204 assert_eq!(rx.recv().unwrap(), 1);
1205 }
1206 assert!(rx.try_recv().is_err());
1207 dtx.send(()).unwrap();
1208 });
1209
1210 let mut ts = Vec::with_capacity(NTHREADS as usize);
1211 for _ in 0..NTHREADS {
1212 let tx = tx.clone();
1213 let t = thread::spawn(move || {
1214 for _ in 0..AMT {
1215 tx.send(1).unwrap();
1216 }
1217 });
1218 ts.push(t);
1219 }
1220 drop(tx);
1221 drx.recv().unwrap();
1222 for t in ts {
1223 t.join().unwrap();
1224 }
1225 t.join().unwrap();
1226 }
1227
1228 #[test]
oneshot_single_thread_close_port_first()1229 fn oneshot_single_thread_close_port_first() {
1230 // Simple test of closing without sending
1231 let (_tx, rx) = sync_channel::<i32>(0);
1232 drop(rx);
1233 }
1234
1235 #[test]
oneshot_single_thread_close_chan_first()1236 fn oneshot_single_thread_close_chan_first() {
1237 // Simple test of closing without sending
1238 let (tx, _rx) = sync_channel::<i32>(0);
1239 drop(tx);
1240 }
1241
1242 #[test]
oneshot_single_thread_send_port_close()1243 fn oneshot_single_thread_send_port_close() {
1244 // Testing that the sender cleans up the payload if receiver is closed
1245 let (tx, rx) = sync_channel::<Box<i32>>(0);
1246 drop(rx);
1247 assert!(tx.send(Box::new(0)).is_err());
1248 }
1249
1250 #[test]
oneshot_single_thread_recv_chan_close()1251 fn oneshot_single_thread_recv_chan_close() {
1252 let (tx, rx) = sync_channel::<i32>(0);
1253 drop(tx);
1254 assert_eq!(rx.recv(), Err(RecvError));
1255 }
1256
1257 #[test]
oneshot_single_thread_send_then_recv()1258 fn oneshot_single_thread_send_then_recv() {
1259 let (tx, rx) = sync_channel::<Box<i32>>(1);
1260 tx.send(Box::new(10)).unwrap();
1261 assert!(*rx.recv().unwrap() == 10);
1262 }
1263
1264 #[test]
oneshot_single_thread_try_send_open()1265 fn oneshot_single_thread_try_send_open() {
1266 let (tx, rx) = sync_channel::<i32>(1);
1267 assert_eq!(tx.try_send(10), Ok(()));
1268 assert!(rx.recv().unwrap() == 10);
1269 }
1270
1271 #[test]
oneshot_single_thread_try_send_closed()1272 fn oneshot_single_thread_try_send_closed() {
1273 let (tx, rx) = sync_channel::<i32>(0);
1274 drop(rx);
1275 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1276 }
1277
1278 #[test]
oneshot_single_thread_try_send_closed2()1279 fn oneshot_single_thread_try_send_closed2() {
1280 let (tx, _rx) = sync_channel::<i32>(0);
1281 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1282 }
1283
1284 #[test]
oneshot_single_thread_try_recv_open()1285 fn oneshot_single_thread_try_recv_open() {
1286 let (tx, rx) = sync_channel::<i32>(1);
1287 tx.send(10).unwrap();
1288 assert!(rx.recv() == Ok(10));
1289 }
1290
1291 #[test]
oneshot_single_thread_try_recv_closed()1292 fn oneshot_single_thread_try_recv_closed() {
1293 let (tx, rx) = sync_channel::<i32>(0);
1294 drop(tx);
1295 assert!(rx.recv().is_err());
1296 }
1297
1298 #[test]
oneshot_single_thread_try_recv_closed_with_data()1299 fn oneshot_single_thread_try_recv_closed_with_data() {
1300 let (tx, rx) = sync_channel::<i32>(1);
1301 tx.send(10).unwrap();
1302 drop(tx);
1303 assert_eq!(rx.try_recv(), Ok(10));
1304 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1305 }
1306
1307 #[test]
oneshot_single_thread_peek_data()1308 fn oneshot_single_thread_peek_data() {
1309 let (tx, rx) = sync_channel::<i32>(1);
1310 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1311 tx.send(10).unwrap();
1312 assert_eq!(rx.try_recv(), Ok(10));
1313 }
1314
1315 #[test]
oneshot_single_thread_peek_close()1316 fn oneshot_single_thread_peek_close() {
1317 let (tx, rx) = sync_channel::<i32>(0);
1318 drop(tx);
1319 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1320 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1321 }
1322
1323 #[test]
oneshot_single_thread_peek_open()1324 fn oneshot_single_thread_peek_open() {
1325 let (_tx, rx) = sync_channel::<i32>(0);
1326 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1327 }
1328
1329 #[test]
oneshot_multi_task_recv_then_send()1330 fn oneshot_multi_task_recv_then_send() {
1331 let (tx, rx) = sync_channel::<Box<i32>>(0);
1332 let t = thread::spawn(move || {
1333 assert!(*rx.recv().unwrap() == 10);
1334 });
1335
1336 tx.send(Box::new(10)).unwrap();
1337 t.join().unwrap();
1338 }
1339
1340 #[test]
oneshot_multi_task_recv_then_close()1341 fn oneshot_multi_task_recv_then_close() {
1342 let (tx, rx) = sync_channel::<Box<i32>>(0);
1343 let t = thread::spawn(move || {
1344 drop(tx);
1345 });
1346 thread::spawn(move || {
1347 assert_eq!(rx.recv(), Err(RecvError));
1348 })
1349 .join()
1350 .unwrap();
1351 t.join().unwrap();
1352 }
1353
1354 #[test]
oneshot_multi_thread_close_stress()1355 fn oneshot_multi_thread_close_stress() {
1356 let stress_factor = stress_factor();
1357 let mut ts = Vec::with_capacity(stress_factor);
1358 for _ in 0..stress_factor {
1359 let (tx, rx) = sync_channel::<i32>(0);
1360 let t = thread::spawn(move || {
1361 drop(rx);
1362 });
1363 ts.push(t);
1364 drop(tx);
1365 }
1366 for t in ts {
1367 t.join().unwrap();
1368 }
1369 }
1370
1371 #[test]
oneshot_multi_thread_send_close_stress()1372 fn oneshot_multi_thread_send_close_stress() {
1373 let stress_factor = stress_factor();
1374 let mut ts = Vec::with_capacity(stress_factor);
1375 for _ in 0..stress_factor {
1376 let (tx, rx) = sync_channel::<i32>(0);
1377 let t = thread::spawn(move || {
1378 drop(rx);
1379 });
1380 ts.push(t);
1381 thread::spawn(move || {
1382 let _ = tx.send(1);
1383 })
1384 .join()
1385 .unwrap();
1386 }
1387 for t in ts {
1388 t.join().unwrap();
1389 }
1390 }
1391
1392 #[test]
oneshot_multi_thread_recv_close_stress()1393 fn oneshot_multi_thread_recv_close_stress() {
1394 let stress_factor = stress_factor();
1395 let mut ts = Vec::with_capacity(2 * stress_factor);
1396 for _ in 0..stress_factor {
1397 let (tx, rx) = sync_channel::<i32>(0);
1398 let t = thread::spawn(move || {
1399 thread::spawn(move || {
1400 assert_eq!(rx.recv(), Err(RecvError));
1401 })
1402 .join()
1403 .unwrap();
1404 });
1405 ts.push(t);
1406 let t2 = thread::spawn(move || {
1407 thread::spawn(move || {
1408 drop(tx);
1409 });
1410 });
1411 ts.push(t2);
1412 }
1413 for t in ts {
1414 t.join().unwrap();
1415 }
1416 }
1417
1418 #[test]
oneshot_multi_thread_send_recv_stress()1419 fn oneshot_multi_thread_send_recv_stress() {
1420 let stress_factor = stress_factor();
1421 let mut ts = Vec::with_capacity(stress_factor);
1422 for _ in 0..stress_factor {
1423 let (tx, rx) = sync_channel::<Box<i32>>(0);
1424 let t = thread::spawn(move || {
1425 tx.send(Box::new(10)).unwrap();
1426 });
1427 ts.push(t);
1428 assert!(*rx.recv().unwrap() == 10);
1429 }
1430 for t in ts {
1431 t.join().unwrap();
1432 }
1433 }
1434
1435 #[test]
stream_send_recv_stress()1436 fn stream_send_recv_stress() {
1437 let stress_factor = stress_factor();
1438 let mut ts = Vec::with_capacity(2 * stress_factor);
1439 for _ in 0..stress_factor {
1440 let (tx, rx) = sync_channel::<Box<i32>>(0);
1441
1442 if let Some(t) = send(tx, 0) {
1443 ts.push(t);
1444 }
1445 if let Some(t) = recv(rx, 0) {
1446 ts.push(t);
1447 }
1448
1449 fn send(tx: SyncSender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1450 if i == 10 {
1451 return None;
1452 }
1453
1454 Some(thread::spawn(move || {
1455 tx.send(Box::new(i)).unwrap();
1456 send(tx, i + 1);
1457 }))
1458 }
1459
1460 fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1461 if i == 10 {
1462 return None;
1463 }
1464
1465 Some(thread::spawn(move || {
1466 assert!(*rx.recv().unwrap() == i);
1467 recv(rx, i + 1);
1468 }))
1469 }
1470 }
1471 for t in ts {
1472 t.join().unwrap();
1473 }
1474 }
1475
1476 #[test]
recv_a_lot()1477 fn recv_a_lot() {
1478 #[cfg(miri)]
1479 const N: usize = 100;
1480 #[cfg(not(miri))]
1481 const N: usize = 10000;
1482
1483 // Regression test that we don't run out of stack in scheduler context
1484 let (tx, rx) = sync_channel(N);
1485 for _ in 0..N {
1486 tx.send(()).unwrap();
1487 }
1488 for _ in 0..N {
1489 rx.recv().unwrap();
1490 }
1491 }
1492
1493 #[test]
shared_chan_stress()1494 fn shared_chan_stress() {
1495 let (tx, rx) = sync_channel(0);
1496 let total = stress_factor() + 100;
1497 let mut ts = Vec::with_capacity(total);
1498 for _ in 0..total {
1499 let tx = tx.clone();
1500 let t = thread::spawn(move || {
1501 tx.send(()).unwrap();
1502 });
1503 ts.push(t);
1504 }
1505
1506 for _ in 0..total {
1507 rx.recv().unwrap();
1508 }
1509 for t in ts {
1510 t.join().unwrap();
1511 }
1512 }
1513
1514 #[test]
test_nested_recv_iter()1515 fn test_nested_recv_iter() {
1516 let (tx, rx) = sync_channel::<i32>(0);
1517 let (total_tx, total_rx) = sync_channel::<i32>(0);
1518
1519 let t = thread::spawn(move || {
1520 let mut acc = 0;
1521 for x in rx.iter() {
1522 acc += x;
1523 }
1524 total_tx.send(acc).unwrap();
1525 });
1526
1527 tx.send(3).unwrap();
1528 tx.send(1).unwrap();
1529 tx.send(2).unwrap();
1530 drop(tx);
1531 assert_eq!(total_rx.recv().unwrap(), 6);
1532 t.join().unwrap();
1533 }
1534
1535 #[test]
test_recv_iter_break()1536 fn test_recv_iter_break() {
1537 let (tx, rx) = sync_channel::<i32>(0);
1538 let (count_tx, count_rx) = sync_channel(0);
1539
1540 let t = thread::spawn(move || {
1541 let mut count = 0;
1542 for x in rx.iter() {
1543 if count >= 3 {
1544 break;
1545 } else {
1546 count += x;
1547 }
1548 }
1549 count_tx.send(count).unwrap();
1550 });
1551
1552 tx.send(2).unwrap();
1553 tx.send(2).unwrap();
1554 tx.send(2).unwrap();
1555 let _ = tx.try_send(2);
1556 drop(tx);
1557 assert_eq!(count_rx.recv().unwrap(), 4);
1558 t.join().unwrap();
1559 }
1560
1561 #[test]
try_recv_states()1562 fn try_recv_states() {
1563 let (tx1, rx1) = sync_channel::<i32>(1);
1564 let (tx2, rx2) = sync_channel::<()>(1);
1565 let (tx3, rx3) = sync_channel::<()>(1);
1566 let t = thread::spawn(move || {
1567 rx2.recv().unwrap();
1568 tx1.send(1).unwrap();
1569 tx3.send(()).unwrap();
1570 rx2.recv().unwrap();
1571 drop(tx1);
1572 tx3.send(()).unwrap();
1573 });
1574
1575 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1576 tx2.send(()).unwrap();
1577 rx3.recv().unwrap();
1578 assert_eq!(rx1.try_recv(), Ok(1));
1579 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1580 tx2.send(()).unwrap();
1581 rx3.recv().unwrap();
1582 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1583 t.join().unwrap();
1584 }
1585
1586 // This bug used to end up in a livelock inside of the Receiver destructor
1587 // because the internal state of the Shared packet was corrupted
1588 #[test]
destroy_upgraded_shared_port_when_sender_still_active()1589 fn destroy_upgraded_shared_port_when_sender_still_active() {
1590 let (tx, rx) = sync_channel::<()>(0);
1591 let (tx2, rx2) = sync_channel::<()>(0);
1592 let t = thread::spawn(move || {
1593 rx.recv().unwrap(); // wait on a oneshot
1594 drop(rx); // destroy a shared
1595 tx2.send(()).unwrap();
1596 });
1597 // make sure the other thread has gone to sleep
1598 for _ in 0..5000 {
1599 thread::yield_now();
1600 }
1601
1602 // upgrade to a shared chan and send a message
1603 let tx2 = tx.clone();
1604 drop(tx);
1605 tx2.send(()).unwrap();
1606
1607 // wait for the child thread to exit before we exit
1608 rx2.recv().unwrap();
1609 t.join().unwrap();
1610 }
1611
1612 #[test]
send1()1613 fn send1() {
1614 let (tx, rx) = sync_channel::<i32>(0);
1615 let t = thread::spawn(move || {
1616 rx.recv().unwrap();
1617 });
1618 assert_eq!(tx.send(1), Ok(()));
1619 t.join().unwrap();
1620 }
1621
1622 #[test]
send2()1623 fn send2() {
1624 let (tx, rx) = sync_channel::<i32>(0);
1625 let t = thread::spawn(move || {
1626 drop(rx);
1627 });
1628 assert!(tx.send(1).is_err());
1629 t.join().unwrap();
1630 }
1631
1632 #[test]
send3()1633 fn send3() {
1634 let (tx, rx) = sync_channel::<i32>(1);
1635 assert_eq!(tx.send(1), Ok(()));
1636 let t = thread::spawn(move || {
1637 drop(rx);
1638 });
1639 assert!(tx.send(1).is_err());
1640 t.join().unwrap();
1641 }
1642
1643 #[test]
send4()1644 fn send4() {
1645 let (tx, rx) = sync_channel::<i32>(0);
1646 let tx2 = tx.clone();
1647 let (done, donerx) = channel();
1648 let done2 = done.clone();
1649 let t = thread::spawn(move || {
1650 assert!(tx.send(1).is_err());
1651 done.send(()).unwrap();
1652 });
1653 let t2 = thread::spawn(move || {
1654 assert!(tx2.send(2).is_err());
1655 done2.send(()).unwrap();
1656 });
1657 drop(rx);
1658 donerx.recv().unwrap();
1659 donerx.recv().unwrap();
1660 t.join().unwrap();
1661 t2.join().unwrap();
1662 }
1663
1664 #[test]
try_send1()1665 fn try_send1() {
1666 let (tx, _rx) = sync_channel::<i32>(0);
1667 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1668 }
1669
1670 #[test]
try_send2()1671 fn try_send2() {
1672 let (tx, _rx) = sync_channel::<i32>(1);
1673 assert_eq!(tx.try_send(1), Ok(()));
1674 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1675 }
1676
1677 #[test]
try_send3()1678 fn try_send3() {
1679 let (tx, rx) = sync_channel::<i32>(1);
1680 assert_eq!(tx.try_send(1), Ok(()));
1681 drop(rx);
1682 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
1683 }
1684
1685 #[test]
issue_15761()1686 fn issue_15761() {
1687 fn repro() {
1688 let (tx1, rx1) = sync_channel::<()>(3);
1689 let (tx2, rx2) = sync_channel::<()>(3);
1690
1691 let _t = thread::spawn(move || {
1692 rx1.recv().unwrap();
1693 tx2.try_send(()).unwrap();
1694 });
1695
1696 tx1.try_send(()).unwrap();
1697 rx2.recv().unwrap();
1698 }
1699
1700 for _ in 0..100 {
1701 repro()
1702 }
1703 }
1704 }
1705
1706 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/select.rs
1707 mod select_tests {
1708 use super::*;
1709
1710 use std::thread;
1711
1712 #[test]
smoke()1713 fn smoke() {
1714 let (tx1, rx1) = channel::<i32>();
1715 let (tx2, rx2) = channel::<i32>();
1716 tx1.send(1).unwrap();
1717 select! {
1718 foo = rx1.recv() => assert_eq!(foo.unwrap(), 1),
1719 _bar = rx2.recv() => panic!()
1720 }
1721 tx2.send(2).unwrap();
1722 select! {
1723 _foo = rx1.recv() => panic!(),
1724 bar = rx2.recv() => assert_eq!(bar.unwrap(), 2)
1725 }
1726 drop(tx1);
1727 select! {
1728 foo = rx1.recv() => assert!(foo.is_err()),
1729 _bar = rx2.recv() => panic!()
1730 }
1731 drop(tx2);
1732 select! {
1733 bar = rx2.recv() => assert!(bar.is_err())
1734 }
1735 }
1736
1737 #[test]
smoke2()1738 fn smoke2() {
1739 let (_tx1, rx1) = channel::<i32>();
1740 let (_tx2, rx2) = channel::<i32>();
1741 let (_tx3, rx3) = channel::<i32>();
1742 let (_tx4, rx4) = channel::<i32>();
1743 let (tx5, rx5) = channel::<i32>();
1744 tx5.send(4).unwrap();
1745 select! {
1746 _foo = rx1.recv() => panic!("1"),
1747 _foo = rx2.recv() => panic!("2"),
1748 _foo = rx3.recv() => panic!("3"),
1749 _foo = rx4.recv() => panic!("4"),
1750 foo = rx5.recv() => assert_eq!(foo.unwrap(), 4)
1751 }
1752 }
1753
1754 #[test]
closed()1755 fn closed() {
1756 let (_tx1, rx1) = channel::<i32>();
1757 let (tx2, rx2) = channel::<i32>();
1758 drop(tx2);
1759
1760 select! {
1761 _a1 = rx1.recv() => panic!(),
1762 a2 = rx2.recv() => assert!(a2.is_err())
1763 }
1764 }
1765
1766 #[test]
unblocks()1767 fn unblocks() {
1768 let (tx1, rx1) = channel::<i32>();
1769 let (_tx2, rx2) = channel::<i32>();
1770 let (tx3, rx3) = channel::<i32>();
1771
1772 let t = thread::spawn(move || {
1773 for _ in 0..20 {
1774 thread::yield_now();
1775 }
1776 tx1.send(1).unwrap();
1777 rx3.recv().unwrap();
1778 for _ in 0..20 {
1779 thread::yield_now();
1780 }
1781 });
1782
1783 select! {
1784 a = rx1.recv() => assert_eq!(a.unwrap(), 1),
1785 _b = rx2.recv() => panic!()
1786 }
1787 tx3.send(1).unwrap();
1788 select! {
1789 a = rx1.recv() => assert!(a.is_err()),
1790 _b = rx2.recv() => panic!()
1791 }
1792 t.join().unwrap();
1793 }
1794
1795 #[test]
both_ready()1796 fn both_ready() {
1797 let (tx1, rx1) = channel::<i32>();
1798 let (tx2, rx2) = channel::<i32>();
1799 let (tx3, rx3) = channel::<()>();
1800
1801 let t = thread::spawn(move || {
1802 for _ in 0..20 {
1803 thread::yield_now();
1804 }
1805 tx1.send(1).unwrap();
1806 tx2.send(2).unwrap();
1807 rx3.recv().unwrap();
1808 });
1809
1810 select! {
1811 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1812 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1813 }
1814 select! {
1815 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1816 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1817 }
1818 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1819 assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
1820 tx3.send(()).unwrap();
1821 t.join().unwrap();
1822 }
1823
1824 #[test]
stress()1825 fn stress() {
1826 #[cfg(miri)]
1827 const AMT: i32 = 100;
1828 #[cfg(not(miri))]
1829 const AMT: i32 = 10000;
1830
1831 let (tx1, rx1) = channel::<i32>();
1832 let (tx2, rx2) = channel::<i32>();
1833 let (tx3, rx3) = channel::<()>();
1834
1835 let t = thread::spawn(move || {
1836 for i in 0..AMT {
1837 if i % 2 == 0 {
1838 tx1.send(i).unwrap();
1839 } else {
1840 tx2.send(i).unwrap();
1841 }
1842 rx3.recv().unwrap();
1843 }
1844 });
1845
1846 for i in 0..AMT {
1847 select! {
1848 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
1849 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
1850 }
1851 tx3.send(()).unwrap();
1852 }
1853 t.join().unwrap();
1854 }
1855
1856 #[allow(unused_must_use)]
1857 #[test]
cloning()1858 fn cloning() {
1859 let (tx1, rx1) = channel::<i32>();
1860 let (_tx2, rx2) = channel::<i32>();
1861 let (tx3, rx3) = channel::<()>();
1862
1863 let t = thread::spawn(move || {
1864 rx3.recv().unwrap();
1865 tx1.clone();
1866 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1867 tx1.send(2).unwrap();
1868 rx3.recv().unwrap();
1869 });
1870
1871 tx3.send(()).unwrap();
1872 select! {
1873 _i1 = rx1.recv() => {},
1874 _i2 = rx2.recv() => panic!()
1875 }
1876 tx3.send(()).unwrap();
1877 t.join().unwrap();
1878 }
1879
1880 #[allow(unused_must_use)]
1881 #[test]
cloning2()1882 fn cloning2() {
1883 let (tx1, rx1) = channel::<i32>();
1884 let (_tx2, rx2) = channel::<i32>();
1885 let (tx3, rx3) = channel::<()>();
1886
1887 let t = thread::spawn(move || {
1888 rx3.recv().unwrap();
1889 tx1.clone();
1890 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1891 tx1.send(2).unwrap();
1892 rx3.recv().unwrap();
1893 });
1894
1895 tx3.send(()).unwrap();
1896 select! {
1897 _i1 = rx1.recv() => {},
1898 _i2 = rx2.recv() => panic!()
1899 }
1900 tx3.send(()).unwrap();
1901 t.join().unwrap();
1902 }
1903
1904 #[test]
cloning3()1905 fn cloning3() {
1906 let (tx1, rx1) = channel::<()>();
1907 let (tx2, rx2) = channel::<()>();
1908 let (tx3, rx3) = channel::<()>();
1909 let t = thread::spawn(move || {
1910 select! {
1911 _ = rx1.recv() => panic!(),
1912 _ = rx2.recv() => {}
1913 }
1914 tx3.send(()).unwrap();
1915 });
1916
1917 for _ in 0..1000 {
1918 thread::yield_now();
1919 }
1920 drop(tx1.clone());
1921 tx2.send(()).unwrap();
1922 rx3.recv().unwrap();
1923 t.join().unwrap();
1924 }
1925
1926 #[test]
preflight1()1927 fn preflight1() {
1928 let (tx, rx) = channel();
1929 tx.send(()).unwrap();
1930 select! {
1931 _n = rx.recv() => {}
1932 }
1933 }
1934
1935 #[test]
preflight2()1936 fn preflight2() {
1937 let (tx, rx) = channel();
1938 tx.send(()).unwrap();
1939 tx.send(()).unwrap();
1940 select! {
1941 _n = rx.recv() => {}
1942 }
1943 }
1944
1945 #[test]
preflight3()1946 fn preflight3() {
1947 let (tx, rx) = channel();
1948 drop(tx.clone());
1949 tx.send(()).unwrap();
1950 select! {
1951 _n = rx.recv() => {}
1952 }
1953 }
1954
1955 #[test]
preflight4()1956 fn preflight4() {
1957 let (tx, rx) = channel();
1958 tx.send(()).unwrap();
1959 select! {
1960 _ = rx.recv() => {}
1961 }
1962 }
1963
1964 #[test]
preflight5()1965 fn preflight5() {
1966 let (tx, rx) = channel();
1967 tx.send(()).unwrap();
1968 tx.send(()).unwrap();
1969 select! {
1970 _ = rx.recv() => {}
1971 }
1972 }
1973
1974 #[test]
preflight6()1975 fn preflight6() {
1976 let (tx, rx) = channel();
1977 drop(tx.clone());
1978 tx.send(()).unwrap();
1979 select! {
1980 _ = rx.recv() => {}
1981 }
1982 }
1983
1984 #[test]
preflight7()1985 fn preflight7() {
1986 let (tx, rx) = channel::<()>();
1987 drop(tx);
1988 select! {
1989 _ = rx.recv() => {}
1990 }
1991 }
1992
1993 #[test]
preflight8()1994 fn preflight8() {
1995 let (tx, rx) = channel();
1996 tx.send(()).unwrap();
1997 drop(tx);
1998 rx.recv().unwrap();
1999 select! {
2000 _ = rx.recv() => {}
2001 }
2002 }
2003
2004 #[test]
preflight9()2005 fn preflight9() {
2006 let (tx, rx) = channel();
2007 drop(tx.clone());
2008 tx.send(()).unwrap();
2009 drop(tx);
2010 rx.recv().unwrap();
2011 select! {
2012 _ = rx.recv() => {}
2013 }
2014 }
2015
2016 #[test]
oneshot_data_waiting()2017 fn oneshot_data_waiting() {
2018 let (tx1, rx1) = channel();
2019 let (tx2, rx2) = channel();
2020 let t = thread::spawn(move || {
2021 select! {
2022 _n = rx1.recv() => {}
2023 }
2024 tx2.send(()).unwrap();
2025 });
2026
2027 for _ in 0..100 {
2028 thread::yield_now()
2029 }
2030 tx1.send(()).unwrap();
2031 rx2.recv().unwrap();
2032 t.join().unwrap();
2033 }
2034
2035 #[test]
stream_data_waiting()2036 fn stream_data_waiting() {
2037 let (tx1, rx1) = channel();
2038 let (tx2, rx2) = channel();
2039 tx1.send(()).unwrap();
2040 tx1.send(()).unwrap();
2041 rx1.recv().unwrap();
2042 rx1.recv().unwrap();
2043 let t = thread::spawn(move || {
2044 select! {
2045 _n = rx1.recv() => {}
2046 }
2047 tx2.send(()).unwrap();
2048 });
2049
2050 for _ in 0..100 {
2051 thread::yield_now()
2052 }
2053 tx1.send(()).unwrap();
2054 rx2.recv().unwrap();
2055 t.join().unwrap();
2056 }
2057
2058 #[test]
shared_data_waiting()2059 fn shared_data_waiting() {
2060 let (tx1, rx1) = channel();
2061 let (tx2, rx2) = channel();
2062 drop(tx1.clone());
2063 tx1.send(()).unwrap();
2064 rx1.recv().unwrap();
2065 let t = thread::spawn(move || {
2066 select! {
2067 _n = rx1.recv() => {}
2068 }
2069 tx2.send(()).unwrap();
2070 });
2071
2072 for _ in 0..100 {
2073 thread::yield_now()
2074 }
2075 tx1.send(()).unwrap();
2076 rx2.recv().unwrap();
2077 t.join().unwrap();
2078 }
2079
2080 #[test]
sync1()2081 fn sync1() {
2082 let (tx, rx) = sync_channel::<i32>(1);
2083 tx.send(1).unwrap();
2084 select! {
2085 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2086 }
2087 }
2088
2089 #[test]
sync2()2090 fn sync2() {
2091 let (tx, rx) = sync_channel::<i32>(0);
2092 let t = thread::spawn(move || {
2093 for _ in 0..100 {
2094 thread::yield_now()
2095 }
2096 tx.send(1).unwrap();
2097 });
2098 select! {
2099 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2100 }
2101 t.join().unwrap();
2102 }
2103
2104 #[test]
sync3()2105 fn sync3() {
2106 let (tx1, rx1) = sync_channel::<i32>(0);
2107 let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
2108 let t = thread::spawn(move || {
2109 tx1.send(1).unwrap();
2110 });
2111 let t2 = thread::spawn(move || {
2112 tx2.send(2).unwrap();
2113 });
2114 select! {
2115 n = rx1.recv() => {
2116 let n = n.unwrap();
2117 assert_eq!(n, 1);
2118 assert_eq!(rx2.recv().unwrap(), 2);
2119 },
2120 n = rx2.recv() => {
2121 let n = n.unwrap();
2122 assert_eq!(n, 2);
2123 assert_eq!(rx1.recv().unwrap(), 1);
2124 }
2125 }
2126 t.join().unwrap();
2127 t2.join().unwrap();
2128 }
2129 }
2130