1 #![allow(clippy::redundant_clone)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4
5 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
6 use wasm_bindgen_test::wasm_bindgen_test as test;
7 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
8 use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
9
10 #[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
11 use tokio::test as maybe_tokio_test;
12
13 use std::fmt;
14 use std::sync::Arc;
15 use tokio::sync::mpsc;
16 use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
17 use tokio_test::*;
18
19 #[cfg(not(target_family = "wasm"))]
20 mod support {
21 pub(crate) mod mpsc_stream;
22 }
23
24 trait AssertSend: Send {}
25 impl AssertSend for mpsc::Sender<i32> {}
26 impl AssertSend for mpsc::Receiver<i32> {}
27
28 #[maybe_tokio_test]
send_recv_with_buffer()29 async fn send_recv_with_buffer() {
30 let (tx, mut rx) = mpsc::channel::<i32>(16);
31
32 // Using poll_ready / try_send
33 // let permit assert_ready_ok!(tx.reserve());
34 let permit = tx.reserve().await.unwrap();
35 permit.send(1);
36
37 // Without poll_ready
38 tx.try_send(2).unwrap();
39
40 drop(tx);
41
42 let val = rx.recv().await;
43 assert_eq!(val, Some(1));
44
45 let val = rx.recv().await;
46 assert_eq!(val, Some(2));
47
48 let val = rx.recv().await;
49 assert!(val.is_none());
50 }
51
52 #[tokio::test]
53 #[cfg(feature = "full")]
reserve_disarm()54 async fn reserve_disarm() {
55 let (tx, mut rx) = mpsc::channel::<i32>(2);
56 let tx1 = tx.clone();
57 let tx2 = tx.clone();
58 let tx3 = tx.clone();
59 let tx4 = tx;
60
61 // We should be able to `poll_ready` two handles without problem
62 let permit1 = assert_ok!(tx1.reserve().await);
63 let permit2 = assert_ok!(tx2.reserve().await);
64
65 // But a third should not be ready
66 let mut r3 = tokio_test::task::spawn(tx3.reserve());
67 assert_pending!(r3.poll());
68
69 let mut r4 = tokio_test::task::spawn(tx4.reserve());
70 assert_pending!(r4.poll());
71
72 // Using one of the reserved slots should allow a new handle to become ready
73 permit1.send(1);
74
75 // We also need to receive for the slot to be free
76 assert!(!r3.is_woken());
77 rx.recv().await.unwrap();
78 // Now there's a free slot!
79 assert!(r3.is_woken());
80 assert!(!r4.is_woken());
81
82 // Dropping a permit should also open up a slot
83 drop(permit2);
84 assert!(r4.is_woken());
85
86 let mut r1 = tokio_test::task::spawn(tx1.reserve());
87 assert_pending!(r1.poll());
88 }
89
90 #[tokio::test]
91 #[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
send_recv_stream_with_buffer()92 async fn send_recv_stream_with_buffer() {
93 use tokio_stream::StreamExt;
94
95 let (tx, rx) = support::mpsc_stream::channel_stream::<i32>(16);
96 let mut rx = Box::pin(rx);
97
98 tokio::spawn(async move {
99 assert_ok!(tx.send(1).await);
100 assert_ok!(tx.send(2).await);
101 });
102
103 assert_eq!(Some(1), rx.next().await);
104 assert_eq!(Some(2), rx.next().await);
105 assert_eq!(None, rx.next().await);
106 }
107
108 #[tokio::test]
109 #[cfg(feature = "full")]
async_send_recv_with_buffer()110 async fn async_send_recv_with_buffer() {
111 let (tx, mut rx) = mpsc::channel(16);
112
113 tokio::spawn(async move {
114 assert_ok!(tx.send(1).await);
115 assert_ok!(tx.send(2).await);
116 });
117
118 assert_eq!(Some(1), rx.recv().await);
119 assert_eq!(Some(2), rx.recv().await);
120 assert_eq!(None, rx.recv().await);
121 }
122
123 #[tokio::test]
124 #[cfg(feature = "full")]
start_send_past_cap()125 async fn start_send_past_cap() {
126 use std::future::Future;
127
128 let mut t1 = tokio_test::task::spawn(());
129
130 let (tx1, mut rx) = mpsc::channel(1);
131 let tx2 = tx1.clone();
132
133 assert_ok!(tx1.try_send(()));
134
135 let mut r1 = Box::pin(tx1.reserve());
136 t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx)));
137
138 {
139 let mut r2 = tokio_test::task::spawn(tx2.reserve());
140 assert_pending!(r2.poll());
141
142 drop(r1);
143
144 assert!(rx.recv().await.is_some());
145
146 assert!(r2.is_woken());
147 assert!(!t1.is_woken());
148 }
149
150 drop(tx1);
151 drop(tx2);
152
153 assert!(rx.recv().await.is_none());
154 }
155
156 #[test]
157 #[should_panic]
158 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
buffer_gteq_one()159 fn buffer_gteq_one() {
160 mpsc::channel::<i32>(0);
161 }
162
163 #[maybe_tokio_test]
send_recv_unbounded()164 async fn send_recv_unbounded() {
165 let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
166
167 // Using `try_send`
168 assert_ok!(tx.send(1));
169 assert_ok!(tx.send(2));
170
171 assert_eq!(rx.recv().await, Some(1));
172 assert_eq!(rx.recv().await, Some(2));
173
174 drop(tx);
175
176 assert!(rx.recv().await.is_none());
177 }
178
179 #[tokio::test]
180 #[cfg(feature = "full")]
async_send_recv_unbounded()181 async fn async_send_recv_unbounded() {
182 let (tx, mut rx) = mpsc::unbounded_channel();
183
184 tokio::spawn(async move {
185 assert_ok!(tx.send(1));
186 assert_ok!(tx.send(2));
187 });
188
189 assert_eq!(Some(1), rx.recv().await);
190 assert_eq!(Some(2), rx.recv().await);
191 assert_eq!(None, rx.recv().await);
192 }
193
194 #[tokio::test]
195 #[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
send_recv_stream_unbounded()196 async fn send_recv_stream_unbounded() {
197 use tokio_stream::StreamExt;
198
199 let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<i32>();
200
201 let mut rx = Box::pin(rx);
202
203 tokio::spawn(async move {
204 assert_ok!(tx.send(1));
205 assert_ok!(tx.send(2));
206 });
207
208 assert_eq!(Some(1), rx.next().await);
209 assert_eq!(Some(2), rx.next().await);
210 assert_eq!(None, rx.next().await);
211 }
212
213 #[maybe_tokio_test]
no_t_bounds_buffer()214 async fn no_t_bounds_buffer() {
215 struct NoImpls;
216
217 let (tx, mut rx) = mpsc::channel(100);
218
219 // sender should be Debug even though T isn't Debug
220 is_debug(&tx);
221 // same with Receiver
222 is_debug(&rx);
223 // and sender should be Clone even though T isn't Clone
224 assert!(tx.clone().try_send(NoImpls).is_ok());
225
226 assert!(rx.recv().await.is_some());
227 }
228
229 #[maybe_tokio_test]
no_t_bounds_unbounded()230 async fn no_t_bounds_unbounded() {
231 struct NoImpls;
232
233 let (tx, mut rx) = mpsc::unbounded_channel();
234
235 // sender should be Debug even though T isn't Debug
236 is_debug(&tx);
237 // same with Receiver
238 is_debug(&rx);
239 // and sender should be Clone even though T isn't Clone
240 assert!(tx.clone().send(NoImpls).is_ok());
241
242 assert!(rx.recv().await.is_some());
243 }
244
245 #[tokio::test]
246 #[cfg(feature = "full")]
send_recv_buffer_limited()247 async fn send_recv_buffer_limited() {
248 let (tx, mut rx) = mpsc::channel::<i32>(1);
249
250 // Reserve capacity
251 let p1 = assert_ok!(tx.reserve().await);
252
253 // Send first message
254 p1.send(1);
255
256 // Not ready
257 let mut p2 = tokio_test::task::spawn(tx.reserve());
258 assert_pending!(p2.poll());
259
260 // Take the value
261 assert!(rx.recv().await.is_some());
262
263 // Notified
264 assert!(p2.is_woken());
265
266 // Trying to send fails
267 assert_err!(tx.try_send(1337));
268
269 // Send second
270 let permit = assert_ready_ok!(p2.poll());
271 permit.send(2);
272
273 assert!(rx.recv().await.is_some());
274 }
275
276 #[maybe_tokio_test]
recv_close_gets_none_idle()277 async fn recv_close_gets_none_idle() {
278 let (tx, mut rx) = mpsc::channel::<i32>(10);
279
280 rx.close();
281
282 assert!(rx.recv().await.is_none());
283
284 assert_err!(tx.send(1).await);
285 }
286
287 #[tokio::test]
288 #[cfg(feature = "full")]
recv_close_gets_none_reserved()289 async fn recv_close_gets_none_reserved() {
290 let (tx1, mut rx) = mpsc::channel::<i32>(1);
291 let tx2 = tx1.clone();
292
293 let permit1 = assert_ok!(tx1.reserve().await);
294 let mut permit2 = tokio_test::task::spawn(tx2.reserve());
295 assert_pending!(permit2.poll());
296
297 rx.close();
298
299 assert!(permit2.is_woken());
300 assert_ready_err!(permit2.poll());
301
302 {
303 let mut recv = tokio_test::task::spawn(rx.recv());
304 assert_pending!(recv.poll());
305
306 permit1.send(123);
307 assert!(recv.is_woken());
308
309 let v = assert_ready!(recv.poll());
310 assert_eq!(v, Some(123));
311 }
312
313 assert!(rx.recv().await.is_none());
314 }
315
316 #[maybe_tokio_test]
tx_close_gets_none()317 async fn tx_close_gets_none() {
318 let (_, mut rx) = mpsc::channel::<i32>(10);
319 assert!(rx.recv().await.is_none());
320 }
321
322 #[maybe_tokio_test]
try_send_fail()323 async fn try_send_fail() {
324 let (tx, mut rx) = mpsc::channel(1);
325
326 tx.try_send("hello").unwrap();
327
328 // This should fail
329 match assert_err!(tx.try_send("fail")) {
330 TrySendError::Full(..) => {}
331 _ => panic!(),
332 }
333
334 assert_eq!(rx.recv().await, Some("hello"));
335
336 assert_ok!(tx.try_send("goodbye"));
337 drop(tx);
338
339 assert_eq!(rx.recv().await, Some("goodbye"));
340 assert!(rx.recv().await.is_none());
341 }
342
343 #[maybe_tokio_test]
try_send_fail_with_try_recv()344 async fn try_send_fail_with_try_recv() {
345 let (tx, mut rx) = mpsc::channel(1);
346
347 tx.try_send("hello").unwrap();
348
349 // This should fail
350 match assert_err!(tx.try_send("fail")) {
351 TrySendError::Full(..) => {}
352 _ => panic!(),
353 }
354
355 assert_eq!(rx.try_recv(), Ok("hello"));
356
357 assert_ok!(tx.try_send("goodbye"));
358 drop(tx);
359
360 assert_eq!(rx.try_recv(), Ok("goodbye"));
361 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
362 }
363
364 #[maybe_tokio_test]
try_reserve_fails()365 async fn try_reserve_fails() {
366 let (tx, mut rx) = mpsc::channel(1);
367
368 let permit = tx.try_reserve().unwrap();
369
370 // This should fail
371 match assert_err!(tx.try_reserve()) {
372 TrySendError::Full(()) => {}
373 _ => panic!(),
374 }
375
376 permit.send("foo");
377
378 assert_eq!(rx.recv().await, Some("foo"));
379
380 // Dropping permit releases the slot.
381 let permit = tx.try_reserve().unwrap();
382 drop(permit);
383
384 let _permit = tx.try_reserve().unwrap();
385 }
386
387 #[tokio::test]
388 #[cfg(feature = "full")]
drop_permit_releases_permit()389 async fn drop_permit_releases_permit() {
390 // poll_ready reserves capacity, ensure that the capacity is released if tx
391 // is dropped w/o sending a value.
392 let (tx1, _rx) = mpsc::channel::<i32>(1);
393 let tx2 = tx1.clone();
394
395 let permit = assert_ok!(tx1.reserve().await);
396
397 let mut reserve2 = tokio_test::task::spawn(tx2.reserve());
398 assert_pending!(reserve2.poll());
399
400 drop(permit);
401
402 assert!(reserve2.is_woken());
403 assert_ready_ok!(reserve2.poll());
404 }
405
406 #[maybe_tokio_test]
dropping_rx_closes_channel()407 async fn dropping_rx_closes_channel() {
408 let (tx, rx) = mpsc::channel(100);
409
410 let msg = Arc::new(());
411 assert_ok!(tx.try_send(msg.clone()));
412
413 drop(rx);
414 assert_err!(tx.reserve().await);
415 assert_eq!(1, Arc::strong_count(&msg));
416 }
417
418 #[test]
dropping_rx_closes_channel_for_try()419 fn dropping_rx_closes_channel_for_try() {
420 let (tx, rx) = mpsc::channel(100);
421
422 let msg = Arc::new(());
423 tx.try_send(msg.clone()).unwrap();
424
425 drop(rx);
426
427 assert!(matches!(
428 tx.try_send(msg.clone()),
429 Err(TrySendError::Closed(_))
430 ));
431 assert!(matches!(tx.try_reserve(), Err(TrySendError::Closed(_))));
432 assert!(matches!(
433 tx.try_reserve_owned(),
434 Err(TrySendError::Closed(_))
435 ));
436
437 assert_eq!(1, Arc::strong_count(&msg));
438 }
439
440 #[test]
unconsumed_messages_are_dropped()441 fn unconsumed_messages_are_dropped() {
442 let msg = Arc::new(());
443
444 let (tx, rx) = mpsc::channel(100);
445
446 tx.try_send(msg.clone()).unwrap();
447
448 assert_eq!(2, Arc::strong_count(&msg));
449
450 drop((tx, rx));
451
452 assert_eq!(1, Arc::strong_count(&msg));
453 }
454
455 #[test]
456 #[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
blocking_recv()457 fn blocking_recv() {
458 let (tx, mut rx) = mpsc::channel::<u8>(1);
459
460 let sync_code = std::thread::spawn(move || {
461 assert_eq!(Some(10), rx.blocking_recv());
462 });
463
464 tokio::runtime::Runtime::new()
465 .unwrap()
466 .block_on(async move {
467 let _ = tx.send(10).await;
468 });
469 sync_code.join().unwrap()
470 }
471
472 #[tokio::test]
473 #[should_panic]
474 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
blocking_recv_async()475 async fn blocking_recv_async() {
476 let (_tx, mut rx) = mpsc::channel::<()>(1);
477 let _ = rx.blocking_recv();
478 }
479
480 #[test]
481 #[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
blocking_send()482 fn blocking_send() {
483 let (tx, mut rx) = mpsc::channel::<u8>(1);
484
485 let sync_code = std::thread::spawn(move || {
486 tx.blocking_send(10).unwrap();
487 });
488
489 tokio::runtime::Runtime::new()
490 .unwrap()
491 .block_on(async move {
492 assert_eq!(Some(10), rx.recv().await);
493 });
494 sync_code.join().unwrap()
495 }
496
497 #[tokio::test]
498 #[should_panic]
499 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
blocking_send_async()500 async fn blocking_send_async() {
501 let (tx, _rx) = mpsc::channel::<()>(1);
502 let _ = tx.blocking_send(());
503 }
504
505 #[tokio::test]
506 #[cfg(feature = "full")]
ready_close_cancel_bounded()507 async fn ready_close_cancel_bounded() {
508 let (tx, mut rx) = mpsc::channel::<()>(100);
509 let _tx2 = tx.clone();
510
511 let permit = assert_ok!(tx.reserve().await);
512
513 rx.close();
514
515 let mut recv = tokio_test::task::spawn(rx.recv());
516 assert_pending!(recv.poll());
517
518 drop(permit);
519
520 assert!(recv.is_woken());
521 let val = assert_ready!(recv.poll());
522 assert!(val.is_none());
523 }
524
525 #[tokio::test]
526 #[cfg(feature = "full")]
permit_available_not_acquired_close()527 async fn permit_available_not_acquired_close() {
528 let (tx1, mut rx) = mpsc::channel::<()>(1);
529 let tx2 = tx1.clone();
530
531 let permit1 = assert_ok!(tx1.reserve().await);
532
533 let mut permit2 = tokio_test::task::spawn(tx2.reserve());
534 assert_pending!(permit2.poll());
535
536 rx.close();
537
538 drop(permit1);
539 assert!(permit2.is_woken());
540
541 drop(permit2);
542 assert!(rx.recv().await.is_none());
543 }
544
545 #[test]
try_recv_bounded()546 fn try_recv_bounded() {
547 let (tx, mut rx) = mpsc::channel(5);
548
549 tx.try_send("hello").unwrap();
550 tx.try_send("hello").unwrap();
551 tx.try_send("hello").unwrap();
552 tx.try_send("hello").unwrap();
553 tx.try_send("hello").unwrap();
554 assert!(tx.try_send("hello").is_err());
555
556 assert_eq!(Ok("hello"), rx.try_recv());
557 assert_eq!(Ok("hello"), rx.try_recv());
558 assert_eq!(Ok("hello"), rx.try_recv());
559 assert_eq!(Ok("hello"), rx.try_recv());
560 assert_eq!(Ok("hello"), rx.try_recv());
561 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
562
563 tx.try_send("hello").unwrap();
564 tx.try_send("hello").unwrap();
565 tx.try_send("hello").unwrap();
566 tx.try_send("hello").unwrap();
567 assert_eq!(Ok("hello"), rx.try_recv());
568 tx.try_send("hello").unwrap();
569 tx.try_send("hello").unwrap();
570 assert!(tx.try_send("hello").is_err());
571 assert_eq!(Ok("hello"), rx.try_recv());
572 assert_eq!(Ok("hello"), rx.try_recv());
573 assert_eq!(Ok("hello"), rx.try_recv());
574 assert_eq!(Ok("hello"), rx.try_recv());
575 assert_eq!(Ok("hello"), rx.try_recv());
576 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
577
578 tx.try_send("hello").unwrap();
579 tx.try_send("hello").unwrap();
580 tx.try_send("hello").unwrap();
581 drop(tx);
582 assert_eq!(Ok("hello"), rx.try_recv());
583 assert_eq!(Ok("hello"), rx.try_recv());
584 assert_eq!(Ok("hello"), rx.try_recv());
585 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
586 }
587
588 #[test]
try_recv_unbounded()589 fn try_recv_unbounded() {
590 for num in 0..100 {
591 let (tx, mut rx) = mpsc::unbounded_channel();
592
593 for i in 0..num {
594 tx.send(i).unwrap();
595 }
596
597 for i in 0..num {
598 assert_eq!(rx.try_recv(), Ok(i));
599 }
600
601 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
602 drop(tx);
603 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
604 }
605 }
606
607 #[test]
try_recv_close_while_empty_bounded()608 fn try_recv_close_while_empty_bounded() {
609 let (tx, mut rx) = mpsc::channel::<()>(5);
610
611 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
612 drop(tx);
613 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
614 }
615
616 #[test]
try_recv_close_while_empty_unbounded()617 fn try_recv_close_while_empty_unbounded() {
618 let (tx, mut rx) = mpsc::unbounded_channel::<()>();
619
620 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
621 drop(tx);
622 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
623 }
624
625 #[tokio::test(start_paused = true)]
626 #[cfg(feature = "full")]
recv_timeout()627 async fn recv_timeout() {
628 use tokio::sync::mpsc::error::SendTimeoutError::{Closed, Timeout};
629 use tokio::time::Duration;
630
631 let (tx, rx) = mpsc::channel(5);
632
633 assert_eq!(tx.send_timeout(10, Duration::from_secs(1)).await, Ok(()));
634 assert_eq!(tx.send_timeout(20, Duration::from_secs(1)).await, Ok(()));
635 assert_eq!(tx.send_timeout(30, Duration::from_secs(1)).await, Ok(()));
636 assert_eq!(tx.send_timeout(40, Duration::from_secs(1)).await, Ok(()));
637 assert_eq!(tx.send_timeout(50, Duration::from_secs(1)).await, Ok(()));
638 assert_eq!(
639 tx.send_timeout(60, Duration::from_secs(1)).await,
640 Err(Timeout(60))
641 );
642
643 drop(rx);
644 assert_eq!(
645 tx.send_timeout(70, Duration::from_secs(1)).await,
646 Err(Closed(70))
647 );
648 }
649
650 #[test]
651 #[should_panic = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"]
652 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
recv_timeout_panic()653 fn recv_timeout_panic() {
654 use futures::future::FutureExt;
655 use tokio::time::Duration;
656
657 let (tx, _rx) = mpsc::channel(5);
658 tx.send_timeout(10, Duration::from_secs(1)).now_or_never();
659 }
660
661 // Tests that channel `capacity` changes and `max_capacity` stays the same
662 #[tokio::test]
test_tx_capacity()663 async fn test_tx_capacity() {
664 let (tx, _rx) = mpsc::channel::<()>(10);
665 // both capacities are same before
666 assert_eq!(tx.capacity(), 10);
667 assert_eq!(tx.max_capacity(), 10);
668
669 let _permit = tx.reserve().await.unwrap();
670 // after reserve, only capacity should drop by one
671 assert_eq!(tx.capacity(), 9);
672 assert_eq!(tx.max_capacity(), 10);
673
674 tx.send(()).await.unwrap();
675 // after send, capacity should drop by one again
676 assert_eq!(tx.capacity(), 8);
677 assert_eq!(tx.max_capacity(), 10);
678 }
679
is_debug<T: fmt::Debug>(_: &T)680 fn is_debug<T: fmt::Debug>(_: &T) {}
681