• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![allow(clippy::redundant_clone)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4 
5 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
6 use wasm_bindgen_test::wasm_bindgen_test as test;
7 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
8 use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
9 
10 #[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
11 use tokio::test as maybe_tokio_test;
12 
13 use std::fmt;
14 use std::sync::Arc;
15 use tokio::sync::mpsc;
16 use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
17 use tokio_test::*;
18 
19 #[cfg(not(target_family = "wasm"))]
20 mod support {
21     pub(crate) mod mpsc_stream;
22 }
23 
24 trait AssertSend: Send {}
25 impl AssertSend for mpsc::Sender<i32> {}
26 impl AssertSend for mpsc::Receiver<i32> {}
27 
28 #[maybe_tokio_test]
send_recv_with_buffer()29 async fn send_recv_with_buffer() {
30     let (tx, mut rx) = mpsc::channel::<i32>(16);
31 
32     // Using poll_ready / try_send
33     // let permit assert_ready_ok!(tx.reserve());
34     let permit = tx.reserve().await.unwrap();
35     permit.send(1);
36 
37     // Without poll_ready
38     tx.try_send(2).unwrap();
39 
40     drop(tx);
41 
42     let val = rx.recv().await;
43     assert_eq!(val, Some(1));
44 
45     let val = rx.recv().await;
46     assert_eq!(val, Some(2));
47 
48     let val = rx.recv().await;
49     assert!(val.is_none());
50 }
51 
52 #[tokio::test]
53 #[cfg(feature = "full")]
reserve_disarm()54 async fn reserve_disarm() {
55     let (tx, mut rx) = mpsc::channel::<i32>(2);
56     let tx1 = tx.clone();
57     let tx2 = tx.clone();
58     let tx3 = tx.clone();
59     let tx4 = tx;
60 
61     // We should be able to `poll_ready` two handles without problem
62     let permit1 = assert_ok!(tx1.reserve().await);
63     let permit2 = assert_ok!(tx2.reserve().await);
64 
65     // But a third should not be ready
66     let mut r3 = tokio_test::task::spawn(tx3.reserve());
67     assert_pending!(r3.poll());
68 
69     let mut r4 = tokio_test::task::spawn(tx4.reserve());
70     assert_pending!(r4.poll());
71 
72     // Using one of the reserved slots should allow a new handle to become ready
73     permit1.send(1);
74 
75     // We also need to receive for the slot to be free
76     assert!(!r3.is_woken());
77     rx.recv().await.unwrap();
78     // Now there's a free slot!
79     assert!(r3.is_woken());
80     assert!(!r4.is_woken());
81 
82     // Dropping a permit should also open up a slot
83     drop(permit2);
84     assert!(r4.is_woken());
85 
86     let mut r1 = tokio_test::task::spawn(tx1.reserve());
87     assert_pending!(r1.poll());
88 }
89 
90 #[tokio::test]
91 #[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
send_recv_stream_with_buffer()92 async fn send_recv_stream_with_buffer() {
93     use tokio_stream::StreamExt;
94 
95     let (tx, rx) = support::mpsc_stream::channel_stream::<i32>(16);
96     let mut rx = Box::pin(rx);
97 
98     tokio::spawn(async move {
99         assert_ok!(tx.send(1).await);
100         assert_ok!(tx.send(2).await);
101     });
102 
103     assert_eq!(Some(1), rx.next().await);
104     assert_eq!(Some(2), rx.next().await);
105     assert_eq!(None, rx.next().await);
106 }
107 
108 #[tokio::test]
109 #[cfg(feature = "full")]
async_send_recv_with_buffer()110 async fn async_send_recv_with_buffer() {
111     let (tx, mut rx) = mpsc::channel(16);
112 
113     tokio::spawn(async move {
114         assert_ok!(tx.send(1).await);
115         assert_ok!(tx.send(2).await);
116     });
117 
118     assert_eq!(Some(1), rx.recv().await);
119     assert_eq!(Some(2), rx.recv().await);
120     assert_eq!(None, rx.recv().await);
121 }
122 
123 #[tokio::test]
124 #[cfg(feature = "full")]
start_send_past_cap()125 async fn start_send_past_cap() {
126     use std::future::Future;
127 
128     let mut t1 = tokio_test::task::spawn(());
129 
130     let (tx1, mut rx) = mpsc::channel(1);
131     let tx2 = tx1.clone();
132 
133     assert_ok!(tx1.try_send(()));
134 
135     let mut r1 = Box::pin(tx1.reserve());
136     t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx)));
137 
138     {
139         let mut r2 = tokio_test::task::spawn(tx2.reserve());
140         assert_pending!(r2.poll());
141 
142         drop(r1);
143 
144         assert!(rx.recv().await.is_some());
145 
146         assert!(r2.is_woken());
147         assert!(!t1.is_woken());
148     }
149 
150     drop(tx1);
151     drop(tx2);
152 
153     assert!(rx.recv().await.is_none());
154 }
155 
156 #[test]
157 #[should_panic]
158 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
buffer_gteq_one()159 fn buffer_gteq_one() {
160     mpsc::channel::<i32>(0);
161 }
162 
163 #[maybe_tokio_test]
send_recv_unbounded()164 async fn send_recv_unbounded() {
165     let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
166 
167     // Using `try_send`
168     assert_ok!(tx.send(1));
169     assert_ok!(tx.send(2));
170 
171     assert_eq!(rx.recv().await, Some(1));
172     assert_eq!(rx.recv().await, Some(2));
173 
174     drop(tx);
175 
176     assert!(rx.recv().await.is_none());
177 }
178 
179 #[tokio::test]
180 #[cfg(feature = "full")]
async_send_recv_unbounded()181 async fn async_send_recv_unbounded() {
182     let (tx, mut rx) = mpsc::unbounded_channel();
183 
184     tokio::spawn(async move {
185         assert_ok!(tx.send(1));
186         assert_ok!(tx.send(2));
187     });
188 
189     assert_eq!(Some(1), rx.recv().await);
190     assert_eq!(Some(2), rx.recv().await);
191     assert_eq!(None, rx.recv().await);
192 }
193 
194 #[tokio::test]
195 #[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
send_recv_stream_unbounded()196 async fn send_recv_stream_unbounded() {
197     use tokio_stream::StreamExt;
198 
199     let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<i32>();
200 
201     let mut rx = Box::pin(rx);
202 
203     tokio::spawn(async move {
204         assert_ok!(tx.send(1));
205         assert_ok!(tx.send(2));
206     });
207 
208     assert_eq!(Some(1), rx.next().await);
209     assert_eq!(Some(2), rx.next().await);
210     assert_eq!(None, rx.next().await);
211 }
212 
213 #[maybe_tokio_test]
no_t_bounds_buffer()214 async fn no_t_bounds_buffer() {
215     struct NoImpls;
216 
217     let (tx, mut rx) = mpsc::channel(100);
218 
219     // sender should be Debug even though T isn't Debug
220     is_debug(&tx);
221     // same with Receiver
222     is_debug(&rx);
223     // and sender should be Clone even though T isn't Clone
224     assert!(tx.clone().try_send(NoImpls).is_ok());
225 
226     assert!(rx.recv().await.is_some());
227 }
228 
229 #[maybe_tokio_test]
no_t_bounds_unbounded()230 async fn no_t_bounds_unbounded() {
231     struct NoImpls;
232 
233     let (tx, mut rx) = mpsc::unbounded_channel();
234 
235     // sender should be Debug even though T isn't Debug
236     is_debug(&tx);
237     // same with Receiver
238     is_debug(&rx);
239     // and sender should be Clone even though T isn't Clone
240     assert!(tx.clone().send(NoImpls).is_ok());
241 
242     assert!(rx.recv().await.is_some());
243 }
244 
245 #[tokio::test]
246 #[cfg(feature = "full")]
send_recv_buffer_limited()247 async fn send_recv_buffer_limited() {
248     let (tx, mut rx) = mpsc::channel::<i32>(1);
249 
250     // Reserve capacity
251     let p1 = assert_ok!(tx.reserve().await);
252 
253     // Send first message
254     p1.send(1);
255 
256     // Not ready
257     let mut p2 = tokio_test::task::spawn(tx.reserve());
258     assert_pending!(p2.poll());
259 
260     // Take the value
261     assert!(rx.recv().await.is_some());
262 
263     // Notified
264     assert!(p2.is_woken());
265 
266     // Trying to send fails
267     assert_err!(tx.try_send(1337));
268 
269     // Send second
270     let permit = assert_ready_ok!(p2.poll());
271     permit.send(2);
272 
273     assert!(rx.recv().await.is_some());
274 }
275 
276 #[maybe_tokio_test]
recv_close_gets_none_idle()277 async fn recv_close_gets_none_idle() {
278     let (tx, mut rx) = mpsc::channel::<i32>(10);
279 
280     rx.close();
281 
282     assert!(rx.recv().await.is_none());
283 
284     assert_err!(tx.send(1).await);
285 }
286 
287 #[tokio::test]
288 #[cfg(feature = "full")]
recv_close_gets_none_reserved()289 async fn recv_close_gets_none_reserved() {
290     let (tx1, mut rx) = mpsc::channel::<i32>(1);
291     let tx2 = tx1.clone();
292 
293     let permit1 = assert_ok!(tx1.reserve().await);
294     let mut permit2 = tokio_test::task::spawn(tx2.reserve());
295     assert_pending!(permit2.poll());
296 
297     rx.close();
298 
299     assert!(permit2.is_woken());
300     assert_ready_err!(permit2.poll());
301 
302     {
303         let mut recv = tokio_test::task::spawn(rx.recv());
304         assert_pending!(recv.poll());
305 
306         permit1.send(123);
307         assert!(recv.is_woken());
308 
309         let v = assert_ready!(recv.poll());
310         assert_eq!(v, Some(123));
311     }
312 
313     assert!(rx.recv().await.is_none());
314 }
315 
316 #[maybe_tokio_test]
tx_close_gets_none()317 async fn tx_close_gets_none() {
318     let (_, mut rx) = mpsc::channel::<i32>(10);
319     assert!(rx.recv().await.is_none());
320 }
321 
322 #[maybe_tokio_test]
try_send_fail()323 async fn try_send_fail() {
324     let (tx, mut rx) = mpsc::channel(1);
325 
326     tx.try_send("hello").unwrap();
327 
328     // This should fail
329     match assert_err!(tx.try_send("fail")) {
330         TrySendError::Full(..) => {}
331         _ => panic!(),
332     }
333 
334     assert_eq!(rx.recv().await, Some("hello"));
335 
336     assert_ok!(tx.try_send("goodbye"));
337     drop(tx);
338 
339     assert_eq!(rx.recv().await, Some("goodbye"));
340     assert!(rx.recv().await.is_none());
341 }
342 
343 #[maybe_tokio_test]
try_send_fail_with_try_recv()344 async fn try_send_fail_with_try_recv() {
345     let (tx, mut rx) = mpsc::channel(1);
346 
347     tx.try_send("hello").unwrap();
348 
349     // This should fail
350     match assert_err!(tx.try_send("fail")) {
351         TrySendError::Full(..) => {}
352         _ => panic!(),
353     }
354 
355     assert_eq!(rx.try_recv(), Ok("hello"));
356 
357     assert_ok!(tx.try_send("goodbye"));
358     drop(tx);
359 
360     assert_eq!(rx.try_recv(), Ok("goodbye"));
361     assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
362 }
363 
364 #[maybe_tokio_test]
try_reserve_fails()365 async fn try_reserve_fails() {
366     let (tx, mut rx) = mpsc::channel(1);
367 
368     let permit = tx.try_reserve().unwrap();
369 
370     // This should fail
371     match assert_err!(tx.try_reserve()) {
372         TrySendError::Full(()) => {}
373         _ => panic!(),
374     }
375 
376     permit.send("foo");
377 
378     assert_eq!(rx.recv().await, Some("foo"));
379 
380     // Dropping permit releases the slot.
381     let permit = tx.try_reserve().unwrap();
382     drop(permit);
383 
384     let _permit = tx.try_reserve().unwrap();
385 }
386 
387 #[tokio::test]
388 #[cfg(feature = "full")]
drop_permit_releases_permit()389 async fn drop_permit_releases_permit() {
390     // poll_ready reserves capacity, ensure that the capacity is released if tx
391     // is dropped w/o sending a value.
392     let (tx1, _rx) = mpsc::channel::<i32>(1);
393     let tx2 = tx1.clone();
394 
395     let permit = assert_ok!(tx1.reserve().await);
396 
397     let mut reserve2 = tokio_test::task::spawn(tx2.reserve());
398     assert_pending!(reserve2.poll());
399 
400     drop(permit);
401 
402     assert!(reserve2.is_woken());
403     assert_ready_ok!(reserve2.poll());
404 }
405 
406 #[maybe_tokio_test]
dropping_rx_closes_channel()407 async fn dropping_rx_closes_channel() {
408     let (tx, rx) = mpsc::channel(100);
409 
410     let msg = Arc::new(());
411     assert_ok!(tx.try_send(msg.clone()));
412 
413     drop(rx);
414     assert_err!(tx.reserve().await);
415     assert_eq!(1, Arc::strong_count(&msg));
416 }
417 
418 #[test]
dropping_rx_closes_channel_for_try()419 fn dropping_rx_closes_channel_for_try() {
420     let (tx, rx) = mpsc::channel(100);
421 
422     let msg = Arc::new(());
423     tx.try_send(msg.clone()).unwrap();
424 
425     drop(rx);
426 
427     assert!(matches!(
428         tx.try_send(msg.clone()),
429         Err(TrySendError::Closed(_))
430     ));
431     assert!(matches!(tx.try_reserve(), Err(TrySendError::Closed(_))));
432     assert!(matches!(
433         tx.try_reserve_owned(),
434         Err(TrySendError::Closed(_))
435     ));
436 
437     assert_eq!(1, Arc::strong_count(&msg));
438 }
439 
440 #[test]
unconsumed_messages_are_dropped()441 fn unconsumed_messages_are_dropped() {
442     let msg = Arc::new(());
443 
444     let (tx, rx) = mpsc::channel(100);
445 
446     tx.try_send(msg.clone()).unwrap();
447 
448     assert_eq!(2, Arc::strong_count(&msg));
449 
450     drop((tx, rx));
451 
452     assert_eq!(1, Arc::strong_count(&msg));
453 }
454 
455 #[test]
456 #[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
blocking_recv()457 fn blocking_recv() {
458     let (tx, mut rx) = mpsc::channel::<u8>(1);
459 
460     let sync_code = std::thread::spawn(move || {
461         assert_eq!(Some(10), rx.blocking_recv());
462     });
463 
464     tokio::runtime::Runtime::new()
465         .unwrap()
466         .block_on(async move {
467             let _ = tx.send(10).await;
468         });
469     sync_code.join().unwrap()
470 }
471 
472 #[tokio::test]
473 #[should_panic]
474 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
blocking_recv_async()475 async fn blocking_recv_async() {
476     let (_tx, mut rx) = mpsc::channel::<()>(1);
477     let _ = rx.blocking_recv();
478 }
479 
480 #[test]
481 #[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
blocking_send()482 fn blocking_send() {
483     let (tx, mut rx) = mpsc::channel::<u8>(1);
484 
485     let sync_code = std::thread::spawn(move || {
486         tx.blocking_send(10).unwrap();
487     });
488 
489     tokio::runtime::Runtime::new()
490         .unwrap()
491         .block_on(async move {
492             assert_eq!(Some(10), rx.recv().await);
493         });
494     sync_code.join().unwrap()
495 }
496 
497 #[tokio::test]
498 #[should_panic]
499 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
blocking_send_async()500 async fn blocking_send_async() {
501     let (tx, _rx) = mpsc::channel::<()>(1);
502     let _ = tx.blocking_send(());
503 }
504 
505 #[tokio::test]
506 #[cfg(feature = "full")]
ready_close_cancel_bounded()507 async fn ready_close_cancel_bounded() {
508     let (tx, mut rx) = mpsc::channel::<()>(100);
509     let _tx2 = tx.clone();
510 
511     let permit = assert_ok!(tx.reserve().await);
512 
513     rx.close();
514 
515     let mut recv = tokio_test::task::spawn(rx.recv());
516     assert_pending!(recv.poll());
517 
518     drop(permit);
519 
520     assert!(recv.is_woken());
521     let val = assert_ready!(recv.poll());
522     assert!(val.is_none());
523 }
524 
525 #[tokio::test]
526 #[cfg(feature = "full")]
permit_available_not_acquired_close()527 async fn permit_available_not_acquired_close() {
528     let (tx1, mut rx) = mpsc::channel::<()>(1);
529     let tx2 = tx1.clone();
530 
531     let permit1 = assert_ok!(tx1.reserve().await);
532 
533     let mut permit2 = tokio_test::task::spawn(tx2.reserve());
534     assert_pending!(permit2.poll());
535 
536     rx.close();
537 
538     drop(permit1);
539     assert!(permit2.is_woken());
540 
541     drop(permit2);
542     assert!(rx.recv().await.is_none());
543 }
544 
545 #[test]
try_recv_bounded()546 fn try_recv_bounded() {
547     let (tx, mut rx) = mpsc::channel(5);
548 
549     tx.try_send("hello").unwrap();
550     tx.try_send("hello").unwrap();
551     tx.try_send("hello").unwrap();
552     tx.try_send("hello").unwrap();
553     tx.try_send("hello").unwrap();
554     assert!(tx.try_send("hello").is_err());
555 
556     assert_eq!(Ok("hello"), rx.try_recv());
557     assert_eq!(Ok("hello"), rx.try_recv());
558     assert_eq!(Ok("hello"), rx.try_recv());
559     assert_eq!(Ok("hello"), rx.try_recv());
560     assert_eq!(Ok("hello"), rx.try_recv());
561     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
562 
563     tx.try_send("hello").unwrap();
564     tx.try_send("hello").unwrap();
565     tx.try_send("hello").unwrap();
566     tx.try_send("hello").unwrap();
567     assert_eq!(Ok("hello"), rx.try_recv());
568     tx.try_send("hello").unwrap();
569     tx.try_send("hello").unwrap();
570     assert!(tx.try_send("hello").is_err());
571     assert_eq!(Ok("hello"), rx.try_recv());
572     assert_eq!(Ok("hello"), rx.try_recv());
573     assert_eq!(Ok("hello"), rx.try_recv());
574     assert_eq!(Ok("hello"), rx.try_recv());
575     assert_eq!(Ok("hello"), rx.try_recv());
576     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
577 
578     tx.try_send("hello").unwrap();
579     tx.try_send("hello").unwrap();
580     tx.try_send("hello").unwrap();
581     drop(tx);
582     assert_eq!(Ok("hello"), rx.try_recv());
583     assert_eq!(Ok("hello"), rx.try_recv());
584     assert_eq!(Ok("hello"), rx.try_recv());
585     assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
586 }
587 
588 #[test]
try_recv_unbounded()589 fn try_recv_unbounded() {
590     for num in 0..100 {
591         let (tx, mut rx) = mpsc::unbounded_channel();
592 
593         for i in 0..num {
594             tx.send(i).unwrap();
595         }
596 
597         for i in 0..num {
598             assert_eq!(rx.try_recv(), Ok(i));
599         }
600 
601         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
602         drop(tx);
603         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
604     }
605 }
606 
607 #[test]
try_recv_close_while_empty_bounded()608 fn try_recv_close_while_empty_bounded() {
609     let (tx, mut rx) = mpsc::channel::<()>(5);
610 
611     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
612     drop(tx);
613     assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
614 }
615 
616 #[test]
try_recv_close_while_empty_unbounded()617 fn try_recv_close_while_empty_unbounded() {
618     let (tx, mut rx) = mpsc::unbounded_channel::<()>();
619 
620     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
621     drop(tx);
622     assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
623 }
624 
625 #[tokio::test(start_paused = true)]
626 #[cfg(feature = "full")]
recv_timeout()627 async fn recv_timeout() {
628     use tokio::sync::mpsc::error::SendTimeoutError::{Closed, Timeout};
629     use tokio::time::Duration;
630 
631     let (tx, rx) = mpsc::channel(5);
632 
633     assert_eq!(tx.send_timeout(10, Duration::from_secs(1)).await, Ok(()));
634     assert_eq!(tx.send_timeout(20, Duration::from_secs(1)).await, Ok(()));
635     assert_eq!(tx.send_timeout(30, Duration::from_secs(1)).await, Ok(()));
636     assert_eq!(tx.send_timeout(40, Duration::from_secs(1)).await, Ok(()));
637     assert_eq!(tx.send_timeout(50, Duration::from_secs(1)).await, Ok(()));
638     assert_eq!(
639         tx.send_timeout(60, Duration::from_secs(1)).await,
640         Err(Timeout(60))
641     );
642 
643     drop(rx);
644     assert_eq!(
645         tx.send_timeout(70, Duration::from_secs(1)).await,
646         Err(Closed(70))
647     );
648 }
649 
650 #[test]
651 #[should_panic = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"]
652 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
recv_timeout_panic()653 fn recv_timeout_panic() {
654     use futures::future::FutureExt;
655     use tokio::time::Duration;
656 
657     let (tx, _rx) = mpsc::channel(5);
658     tx.send_timeout(10, Duration::from_secs(1)).now_or_never();
659 }
660 
661 // Tests that channel `capacity` changes and `max_capacity` stays the same
662 #[tokio::test]
test_tx_capacity()663 async fn test_tx_capacity() {
664     let (tx, _rx) = mpsc::channel::<()>(10);
665     // both capacities are same before
666     assert_eq!(tx.capacity(), 10);
667     assert_eq!(tx.max_capacity(), 10);
668 
669     let _permit = tx.reserve().await.unwrap();
670     // after reserve, only capacity should drop by one
671     assert_eq!(tx.capacity(), 9);
672     assert_eq!(tx.max_capacity(), 10);
673 
674     tx.send(()).await.unwrap();
675     // after send, capacity should drop by one again
676     assert_eq!(tx.capacity(), 8);
677     assert_eq!(tx.max_capacity(), 10);
678 }
679 
is_debug<T: fmt::Debug>(_: &T)680 fn is_debug<T: fmt::Debug>(_: &T) {}
681