• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 macro_rules! run_fill_buf {
2     ($reader:expr) => {{
3         use futures_test::task::noop_context;
4         use futures::task::Poll;
5         use std::pin::Pin;
6 
7         let mut cx = noop_context();
8         loop {
9             if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
10                 break x;
11             }
12         }
13     }};
14 }
15 
16 mod util {
17     use futures::future::Future;
run<F: Future + Unpin>(mut f: F) -> F::Output18     pub fn run<F: Future + Unpin>(mut f: F) -> F::Output {
19         use futures_test::task::noop_context;
20         use futures::task::Poll;
21         use futures::future::FutureExt;
22 
23         let mut cx = noop_context();
24         loop {
25             if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
26                 return x;
27             }
28         }
29     }
30 }
31 
32 mod maybe_pending {
33     use futures::task::{Context,Poll};
34     use std::{cmp,io};
35     use std::pin::Pin;
36     use futures::io::{AsyncRead,AsyncBufRead};
37 
38     pub struct MaybePending<'a> {
39         inner: &'a [u8],
40         ready_read: bool,
41         ready_fill_buf: bool,
42     }
43 
44     impl<'a> MaybePending<'a> {
new(inner: &'a [u8]) -> Self45         pub fn new(inner: &'a [u8]) -> Self {
46             Self { inner, ready_read: false, ready_fill_buf: false }
47         }
48     }
49 
50     impl AsyncRead for MaybePending<'_> {
poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>>51         fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
52             -> Poll<io::Result<usize>>
53         {
54             if self.ready_read {
55                 self.ready_read = false;
56                 Pin::new(&mut self.inner).poll_read(cx, buf)
57             } else {
58                 self.ready_read = true;
59                 Poll::Pending
60             }
61         }
62     }
63 
64     impl AsyncBufRead for MaybePending<'_> {
poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>>65         fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>)
66             -> Poll<io::Result<&[u8]>>
67         {
68             if self.ready_fill_buf {
69                 self.ready_fill_buf = false;
70                 if self.inner.is_empty() { return Poll::Ready(Ok(&[])) }
71                 let len = cmp::min(2, self.inner.len());
72                 Poll::Ready(Ok(&self.inner[0..len]))
73             } else {
74                 self.ready_fill_buf = true;
75                 Poll::Pending
76             }
77         }
78 
consume(mut self: Pin<&mut Self>, amt: usize)79         fn consume(mut self: Pin<&mut Self>, amt: usize) {
80             self.inner = &self.inner[amt..];
81         }
82     }
83 }
84 
85 #[test]
test_buffered_reader()86 fn test_buffered_reader() {
87     use futures::executor::block_on;
88     use futures::io::{AsyncReadExt, BufReader};
89 
90     let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
91     let mut reader = BufReader::with_capacity(2, inner);
92 
93     let mut buf = [0, 0, 0];
94     let nread = block_on(reader.read(&mut buf));
95     assert_eq!(nread.unwrap(), 3);
96     assert_eq!(buf, [5, 6, 7]);
97     assert_eq!(reader.buffer(), []);
98 
99     let mut buf = [0, 0];
100     let nread = block_on(reader.read(&mut buf));
101     assert_eq!(nread.unwrap(), 2);
102     assert_eq!(buf, [0, 1]);
103     assert_eq!(reader.buffer(), []);
104 
105     let mut buf = [0];
106     let nread = block_on(reader.read(&mut buf));
107     assert_eq!(nread.unwrap(), 1);
108     assert_eq!(buf, [2]);
109     assert_eq!(reader.buffer(), [3]);
110 
111     let mut buf = [0, 0, 0];
112     let nread = block_on(reader.read(&mut buf));
113     assert_eq!(nread.unwrap(), 1);
114     assert_eq!(buf, [3, 0, 0]);
115     assert_eq!(reader.buffer(), []);
116 
117     let nread = block_on(reader.read(&mut buf));
118     assert_eq!(nread.unwrap(), 1);
119     assert_eq!(buf, [4, 0, 0]);
120     assert_eq!(reader.buffer(), []);
121 
122     assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
123 }
124 
125 #[test]
test_buffered_reader_seek()126 fn test_buffered_reader_seek() {
127     use futures::executor::block_on;
128     use futures::io::{AsyncSeekExt, AsyncBufRead, BufReader, Cursor, SeekFrom};
129     use std::pin::Pin;
130     use util::run;
131 
132     let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
133     let mut reader = BufReader::with_capacity(2, Cursor::new(inner));
134 
135     assert_eq!(block_on(reader.seek(SeekFrom::Start(3))).ok(), Some(3));
136     assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
137     assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
138     assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
139     assert_eq!(block_on(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
140     assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
141     Pin::new(&mut reader).consume(1);
142     assert_eq!(block_on(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
143 }
144 
145 #[test]
test_buffered_reader_seek_underflow()146 fn test_buffered_reader_seek_underflow() {
147     use futures::executor::block_on;
148     use futures::io::{AsyncSeekExt, AsyncBufRead, AllowStdIo, BufReader, SeekFrom};
149     use std::io;
150 
151     // gimmick reader that yields its position modulo 256 for each byte
152     struct PositionReader {
153         pos: u64
154     }
155     impl io::Read for PositionReader {
156         fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
157             let len = buf.len();
158             for x in buf {
159                 *x = self.pos as u8;
160                 self.pos = self.pos.wrapping_add(1);
161             }
162             Ok(len)
163         }
164     }
165     impl io::Seek for PositionReader {
166         fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
167             match pos {
168                 SeekFrom::Start(n) => {
169                     self.pos = n;
170                 }
171                 SeekFrom::Current(n) => {
172                     self.pos = self.pos.wrapping_add(n as u64);
173                 }
174                 SeekFrom::End(n) => {
175                     self.pos = u64::max_value().wrapping_add(n as u64);
176                 }
177             }
178             Ok(self.pos)
179         }
180     }
181 
182     let mut reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 }));
183     assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1, 2, 3, 4][..]));
184     assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value()-5));
185     assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
186     // the following seek will require two underlying seeks
187     let expected = 9_223_372_036_854_775_802;
188     assert_eq!(block_on(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), Some(expected));
189     assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
190     // seeking to 0 should empty the buffer.
191     assert_eq!(block_on(reader.seek(SeekFrom::Current(0))).ok(), Some(expected));
192     assert_eq!(reader.get_ref().get_ref().pos, expected);
193 }
194 
195 #[test]
test_short_reads()196 fn test_short_reads() {
197     use futures::executor::block_on;
198     use futures::io::{AsyncReadExt, AllowStdIo, BufReader};
199     use std::io;
200 
201     /// A dummy reader intended at testing short-reads propagation.
202     struct ShortReader {
203         lengths: Vec<usize>,
204     }
205 
206     impl io::Read for ShortReader {
207         fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
208             if self.lengths.is_empty() {
209                 Ok(0)
210             } else {
211                 Ok(self.lengths.remove(0))
212             }
213         }
214     }
215 
216     let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] };
217     let mut reader = BufReader::new(AllowStdIo::new(inner));
218     let mut buf = [0, 0];
219     assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
220     assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
221     assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 2);
222     assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
223     assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
224     assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
225     assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
226 }
227 
228 #[test]
maybe_pending()229 fn maybe_pending() {
230     use futures::io::{AsyncReadExt, BufReader};
231     use util::run;
232     use maybe_pending::MaybePending;
233 
234     let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
235     let mut reader = BufReader::with_capacity(2, MaybePending::new(inner));
236 
237     let mut buf = [0, 0, 0];
238     let nread = run(reader.read(&mut buf));
239     assert_eq!(nread.unwrap(), 3);
240     assert_eq!(buf, [5, 6, 7]);
241     assert_eq!(reader.buffer(), []);
242 
243     let mut buf = [0, 0];
244     let nread = run(reader.read(&mut buf));
245     assert_eq!(nread.unwrap(), 2);
246     assert_eq!(buf, [0, 1]);
247     assert_eq!(reader.buffer(), []);
248 
249     let mut buf = [0];
250     let nread = run(reader.read(&mut buf));
251     assert_eq!(nread.unwrap(), 1);
252     assert_eq!(buf, [2]);
253     assert_eq!(reader.buffer(), [3]);
254 
255     let mut buf = [0, 0, 0];
256     let nread = run(reader.read(&mut buf));
257     assert_eq!(nread.unwrap(), 1);
258     assert_eq!(buf, [3, 0, 0]);
259     assert_eq!(reader.buffer(), []);
260 
261     let nread = run(reader.read(&mut buf));
262     assert_eq!(nread.unwrap(), 1);
263     assert_eq!(buf, [4, 0, 0]);
264     assert_eq!(reader.buffer(), []);
265 
266     assert_eq!(run(reader.read(&mut buf)).unwrap(), 0);
267 }
268 
269 #[test]
maybe_pending_buf_read()270 fn maybe_pending_buf_read() {
271     use futures::io::{AsyncBufReadExt, BufReader};
272     use util::run;
273     use maybe_pending::MaybePending;
274 
275     let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]);
276     let mut reader = BufReader::with_capacity(2, inner);
277     let mut v = Vec::new();
278     run(reader.read_until(3, &mut v)).unwrap();
279     assert_eq!(v, [0, 1, 2, 3]);
280     v.clear();
281     run(reader.read_until(1, &mut v)).unwrap();
282     assert_eq!(v, [1]);
283     v.clear();
284     run(reader.read_until(8, &mut v)).unwrap();
285     assert_eq!(v, [0]);
286     v.clear();
287     run(reader.read_until(9, &mut v)).unwrap();
288     assert_eq!(v, []);
289 }
290 
291 // https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309
292 #[test]
maybe_pending_seek()293 fn maybe_pending_seek() {
294     use futures::io::{AsyncBufRead, AsyncSeek, AsyncSeekExt, AsyncRead, BufReader,
295         Cursor, SeekFrom
296     };
297     use futures::task::{Context,Poll};
298     use std::io;
299     use std::pin::Pin;
300     use util::run;
301     pub struct MaybePendingSeek<'a> {
302         inner: Cursor<&'a [u8]>,
303         ready: bool,
304     }
305 
306     impl<'a> MaybePendingSeek<'a> {
307         pub fn new(inner: &'a [u8]) -> Self {
308             Self { inner: Cursor::new(inner), ready: true }
309         }
310     }
311 
312     impl AsyncRead for MaybePendingSeek<'_> {
313         fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
314             -> Poll<io::Result<usize>>
315         {
316             Pin::new(&mut self.inner).poll_read(cx, buf)
317         }
318     }
319 
320     impl AsyncBufRead for MaybePendingSeek<'_> {
321         fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
322             -> Poll<io::Result<&[u8]>>
323         {
324             let this: *mut Self = &mut *self as *mut _;
325             Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx)
326         }
327 
328         fn consume(mut self: Pin<&mut Self>, amt: usize) {
329             Pin::new(&mut self.inner).consume(amt)
330         }
331     }
332 
333     impl AsyncSeek for MaybePendingSeek<'_> {
334         fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom)
335             -> Poll<io::Result<u64>>
336         {
337             if self.ready {
338                 self.ready = false;
339                 Pin::new(&mut self.inner).poll_seek(cx, pos)
340             } else {
341                 self.ready = true;
342                 Poll::Pending
343             }
344         }
345     }
346 
347     let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
348     let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
349 
350     assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3));
351     assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
352     assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
353     assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
354     assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
355     assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
356     Pin::new(&mut reader).consume(1);
357     assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
358 }
359