• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 mod assert_send_sync {
2     use futures::stream::FuturesUnordered;
3 
4     pub trait AssertSendSync: Send + Sync {}
5     impl AssertSendSync for FuturesUnordered<()> {}
6 }
7 
8 #[test]
basic_usage()9 fn basic_usage() {
10     use futures::channel::oneshot;
11     use futures::executor::block_on;
12     use futures::future;
13     use futures::stream::{FuturesUnordered, StreamExt};
14     use futures::task::Poll;
15 
16     block_on(future::lazy(move |cx| {
17         let mut queue = FuturesUnordered::new();
18         let (tx1, rx1) = oneshot::channel();
19         let (tx2, rx2) = oneshot::channel();
20         let (tx3, rx3) = oneshot::channel();
21 
22         queue.push(rx1);
23         queue.push(rx2);
24         queue.push(rx3);
25 
26         assert!(!queue.poll_next_unpin(cx).is_ready());
27 
28         tx2.send("hello").unwrap();
29 
30         assert_eq!(Poll::Ready(Some(Ok("hello"))), queue.poll_next_unpin(cx));
31         assert!(!queue.poll_next_unpin(cx).is_ready());
32 
33         tx1.send("world").unwrap();
34         tx3.send("world2").unwrap();
35 
36         assert_eq!(Poll::Ready(Some(Ok("world"))), queue.poll_next_unpin(cx));
37         assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
38         assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
39     }));
40 }
41 
42 #[test]
resolving_errors()43 fn resolving_errors() {
44     use futures::channel::oneshot;
45     use futures::executor::block_on;
46     use futures::future;
47     use futures::stream::{FuturesUnordered, StreamExt};
48     use futures::task::Poll;
49 
50     block_on(future::lazy(move |cx| {
51         let mut queue = FuturesUnordered::new();
52         let (tx1, rx1) = oneshot::channel();
53         let (tx2, rx2) = oneshot::channel();
54         let (tx3, rx3) = oneshot::channel();
55 
56         queue.push(rx1);
57         queue.push(rx2);
58         queue.push(rx3);
59 
60         assert!(!queue.poll_next_unpin(cx).is_ready());
61 
62         drop(tx2);
63 
64         assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
65         assert!(!queue.poll_next_unpin(cx).is_ready());
66 
67         drop(tx1);
68         tx3.send("world2").unwrap();
69 
70         assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
71         assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
72         assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
73     }));
74 }
75 
76 #[test]
dropping_ready_queue()77 fn dropping_ready_queue() {
78     use futures::channel::oneshot;
79     use futures::executor::block_on;
80     use futures::future;
81     use futures::stream::FuturesUnordered;
82     use futures_test::task::noop_context;
83 
84     block_on(future::lazy(move |_| {
85         let queue = FuturesUnordered::new();
86         let (mut tx1, rx1) = oneshot::channel::<()>();
87         let (mut tx2, rx2) = oneshot::channel::<()>();
88         let (mut tx3, rx3) = oneshot::channel::<()>();
89 
90         queue.push(rx1);
91         queue.push(rx2);
92         queue.push(rx3);
93 
94         {
95             let cx = &mut noop_context();
96             assert!(!tx1.poll_canceled(cx).is_ready());
97             assert!(!tx2.poll_canceled(cx).is_ready());
98             assert!(!tx3.poll_canceled(cx).is_ready());
99 
100             drop(queue);
101 
102             assert!(tx1.poll_canceled(cx).is_ready());
103             assert!(tx2.poll_canceled(cx).is_ready());
104             assert!(tx3.poll_canceled(cx).is_ready());
105         }
106     }));
107 }
108 
109 #[test]
stress()110 fn stress() {
111     use futures::channel::oneshot;
112     use futures::executor::block_on_stream;
113     use futures::stream::FuturesUnordered;
114     use std::sync::{Arc, Barrier};
115     use std::thread;
116 
117     const ITER: usize = 300;
118 
119     for i in 0..ITER {
120         let n = (i % 10) + 1;
121 
122         let mut queue = FuturesUnordered::new();
123 
124         for _ in 0..5 {
125             let barrier = Arc::new(Barrier::new(n + 1));
126 
127             for num in 0..n {
128                 let barrier = barrier.clone();
129                 let (tx, rx) = oneshot::channel();
130 
131                 queue.push(rx);
132 
133                 thread::spawn(move || {
134                     barrier.wait();
135                     tx.send(num).unwrap();
136                 });
137             }
138 
139             barrier.wait();
140 
141             let mut sync = block_on_stream(queue);
142 
143             let mut rx: Vec<_> = (&mut sync).take(n).map(|res| res.unwrap()).collect();
144 
145             assert_eq!(rx.len(), n);
146 
147             rx.sort_unstable();
148 
149             for (i, x) in rx.into_iter().enumerate() {
150                 assert_eq!(i, x);
151             }
152 
153             queue = sync.into_inner();
154         }
155     }
156 }
157 
158 #[test]
panicking_future_dropped()159 fn panicking_future_dropped() {
160     use futures::executor::block_on;
161     use futures::future;
162     use futures::stream::{FuturesUnordered, StreamExt};
163     use futures::task::Poll;
164     use std::panic::{self, AssertUnwindSafe};
165 
166     block_on(future::lazy(move |cx| {
167         let mut queue = FuturesUnordered::new();
168         queue.push(future::poll_fn(|_| -> Poll<Result<i32, i32>> { panic!() }));
169 
170         let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next_unpin(cx)));
171         assert!(r.is_err());
172         assert!(queue.is_empty());
173         assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
174     }));
175 }
176