• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use futures::channel::oneshot;
2 use futures::executor::{block_on, block_on_stream};
3 use futures::future;
4 use futures::stream::{FuturesUnordered, StreamExt};
5 use futures::task::Poll;
6 use futures_test::task::noop_context;
7 use std::panic::{self, AssertUnwindSafe};
8 use std::sync::{Arc, Barrier};
9 use std::thread;
10 
11 #[test]
basic_usage()12 fn basic_usage() {
13     block_on(future::lazy(move |cx| {
14         let mut queue = FuturesUnordered::new();
15         let (tx1, rx1) = oneshot::channel();
16         let (tx2, rx2) = oneshot::channel();
17         let (tx3, rx3) = oneshot::channel();
18 
19         queue.push(rx1);
20         queue.push(rx2);
21         queue.push(rx3);
22 
23         assert!(!queue.poll_next_unpin(cx).is_ready());
24 
25         tx2.send("hello").unwrap();
26 
27         assert_eq!(Poll::Ready(Some(Ok("hello"))), queue.poll_next_unpin(cx));
28         assert!(!queue.poll_next_unpin(cx).is_ready());
29 
30         tx1.send("world").unwrap();
31         tx3.send("world2").unwrap();
32 
33         assert_eq!(Poll::Ready(Some(Ok("world"))), queue.poll_next_unpin(cx));
34         assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
35         assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
36     }));
37 }
38 
39 #[test]
resolving_errors()40 fn resolving_errors() {
41     block_on(future::lazy(move |cx| {
42         let mut queue = FuturesUnordered::new();
43         let (tx1, rx1) = oneshot::channel();
44         let (tx2, rx2) = oneshot::channel();
45         let (tx3, rx3) = oneshot::channel();
46 
47         queue.push(rx1);
48         queue.push(rx2);
49         queue.push(rx3);
50 
51         assert!(!queue.poll_next_unpin(cx).is_ready());
52 
53         drop(tx2);
54 
55         assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
56         assert!(!queue.poll_next_unpin(cx).is_ready());
57 
58         drop(tx1);
59         tx3.send("world2").unwrap();
60 
61         assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
62         assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
63         assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
64     }));
65 }
66 
67 #[test]
dropping_ready_queue()68 fn dropping_ready_queue() {
69     block_on(future::lazy(move |_| {
70         let queue = FuturesUnordered::new();
71         let (mut tx1, rx1) = oneshot::channel::<()>();
72         let (mut tx2, rx2) = oneshot::channel::<()>();
73         let (mut tx3, rx3) = oneshot::channel::<()>();
74 
75         queue.push(rx1);
76         queue.push(rx2);
77         queue.push(rx3);
78 
79         {
80             let cx = &mut noop_context();
81             assert!(!tx1.poll_canceled(cx).is_ready());
82             assert!(!tx2.poll_canceled(cx).is_ready());
83             assert!(!tx3.poll_canceled(cx).is_ready());
84 
85             drop(queue);
86 
87             assert!(tx1.poll_canceled(cx).is_ready());
88             assert!(tx2.poll_canceled(cx).is_ready());
89             assert!(tx3.poll_canceled(cx).is_ready());
90         }
91     }));
92 }
93 
94 #[test]
stress()95 fn stress() {
96     #[cfg(miri)]
97     const ITER: usize = 30;
98     #[cfg(not(miri))]
99     const ITER: usize = 300;
100 
101     for i in 0..ITER {
102         let n = (i % 10) + 1;
103 
104         let mut queue = FuturesUnordered::new();
105 
106         for _ in 0..5 {
107             let barrier = Arc::new(Barrier::new(n + 1));
108 
109             for num in 0..n {
110                 let barrier = barrier.clone();
111                 let (tx, rx) = oneshot::channel();
112 
113                 queue.push(rx);
114 
115                 thread::spawn(move || {
116                     barrier.wait();
117                     tx.send(num).unwrap();
118                 });
119             }
120 
121             barrier.wait();
122 
123             let mut sync = block_on_stream(queue);
124 
125             let mut rx: Vec<_> = (&mut sync).take(n).map(|res| res.unwrap()).collect();
126 
127             assert_eq!(rx.len(), n);
128 
129             rx.sort_unstable();
130 
131             for (i, x) in rx.into_iter().enumerate() {
132                 assert_eq!(i, x);
133             }
134 
135             queue = sync.into_inner();
136         }
137     }
138 }
139 
140 #[test]
panicking_future_dropped()141 fn panicking_future_dropped() {
142     block_on(future::lazy(move |cx| {
143         let mut queue = FuturesUnordered::new();
144         queue.push(future::poll_fn(|_| -> Poll<Result<i32, i32>> { panic!() }));
145 
146         let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next_unpin(cx)));
147         assert!(r.is_err());
148         assert!(queue.is_empty());
149         assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
150     }));
151 }
152