1 #![allow(clippy::redundant_clone)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4
5 use std::thread;
6 use tokio::runtime::Runtime;
7 use tokio::sync::mpsc;
8 use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
9 use tokio_test::task;
10 use tokio_test::{
11 assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
12 };
13
14 use std::sync::Arc;
15
16 mod support {
17 pub(crate) mod mpsc_stream;
18 }
19
20 trait AssertSend: Send {}
21 impl AssertSend for mpsc::Sender<i32> {}
22 impl AssertSend for mpsc::Receiver<i32> {}
23
24 #[tokio::test]
send_recv_with_buffer()25 async fn send_recv_with_buffer() {
26 let (tx, mut rx) = mpsc::channel::<i32>(16);
27
28 // Using poll_ready / try_send
29 // let permit assert_ready_ok!(tx.reserve());
30 let permit = tx.reserve().await.unwrap();
31 permit.send(1);
32
33 // Without poll_ready
34 tx.try_send(2).unwrap();
35
36 drop(tx);
37
38 let val = rx.recv().await;
39 assert_eq!(val, Some(1));
40
41 let val = rx.recv().await;
42 assert_eq!(val, Some(2));
43
44 let val = rx.recv().await;
45 assert!(val.is_none());
46 }
47
48 #[tokio::test]
reserve_disarm()49 async fn reserve_disarm() {
50 let (tx, mut rx) = mpsc::channel::<i32>(2);
51 let tx1 = tx.clone();
52 let tx2 = tx.clone();
53 let tx3 = tx.clone();
54 let tx4 = tx;
55
56 // We should be able to `poll_ready` two handles without problem
57 let permit1 = assert_ok!(tx1.reserve().await);
58 let permit2 = assert_ok!(tx2.reserve().await);
59
60 // But a third should not be ready
61 let mut r3 = task::spawn(tx3.reserve());
62 assert_pending!(r3.poll());
63
64 let mut r4 = task::spawn(tx4.reserve());
65 assert_pending!(r4.poll());
66
67 // Using one of the reserved slots should allow a new handle to become ready
68 permit1.send(1);
69
70 // We also need to receive for the slot to be free
71 assert!(!r3.is_woken());
72 rx.recv().await.unwrap();
73 // Now there's a free slot!
74 assert!(r3.is_woken());
75 assert!(!r4.is_woken());
76
77 // Dropping a permit should also open up a slot
78 drop(permit2);
79 assert!(r4.is_woken());
80
81 let mut r1 = task::spawn(tx1.reserve());
82 assert_pending!(r1.poll());
83 }
84
85 #[tokio::test]
send_recv_stream_with_buffer()86 async fn send_recv_stream_with_buffer() {
87 use tokio_stream::StreamExt;
88
89 let (tx, rx) = support::mpsc_stream::channel_stream::<i32>(16);
90 let mut rx = Box::pin(rx);
91
92 tokio::spawn(async move {
93 assert_ok!(tx.send(1).await);
94 assert_ok!(tx.send(2).await);
95 });
96
97 assert_eq!(Some(1), rx.next().await);
98 assert_eq!(Some(2), rx.next().await);
99 assert_eq!(None, rx.next().await);
100 }
101
102 #[tokio::test]
async_send_recv_with_buffer()103 async fn async_send_recv_with_buffer() {
104 let (tx, mut rx) = mpsc::channel(16);
105
106 tokio::spawn(async move {
107 assert_ok!(tx.send(1).await);
108 assert_ok!(tx.send(2).await);
109 });
110
111 assert_eq!(Some(1), rx.recv().await);
112 assert_eq!(Some(2), rx.recv().await);
113 assert_eq!(None, rx.recv().await);
114 }
115
116 #[tokio::test]
start_send_past_cap()117 async fn start_send_past_cap() {
118 use std::future::Future;
119
120 let mut t1 = task::spawn(());
121
122 let (tx1, mut rx) = mpsc::channel(1);
123 let tx2 = tx1.clone();
124
125 assert_ok!(tx1.try_send(()));
126
127 let mut r1 = Box::pin(tx1.reserve());
128 t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx)));
129
130 {
131 let mut r2 = task::spawn(tx2.reserve());
132 assert_pending!(r2.poll());
133
134 drop(r1);
135
136 assert!(rx.recv().await.is_some());
137
138 assert!(r2.is_woken());
139 assert!(!t1.is_woken());
140 }
141
142 drop(tx1);
143 drop(tx2);
144
145 assert!(rx.recv().await.is_none());
146 }
147
148 #[test]
149 #[should_panic]
buffer_gteq_one()150 fn buffer_gteq_one() {
151 mpsc::channel::<i32>(0);
152 }
153
154 #[tokio::test]
send_recv_unbounded()155 async fn send_recv_unbounded() {
156 let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
157
158 // Using `try_send`
159 assert_ok!(tx.send(1));
160 assert_ok!(tx.send(2));
161
162 assert_eq!(rx.recv().await, Some(1));
163 assert_eq!(rx.recv().await, Some(2));
164
165 drop(tx);
166
167 assert!(rx.recv().await.is_none());
168 }
169
170 #[tokio::test]
async_send_recv_unbounded()171 async fn async_send_recv_unbounded() {
172 let (tx, mut rx) = mpsc::unbounded_channel();
173
174 tokio::spawn(async move {
175 assert_ok!(tx.send(1));
176 assert_ok!(tx.send(2));
177 });
178
179 assert_eq!(Some(1), rx.recv().await);
180 assert_eq!(Some(2), rx.recv().await);
181 assert_eq!(None, rx.recv().await);
182 }
183
184 #[tokio::test]
send_recv_stream_unbounded()185 async fn send_recv_stream_unbounded() {
186 use tokio_stream::StreamExt;
187
188 let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<i32>();
189
190 let mut rx = Box::pin(rx);
191
192 tokio::spawn(async move {
193 assert_ok!(tx.send(1));
194 assert_ok!(tx.send(2));
195 });
196
197 assert_eq!(Some(1), rx.next().await);
198 assert_eq!(Some(2), rx.next().await);
199 assert_eq!(None, rx.next().await);
200 }
201
202 #[tokio::test]
no_t_bounds_buffer()203 async fn no_t_bounds_buffer() {
204 struct NoImpls;
205
206 let (tx, mut rx) = mpsc::channel(100);
207
208 // sender should be Debug even though T isn't Debug
209 println!("{:?}", tx);
210 // same with Receiver
211 println!("{:?}", rx);
212 // and sender should be Clone even though T isn't Clone
213 assert!(tx.clone().try_send(NoImpls).is_ok());
214
215 assert!(rx.recv().await.is_some());
216 }
217
218 #[tokio::test]
no_t_bounds_unbounded()219 async fn no_t_bounds_unbounded() {
220 struct NoImpls;
221
222 let (tx, mut rx) = mpsc::unbounded_channel();
223
224 // sender should be Debug even though T isn't Debug
225 println!("{:?}", tx);
226 // same with Receiver
227 println!("{:?}", rx);
228 // and sender should be Clone even though T isn't Clone
229 assert!(tx.clone().send(NoImpls).is_ok());
230
231 assert!(rx.recv().await.is_some());
232 }
233
234 #[tokio::test]
send_recv_buffer_limited()235 async fn send_recv_buffer_limited() {
236 let (tx, mut rx) = mpsc::channel::<i32>(1);
237
238 // Reserve capacity
239 let p1 = assert_ok!(tx.reserve().await);
240
241 // Send first message
242 p1.send(1);
243
244 // Not ready
245 let mut p2 = task::spawn(tx.reserve());
246 assert_pending!(p2.poll());
247
248 // Take the value
249 assert!(rx.recv().await.is_some());
250
251 // Notified
252 assert!(p2.is_woken());
253
254 // Trying to send fails
255 assert_err!(tx.try_send(1337));
256
257 // Send second
258 let permit = assert_ready_ok!(p2.poll());
259 permit.send(2);
260
261 assert!(rx.recv().await.is_some());
262 }
263
264 #[tokio::test]
recv_close_gets_none_idle()265 async fn recv_close_gets_none_idle() {
266 let (tx, mut rx) = mpsc::channel::<i32>(10);
267
268 rx.close();
269
270 assert!(rx.recv().await.is_none());
271
272 assert_err!(tx.send(1).await);
273 }
274
275 #[tokio::test]
recv_close_gets_none_reserved()276 async fn recv_close_gets_none_reserved() {
277 let (tx1, mut rx) = mpsc::channel::<i32>(1);
278 let tx2 = tx1.clone();
279
280 let permit1 = assert_ok!(tx1.reserve().await);
281 let mut permit2 = task::spawn(tx2.reserve());
282 assert_pending!(permit2.poll());
283
284 rx.close();
285
286 assert!(permit2.is_woken());
287 assert_ready_err!(permit2.poll());
288
289 {
290 let mut recv = task::spawn(rx.recv());
291 assert_pending!(recv.poll());
292
293 permit1.send(123);
294 assert!(recv.is_woken());
295
296 let v = assert_ready!(recv.poll());
297 assert_eq!(v, Some(123));
298 }
299
300 assert!(rx.recv().await.is_none());
301 }
302
303 #[tokio::test]
tx_close_gets_none()304 async fn tx_close_gets_none() {
305 let (_, mut rx) = mpsc::channel::<i32>(10);
306 assert!(rx.recv().await.is_none());
307 }
308
309 #[tokio::test]
try_send_fail()310 async fn try_send_fail() {
311 let (tx, mut rx) = mpsc::channel(1);
312
313 tx.try_send("hello").unwrap();
314
315 // This should fail
316 match assert_err!(tx.try_send("fail")) {
317 TrySendError::Full(..) => {}
318 _ => panic!(),
319 }
320
321 assert_eq!(rx.recv().await, Some("hello"));
322
323 assert_ok!(tx.try_send("goodbye"));
324 drop(tx);
325
326 assert_eq!(rx.recv().await, Some("goodbye"));
327 assert!(rx.recv().await.is_none());
328 }
329
330 #[tokio::test]
try_send_fail_with_try_recv()331 async fn try_send_fail_with_try_recv() {
332 let (tx, mut rx) = mpsc::channel(1);
333
334 tx.try_send("hello").unwrap();
335
336 // This should fail
337 match assert_err!(tx.try_send("fail")) {
338 TrySendError::Full(..) => {}
339 _ => panic!(),
340 }
341
342 assert_eq!(rx.try_recv(), Ok("hello"));
343
344 assert_ok!(tx.try_send("goodbye"));
345 drop(tx);
346
347 assert_eq!(rx.try_recv(), Ok("goodbye"));
348 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
349 }
350
351 #[tokio::test]
try_reserve_fails()352 async fn try_reserve_fails() {
353 let (tx, mut rx) = mpsc::channel(1);
354
355 let permit = tx.try_reserve().unwrap();
356
357 // This should fail
358 match assert_err!(tx.try_reserve()) {
359 TrySendError::Full(()) => {}
360 _ => panic!(),
361 }
362
363 permit.send("foo");
364
365 assert_eq!(rx.recv().await, Some("foo"));
366
367 // Dropping permit releases the slot.
368 let permit = tx.try_reserve().unwrap();
369 drop(permit);
370
371 let _permit = tx.try_reserve().unwrap();
372 }
373
374 #[tokio::test]
drop_permit_releases_permit()375 async fn drop_permit_releases_permit() {
376 // poll_ready reserves capacity, ensure that the capacity is released if tx
377 // is dropped w/o sending a value.
378 let (tx1, _rx) = mpsc::channel::<i32>(1);
379 let tx2 = tx1.clone();
380
381 let permit = assert_ok!(tx1.reserve().await);
382
383 let mut reserve2 = task::spawn(tx2.reserve());
384 assert_pending!(reserve2.poll());
385
386 drop(permit);
387
388 assert!(reserve2.is_woken());
389 assert_ready_ok!(reserve2.poll());
390 }
391
392 #[tokio::test]
dropping_rx_closes_channel()393 async fn dropping_rx_closes_channel() {
394 let (tx, rx) = mpsc::channel(100);
395
396 let msg = Arc::new(());
397 assert_ok!(tx.try_send(msg.clone()));
398
399 drop(rx);
400 assert_err!(tx.reserve().await);
401 assert_eq!(1, Arc::strong_count(&msg));
402 }
403
404 #[test]
dropping_rx_closes_channel_for_try()405 fn dropping_rx_closes_channel_for_try() {
406 let (tx, rx) = mpsc::channel(100);
407
408 let msg = Arc::new(());
409 tx.try_send(msg.clone()).unwrap();
410
411 drop(rx);
412
413 assert!(matches!(
414 tx.try_send(msg.clone()),
415 Err(TrySendError::Closed(_))
416 ));
417 assert!(matches!(tx.try_reserve(), Err(TrySendError::Closed(_))));
418 assert!(matches!(
419 tx.try_reserve_owned(),
420 Err(TrySendError::Closed(_))
421 ));
422
423 assert_eq!(1, Arc::strong_count(&msg));
424 }
425
426 #[test]
unconsumed_messages_are_dropped()427 fn unconsumed_messages_are_dropped() {
428 let msg = Arc::new(());
429
430 let (tx, rx) = mpsc::channel(100);
431
432 tx.try_send(msg.clone()).unwrap();
433
434 assert_eq!(2, Arc::strong_count(&msg));
435
436 drop((tx, rx));
437
438 assert_eq!(1, Arc::strong_count(&msg));
439 }
440
441 #[test]
blocking_recv()442 fn blocking_recv() {
443 let (tx, mut rx) = mpsc::channel::<u8>(1);
444
445 let sync_code = thread::spawn(move || {
446 assert_eq!(Some(10), rx.blocking_recv());
447 });
448
449 Runtime::new().unwrap().block_on(async move {
450 let _ = tx.send(10).await;
451 });
452 sync_code.join().unwrap()
453 }
454
455 #[tokio::test]
456 #[should_panic]
blocking_recv_async()457 async fn blocking_recv_async() {
458 let (_tx, mut rx) = mpsc::channel::<()>(1);
459 let _ = rx.blocking_recv();
460 }
461
462 #[test]
blocking_send()463 fn blocking_send() {
464 let (tx, mut rx) = mpsc::channel::<u8>(1);
465
466 let sync_code = thread::spawn(move || {
467 tx.blocking_send(10).unwrap();
468 });
469
470 Runtime::new().unwrap().block_on(async move {
471 assert_eq!(Some(10), rx.recv().await);
472 });
473 sync_code.join().unwrap()
474 }
475
476 #[tokio::test]
477 #[should_panic]
blocking_send_async()478 async fn blocking_send_async() {
479 let (tx, _rx) = mpsc::channel::<()>(1);
480 let _ = tx.blocking_send(());
481 }
482
483 #[tokio::test]
ready_close_cancel_bounded()484 async fn ready_close_cancel_bounded() {
485 let (tx, mut rx) = mpsc::channel::<()>(100);
486 let _tx2 = tx.clone();
487
488 let permit = assert_ok!(tx.reserve().await);
489
490 rx.close();
491
492 let mut recv = task::spawn(rx.recv());
493 assert_pending!(recv.poll());
494
495 drop(permit);
496
497 assert!(recv.is_woken());
498 let val = assert_ready!(recv.poll());
499 assert!(val.is_none());
500 }
501
502 #[tokio::test]
permit_available_not_acquired_close()503 async fn permit_available_not_acquired_close() {
504 let (tx1, mut rx) = mpsc::channel::<()>(1);
505 let tx2 = tx1.clone();
506
507 let permit1 = assert_ok!(tx1.reserve().await);
508
509 let mut permit2 = task::spawn(tx2.reserve());
510 assert_pending!(permit2.poll());
511
512 rx.close();
513
514 drop(permit1);
515 assert!(permit2.is_woken());
516
517 drop(permit2);
518 assert!(rx.recv().await.is_none());
519 }
520
521 #[test]
try_recv_bounded()522 fn try_recv_bounded() {
523 let (tx, mut rx) = mpsc::channel(5);
524
525 tx.try_send("hello").unwrap();
526 tx.try_send("hello").unwrap();
527 tx.try_send("hello").unwrap();
528 tx.try_send("hello").unwrap();
529 tx.try_send("hello").unwrap();
530 assert!(tx.try_send("hello").is_err());
531
532 assert_eq!(Ok("hello"), rx.try_recv());
533 assert_eq!(Ok("hello"), rx.try_recv());
534 assert_eq!(Ok("hello"), rx.try_recv());
535 assert_eq!(Ok("hello"), rx.try_recv());
536 assert_eq!(Ok("hello"), rx.try_recv());
537 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
538
539 tx.try_send("hello").unwrap();
540 tx.try_send("hello").unwrap();
541 tx.try_send("hello").unwrap();
542 tx.try_send("hello").unwrap();
543 assert_eq!(Ok("hello"), rx.try_recv());
544 tx.try_send("hello").unwrap();
545 tx.try_send("hello").unwrap();
546 assert!(tx.try_send("hello").is_err());
547 assert_eq!(Ok("hello"), rx.try_recv());
548 assert_eq!(Ok("hello"), rx.try_recv());
549 assert_eq!(Ok("hello"), rx.try_recv());
550 assert_eq!(Ok("hello"), rx.try_recv());
551 assert_eq!(Ok("hello"), rx.try_recv());
552 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
553
554 tx.try_send("hello").unwrap();
555 tx.try_send("hello").unwrap();
556 tx.try_send("hello").unwrap();
557 drop(tx);
558 assert_eq!(Ok("hello"), rx.try_recv());
559 assert_eq!(Ok("hello"), rx.try_recv());
560 assert_eq!(Ok("hello"), rx.try_recv());
561 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
562 }
563
564 #[test]
try_recv_unbounded()565 fn try_recv_unbounded() {
566 for num in 0..100 {
567 let (tx, mut rx) = mpsc::unbounded_channel();
568
569 for i in 0..num {
570 tx.send(i).unwrap();
571 }
572
573 for i in 0..num {
574 assert_eq!(rx.try_recv(), Ok(i));
575 }
576
577 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
578 drop(tx);
579 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
580 }
581 }
582
583 #[test]
try_recv_close_while_empty_bounded()584 fn try_recv_close_while_empty_bounded() {
585 let (tx, mut rx) = mpsc::channel::<()>(5);
586
587 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
588 drop(tx);
589 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
590 }
591
592 #[test]
try_recv_close_while_empty_unbounded()593 fn try_recv_close_while_empty_unbounded() {
594 let (tx, mut rx) = mpsc::unbounded_channel::<()>();
595
596 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
597 drop(tx);
598 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
599 }
600