1 #![allow(clippy::redundant_clone)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4
5 #[cfg(tokio_wasm_not_wasi)]
6 use wasm_bindgen_test::wasm_bindgen_test as test;
7 #[cfg(tokio_wasm_not_wasi)]
8 use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
9
10 use std::fmt;
11 use std::sync::Arc;
12 use tokio::sync::mpsc;
13 use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
14 #[cfg(not(tokio_wasm_not_wasi))]
15 use tokio::test as maybe_tokio_test;
16 use tokio_test::*;
17
18 #[cfg(not(tokio_wasm))]
19 mod support {
20 pub(crate) mod mpsc_stream;
21 }
22
23 trait AssertSend: Send {}
24 impl AssertSend for mpsc::Sender<i32> {}
25 impl AssertSend for mpsc::Receiver<i32> {}
26
27 #[maybe_tokio_test]
send_recv_with_buffer()28 async fn send_recv_with_buffer() {
29 let (tx, mut rx) = mpsc::channel::<i32>(16);
30
31 // Using poll_ready / try_send
32 // let permit assert_ready_ok!(tx.reserve());
33 let permit = tx.reserve().await.unwrap();
34 permit.send(1);
35
36 // Without poll_ready
37 tx.try_send(2).unwrap();
38
39 drop(tx);
40
41 let val = rx.recv().await;
42 assert_eq!(val, Some(1));
43
44 let val = rx.recv().await;
45 assert_eq!(val, Some(2));
46
47 let val = rx.recv().await;
48 assert!(val.is_none());
49 }
50
51 #[tokio::test]
52 #[cfg(feature = "full")]
reserve_disarm()53 async fn reserve_disarm() {
54 let (tx, mut rx) = mpsc::channel::<i32>(2);
55 let tx1 = tx.clone();
56 let tx2 = tx.clone();
57 let tx3 = tx.clone();
58 let tx4 = tx;
59
60 // We should be able to `poll_ready` two handles without problem
61 let permit1 = assert_ok!(tx1.reserve().await);
62 let permit2 = assert_ok!(tx2.reserve().await);
63
64 // But a third should not be ready
65 let mut r3 = tokio_test::task::spawn(tx3.reserve());
66 assert_pending!(r3.poll());
67
68 let mut r4 = tokio_test::task::spawn(tx4.reserve());
69 assert_pending!(r4.poll());
70
71 // Using one of the reserved slots should allow a new handle to become ready
72 permit1.send(1);
73
74 // We also need to receive for the slot to be free
75 assert!(!r3.is_woken());
76 rx.recv().await.unwrap();
77 // Now there's a free slot!
78 assert!(r3.is_woken());
79 assert!(!r4.is_woken());
80
81 // Dropping a permit should also open up a slot
82 drop(permit2);
83 assert!(r4.is_woken());
84
85 let mut r1 = tokio_test::task::spawn(tx1.reserve());
86 assert_pending!(r1.poll());
87 }
88
89 #[tokio::test]
90 #[cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads
send_recv_stream_with_buffer()91 async fn send_recv_stream_with_buffer() {
92 use tokio_stream::StreamExt;
93
94 let (tx, rx) = support::mpsc_stream::channel_stream::<i32>(16);
95 let mut rx = Box::pin(rx);
96
97 tokio::spawn(async move {
98 assert_ok!(tx.send(1).await);
99 assert_ok!(tx.send(2).await);
100 });
101
102 assert_eq!(Some(1), rx.next().await);
103 assert_eq!(Some(2), rx.next().await);
104 assert_eq!(None, rx.next().await);
105 }
106
107 #[tokio::test]
108 #[cfg(feature = "full")]
async_send_recv_with_buffer()109 async fn async_send_recv_with_buffer() {
110 let (tx, mut rx) = mpsc::channel(16);
111
112 tokio::spawn(async move {
113 assert_ok!(tx.send(1).await);
114 assert_ok!(tx.send(2).await);
115 });
116
117 assert_eq!(Some(1), rx.recv().await);
118 assert_eq!(Some(2), rx.recv().await);
119 assert_eq!(None, rx.recv().await);
120 }
121
122 #[tokio::test]
123 #[cfg(feature = "full")]
start_send_past_cap()124 async fn start_send_past_cap() {
125 use std::future::Future;
126
127 let mut t1 = tokio_test::task::spawn(());
128
129 let (tx1, mut rx) = mpsc::channel(1);
130 let tx2 = tx1.clone();
131
132 assert_ok!(tx1.try_send(()));
133
134 let mut r1 = Box::pin(tx1.reserve());
135 t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx)));
136
137 {
138 let mut r2 = tokio_test::task::spawn(tx2.reserve());
139 assert_pending!(r2.poll());
140
141 drop(r1);
142
143 assert!(rx.recv().await.is_some());
144
145 assert!(r2.is_woken());
146 assert!(!t1.is_woken());
147 }
148
149 drop(tx1);
150 drop(tx2);
151
152 assert!(rx.recv().await.is_none());
153 }
154
155 #[test]
156 #[should_panic]
157 #[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
buffer_gteq_one()158 fn buffer_gteq_one() {
159 mpsc::channel::<i32>(0);
160 }
161
162 #[maybe_tokio_test]
send_recv_unbounded()163 async fn send_recv_unbounded() {
164 let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
165
166 // Using `try_send`
167 assert_ok!(tx.send(1));
168 assert_ok!(tx.send(2));
169
170 assert_eq!(rx.recv().await, Some(1));
171 assert_eq!(rx.recv().await, Some(2));
172
173 drop(tx);
174
175 assert!(rx.recv().await.is_none());
176 }
177
178 #[tokio::test]
179 #[cfg(feature = "full")]
async_send_recv_unbounded()180 async fn async_send_recv_unbounded() {
181 let (tx, mut rx) = mpsc::unbounded_channel();
182
183 tokio::spawn(async move {
184 assert_ok!(tx.send(1));
185 assert_ok!(tx.send(2));
186 });
187
188 assert_eq!(Some(1), rx.recv().await);
189 assert_eq!(Some(2), rx.recv().await);
190 assert_eq!(None, rx.recv().await);
191 }
192
193 #[tokio::test]
194 #[cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads
send_recv_stream_unbounded()195 async fn send_recv_stream_unbounded() {
196 use tokio_stream::StreamExt;
197
198 let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<i32>();
199
200 let mut rx = Box::pin(rx);
201
202 tokio::spawn(async move {
203 assert_ok!(tx.send(1));
204 assert_ok!(tx.send(2));
205 });
206
207 assert_eq!(Some(1), rx.next().await);
208 assert_eq!(Some(2), rx.next().await);
209 assert_eq!(None, rx.next().await);
210 }
211
212 #[maybe_tokio_test]
no_t_bounds_buffer()213 async fn no_t_bounds_buffer() {
214 struct NoImpls;
215
216 let (tx, mut rx) = mpsc::channel(100);
217
218 // sender should be Debug even though T isn't Debug
219 is_debug(&tx);
220 // same with Receiver
221 is_debug(&rx);
222 // and sender should be Clone even though T isn't Clone
223 assert!(tx.clone().try_send(NoImpls).is_ok());
224
225 assert!(rx.recv().await.is_some());
226 }
227
228 #[maybe_tokio_test]
no_t_bounds_unbounded()229 async fn no_t_bounds_unbounded() {
230 struct NoImpls;
231
232 let (tx, mut rx) = mpsc::unbounded_channel();
233
234 // sender should be Debug even though T isn't Debug
235 is_debug(&tx);
236 // same with Receiver
237 is_debug(&rx);
238 // and sender should be Clone even though T isn't Clone
239 assert!(tx.clone().send(NoImpls).is_ok());
240
241 assert!(rx.recv().await.is_some());
242 }
243
244 #[tokio::test]
245 #[cfg(feature = "full")]
send_recv_buffer_limited()246 async fn send_recv_buffer_limited() {
247 let (tx, mut rx) = mpsc::channel::<i32>(1);
248
249 // Reserve capacity
250 let p1 = assert_ok!(tx.reserve().await);
251
252 // Send first message
253 p1.send(1);
254
255 // Not ready
256 let mut p2 = tokio_test::task::spawn(tx.reserve());
257 assert_pending!(p2.poll());
258
259 // Take the value
260 assert!(rx.recv().await.is_some());
261
262 // Notified
263 assert!(p2.is_woken());
264
265 // Trying to send fails
266 assert_err!(tx.try_send(1337));
267
268 // Send second
269 let permit = assert_ready_ok!(p2.poll());
270 permit.send(2);
271
272 assert!(rx.recv().await.is_some());
273 }
274
275 #[maybe_tokio_test]
recv_close_gets_none_idle()276 async fn recv_close_gets_none_idle() {
277 let (tx, mut rx) = mpsc::channel::<i32>(10);
278
279 rx.close();
280
281 assert!(rx.recv().await.is_none());
282
283 assert_err!(tx.send(1).await);
284 }
285
286 #[tokio::test]
287 #[cfg(feature = "full")]
recv_close_gets_none_reserved()288 async fn recv_close_gets_none_reserved() {
289 let (tx1, mut rx) = mpsc::channel::<i32>(1);
290 let tx2 = tx1.clone();
291
292 let permit1 = assert_ok!(tx1.reserve().await);
293 let mut permit2 = tokio_test::task::spawn(tx2.reserve());
294 assert_pending!(permit2.poll());
295
296 rx.close();
297
298 assert!(permit2.is_woken());
299 assert_ready_err!(permit2.poll());
300
301 {
302 let mut recv = tokio_test::task::spawn(rx.recv());
303 assert_pending!(recv.poll());
304
305 permit1.send(123);
306 assert!(recv.is_woken());
307
308 let v = assert_ready!(recv.poll());
309 assert_eq!(v, Some(123));
310 }
311
312 assert!(rx.recv().await.is_none());
313 }
314
315 #[maybe_tokio_test]
tx_close_gets_none()316 async fn tx_close_gets_none() {
317 let (_, mut rx) = mpsc::channel::<i32>(10);
318 assert!(rx.recv().await.is_none());
319 }
320
321 #[maybe_tokio_test]
try_send_fail()322 async fn try_send_fail() {
323 let (tx, mut rx) = mpsc::channel(1);
324
325 tx.try_send("hello").unwrap();
326
327 // This should fail
328 match assert_err!(tx.try_send("fail")) {
329 TrySendError::Full(..) => {}
330 _ => panic!(),
331 }
332
333 assert_eq!(rx.recv().await, Some("hello"));
334
335 assert_ok!(tx.try_send("goodbye"));
336 drop(tx);
337
338 assert_eq!(rx.recv().await, Some("goodbye"));
339 assert!(rx.recv().await.is_none());
340 }
341
342 #[maybe_tokio_test]
try_send_fail_with_try_recv()343 async fn try_send_fail_with_try_recv() {
344 let (tx, mut rx) = mpsc::channel(1);
345
346 tx.try_send("hello").unwrap();
347
348 // This should fail
349 match assert_err!(tx.try_send("fail")) {
350 TrySendError::Full(..) => {}
351 _ => panic!(),
352 }
353
354 assert_eq!(rx.try_recv(), Ok("hello"));
355
356 assert_ok!(tx.try_send("goodbye"));
357 drop(tx);
358
359 assert_eq!(rx.try_recv(), Ok("goodbye"));
360 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
361 }
362
363 #[maybe_tokio_test]
try_reserve_fails()364 async fn try_reserve_fails() {
365 let (tx, mut rx) = mpsc::channel(1);
366
367 let permit = tx.try_reserve().unwrap();
368
369 // This should fail
370 match assert_err!(tx.try_reserve()) {
371 TrySendError::Full(()) => {}
372 _ => panic!(),
373 }
374
375 permit.send("foo");
376
377 assert_eq!(rx.recv().await, Some("foo"));
378
379 // Dropping permit releases the slot.
380 let permit = tx.try_reserve().unwrap();
381 drop(permit);
382
383 let _permit = tx.try_reserve().unwrap();
384 }
385
386 #[tokio::test]
387 #[cfg(feature = "full")]
drop_permit_releases_permit()388 async fn drop_permit_releases_permit() {
389 // poll_ready reserves capacity, ensure that the capacity is released if tx
390 // is dropped w/o sending a value.
391 let (tx1, _rx) = mpsc::channel::<i32>(1);
392 let tx2 = tx1.clone();
393
394 let permit = assert_ok!(tx1.reserve().await);
395
396 let mut reserve2 = tokio_test::task::spawn(tx2.reserve());
397 assert_pending!(reserve2.poll());
398
399 drop(permit);
400
401 assert!(reserve2.is_woken());
402 assert_ready_ok!(reserve2.poll());
403 }
404
405 #[maybe_tokio_test]
dropping_rx_closes_channel()406 async fn dropping_rx_closes_channel() {
407 let (tx, rx) = mpsc::channel(100);
408
409 let msg = Arc::new(());
410 assert_ok!(tx.try_send(msg.clone()));
411
412 drop(rx);
413 assert_err!(tx.reserve().await);
414 assert_eq!(1, Arc::strong_count(&msg));
415 }
416
417 #[test]
dropping_rx_closes_channel_for_try()418 fn dropping_rx_closes_channel_for_try() {
419 let (tx, rx) = mpsc::channel(100);
420
421 let msg = Arc::new(());
422 tx.try_send(msg.clone()).unwrap();
423
424 drop(rx);
425
426 assert!(matches!(
427 tx.try_send(msg.clone()),
428 Err(TrySendError::Closed(_))
429 ));
430 assert!(matches!(tx.try_reserve(), Err(TrySendError::Closed(_))));
431 assert!(matches!(
432 tx.try_reserve_owned(),
433 Err(TrySendError::Closed(_))
434 ));
435
436 assert_eq!(1, Arc::strong_count(&msg));
437 }
438
439 #[test]
unconsumed_messages_are_dropped()440 fn unconsumed_messages_are_dropped() {
441 let msg = Arc::new(());
442
443 let (tx, rx) = mpsc::channel(100);
444
445 tx.try_send(msg.clone()).unwrap();
446
447 assert_eq!(2, Arc::strong_count(&msg));
448
449 drop((tx, rx));
450
451 assert_eq!(1, Arc::strong_count(&msg));
452 }
453
454 #[test]
455 #[cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads
blocking_recv()456 fn blocking_recv() {
457 let (tx, mut rx) = mpsc::channel::<u8>(1);
458
459 let sync_code = std::thread::spawn(move || {
460 assert_eq!(Some(10), rx.blocking_recv());
461 });
462
463 tokio::runtime::Runtime::new()
464 .unwrap()
465 .block_on(async move {
466 let _ = tx.send(10).await;
467 });
468 sync_code.join().unwrap()
469 }
470
471 #[tokio::test]
472 #[should_panic]
473 #[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
blocking_recv_async()474 async fn blocking_recv_async() {
475 let (_tx, mut rx) = mpsc::channel::<()>(1);
476 let _ = rx.blocking_recv();
477 }
478
479 #[test]
480 #[cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads
blocking_send()481 fn blocking_send() {
482 let (tx, mut rx) = mpsc::channel::<u8>(1);
483
484 let sync_code = std::thread::spawn(move || {
485 tx.blocking_send(10).unwrap();
486 });
487
488 tokio::runtime::Runtime::new()
489 .unwrap()
490 .block_on(async move {
491 assert_eq!(Some(10), rx.recv().await);
492 });
493 sync_code.join().unwrap()
494 }
495
496 #[tokio::test]
497 #[should_panic]
498 #[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
blocking_send_async()499 async fn blocking_send_async() {
500 let (tx, _rx) = mpsc::channel::<()>(1);
501 let _ = tx.blocking_send(());
502 }
503
504 #[tokio::test]
505 #[cfg(feature = "full")]
ready_close_cancel_bounded()506 async fn ready_close_cancel_bounded() {
507 let (tx, mut rx) = mpsc::channel::<()>(100);
508 let _tx2 = tx.clone();
509
510 let permit = assert_ok!(tx.reserve().await);
511
512 rx.close();
513
514 let mut recv = tokio_test::task::spawn(rx.recv());
515 assert_pending!(recv.poll());
516
517 drop(permit);
518
519 assert!(recv.is_woken());
520 let val = assert_ready!(recv.poll());
521 assert!(val.is_none());
522 }
523
524 #[tokio::test]
525 #[cfg(feature = "full")]
permit_available_not_acquired_close()526 async fn permit_available_not_acquired_close() {
527 let (tx1, mut rx) = mpsc::channel::<()>(1);
528 let tx2 = tx1.clone();
529
530 let permit1 = assert_ok!(tx1.reserve().await);
531
532 let mut permit2 = tokio_test::task::spawn(tx2.reserve());
533 assert_pending!(permit2.poll());
534
535 rx.close();
536
537 drop(permit1);
538 assert!(permit2.is_woken());
539
540 drop(permit2);
541 assert!(rx.recv().await.is_none());
542 }
543
544 #[test]
try_recv_bounded()545 fn try_recv_bounded() {
546 let (tx, mut rx) = mpsc::channel(5);
547
548 tx.try_send("hello").unwrap();
549 tx.try_send("hello").unwrap();
550 tx.try_send("hello").unwrap();
551 tx.try_send("hello").unwrap();
552 tx.try_send("hello").unwrap();
553 assert!(tx.try_send("hello").is_err());
554
555 assert_eq!(Ok("hello"), rx.try_recv());
556 assert_eq!(Ok("hello"), rx.try_recv());
557 assert_eq!(Ok("hello"), rx.try_recv());
558 assert_eq!(Ok("hello"), rx.try_recv());
559 assert_eq!(Ok("hello"), rx.try_recv());
560 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
561
562 tx.try_send("hello").unwrap();
563 tx.try_send("hello").unwrap();
564 tx.try_send("hello").unwrap();
565 tx.try_send("hello").unwrap();
566 assert_eq!(Ok("hello"), rx.try_recv());
567 tx.try_send("hello").unwrap();
568 tx.try_send("hello").unwrap();
569 assert!(tx.try_send("hello").is_err());
570 assert_eq!(Ok("hello"), rx.try_recv());
571 assert_eq!(Ok("hello"), rx.try_recv());
572 assert_eq!(Ok("hello"), rx.try_recv());
573 assert_eq!(Ok("hello"), rx.try_recv());
574 assert_eq!(Ok("hello"), rx.try_recv());
575 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
576
577 tx.try_send("hello").unwrap();
578 tx.try_send("hello").unwrap();
579 tx.try_send("hello").unwrap();
580 drop(tx);
581 assert_eq!(Ok("hello"), rx.try_recv());
582 assert_eq!(Ok("hello"), rx.try_recv());
583 assert_eq!(Ok("hello"), rx.try_recv());
584 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
585 }
586
587 #[test]
try_recv_unbounded()588 fn try_recv_unbounded() {
589 for num in 0..100 {
590 let (tx, mut rx) = mpsc::unbounded_channel();
591
592 for i in 0..num {
593 tx.send(i).unwrap();
594 }
595
596 for i in 0..num {
597 assert_eq!(rx.try_recv(), Ok(i));
598 }
599
600 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
601 drop(tx);
602 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
603 }
604 }
605
606 #[test]
try_recv_close_while_empty_bounded()607 fn try_recv_close_while_empty_bounded() {
608 let (tx, mut rx) = mpsc::channel::<()>(5);
609
610 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
611 drop(tx);
612 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
613 }
614
615 #[test]
try_recv_close_while_empty_unbounded()616 fn try_recv_close_while_empty_unbounded() {
617 let (tx, mut rx) = mpsc::unbounded_channel::<()>();
618
619 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
620 drop(tx);
621 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
622 }
623
624 #[tokio::test(start_paused = true)]
625 #[cfg(feature = "full")]
recv_timeout()626 async fn recv_timeout() {
627 use tokio::sync::mpsc::error::SendTimeoutError::{Closed, Timeout};
628 use tokio::time::Duration;
629
630 let (tx, rx) = mpsc::channel(5);
631
632 assert_eq!(tx.send_timeout(10, Duration::from_secs(1)).await, Ok(()));
633 assert_eq!(tx.send_timeout(20, Duration::from_secs(1)).await, Ok(()));
634 assert_eq!(tx.send_timeout(30, Duration::from_secs(1)).await, Ok(()));
635 assert_eq!(tx.send_timeout(40, Duration::from_secs(1)).await, Ok(()));
636 assert_eq!(tx.send_timeout(50, Duration::from_secs(1)).await, Ok(()));
637 assert_eq!(
638 tx.send_timeout(60, Duration::from_secs(1)).await,
639 Err(Timeout(60))
640 );
641
642 drop(rx);
643 assert_eq!(
644 tx.send_timeout(70, Duration::from_secs(1)).await,
645 Err(Closed(70))
646 );
647 }
648
649 #[test]
650 #[should_panic = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"]
651 #[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
recv_timeout_panic()652 fn recv_timeout_panic() {
653 use futures::future::FutureExt;
654 use tokio::time::Duration;
655
656 let (tx, _rx) = mpsc::channel(5);
657 tx.send_timeout(10, Duration::from_secs(1)).now_or_never();
658 }
659
660 // Tests that channel `capacity` changes and `max_capacity` stays the same
661 #[tokio::test]
test_tx_capacity()662 async fn test_tx_capacity() {
663 let (tx, _rx) = mpsc::channel::<()>(10);
664 // both capacities are same before
665 assert_eq!(tx.capacity(), 10);
666 assert_eq!(tx.max_capacity(), 10);
667
668 let _permit = tx.reserve().await.unwrap();
669 // after reserve, only capacity should drop by one
670 assert_eq!(tx.capacity(), 9);
671 assert_eq!(tx.max_capacity(), 10);
672
673 tx.send(()).await.unwrap();
674 // after send, capacity should drop by one again
675 assert_eq!(tx.capacity(), 8);
676 assert_eq!(tx.max_capacity(), 10);
677 }
678
is_debug<T: fmt::Debug>(_: &T)679 fn is_debug<T: fmt::Debug>(_: &T) {}
680