• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![allow(clippy::redundant_clone)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4 
5 #[cfg(tokio_wasm_not_wasi)]
6 use wasm_bindgen_test::wasm_bindgen_test as test;
7 #[cfg(tokio_wasm_not_wasi)]
8 use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
9 
10 use std::fmt;
11 use std::sync::Arc;
12 use tokio::sync::mpsc;
13 use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
14 #[cfg(not(tokio_wasm_not_wasi))]
15 use tokio::test as maybe_tokio_test;
16 use tokio_test::*;
17 
18 #[cfg(not(tokio_wasm))]
19 mod support {
20     pub(crate) mod mpsc_stream;
21 }
22 
23 trait AssertSend: Send {}
24 impl AssertSend for mpsc::Sender<i32> {}
25 impl AssertSend for mpsc::Receiver<i32> {}
26 
27 #[maybe_tokio_test]
send_recv_with_buffer()28 async fn send_recv_with_buffer() {
29     let (tx, mut rx) = mpsc::channel::<i32>(16);
30 
31     // Using poll_ready / try_send
32     // let permit assert_ready_ok!(tx.reserve());
33     let permit = tx.reserve().await.unwrap();
34     permit.send(1);
35 
36     // Without poll_ready
37     tx.try_send(2).unwrap();
38 
39     drop(tx);
40 
41     let val = rx.recv().await;
42     assert_eq!(val, Some(1));
43 
44     let val = rx.recv().await;
45     assert_eq!(val, Some(2));
46 
47     let val = rx.recv().await;
48     assert!(val.is_none());
49 }
50 
51 #[tokio::test]
52 #[cfg(feature = "full")]
reserve_disarm()53 async fn reserve_disarm() {
54     let (tx, mut rx) = mpsc::channel::<i32>(2);
55     let tx1 = tx.clone();
56     let tx2 = tx.clone();
57     let tx3 = tx.clone();
58     let tx4 = tx;
59 
60     // We should be able to `poll_ready` two handles without problem
61     let permit1 = assert_ok!(tx1.reserve().await);
62     let permit2 = assert_ok!(tx2.reserve().await);
63 
64     // But a third should not be ready
65     let mut r3 = tokio_test::task::spawn(tx3.reserve());
66     assert_pending!(r3.poll());
67 
68     let mut r4 = tokio_test::task::spawn(tx4.reserve());
69     assert_pending!(r4.poll());
70 
71     // Using one of the reserved slots should allow a new handle to become ready
72     permit1.send(1);
73 
74     // We also need to receive for the slot to be free
75     assert!(!r3.is_woken());
76     rx.recv().await.unwrap();
77     // Now there's a free slot!
78     assert!(r3.is_woken());
79     assert!(!r4.is_woken());
80 
81     // Dropping a permit should also open up a slot
82     drop(permit2);
83     assert!(r4.is_woken());
84 
85     let mut r1 = tokio_test::task::spawn(tx1.reserve());
86     assert_pending!(r1.poll());
87 }
88 
89 #[tokio::test]
90 #[cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads
send_recv_stream_with_buffer()91 async fn send_recv_stream_with_buffer() {
92     use tokio_stream::StreamExt;
93 
94     let (tx, rx) = support::mpsc_stream::channel_stream::<i32>(16);
95     let mut rx = Box::pin(rx);
96 
97     tokio::spawn(async move {
98         assert_ok!(tx.send(1).await);
99         assert_ok!(tx.send(2).await);
100     });
101 
102     assert_eq!(Some(1), rx.next().await);
103     assert_eq!(Some(2), rx.next().await);
104     assert_eq!(None, rx.next().await);
105 }
106 
107 #[tokio::test]
108 #[cfg(feature = "full")]
async_send_recv_with_buffer()109 async fn async_send_recv_with_buffer() {
110     let (tx, mut rx) = mpsc::channel(16);
111 
112     tokio::spawn(async move {
113         assert_ok!(tx.send(1).await);
114         assert_ok!(tx.send(2).await);
115     });
116 
117     assert_eq!(Some(1), rx.recv().await);
118     assert_eq!(Some(2), rx.recv().await);
119     assert_eq!(None, rx.recv().await);
120 }
121 
122 #[tokio::test]
123 #[cfg(feature = "full")]
start_send_past_cap()124 async fn start_send_past_cap() {
125     use std::future::Future;
126 
127     let mut t1 = tokio_test::task::spawn(());
128 
129     let (tx1, mut rx) = mpsc::channel(1);
130     let tx2 = tx1.clone();
131 
132     assert_ok!(tx1.try_send(()));
133 
134     let mut r1 = Box::pin(tx1.reserve());
135     t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx)));
136 
137     {
138         let mut r2 = tokio_test::task::spawn(tx2.reserve());
139         assert_pending!(r2.poll());
140 
141         drop(r1);
142 
143         assert!(rx.recv().await.is_some());
144 
145         assert!(r2.is_woken());
146         assert!(!t1.is_woken());
147     }
148 
149     drop(tx1);
150     drop(tx2);
151 
152     assert!(rx.recv().await.is_none());
153 }
154 
155 #[test]
156 #[should_panic]
157 #[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
buffer_gteq_one()158 fn buffer_gteq_one() {
159     mpsc::channel::<i32>(0);
160 }
161 
162 #[maybe_tokio_test]
send_recv_unbounded()163 async fn send_recv_unbounded() {
164     let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
165 
166     // Using `try_send`
167     assert_ok!(tx.send(1));
168     assert_ok!(tx.send(2));
169 
170     assert_eq!(rx.recv().await, Some(1));
171     assert_eq!(rx.recv().await, Some(2));
172 
173     drop(tx);
174 
175     assert!(rx.recv().await.is_none());
176 }
177 
178 #[tokio::test]
179 #[cfg(feature = "full")]
async_send_recv_unbounded()180 async fn async_send_recv_unbounded() {
181     let (tx, mut rx) = mpsc::unbounded_channel();
182 
183     tokio::spawn(async move {
184         assert_ok!(tx.send(1));
185         assert_ok!(tx.send(2));
186     });
187 
188     assert_eq!(Some(1), rx.recv().await);
189     assert_eq!(Some(2), rx.recv().await);
190     assert_eq!(None, rx.recv().await);
191 }
192 
193 #[tokio::test]
194 #[cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads
send_recv_stream_unbounded()195 async fn send_recv_stream_unbounded() {
196     use tokio_stream::StreamExt;
197 
198     let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<i32>();
199 
200     let mut rx = Box::pin(rx);
201 
202     tokio::spawn(async move {
203         assert_ok!(tx.send(1));
204         assert_ok!(tx.send(2));
205     });
206 
207     assert_eq!(Some(1), rx.next().await);
208     assert_eq!(Some(2), rx.next().await);
209     assert_eq!(None, rx.next().await);
210 }
211 
212 #[maybe_tokio_test]
no_t_bounds_buffer()213 async fn no_t_bounds_buffer() {
214     struct NoImpls;
215 
216     let (tx, mut rx) = mpsc::channel(100);
217 
218     // sender should be Debug even though T isn't Debug
219     is_debug(&tx);
220     // same with Receiver
221     is_debug(&rx);
222     // and sender should be Clone even though T isn't Clone
223     assert!(tx.clone().try_send(NoImpls).is_ok());
224 
225     assert!(rx.recv().await.is_some());
226 }
227 
228 #[maybe_tokio_test]
no_t_bounds_unbounded()229 async fn no_t_bounds_unbounded() {
230     struct NoImpls;
231 
232     let (tx, mut rx) = mpsc::unbounded_channel();
233 
234     // sender should be Debug even though T isn't Debug
235     is_debug(&tx);
236     // same with Receiver
237     is_debug(&rx);
238     // and sender should be Clone even though T isn't Clone
239     assert!(tx.clone().send(NoImpls).is_ok());
240 
241     assert!(rx.recv().await.is_some());
242 }
243 
244 #[tokio::test]
245 #[cfg(feature = "full")]
send_recv_buffer_limited()246 async fn send_recv_buffer_limited() {
247     let (tx, mut rx) = mpsc::channel::<i32>(1);
248 
249     // Reserve capacity
250     let p1 = assert_ok!(tx.reserve().await);
251 
252     // Send first message
253     p1.send(1);
254 
255     // Not ready
256     let mut p2 = tokio_test::task::spawn(tx.reserve());
257     assert_pending!(p2.poll());
258 
259     // Take the value
260     assert!(rx.recv().await.is_some());
261 
262     // Notified
263     assert!(p2.is_woken());
264 
265     // Trying to send fails
266     assert_err!(tx.try_send(1337));
267 
268     // Send second
269     let permit = assert_ready_ok!(p2.poll());
270     permit.send(2);
271 
272     assert!(rx.recv().await.is_some());
273 }
274 
275 #[maybe_tokio_test]
recv_close_gets_none_idle()276 async fn recv_close_gets_none_idle() {
277     let (tx, mut rx) = mpsc::channel::<i32>(10);
278 
279     rx.close();
280 
281     assert!(rx.recv().await.is_none());
282 
283     assert_err!(tx.send(1).await);
284 }
285 
286 #[tokio::test]
287 #[cfg(feature = "full")]
recv_close_gets_none_reserved()288 async fn recv_close_gets_none_reserved() {
289     let (tx1, mut rx) = mpsc::channel::<i32>(1);
290     let tx2 = tx1.clone();
291 
292     let permit1 = assert_ok!(tx1.reserve().await);
293     let mut permit2 = tokio_test::task::spawn(tx2.reserve());
294     assert_pending!(permit2.poll());
295 
296     rx.close();
297 
298     assert!(permit2.is_woken());
299     assert_ready_err!(permit2.poll());
300 
301     {
302         let mut recv = tokio_test::task::spawn(rx.recv());
303         assert_pending!(recv.poll());
304 
305         permit1.send(123);
306         assert!(recv.is_woken());
307 
308         let v = assert_ready!(recv.poll());
309         assert_eq!(v, Some(123));
310     }
311 
312     assert!(rx.recv().await.is_none());
313 }
314 
315 #[maybe_tokio_test]
tx_close_gets_none()316 async fn tx_close_gets_none() {
317     let (_, mut rx) = mpsc::channel::<i32>(10);
318     assert!(rx.recv().await.is_none());
319 }
320 
321 #[maybe_tokio_test]
try_send_fail()322 async fn try_send_fail() {
323     let (tx, mut rx) = mpsc::channel(1);
324 
325     tx.try_send("hello").unwrap();
326 
327     // This should fail
328     match assert_err!(tx.try_send("fail")) {
329         TrySendError::Full(..) => {}
330         _ => panic!(),
331     }
332 
333     assert_eq!(rx.recv().await, Some("hello"));
334 
335     assert_ok!(tx.try_send("goodbye"));
336     drop(tx);
337 
338     assert_eq!(rx.recv().await, Some("goodbye"));
339     assert!(rx.recv().await.is_none());
340 }
341 
342 #[maybe_tokio_test]
try_send_fail_with_try_recv()343 async fn try_send_fail_with_try_recv() {
344     let (tx, mut rx) = mpsc::channel(1);
345 
346     tx.try_send("hello").unwrap();
347 
348     // This should fail
349     match assert_err!(tx.try_send("fail")) {
350         TrySendError::Full(..) => {}
351         _ => panic!(),
352     }
353 
354     assert_eq!(rx.try_recv(), Ok("hello"));
355 
356     assert_ok!(tx.try_send("goodbye"));
357     drop(tx);
358 
359     assert_eq!(rx.try_recv(), Ok("goodbye"));
360     assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
361 }
362 
363 #[maybe_tokio_test]
try_reserve_fails()364 async fn try_reserve_fails() {
365     let (tx, mut rx) = mpsc::channel(1);
366 
367     let permit = tx.try_reserve().unwrap();
368 
369     // This should fail
370     match assert_err!(tx.try_reserve()) {
371         TrySendError::Full(()) => {}
372         _ => panic!(),
373     }
374 
375     permit.send("foo");
376 
377     assert_eq!(rx.recv().await, Some("foo"));
378 
379     // Dropping permit releases the slot.
380     let permit = tx.try_reserve().unwrap();
381     drop(permit);
382 
383     let _permit = tx.try_reserve().unwrap();
384 }
385 
386 #[tokio::test]
387 #[cfg(feature = "full")]
drop_permit_releases_permit()388 async fn drop_permit_releases_permit() {
389     // poll_ready reserves capacity, ensure that the capacity is released if tx
390     // is dropped w/o sending a value.
391     let (tx1, _rx) = mpsc::channel::<i32>(1);
392     let tx2 = tx1.clone();
393 
394     let permit = assert_ok!(tx1.reserve().await);
395 
396     let mut reserve2 = tokio_test::task::spawn(tx2.reserve());
397     assert_pending!(reserve2.poll());
398 
399     drop(permit);
400 
401     assert!(reserve2.is_woken());
402     assert_ready_ok!(reserve2.poll());
403 }
404 
405 #[maybe_tokio_test]
dropping_rx_closes_channel()406 async fn dropping_rx_closes_channel() {
407     let (tx, rx) = mpsc::channel(100);
408 
409     let msg = Arc::new(());
410     assert_ok!(tx.try_send(msg.clone()));
411 
412     drop(rx);
413     assert_err!(tx.reserve().await);
414     assert_eq!(1, Arc::strong_count(&msg));
415 }
416 
417 #[test]
dropping_rx_closes_channel_for_try()418 fn dropping_rx_closes_channel_for_try() {
419     let (tx, rx) = mpsc::channel(100);
420 
421     let msg = Arc::new(());
422     tx.try_send(msg.clone()).unwrap();
423 
424     drop(rx);
425 
426     assert!(matches!(
427         tx.try_send(msg.clone()),
428         Err(TrySendError::Closed(_))
429     ));
430     assert!(matches!(tx.try_reserve(), Err(TrySendError::Closed(_))));
431     assert!(matches!(
432         tx.try_reserve_owned(),
433         Err(TrySendError::Closed(_))
434     ));
435 
436     assert_eq!(1, Arc::strong_count(&msg));
437 }
438 
439 #[test]
unconsumed_messages_are_dropped()440 fn unconsumed_messages_are_dropped() {
441     let msg = Arc::new(());
442 
443     let (tx, rx) = mpsc::channel(100);
444 
445     tx.try_send(msg.clone()).unwrap();
446 
447     assert_eq!(2, Arc::strong_count(&msg));
448 
449     drop((tx, rx));
450 
451     assert_eq!(1, Arc::strong_count(&msg));
452 }
453 
454 #[test]
455 #[cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads
blocking_recv()456 fn blocking_recv() {
457     let (tx, mut rx) = mpsc::channel::<u8>(1);
458 
459     let sync_code = std::thread::spawn(move || {
460         assert_eq!(Some(10), rx.blocking_recv());
461     });
462 
463     tokio::runtime::Runtime::new()
464         .unwrap()
465         .block_on(async move {
466             let _ = tx.send(10).await;
467         });
468     sync_code.join().unwrap()
469 }
470 
471 #[tokio::test]
472 #[should_panic]
473 #[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
blocking_recv_async()474 async fn blocking_recv_async() {
475     let (_tx, mut rx) = mpsc::channel::<()>(1);
476     let _ = rx.blocking_recv();
477 }
478 
479 #[test]
480 #[cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads
blocking_send()481 fn blocking_send() {
482     let (tx, mut rx) = mpsc::channel::<u8>(1);
483 
484     let sync_code = std::thread::spawn(move || {
485         tx.blocking_send(10).unwrap();
486     });
487 
488     tokio::runtime::Runtime::new()
489         .unwrap()
490         .block_on(async move {
491             assert_eq!(Some(10), rx.recv().await);
492         });
493     sync_code.join().unwrap()
494 }
495 
496 #[tokio::test]
497 #[should_panic]
498 #[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
blocking_send_async()499 async fn blocking_send_async() {
500     let (tx, _rx) = mpsc::channel::<()>(1);
501     let _ = tx.blocking_send(());
502 }
503 
504 #[tokio::test]
505 #[cfg(feature = "full")]
ready_close_cancel_bounded()506 async fn ready_close_cancel_bounded() {
507     let (tx, mut rx) = mpsc::channel::<()>(100);
508     let _tx2 = tx.clone();
509 
510     let permit = assert_ok!(tx.reserve().await);
511 
512     rx.close();
513 
514     let mut recv = tokio_test::task::spawn(rx.recv());
515     assert_pending!(recv.poll());
516 
517     drop(permit);
518 
519     assert!(recv.is_woken());
520     let val = assert_ready!(recv.poll());
521     assert!(val.is_none());
522 }
523 
524 #[tokio::test]
525 #[cfg(feature = "full")]
permit_available_not_acquired_close()526 async fn permit_available_not_acquired_close() {
527     let (tx1, mut rx) = mpsc::channel::<()>(1);
528     let tx2 = tx1.clone();
529 
530     let permit1 = assert_ok!(tx1.reserve().await);
531 
532     let mut permit2 = tokio_test::task::spawn(tx2.reserve());
533     assert_pending!(permit2.poll());
534 
535     rx.close();
536 
537     drop(permit1);
538     assert!(permit2.is_woken());
539 
540     drop(permit2);
541     assert!(rx.recv().await.is_none());
542 }
543 
544 #[test]
try_recv_bounded()545 fn try_recv_bounded() {
546     let (tx, mut rx) = mpsc::channel(5);
547 
548     tx.try_send("hello").unwrap();
549     tx.try_send("hello").unwrap();
550     tx.try_send("hello").unwrap();
551     tx.try_send("hello").unwrap();
552     tx.try_send("hello").unwrap();
553     assert!(tx.try_send("hello").is_err());
554 
555     assert_eq!(Ok("hello"), rx.try_recv());
556     assert_eq!(Ok("hello"), rx.try_recv());
557     assert_eq!(Ok("hello"), rx.try_recv());
558     assert_eq!(Ok("hello"), rx.try_recv());
559     assert_eq!(Ok("hello"), rx.try_recv());
560     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
561 
562     tx.try_send("hello").unwrap();
563     tx.try_send("hello").unwrap();
564     tx.try_send("hello").unwrap();
565     tx.try_send("hello").unwrap();
566     assert_eq!(Ok("hello"), rx.try_recv());
567     tx.try_send("hello").unwrap();
568     tx.try_send("hello").unwrap();
569     assert!(tx.try_send("hello").is_err());
570     assert_eq!(Ok("hello"), rx.try_recv());
571     assert_eq!(Ok("hello"), rx.try_recv());
572     assert_eq!(Ok("hello"), rx.try_recv());
573     assert_eq!(Ok("hello"), rx.try_recv());
574     assert_eq!(Ok("hello"), rx.try_recv());
575     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
576 
577     tx.try_send("hello").unwrap();
578     tx.try_send("hello").unwrap();
579     tx.try_send("hello").unwrap();
580     drop(tx);
581     assert_eq!(Ok("hello"), rx.try_recv());
582     assert_eq!(Ok("hello"), rx.try_recv());
583     assert_eq!(Ok("hello"), rx.try_recv());
584     assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
585 }
586 
587 #[test]
try_recv_unbounded()588 fn try_recv_unbounded() {
589     for num in 0..100 {
590         let (tx, mut rx) = mpsc::unbounded_channel();
591 
592         for i in 0..num {
593             tx.send(i).unwrap();
594         }
595 
596         for i in 0..num {
597             assert_eq!(rx.try_recv(), Ok(i));
598         }
599 
600         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
601         drop(tx);
602         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
603     }
604 }
605 
606 #[test]
try_recv_close_while_empty_bounded()607 fn try_recv_close_while_empty_bounded() {
608     let (tx, mut rx) = mpsc::channel::<()>(5);
609 
610     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
611     drop(tx);
612     assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
613 }
614 
615 #[test]
try_recv_close_while_empty_unbounded()616 fn try_recv_close_while_empty_unbounded() {
617     let (tx, mut rx) = mpsc::unbounded_channel::<()>();
618 
619     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
620     drop(tx);
621     assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
622 }
623 
624 #[tokio::test(start_paused = true)]
625 #[cfg(feature = "full")]
recv_timeout()626 async fn recv_timeout() {
627     use tokio::sync::mpsc::error::SendTimeoutError::{Closed, Timeout};
628     use tokio::time::Duration;
629 
630     let (tx, rx) = mpsc::channel(5);
631 
632     assert_eq!(tx.send_timeout(10, Duration::from_secs(1)).await, Ok(()));
633     assert_eq!(tx.send_timeout(20, Duration::from_secs(1)).await, Ok(()));
634     assert_eq!(tx.send_timeout(30, Duration::from_secs(1)).await, Ok(()));
635     assert_eq!(tx.send_timeout(40, Duration::from_secs(1)).await, Ok(()));
636     assert_eq!(tx.send_timeout(50, Duration::from_secs(1)).await, Ok(()));
637     assert_eq!(
638         tx.send_timeout(60, Duration::from_secs(1)).await,
639         Err(Timeout(60))
640     );
641 
642     drop(rx);
643     assert_eq!(
644         tx.send_timeout(70, Duration::from_secs(1)).await,
645         Err(Closed(70))
646     );
647 }
648 
649 #[test]
650 #[should_panic = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"]
651 #[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
recv_timeout_panic()652 fn recv_timeout_panic() {
653     use futures::future::FutureExt;
654     use tokio::time::Duration;
655 
656     let (tx, _rx) = mpsc::channel(5);
657     tx.send_timeout(10, Duration::from_secs(1)).now_or_never();
658 }
659 
660 // Tests that channel `capacity` changes and `max_capacity` stays the same
661 #[tokio::test]
test_tx_capacity()662 async fn test_tx_capacity() {
663     let (tx, _rx) = mpsc::channel::<()>(10);
664     // both capacities are same before
665     assert_eq!(tx.capacity(), 10);
666     assert_eq!(tx.max_capacity(), 10);
667 
668     let _permit = tx.reserve().await.unwrap();
669     // after reserve, only capacity should drop by one
670     assert_eq!(tx.capacity(), 9);
671     assert_eq!(tx.max_capacity(), 10);
672 
673     tx.send(()).await.unwrap();
674     // after send, capacity should drop by one again
675     assert_eq!(tx.capacity(), 8);
676     assert_eq!(tx.max_capacity(), 10);
677 }
678 
is_debug<T: fmt::Debug>(_: &T)679 fn is_debug<T: fmt::Debug>(_: &T) {}
680