• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use futures::executor::{block_on, block_on_stream};
2 use futures::future::{err, ok};
3 use futures::stream::{empty, iter_ok, poll_fn, Peekable};
4 use futures::channel::oneshot;
5 use futures::channel::mpsc;
6 
7 mod support;
8 use support::*;
9 
10 pub struct Iter<I> {
11     iter: I,
12 }
13 
iter<J, T, E>(i: J) -> Iter<J::IntoIter> where J: IntoIterator<Item=Result<T, E>>,14 pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter>
15     where J: IntoIterator<Item=Result<T, E>>,
16 {
17     Iter {
18         iter: i.into_iter(),
19     }
20 }
21 
22 impl<I, T, E> Stream for Iter<I>
23     where I: Iterator<Item=Result<T, E>>,
24 {
25     type Item = T;
26     type Error = E;
27 
poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<T>, E>28     fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<T>, E> {
29         match self.iter.next() {
30             Some(Ok(e)) => Ok(Poll::Ready(Some(e))),
31             Some(Err(e)) => Err(e),
32             None => Ok(Poll::Ready(None)),
33         }
34     }
35 }
36 
list() -> Box<Stream<Item=i32, Error=u32> + Send>37 fn list() -> Box<Stream<Item=i32, Error=u32> + Send> {
38     let (tx, rx) = mpsc::channel(1);
39     tx.send(Ok(1))
40       .and_then(|tx| tx.send(Ok(2)))
41       .and_then(|tx| tx.send(Ok(3)))
42       .forget();
43     Box::new(rx.then(|r| r.unwrap()))
44 }
45 
err_list() -> Box<Stream<Item=i32, Error=u32> + Send>46 fn err_list() -> Box<Stream<Item=i32, Error=u32> + Send> {
47     let (tx, rx) = mpsc::channel(1);
48     tx.send(Ok(1))
49       .and_then(|tx| tx.send(Ok(2)))
50       .and_then(|tx| tx.send(Err(3)))
51       .forget();
52     Box::new(rx.then(|r| r.unwrap()))
53 }
54 
55 #[test]
map()56 fn map() {
57     assert_done(|| list().map(|a| a + 1).collect(), Ok(vec![2, 3, 4]));
58 }
59 
60 #[test]
map_err()61 fn map_err() {
62     assert_done(|| err_list().map_err(|a| a + 1).collect::<Vec<_>>(), Err(4));
63 }
64 
65 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
66 struct FromErrTest(u32);
67 
68 impl From<u32> for FromErrTest {
from(i: u32) -> Self69     fn from(i: u32) -> Self {
70         Self(i)
71     }
72 }
73 
74 #[test]
from_err()75 fn from_err() {
76     assert_done(|| err_list().err_into().collect::<Vec<_>>(), Err(FromErrTest(3)));
77 }
78 
79 #[test]
fold()80 fn fold() {
81     assert_done(|| list().fold(0, |a, b| ok::<i32, u32>(a + b)), Ok(6));
82     assert_done(|| err_list().fold(0, |a, b| ok::<i32, u32>(a + b)), Err(3));
83 }
84 
85 #[test]
filter()86 fn filter() {
87     assert_done(|| list().filter(|a| ok(*a % 2 == 0)).collect(), Ok(vec![2]));
88 }
89 
90 #[test]
filter_map()91 fn filter_map() {
92     assert_done(|| list().filter_map(|x| {
93         ok(if x % 2 == 0 {
94             Some(x + 10)
95         } else {
96             None
97         })
98     }).collect(), Ok(vec![12]));
99 }
100 
101 #[test]
and_then()102 fn and_then() {
103     assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4]));
104     assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect::<Vec<_>>(),
105                 Err(1));
106 }
107 
108 #[test]
then()109 fn then() {
110     assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4]));
111 
112 }
113 
114 #[test]
or_else()115 fn or_else() {
116     assert_done(|| err_list().or_else(|a| {
117         ok::<i32, u32>(a as i32)
118     }).collect(), Ok(vec![1, 2, 3]));
119 }
120 
121 #[test]
flatten()122 fn flatten() {
123     assert_done(|| list().map(|_| list()).flatten().collect(),
124                 Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3]));
125 
126 }
127 
128 #[test]
skip()129 fn skip() {
130     assert_done(|| list().skip(2).collect(), Ok(vec![3]));
131 }
132 
133 #[test]
skip_passes_errors_through()134 fn skip_passes_errors_through() {
135     let mut s = block_on_stream(
136         iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1)
137     );
138     assert_eq!(s.next(), Some(Err(1)));
139     assert_eq!(s.next(), Some(Err(2)));
140     assert_eq!(s.next(), Some(Ok(4)));
141     assert_eq!(s.next(), Some(Ok(5)));
142     assert_eq!(s.next(), None);
143 }
144 
145 #[test]
skip_while()146 fn skip_while() {
147     assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(),
148                 Ok(vec![2, 3]));
149 }
150 #[test]
take()151 fn take() {
152     assert_done(|| list().take(2).collect(), Ok(vec![1, 2]));
153 }
154 
155 #[test]
take_while()156 fn take_while() {
157     assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(),
158                 Ok(vec![1, 2]));
159 }
160 
161 #[test]
take_passes_errors_through()162 fn take_passes_errors_through() {
163     let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Err(4)]).take(1));
164     assert_eq!(s.next(), Some(Err(1)));
165     assert_eq!(s.next(), Some(Err(2)));
166     assert_eq!(s.next(), Some(Ok(3)));
167     assert_eq!(s.next(), None);
168 
169     let mut s = block_on_stream(iter(vec![Ok(1), Err(2)]).take(1));
170     assert_eq!(s.next(), Some(Ok(1)));
171     assert_eq!(s.next(), None);
172 }
173 
174 #[test]
peekable()175 fn peekable() {
176     assert_done(|| list().peekable().collect(), Ok(vec![1, 2, 3]));
177 }
178 
179 #[test]
fuse()180 fn fuse() {
181     let mut stream = block_on_stream(list().fuse());
182     assert_eq!(stream.next(), Some(Ok(1)));
183     assert_eq!(stream.next(), Some(Ok(2)));
184     assert_eq!(stream.next(), Some(Ok(3)));
185     assert_eq!(stream.next(), None);
186     assert_eq!(stream.next(), None);
187     assert_eq!(stream.next(), None);
188 }
189 
190 #[test]
buffered()191 fn buffered() {
192     let (tx, rx) = mpsc::channel(1);
193     let (a, b) = oneshot::channel::<u32>();
194     let (c, d) = oneshot::channel::<u32>();
195 
196     tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item=_, Error=_> + Send>)
197       .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
198       .forget();
199 
200     let mut rx = rx.buffered(2);
201     sassert_empty(&mut rx);
202     c.send(3).unwrap();
203     sassert_empty(&mut rx);
204     a.send(5).unwrap();
205     let mut rx = block_on_stream(rx);
206     assert_eq!(rx.next(), Some(Ok(5)));
207     assert_eq!(rx.next(), Some(Ok(3)));
208     assert_eq!(rx.next(), None);
209 
210     let (tx, rx) = mpsc::channel(1);
211     let (a, b) = oneshot::channel::<u32>();
212     let (c, d) = oneshot::channel::<u32>();
213 
214     tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item=_, Error=_> + Send>)
215       .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
216       .forget();
217 
218     let mut rx = rx.buffered(1);
219     sassert_empty(&mut rx);
220     c.send(3).unwrap();
221     sassert_empty(&mut rx);
222     a.send(5).unwrap();
223     let mut rx = block_on_stream(rx);
224     assert_eq!(rx.next(), Some(Ok(5)));
225     assert_eq!(rx.next(), Some(Ok(3)));
226     assert_eq!(rx.next(), None);
227 }
228 
229 #[test]
unordered()230 fn unordered() {
231     let (tx, rx) = mpsc::channel(1);
232     let (a, b) = oneshot::channel::<u32>();
233     let (c, d) = oneshot::channel::<u32>();
234 
235     tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
236       .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!()))))
237       .forget();
238 
239     let mut rx = rx.buffer_unordered(2);
240     sassert_empty(&mut rx);
241     let mut rx = block_on_stream(rx);
242     c.send(3).unwrap();
243     assert_eq!(rx.next(), Some(Ok(3)));
244     a.send(5).unwrap();
245     assert_eq!(rx.next(), Some(Ok(5)));
246     assert_eq!(rx.next(), None);
247 
248     let (tx, rx) = mpsc::channel(1);
249     let (a, b) = oneshot::channel::<u32>();
250     let (c, d) = oneshot::channel::<u32>();
251 
252     tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
253       .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!()))))
254       .forget();
255 
256     // We don't even get to see `c` until `a` completes.
257     let mut rx = rx.buffer_unordered(1);
258     sassert_empty(&mut rx);
259     c.send(3).unwrap();
260     sassert_empty(&mut rx);
261     a.send(5).unwrap();
262     let mut rx = block_on_stream(rx);
263     assert_eq!(rx.next(), Some(Ok(5)));
264     assert_eq!(rx.next(), Some(Ok(3)));
265     assert_eq!(rx.next(), None);
266 }
267 
268 #[test]
zip()269 fn zip() {
270     assert_done(|| list().zip(list()).collect(),
271                 Ok(vec![(1, 1), (2, 2), (3, 3)]));
272     assert_done(|| list().zip(list().take(2)).collect(),
273                 Ok(vec![(1, 1), (2, 2)]));
274     assert_done(|| list().take(2).zip(list()).collect(),
275                 Ok(vec![(1, 1), (2, 2)]));
276     assert_done(|| err_list().zip(list()).collect::<Vec<_>>(), Err(3));
277     assert_done(|| list().zip(list().map(|x| x + 1)).collect(),
278                 Ok(vec![(1, 2), (2, 3), (3, 4)]));
279 }
280 
281 #[test]
peek()282 fn peek() {
283     struct Peek {
284         inner: Peekable<Box<Stream<Item = i32, Error =u32> + Send>>
285     }
286 
287     impl Future for Peek {
288         type Item = ();
289         type Error = u32;
290 
291         fn poll(&mut self, cx: &mut Context<'_>) -> Poll<(), u32> {
292             {
293                 let res = ready!(self.inner.peek(cx))?;
294                 assert_eq!(res, Some(&1));
295             }
296             assert_eq!(self.inner.peek(cx).unwrap(), Some(&1).into());
297             assert_eq!(self.inner.poll_next(cx).unwrap(), Some(1).into());
298             Ok(Poll::Ready(()))
299         }
300     }
301 
302     block_on(Peek {
303         inner: list().peekable(),
304     }).unwrap()
305 }
306 
307 #[test]
wait()308 fn wait() {
309     assert_eq!(block_on_stream(list()).collect::<Result<Vec<_>, _>>(),
310                Ok(vec![1, 2, 3]));
311 }
312 
313 #[test]
chunks()314 fn chunks() {
315     assert_done(|| list().chunks(3).collect(), Ok(vec![vec![1, 2, 3]]));
316     assert_done(|| list().chunks(1).collect(), Ok(vec![vec![1], vec![2], vec![3]]));
317     assert_done(|| list().chunks(2).collect(), Ok(vec![vec![1, 2], vec![3]]));
318     let mut list = block_on_stream(err_list().chunks(3));
319     let i = list.next().unwrap().unwrap();
320     assert_eq!(i, vec![1, 2]);
321     let i = list.next().unwrap().unwrap_err();
322     assert_eq!(i, 3);
323 }
324 
325 #[test]
326 #[should_panic]
chunks_panic_on_cap_zero()327 fn chunks_panic_on_cap_zero() {
328     let _ = list().chunks(0);
329 }
330 
331 #[test]
forward()332 fn forward() {
333     let v = Vec::new();
334     let v = block_on(iter_ok::<_, Never>(vec![0, 1]).forward(v)).unwrap().1;
335     assert_eq!(v, vec![0, 1]);
336 
337     let v = block_on(iter_ok::<_, Never>(vec![2, 3]).forward(v)).unwrap().1;
338     assert_eq!(v, vec![0, 1, 2, 3]);
339 
340     assert_done(move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s),
341                 Ok(vec![0, 1, 2, 3, 4, 5]));
342 }
343 
344 #[test]
345 #[allow(deprecated)]
concat()346 fn concat() {
347     let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
348     assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
349 
350     let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);
351     assert_done(move || b.concat(), Err(()));
352 }
353 
354 #[test]
concat2()355 fn concat2() {
356     let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
357     assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
358 
359     let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);
360     assert_done(move || b.concat(), Err(()));
361 
362     let c = empty::<Vec<()>, ()>();
363     assert_done(move || c.concat(), Ok(vec![]))
364 }
365 
366 #[test]
stream_poll_fn()367 fn stream_poll_fn() {
368     let mut counter = 5usize;
369 
370     let read_stream = poll_fn(move |_| -> Poll<Option<usize>, std::io::Error> {
371         if counter == 0 {
372             return Ok(Poll::Ready(None));
373         }
374         counter -= 1;
375         Ok(Poll::Ready(Some(counter)))
376     });
377 
378     assert_eq!(block_on_stream(read_stream).count(), 5);
379 }
380 
381 #[test]
inspect()382 fn inspect() {
383     let mut seen = vec![];
384     assert_done(|| list().inspect(|&a| seen.push(a)).collect(), Ok(vec![1, 2, 3]));
385     assert_eq!(seen, [1, 2, 3]);
386 }
387 
388 #[test]
inspect_err()389 fn inspect_err() {
390     let mut seen = vec![];
391     assert_done(|| err_list().inspect_err(|&a| seen.push(a)).collect::<Vec<_>>(), Err(3));
392     assert_eq!(seen, [3]);
393 }
394