1 use futures::channel::oneshot;
2 use futures::executor::{block_on, block_on_stream};
3 use futures::future;
4 use futures::stream::{FuturesUnordered, StreamExt};
5 use futures::task::Poll;
6 use futures_test::task::noop_context;
7 use std::panic::{self, AssertUnwindSafe};
8 use std::sync::{Arc, Barrier};
9 use std::thread;
10
11 #[test]
basic_usage()12 fn basic_usage() {
13 block_on(future::lazy(move |cx| {
14 let mut queue = FuturesUnordered::new();
15 let (tx1, rx1) = oneshot::channel();
16 let (tx2, rx2) = oneshot::channel();
17 let (tx3, rx3) = oneshot::channel();
18
19 queue.push(rx1);
20 queue.push(rx2);
21 queue.push(rx3);
22
23 assert!(!queue.poll_next_unpin(cx).is_ready());
24
25 tx2.send("hello").unwrap();
26
27 assert_eq!(Poll::Ready(Some(Ok("hello"))), queue.poll_next_unpin(cx));
28 assert!(!queue.poll_next_unpin(cx).is_ready());
29
30 tx1.send("world").unwrap();
31 tx3.send("world2").unwrap();
32
33 assert_eq!(Poll::Ready(Some(Ok("world"))), queue.poll_next_unpin(cx));
34 assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
35 assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
36 }));
37 }
38
39 #[test]
resolving_errors()40 fn resolving_errors() {
41 block_on(future::lazy(move |cx| {
42 let mut queue = FuturesUnordered::new();
43 let (tx1, rx1) = oneshot::channel();
44 let (tx2, rx2) = oneshot::channel();
45 let (tx3, rx3) = oneshot::channel();
46
47 queue.push(rx1);
48 queue.push(rx2);
49 queue.push(rx3);
50
51 assert!(!queue.poll_next_unpin(cx).is_ready());
52
53 drop(tx2);
54
55 assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
56 assert!(!queue.poll_next_unpin(cx).is_ready());
57
58 drop(tx1);
59 tx3.send("world2").unwrap();
60
61 assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
62 assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
63 assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
64 }));
65 }
66
67 #[test]
dropping_ready_queue()68 fn dropping_ready_queue() {
69 block_on(future::lazy(move |_| {
70 let queue = FuturesUnordered::new();
71 let (mut tx1, rx1) = oneshot::channel::<()>();
72 let (mut tx2, rx2) = oneshot::channel::<()>();
73 let (mut tx3, rx3) = oneshot::channel::<()>();
74
75 queue.push(rx1);
76 queue.push(rx2);
77 queue.push(rx3);
78
79 {
80 let cx = &mut noop_context();
81 assert!(!tx1.poll_canceled(cx).is_ready());
82 assert!(!tx2.poll_canceled(cx).is_ready());
83 assert!(!tx3.poll_canceled(cx).is_ready());
84
85 drop(queue);
86
87 assert!(tx1.poll_canceled(cx).is_ready());
88 assert!(tx2.poll_canceled(cx).is_ready());
89 assert!(tx3.poll_canceled(cx).is_ready());
90 }
91 }));
92 }
93
94 #[test]
stress()95 fn stress() {
96 #[cfg(miri)]
97 const ITER: usize = 30;
98 #[cfg(not(miri))]
99 const ITER: usize = 300;
100
101 for i in 0..ITER {
102 let n = (i % 10) + 1;
103
104 let mut queue = FuturesUnordered::new();
105
106 for _ in 0..5 {
107 let barrier = Arc::new(Barrier::new(n + 1));
108
109 for num in 0..n {
110 let barrier = barrier.clone();
111 let (tx, rx) = oneshot::channel();
112
113 queue.push(rx);
114
115 thread::spawn(move || {
116 barrier.wait();
117 tx.send(num).unwrap();
118 });
119 }
120
121 barrier.wait();
122
123 let mut sync = block_on_stream(queue);
124
125 let mut rx: Vec<_> = (&mut sync).take(n).map(|res| res.unwrap()).collect();
126
127 assert_eq!(rx.len(), n);
128
129 rx.sort_unstable();
130
131 for (i, x) in rx.into_iter().enumerate() {
132 assert_eq!(i, x);
133 }
134
135 queue = sync.into_inner();
136 }
137 }
138 }
139
140 #[test]
panicking_future_dropped()141 fn panicking_future_dropped() {
142 block_on(future::lazy(move |cx| {
143 let mut queue = FuturesUnordered::new();
144 queue.push(future::poll_fn(|_| -> Poll<Result<i32, i32>> { panic!() }));
145
146 let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next_unpin(cx)));
147 assert!(r.is_err());
148 assert!(queue.is_empty());
149 assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
150 }));
151 }
152