• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![allow(clippy::redundant_clone)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4 
5 use std::thread;
6 use tokio::runtime::Runtime;
7 use tokio::sync::mpsc;
8 use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
9 use tokio_test::task;
10 use tokio_test::{
11     assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
12 };
13 
14 use std::sync::Arc;
15 
16 mod support {
17     pub(crate) mod mpsc_stream;
18 }
19 
20 trait AssertSend: Send {}
21 impl AssertSend for mpsc::Sender<i32> {}
22 impl AssertSend for mpsc::Receiver<i32> {}
23 
24 #[tokio::test]
send_recv_with_buffer()25 async fn send_recv_with_buffer() {
26     let (tx, mut rx) = mpsc::channel::<i32>(16);
27 
28     // Using poll_ready / try_send
29     // let permit assert_ready_ok!(tx.reserve());
30     let permit = tx.reserve().await.unwrap();
31     permit.send(1);
32 
33     // Without poll_ready
34     tx.try_send(2).unwrap();
35 
36     drop(tx);
37 
38     let val = rx.recv().await;
39     assert_eq!(val, Some(1));
40 
41     let val = rx.recv().await;
42     assert_eq!(val, Some(2));
43 
44     let val = rx.recv().await;
45     assert!(val.is_none());
46 }
47 
48 #[tokio::test]
reserve_disarm()49 async fn reserve_disarm() {
50     let (tx, mut rx) = mpsc::channel::<i32>(2);
51     let tx1 = tx.clone();
52     let tx2 = tx.clone();
53     let tx3 = tx.clone();
54     let tx4 = tx;
55 
56     // We should be able to `poll_ready` two handles without problem
57     let permit1 = assert_ok!(tx1.reserve().await);
58     let permit2 = assert_ok!(tx2.reserve().await);
59 
60     // But a third should not be ready
61     let mut r3 = task::spawn(tx3.reserve());
62     assert_pending!(r3.poll());
63 
64     let mut r4 = task::spawn(tx4.reserve());
65     assert_pending!(r4.poll());
66 
67     // Using one of the reserved slots should allow a new handle to become ready
68     permit1.send(1);
69 
70     // We also need to receive for the slot to be free
71     assert!(!r3.is_woken());
72     rx.recv().await.unwrap();
73     // Now there's a free slot!
74     assert!(r3.is_woken());
75     assert!(!r4.is_woken());
76 
77     // Dropping a permit should also open up a slot
78     drop(permit2);
79     assert!(r4.is_woken());
80 
81     let mut r1 = task::spawn(tx1.reserve());
82     assert_pending!(r1.poll());
83 }
84 
85 #[tokio::test]
send_recv_stream_with_buffer()86 async fn send_recv_stream_with_buffer() {
87     use tokio_stream::StreamExt;
88 
89     let (tx, rx) = support::mpsc_stream::channel_stream::<i32>(16);
90     let mut rx = Box::pin(rx);
91 
92     tokio::spawn(async move {
93         assert_ok!(tx.send(1).await);
94         assert_ok!(tx.send(2).await);
95     });
96 
97     assert_eq!(Some(1), rx.next().await);
98     assert_eq!(Some(2), rx.next().await);
99     assert_eq!(None, rx.next().await);
100 }
101 
102 #[tokio::test]
async_send_recv_with_buffer()103 async fn async_send_recv_with_buffer() {
104     let (tx, mut rx) = mpsc::channel(16);
105 
106     tokio::spawn(async move {
107         assert_ok!(tx.send(1).await);
108         assert_ok!(tx.send(2).await);
109     });
110 
111     assert_eq!(Some(1), rx.recv().await);
112     assert_eq!(Some(2), rx.recv().await);
113     assert_eq!(None, rx.recv().await);
114 }
115 
116 #[tokio::test]
start_send_past_cap()117 async fn start_send_past_cap() {
118     use std::future::Future;
119 
120     let mut t1 = task::spawn(());
121 
122     let (tx1, mut rx) = mpsc::channel(1);
123     let tx2 = tx1.clone();
124 
125     assert_ok!(tx1.try_send(()));
126 
127     let mut r1 = Box::pin(tx1.reserve());
128     t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx)));
129 
130     {
131         let mut r2 = task::spawn(tx2.reserve());
132         assert_pending!(r2.poll());
133 
134         drop(r1);
135 
136         assert!(rx.recv().await.is_some());
137 
138         assert!(r2.is_woken());
139         assert!(!t1.is_woken());
140     }
141 
142     drop(tx1);
143     drop(tx2);
144 
145     assert!(rx.recv().await.is_none());
146 }
147 
148 #[test]
149 #[should_panic]
buffer_gteq_one()150 fn buffer_gteq_one() {
151     mpsc::channel::<i32>(0);
152 }
153 
154 #[tokio::test]
send_recv_unbounded()155 async fn send_recv_unbounded() {
156     let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
157 
158     // Using `try_send`
159     assert_ok!(tx.send(1));
160     assert_ok!(tx.send(2));
161 
162     assert_eq!(rx.recv().await, Some(1));
163     assert_eq!(rx.recv().await, Some(2));
164 
165     drop(tx);
166 
167     assert!(rx.recv().await.is_none());
168 }
169 
170 #[tokio::test]
async_send_recv_unbounded()171 async fn async_send_recv_unbounded() {
172     let (tx, mut rx) = mpsc::unbounded_channel();
173 
174     tokio::spawn(async move {
175         assert_ok!(tx.send(1));
176         assert_ok!(tx.send(2));
177     });
178 
179     assert_eq!(Some(1), rx.recv().await);
180     assert_eq!(Some(2), rx.recv().await);
181     assert_eq!(None, rx.recv().await);
182 }
183 
184 #[tokio::test]
send_recv_stream_unbounded()185 async fn send_recv_stream_unbounded() {
186     use tokio_stream::StreamExt;
187 
188     let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<i32>();
189 
190     let mut rx = Box::pin(rx);
191 
192     tokio::spawn(async move {
193         assert_ok!(tx.send(1));
194         assert_ok!(tx.send(2));
195     });
196 
197     assert_eq!(Some(1), rx.next().await);
198     assert_eq!(Some(2), rx.next().await);
199     assert_eq!(None, rx.next().await);
200 }
201 
202 #[tokio::test]
no_t_bounds_buffer()203 async fn no_t_bounds_buffer() {
204     struct NoImpls;
205 
206     let (tx, mut rx) = mpsc::channel(100);
207 
208     // sender should be Debug even though T isn't Debug
209     println!("{:?}", tx);
210     // same with Receiver
211     println!("{:?}", rx);
212     // and sender should be Clone even though T isn't Clone
213     assert!(tx.clone().try_send(NoImpls).is_ok());
214 
215     assert!(rx.recv().await.is_some());
216 }
217 
218 #[tokio::test]
no_t_bounds_unbounded()219 async fn no_t_bounds_unbounded() {
220     struct NoImpls;
221 
222     let (tx, mut rx) = mpsc::unbounded_channel();
223 
224     // sender should be Debug even though T isn't Debug
225     println!("{:?}", tx);
226     // same with Receiver
227     println!("{:?}", rx);
228     // and sender should be Clone even though T isn't Clone
229     assert!(tx.clone().send(NoImpls).is_ok());
230 
231     assert!(rx.recv().await.is_some());
232 }
233 
234 #[tokio::test]
send_recv_buffer_limited()235 async fn send_recv_buffer_limited() {
236     let (tx, mut rx) = mpsc::channel::<i32>(1);
237 
238     // Reserve capacity
239     let p1 = assert_ok!(tx.reserve().await);
240 
241     // Send first message
242     p1.send(1);
243 
244     // Not ready
245     let mut p2 = task::spawn(tx.reserve());
246     assert_pending!(p2.poll());
247 
248     // Take the value
249     assert!(rx.recv().await.is_some());
250 
251     // Notified
252     assert!(p2.is_woken());
253 
254     // Trying to send fails
255     assert_err!(tx.try_send(1337));
256 
257     // Send second
258     let permit = assert_ready_ok!(p2.poll());
259     permit.send(2);
260 
261     assert!(rx.recv().await.is_some());
262 }
263 
264 #[tokio::test]
recv_close_gets_none_idle()265 async fn recv_close_gets_none_idle() {
266     let (tx, mut rx) = mpsc::channel::<i32>(10);
267 
268     rx.close();
269 
270     assert!(rx.recv().await.is_none());
271 
272     assert_err!(tx.send(1).await);
273 }
274 
275 #[tokio::test]
recv_close_gets_none_reserved()276 async fn recv_close_gets_none_reserved() {
277     let (tx1, mut rx) = mpsc::channel::<i32>(1);
278     let tx2 = tx1.clone();
279 
280     let permit1 = assert_ok!(tx1.reserve().await);
281     let mut permit2 = task::spawn(tx2.reserve());
282     assert_pending!(permit2.poll());
283 
284     rx.close();
285 
286     assert!(permit2.is_woken());
287     assert_ready_err!(permit2.poll());
288 
289     {
290         let mut recv = task::spawn(rx.recv());
291         assert_pending!(recv.poll());
292 
293         permit1.send(123);
294         assert!(recv.is_woken());
295 
296         let v = assert_ready!(recv.poll());
297         assert_eq!(v, Some(123));
298     }
299 
300     assert!(rx.recv().await.is_none());
301 }
302 
303 #[tokio::test]
tx_close_gets_none()304 async fn tx_close_gets_none() {
305     let (_, mut rx) = mpsc::channel::<i32>(10);
306     assert!(rx.recv().await.is_none());
307 }
308 
309 #[tokio::test]
try_send_fail()310 async fn try_send_fail() {
311     let (tx, mut rx) = mpsc::channel(1);
312 
313     tx.try_send("hello").unwrap();
314 
315     // This should fail
316     match assert_err!(tx.try_send("fail")) {
317         TrySendError::Full(..) => {}
318         _ => panic!(),
319     }
320 
321     assert_eq!(rx.recv().await, Some("hello"));
322 
323     assert_ok!(tx.try_send("goodbye"));
324     drop(tx);
325 
326     assert_eq!(rx.recv().await, Some("goodbye"));
327     assert!(rx.recv().await.is_none());
328 }
329 
330 #[tokio::test]
try_send_fail_with_try_recv()331 async fn try_send_fail_with_try_recv() {
332     let (tx, mut rx) = mpsc::channel(1);
333 
334     tx.try_send("hello").unwrap();
335 
336     // This should fail
337     match assert_err!(tx.try_send("fail")) {
338         TrySendError::Full(..) => {}
339         _ => panic!(),
340     }
341 
342     assert_eq!(rx.try_recv(), Ok("hello"));
343 
344     assert_ok!(tx.try_send("goodbye"));
345     drop(tx);
346 
347     assert_eq!(rx.try_recv(), Ok("goodbye"));
348     assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
349 }
350 
351 #[tokio::test]
try_reserve_fails()352 async fn try_reserve_fails() {
353     let (tx, mut rx) = mpsc::channel(1);
354 
355     let permit = tx.try_reserve().unwrap();
356 
357     // This should fail
358     match assert_err!(tx.try_reserve()) {
359         TrySendError::Full(()) => {}
360         _ => panic!(),
361     }
362 
363     permit.send("foo");
364 
365     assert_eq!(rx.recv().await, Some("foo"));
366 
367     // Dropping permit releases the slot.
368     let permit = tx.try_reserve().unwrap();
369     drop(permit);
370 
371     let _permit = tx.try_reserve().unwrap();
372 }
373 
374 #[tokio::test]
drop_permit_releases_permit()375 async fn drop_permit_releases_permit() {
376     // poll_ready reserves capacity, ensure that the capacity is released if tx
377     // is dropped w/o sending a value.
378     let (tx1, _rx) = mpsc::channel::<i32>(1);
379     let tx2 = tx1.clone();
380 
381     let permit = assert_ok!(tx1.reserve().await);
382 
383     let mut reserve2 = task::spawn(tx2.reserve());
384     assert_pending!(reserve2.poll());
385 
386     drop(permit);
387 
388     assert!(reserve2.is_woken());
389     assert_ready_ok!(reserve2.poll());
390 }
391 
392 #[tokio::test]
dropping_rx_closes_channel()393 async fn dropping_rx_closes_channel() {
394     let (tx, rx) = mpsc::channel(100);
395 
396     let msg = Arc::new(());
397     assert_ok!(tx.try_send(msg.clone()));
398 
399     drop(rx);
400     assert_err!(tx.reserve().await);
401     assert_eq!(1, Arc::strong_count(&msg));
402 }
403 
404 #[test]
dropping_rx_closes_channel_for_try()405 fn dropping_rx_closes_channel_for_try() {
406     let (tx, rx) = mpsc::channel(100);
407 
408     let msg = Arc::new(());
409     tx.try_send(msg.clone()).unwrap();
410 
411     drop(rx);
412 
413     assert!(matches!(
414         tx.try_send(msg.clone()),
415         Err(TrySendError::Closed(_))
416     ));
417     assert!(matches!(tx.try_reserve(), Err(TrySendError::Closed(_))));
418     assert!(matches!(
419         tx.try_reserve_owned(),
420         Err(TrySendError::Closed(_))
421     ));
422 
423     assert_eq!(1, Arc::strong_count(&msg));
424 }
425 
426 #[test]
unconsumed_messages_are_dropped()427 fn unconsumed_messages_are_dropped() {
428     let msg = Arc::new(());
429 
430     let (tx, rx) = mpsc::channel(100);
431 
432     tx.try_send(msg.clone()).unwrap();
433 
434     assert_eq!(2, Arc::strong_count(&msg));
435 
436     drop((tx, rx));
437 
438     assert_eq!(1, Arc::strong_count(&msg));
439 }
440 
441 #[test]
blocking_recv()442 fn blocking_recv() {
443     let (tx, mut rx) = mpsc::channel::<u8>(1);
444 
445     let sync_code = thread::spawn(move || {
446         assert_eq!(Some(10), rx.blocking_recv());
447     });
448 
449     Runtime::new().unwrap().block_on(async move {
450         let _ = tx.send(10).await;
451     });
452     sync_code.join().unwrap()
453 }
454 
455 #[tokio::test]
456 #[should_panic]
blocking_recv_async()457 async fn blocking_recv_async() {
458     let (_tx, mut rx) = mpsc::channel::<()>(1);
459     let _ = rx.blocking_recv();
460 }
461 
462 #[test]
blocking_send()463 fn blocking_send() {
464     let (tx, mut rx) = mpsc::channel::<u8>(1);
465 
466     let sync_code = thread::spawn(move || {
467         tx.blocking_send(10).unwrap();
468     });
469 
470     Runtime::new().unwrap().block_on(async move {
471         assert_eq!(Some(10), rx.recv().await);
472     });
473     sync_code.join().unwrap()
474 }
475 
476 #[tokio::test]
477 #[should_panic]
blocking_send_async()478 async fn blocking_send_async() {
479     let (tx, _rx) = mpsc::channel::<()>(1);
480     let _ = tx.blocking_send(());
481 }
482 
483 #[tokio::test]
ready_close_cancel_bounded()484 async fn ready_close_cancel_bounded() {
485     let (tx, mut rx) = mpsc::channel::<()>(100);
486     let _tx2 = tx.clone();
487 
488     let permit = assert_ok!(tx.reserve().await);
489 
490     rx.close();
491 
492     let mut recv = task::spawn(rx.recv());
493     assert_pending!(recv.poll());
494 
495     drop(permit);
496 
497     assert!(recv.is_woken());
498     let val = assert_ready!(recv.poll());
499     assert!(val.is_none());
500 }
501 
502 #[tokio::test]
permit_available_not_acquired_close()503 async fn permit_available_not_acquired_close() {
504     let (tx1, mut rx) = mpsc::channel::<()>(1);
505     let tx2 = tx1.clone();
506 
507     let permit1 = assert_ok!(tx1.reserve().await);
508 
509     let mut permit2 = task::spawn(tx2.reserve());
510     assert_pending!(permit2.poll());
511 
512     rx.close();
513 
514     drop(permit1);
515     assert!(permit2.is_woken());
516 
517     drop(permit2);
518     assert!(rx.recv().await.is_none());
519 }
520 
521 #[test]
try_recv_bounded()522 fn try_recv_bounded() {
523     let (tx, mut rx) = mpsc::channel(5);
524 
525     tx.try_send("hello").unwrap();
526     tx.try_send("hello").unwrap();
527     tx.try_send("hello").unwrap();
528     tx.try_send("hello").unwrap();
529     tx.try_send("hello").unwrap();
530     assert!(tx.try_send("hello").is_err());
531 
532     assert_eq!(Ok("hello"), rx.try_recv());
533     assert_eq!(Ok("hello"), rx.try_recv());
534     assert_eq!(Ok("hello"), rx.try_recv());
535     assert_eq!(Ok("hello"), rx.try_recv());
536     assert_eq!(Ok("hello"), rx.try_recv());
537     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
538 
539     tx.try_send("hello").unwrap();
540     tx.try_send("hello").unwrap();
541     tx.try_send("hello").unwrap();
542     tx.try_send("hello").unwrap();
543     assert_eq!(Ok("hello"), rx.try_recv());
544     tx.try_send("hello").unwrap();
545     tx.try_send("hello").unwrap();
546     assert!(tx.try_send("hello").is_err());
547     assert_eq!(Ok("hello"), rx.try_recv());
548     assert_eq!(Ok("hello"), rx.try_recv());
549     assert_eq!(Ok("hello"), rx.try_recv());
550     assert_eq!(Ok("hello"), rx.try_recv());
551     assert_eq!(Ok("hello"), rx.try_recv());
552     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
553 
554     tx.try_send("hello").unwrap();
555     tx.try_send("hello").unwrap();
556     tx.try_send("hello").unwrap();
557     drop(tx);
558     assert_eq!(Ok("hello"), rx.try_recv());
559     assert_eq!(Ok("hello"), rx.try_recv());
560     assert_eq!(Ok("hello"), rx.try_recv());
561     assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
562 }
563 
564 #[test]
try_recv_unbounded()565 fn try_recv_unbounded() {
566     for num in 0..100 {
567         let (tx, mut rx) = mpsc::unbounded_channel();
568 
569         for i in 0..num {
570             tx.send(i).unwrap();
571         }
572 
573         for i in 0..num {
574             assert_eq!(rx.try_recv(), Ok(i));
575         }
576 
577         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
578         drop(tx);
579         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
580     }
581 }
582 
583 #[test]
try_recv_close_while_empty_bounded()584 fn try_recv_close_while_empty_bounded() {
585     let (tx, mut rx) = mpsc::channel::<()>(5);
586 
587     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
588     drop(tx);
589     assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
590 }
591 
592 #[test]
try_recv_close_while_empty_unbounded()593 fn try_recv_close_while_empty_unbounded() {
594     let (tx, mut rx) = mpsc::unbounded_channel::<()>();
595 
596     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
597     drop(tx);
598     assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
599 }
600