• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use futures::channel::oneshot;
2 use futures::executor::{block_on, block_on_stream};
3 use futures::future::{self, join, Future, FutureExt};
4 use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
5 use futures::task::{Context, Poll};
6 use futures_test::future::FutureTestExt;
7 use futures_test::task::noop_context;
8 use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending};
9 use std::iter::FromIterator;
10 use std::pin::Pin;
11 use std::sync::atomic::{AtomicBool, Ordering};
12 
13 #[test]
is_terminated()14 fn is_terminated() {
15     let mut cx = noop_context();
16     let mut tasks = FuturesUnordered::new();
17 
18     assert_eq!(tasks.is_terminated(), false);
19     assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
20     assert_eq!(tasks.is_terminated(), true);
21 
22     // Test that the sentinel value doesn't leak
23     assert_eq!(tasks.is_empty(), true);
24     assert_eq!(tasks.len(), 0);
25     assert_eq!(tasks.iter_mut().len(), 0);
26 
27     tasks.push(future::ready(1));
28 
29     assert_eq!(tasks.is_empty(), false);
30     assert_eq!(tasks.len(), 1);
31     assert_eq!(tasks.iter_mut().len(), 1);
32 
33     assert_eq!(tasks.is_terminated(), false);
34     assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1)));
35     assert_eq!(tasks.is_terminated(), false);
36     assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
37     assert_eq!(tasks.is_terminated(), true);
38 }
39 
40 #[test]
works_1()41 fn works_1() {
42     let (a_tx, a_rx) = oneshot::channel::<i32>();
43     let (b_tx, b_rx) = oneshot::channel::<i32>();
44     let (c_tx, c_rx) = oneshot::channel::<i32>();
45 
46     let mut iter =
47         block_on_stream(vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>());
48 
49     b_tx.send(99).unwrap();
50     assert_eq!(Some(Ok(99)), iter.next());
51 
52     a_tx.send(33).unwrap();
53     c_tx.send(33).unwrap();
54     assert_eq!(Some(Ok(33)), iter.next());
55     assert_eq!(Some(Ok(33)), iter.next());
56     assert_eq!(None, iter.next());
57 }
58 
59 #[test]
works_2()60 fn works_2() {
61     let (a_tx, a_rx) = oneshot::channel::<i32>();
62     let (b_tx, b_rx) = oneshot::channel::<i32>();
63     let (c_tx, c_rx) = oneshot::channel::<i32>();
64 
65     let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()]
66         .into_iter()
67         .collect::<FuturesUnordered<_>>();
68 
69     a_tx.send(9).unwrap();
70     b_tx.send(10).unwrap();
71 
72     let mut cx = noop_context();
73     assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(9))));
74     c_tx.send(20).unwrap();
75     assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(30))));
76     assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(None));
77 }
78 
79 #[test]
from_iterator()80 fn from_iterator() {
81     let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)]
82         .into_iter()
83         .collect::<FuturesUnordered<_>>();
84     assert_eq!(stream.len(), 3);
85     assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
86 }
87 
88 #[test]
finished_future()89 fn finished_future() {
90     let (_a_tx, a_rx) = oneshot::channel::<i32>();
91     let (b_tx, b_rx) = oneshot::channel::<i32>();
92     let (c_tx, c_rx) = oneshot::channel::<i32>();
93 
94     let mut stream = vec![
95         Box::new(a_rx) as Box<dyn Future<Output = Result<_, _>> + Unpin>,
96         Box::new(future::select(b_rx, c_rx).map(|e| e.factor_first().0)) as _,
97     ]
98     .into_iter()
99     .collect::<FuturesUnordered<_>>();
100 
101     let cx = &mut noop_context();
102     for _ in 0..10 {
103         assert!(stream.poll_next_unpin(cx).is_pending());
104     }
105 
106     b_tx.send(12).unwrap();
107     c_tx.send(3).unwrap();
108     assert!(stream.poll_next_unpin(cx).is_ready());
109     assert!(stream.poll_next_unpin(cx).is_pending());
110     assert!(stream.poll_next_unpin(cx).is_pending());
111 }
112 
113 #[test]
iter_mut_cancel()114 fn iter_mut_cancel() {
115     let (a_tx, a_rx) = oneshot::channel::<i32>();
116     let (b_tx, b_rx) = oneshot::channel::<i32>();
117     let (c_tx, c_rx) = oneshot::channel::<i32>();
118 
119     let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>();
120 
121     for rx in stream.iter_mut() {
122         rx.close();
123     }
124 
125     let mut iter = block_on_stream(stream);
126 
127     assert!(a_tx.is_canceled());
128     assert!(b_tx.is_canceled());
129     assert!(c_tx.is_canceled());
130 
131     assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
132     assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
133     assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
134     assert_eq!(iter.next(), None);
135 }
136 
137 #[test]
iter_mut_len()138 fn iter_mut_len() {
139     let mut stream =
140         vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
141             .into_iter()
142             .collect::<FuturesUnordered<_>>();
143 
144     let mut iter_mut = stream.iter_mut();
145     assert_eq!(iter_mut.len(), 3);
146     assert!(iter_mut.next().is_some());
147     assert_eq!(iter_mut.len(), 2);
148     assert!(iter_mut.next().is_some());
149     assert_eq!(iter_mut.len(), 1);
150     assert!(iter_mut.next().is_some());
151     assert_eq!(iter_mut.len(), 0);
152     assert!(iter_mut.next().is_none());
153 }
154 
155 #[test]
iter_cancel()156 fn iter_cancel() {
157     struct AtomicCancel<F> {
158         future: F,
159         cancel: AtomicBool,
160     }
161 
162     impl<F: Future + Unpin> Future for AtomicCancel<F> {
163         type Output = Option<<F as Future>::Output>;
164 
165         fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
166             if self.cancel.load(Ordering::Relaxed) {
167                 Poll::Ready(None)
168             } else {
169                 self.future.poll_unpin(cx).map(Some)
170             }
171         }
172     }
173 
174     impl<F: Future + Unpin> AtomicCancel<F> {
175         fn new(future: F) -> Self {
176             Self { future, cancel: AtomicBool::new(false) }
177         }
178     }
179 
180     let stream = vec![
181         AtomicCancel::new(future::pending::<()>()),
182         AtomicCancel::new(future::pending::<()>()),
183         AtomicCancel::new(future::pending::<()>()),
184     ]
185     .into_iter()
186     .collect::<FuturesUnordered<_>>();
187 
188     for f in stream.iter() {
189         f.cancel.store(true, Ordering::Relaxed);
190     }
191 
192     let mut iter = block_on_stream(stream);
193 
194     assert_eq!(iter.next(), Some(None));
195     assert_eq!(iter.next(), Some(None));
196     assert_eq!(iter.next(), Some(None));
197     assert_eq!(iter.next(), None);
198 }
199 
200 #[test]
iter_len()201 fn iter_len() {
202     let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
203         .into_iter()
204         .collect::<FuturesUnordered<_>>();
205 
206     let mut iter = stream.iter();
207     assert_eq!(iter.len(), 3);
208     assert!(iter.next().is_some());
209     assert_eq!(iter.len(), 2);
210     assert!(iter.next().is_some());
211     assert_eq!(iter.len(), 1);
212     assert!(iter.next().is_some());
213     assert_eq!(iter.len(), 0);
214     assert!(iter.next().is_none());
215 }
216 
217 #[test]
into_iter_cancel()218 fn into_iter_cancel() {
219     let (a_tx, a_rx) = oneshot::channel::<i32>();
220     let (b_tx, b_rx) = oneshot::channel::<i32>();
221     let (c_tx, c_rx) = oneshot::channel::<i32>();
222 
223     let stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>();
224 
225     let stream = stream
226         .into_iter()
227         .map(|mut rx| {
228             rx.close();
229             rx
230         })
231         .collect::<FuturesUnordered<_>>();
232 
233     let mut iter = block_on_stream(stream);
234 
235     assert!(a_tx.is_canceled());
236     assert!(b_tx.is_canceled());
237     assert!(c_tx.is_canceled());
238 
239     assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
240     assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
241     assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
242     assert_eq!(iter.next(), None);
243 }
244 
245 #[test]
into_iter_len()246 fn into_iter_len() {
247     let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
248         .into_iter()
249         .collect::<FuturesUnordered<_>>();
250 
251     let mut into_iter = stream.into_iter();
252     assert_eq!(into_iter.len(), 3);
253     assert!(into_iter.next().is_some());
254     assert_eq!(into_iter.len(), 2);
255     assert!(into_iter.next().is_some());
256     assert_eq!(into_iter.len(), 1);
257     assert!(into_iter.next().is_some());
258     assert_eq!(into_iter.len(), 0);
259     assert!(into_iter.next().is_none());
260 }
261 
262 #[test]
into_iter_partial()263 fn into_iter_partial() {
264     let stream = vec![future::ready(1), future::ready(2), future::ready(3), future::ready(4)]
265         .into_iter()
266         .collect::<FuturesUnordered<_>>();
267 
268     let mut into_iter = stream.into_iter();
269     assert!(into_iter.next().is_some());
270     assert!(into_iter.next().is_some());
271     assert!(into_iter.next().is_some());
272     assert_eq!(into_iter.len(), 1);
273     // don't panic when iterator is dropped before completing
274 }
275 
276 #[test]
futures_not_moved_after_poll()277 fn futures_not_moved_after_poll() {
278     // Future that will be ready after being polled twice,
279     // asserting that it does not move.
280     let fut = future::ready(()).pending_once().assert_unmoved();
281     let mut stream = vec![fut; 3].into_iter().collect::<FuturesUnordered<_>>();
282     assert_stream_pending!(stream);
283     assert_stream_next!(stream, ());
284     assert_stream_next!(stream, ());
285     assert_stream_next!(stream, ());
286     assert_stream_done!(stream);
287 }
288 
289 #[test]
len_valid_during_out_of_order_completion()290 fn len_valid_during_out_of_order_completion() {
291     // Complete futures out-of-order and add new futures afterwards to ensure
292     // length values remain correct.
293     let (a_tx, a_rx) = oneshot::channel::<i32>();
294     let (b_tx, b_rx) = oneshot::channel::<i32>();
295     let (c_tx, c_rx) = oneshot::channel::<i32>();
296     let (d_tx, d_rx) = oneshot::channel::<i32>();
297 
298     let mut cx = noop_context();
299     let mut stream = FuturesUnordered::new();
300     assert_eq!(stream.len(), 0);
301 
302     stream.push(a_rx);
303     assert_eq!(stream.len(), 1);
304     stream.push(b_rx);
305     assert_eq!(stream.len(), 2);
306     stream.push(c_rx);
307     assert_eq!(stream.len(), 3);
308 
309     b_tx.send(4).unwrap();
310     assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(4))));
311     assert_eq!(stream.len(), 2);
312 
313     stream.push(d_rx);
314     assert_eq!(stream.len(), 3);
315 
316     c_tx.send(5).unwrap();
317     assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(5))));
318     assert_eq!(stream.len(), 2);
319 
320     d_tx.send(6).unwrap();
321     assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(6))));
322     assert_eq!(stream.len(), 1);
323 
324     a_tx.send(7).unwrap();
325     assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(7))));
326     assert_eq!(stream.len(), 0);
327 }
328 
329 #[test]
polled_only_once_at_most_per_iteration()330 fn polled_only_once_at_most_per_iteration() {
331     #[derive(Debug, Clone, Copy, Default)]
332     struct F {
333         polled: bool,
334     }
335 
336     impl Future for F {
337         type Output = ();
338 
339         fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
340             if self.polled {
341                 panic!("polled twice")
342             } else {
343                 self.polled = true;
344                 Poll::Pending
345             }
346         }
347     }
348 
349     let cx = &mut noop_context();
350 
351     let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 10]);
352     assert!(tasks.poll_next_unpin(cx).is_pending());
353     assert_eq!(10, tasks.iter().filter(|f| f.polled).count());
354 
355     let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
356     assert!(tasks.poll_next_unpin(cx).is_pending());
357     assert_eq!(33, tasks.iter().filter(|f| f.polled).count());
358 
359     let mut tasks = FuturesUnordered::<F>::new();
360     assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));
361 }
362 
363 #[test]
clear()364 fn clear() {
365     let mut tasks = FuturesUnordered::from_iter(vec![future::ready(1), future::ready(2)]);
366 
367     assert_eq!(block_on(tasks.next()), Some(1));
368     assert!(!tasks.is_empty());
369 
370     tasks.clear();
371     assert!(tasks.is_empty());
372 
373     tasks.push(future::ready(3));
374     assert!(!tasks.is_empty());
375 
376     tasks.clear();
377     assert!(tasks.is_empty());
378 
379     assert_eq!(block_on(tasks.next()), None);
380     assert!(tasks.is_terminated());
381     tasks.clear();
382     assert!(!tasks.is_terminated());
383 }
384 
385 // https://github.com/rust-lang/futures-rs/issues/2529#issuecomment-997290279
386 #[test]
clear_in_loop()387 fn clear_in_loop() {
388     const N: usize =
389         if cfg!(miri) || option_env!("QEMU_LD_PREFIX").is_some() { 100 } else { 10_000 };
390     futures::executor::block_on(async {
391         async fn task() {
392             let (s, r) = oneshot::channel();
393             std::thread::spawn(|| {
394                 std::thread::sleep(std::time::Duration::from_micros(100));
395                 let _ = s.send(());
396             });
397             r.await.unwrap()
398         }
399         let mut futures = FuturesUnordered::new();
400         for _ in 0..N {
401             for _ in 0..24 {
402                 futures.push(task());
403             }
404             let _ = futures.next().await;
405             futures.clear();
406         }
407     });
408 }
409 
410 // https://github.com/rust-lang/futures-rs/issues/2863#issuecomment-2219441515
411 #[test]
412 #[should_panic]
panic_on_drop_fut()413 fn panic_on_drop_fut() {
414     struct BadFuture;
415 
416     impl Drop for BadFuture {
417         fn drop(&mut self) {
418             panic!()
419         }
420     }
421 
422     impl Future for BadFuture {
423         type Output = ();
424 
425         fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
426             Poll::Pending
427         }
428     }
429 
430     FuturesUnordered::default().push(BadFuture);
431 }
432