1 mod assert_send_sync {
2 use futures::stream::FuturesUnordered;
3
4 pub trait AssertSendSync: Send + Sync {}
5 impl AssertSendSync for FuturesUnordered<()> {}
6 }
7
8 #[test]
basic_usage()9 fn basic_usage() {
10 use futures::channel::oneshot;
11 use futures::executor::block_on;
12 use futures::future;
13 use futures::stream::{FuturesUnordered, StreamExt};
14 use futures::task::Poll;
15
16 block_on(future::lazy(move |cx| {
17 let mut queue = FuturesUnordered::new();
18 let (tx1, rx1) = oneshot::channel();
19 let (tx2, rx2) = oneshot::channel();
20 let (tx3, rx3) = oneshot::channel();
21
22 queue.push(rx1);
23 queue.push(rx2);
24 queue.push(rx3);
25
26 assert!(!queue.poll_next_unpin(cx).is_ready());
27
28 tx2.send("hello").unwrap();
29
30 assert_eq!(Poll::Ready(Some(Ok("hello"))), queue.poll_next_unpin(cx));
31 assert!(!queue.poll_next_unpin(cx).is_ready());
32
33 tx1.send("world").unwrap();
34 tx3.send("world2").unwrap();
35
36 assert_eq!(Poll::Ready(Some(Ok("world"))), queue.poll_next_unpin(cx));
37 assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
38 assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
39 }));
40 }
41
42 #[test]
resolving_errors()43 fn resolving_errors() {
44 use futures::channel::oneshot;
45 use futures::executor::block_on;
46 use futures::future;
47 use futures::stream::{FuturesUnordered, StreamExt};
48 use futures::task::Poll;
49
50 block_on(future::lazy(move |cx| {
51 let mut queue = FuturesUnordered::new();
52 let (tx1, rx1) = oneshot::channel();
53 let (tx2, rx2) = oneshot::channel();
54 let (tx3, rx3) = oneshot::channel();
55
56 queue.push(rx1);
57 queue.push(rx2);
58 queue.push(rx3);
59
60 assert!(!queue.poll_next_unpin(cx).is_ready());
61
62 drop(tx2);
63
64 assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
65 assert!(!queue.poll_next_unpin(cx).is_ready());
66
67 drop(tx1);
68 tx3.send("world2").unwrap();
69
70 assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
71 assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
72 assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
73 }));
74 }
75
76 #[test]
dropping_ready_queue()77 fn dropping_ready_queue() {
78 use futures::channel::oneshot;
79 use futures::executor::block_on;
80 use futures::future;
81 use futures::stream::FuturesUnordered;
82 use futures_test::task::noop_context;
83
84 block_on(future::lazy(move |_| {
85 let queue = FuturesUnordered::new();
86 let (mut tx1, rx1) = oneshot::channel::<()>();
87 let (mut tx2, rx2) = oneshot::channel::<()>();
88 let (mut tx3, rx3) = oneshot::channel::<()>();
89
90 queue.push(rx1);
91 queue.push(rx2);
92 queue.push(rx3);
93
94 {
95 let cx = &mut noop_context();
96 assert!(!tx1.poll_canceled(cx).is_ready());
97 assert!(!tx2.poll_canceled(cx).is_ready());
98 assert!(!tx3.poll_canceled(cx).is_ready());
99
100 drop(queue);
101
102 assert!(tx1.poll_canceled(cx).is_ready());
103 assert!(tx2.poll_canceled(cx).is_ready());
104 assert!(tx3.poll_canceled(cx).is_ready());
105 }
106 }));
107 }
108
109 #[test]
stress()110 fn stress() {
111 use futures::channel::oneshot;
112 use futures::executor::block_on_stream;
113 use futures::stream::FuturesUnordered;
114 use std::sync::{Arc, Barrier};
115 use std::thread;
116
117 const ITER: usize = 300;
118
119 for i in 0..ITER {
120 let n = (i % 10) + 1;
121
122 let mut queue = FuturesUnordered::new();
123
124 for _ in 0..5 {
125 let barrier = Arc::new(Barrier::new(n + 1));
126
127 for num in 0..n {
128 let barrier = barrier.clone();
129 let (tx, rx) = oneshot::channel();
130
131 queue.push(rx);
132
133 thread::spawn(move || {
134 barrier.wait();
135 tx.send(num).unwrap();
136 });
137 }
138
139 barrier.wait();
140
141 let mut sync = block_on_stream(queue);
142
143 let mut rx: Vec<_> = (&mut sync).take(n).map(|res| res.unwrap()).collect();
144
145 assert_eq!(rx.len(), n);
146
147 rx.sort_unstable();
148
149 for (i, x) in rx.into_iter().enumerate() {
150 assert_eq!(i, x);
151 }
152
153 queue = sync.into_inner();
154 }
155 }
156 }
157
158 #[test]
panicking_future_dropped()159 fn panicking_future_dropped() {
160 use futures::executor::block_on;
161 use futures::future;
162 use futures::stream::{FuturesUnordered, StreamExt};
163 use futures::task::Poll;
164 use std::panic::{self, AssertUnwindSafe};
165
166 block_on(future::lazy(move |cx| {
167 let mut queue = FuturesUnordered::new();
168 queue.push(future::poll_fn(|_| -> Poll<Result<i32, i32>> { panic!() }));
169
170 let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next_unpin(cx)));
171 assert!(r.is_err());
172 assert!(queue.is_empty());
173 assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
174 }));
175 }
176