• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use core::pin::Pin;
2 use futures::io::{AsyncBufRead, AsyncRead};
3 use futures::stream::{self, TryStreamExt};
4 use futures::task::Poll;
5 use futures_test::{stream::StreamTestExt, task::noop_context};
6 
7 macro_rules! assert_read {
8     ($reader:expr, $buf:expr, $item:expr) => {
9         let mut cx = noop_context();
10         loop {
11             match Pin::new(&mut $reader).poll_read(&mut cx, $buf) {
12                 Poll::Ready(Ok(x)) => {
13                     assert_eq!(x, $item);
14                     break;
15                 }
16                 Poll::Ready(Err(err)) => {
17                     panic!("assertion failed: expected value but got {}", err);
18                 }
19                 Poll::Pending => {
20                     continue;
21                 }
22             }
23         }
24     };
25 }
26 
27 macro_rules! assert_fill_buf {
28     ($reader:expr, $buf:expr) => {
29         let mut cx = noop_context();
30         loop {
31             match Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
32                 Poll::Ready(Ok(x)) => {
33                     assert_eq!(x, $buf);
34                     break;
35                 }
36                 Poll::Ready(Err(err)) => {
37                     panic!("assertion failed: expected value but got {}", err);
38                 }
39                 Poll::Pending => {
40                     continue;
41                 }
42             }
43         }
44     };
45 }
46 
47 #[test]
test_into_async_read()48 fn test_into_async_read() {
49     let stream = stream::iter((1..=3).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])]));
50     let mut reader = stream.interleave_pending().into_async_read();
51     let mut buf = vec![0; 3];
52 
53     assert_read!(reader, &mut buf, 3);
54     assert_eq!(&buf, &[1, 2, 3]);
55 
56     assert_read!(reader, &mut buf, 2);
57     assert_eq!(&buf[..2], &[4, 5]);
58 
59     assert_read!(reader, &mut buf, 3);
60     assert_eq!(&buf, &[1, 2, 3]);
61 
62     assert_read!(reader, &mut buf, 2);
63     assert_eq!(&buf[..2], &[4, 5]);
64 
65     assert_read!(reader, &mut buf, 3);
66     assert_eq!(&buf, &[1, 2, 3]);
67 
68     assert_read!(reader, &mut buf, 2);
69     assert_eq!(&buf[..2], &[4, 5]);
70 
71     assert_read!(reader, &mut buf, 0);
72 }
73 
74 #[test]
test_into_async_bufread()75 fn test_into_async_bufread() {
76     let stream = stream::iter((1..=2).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])]));
77     let mut reader = stream.interleave_pending().into_async_read();
78 
79     let mut reader = Pin::new(&mut reader);
80 
81     assert_fill_buf!(reader, &[1, 2, 3, 4, 5][..]);
82     reader.as_mut().consume(3);
83 
84     assert_fill_buf!(reader, &[4, 5][..]);
85     reader.as_mut().consume(2);
86 
87     assert_fill_buf!(reader, &[1, 2, 3, 4, 5][..]);
88     reader.as_mut().consume(2);
89 
90     assert_fill_buf!(reader, &[3, 4, 5][..]);
91     reader.as_mut().consume(3);
92 
93     assert_fill_buf!(reader, &[][..]);
94 }
95