1 macro_rules! run_fill_buf {
2 ($reader:expr) => {{
3 use futures_test::task::noop_context;
4 use futures::task::Poll;
5 use std::pin::Pin;
6
7 let mut cx = noop_context();
8 loop {
9 if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
10 break x;
11 }
12 }
13 }};
14 }
15
16 mod util {
17 use futures::future::Future;
run<F: Future + Unpin>(mut f: F) -> F::Output18 pub fn run<F: Future + Unpin>(mut f: F) -> F::Output {
19 use futures_test::task::noop_context;
20 use futures::task::Poll;
21 use futures::future::FutureExt;
22
23 let mut cx = noop_context();
24 loop {
25 if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
26 return x;
27 }
28 }
29 }
30 }
31
32 mod maybe_pending {
33 use futures::task::{Context,Poll};
34 use std::{cmp,io};
35 use std::pin::Pin;
36 use futures::io::{AsyncRead,AsyncBufRead};
37
38 pub struct MaybePending<'a> {
39 inner: &'a [u8],
40 ready_read: bool,
41 ready_fill_buf: bool,
42 }
43
44 impl<'a> MaybePending<'a> {
new(inner: &'a [u8]) -> Self45 pub fn new(inner: &'a [u8]) -> Self {
46 Self { inner, ready_read: false, ready_fill_buf: false }
47 }
48 }
49
50 impl AsyncRead for MaybePending<'_> {
poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>>51 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
52 -> Poll<io::Result<usize>>
53 {
54 if self.ready_read {
55 self.ready_read = false;
56 Pin::new(&mut self.inner).poll_read(cx, buf)
57 } else {
58 self.ready_read = true;
59 Poll::Pending
60 }
61 }
62 }
63
64 impl AsyncBufRead for MaybePending<'_> {
poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>>65 fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>)
66 -> Poll<io::Result<&[u8]>>
67 {
68 if self.ready_fill_buf {
69 self.ready_fill_buf = false;
70 if self.inner.is_empty() { return Poll::Ready(Ok(&[])) }
71 let len = cmp::min(2, self.inner.len());
72 Poll::Ready(Ok(&self.inner[0..len]))
73 } else {
74 self.ready_fill_buf = true;
75 Poll::Pending
76 }
77 }
78
consume(mut self: Pin<&mut Self>, amt: usize)79 fn consume(mut self: Pin<&mut Self>, amt: usize) {
80 self.inner = &self.inner[amt..];
81 }
82 }
83 }
84
85 #[test]
test_buffered_reader()86 fn test_buffered_reader() {
87 use futures::executor::block_on;
88 use futures::io::{AsyncReadExt, BufReader};
89
90 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
91 let mut reader = BufReader::with_capacity(2, inner);
92
93 let mut buf = [0, 0, 0];
94 let nread = block_on(reader.read(&mut buf));
95 assert_eq!(nread.unwrap(), 3);
96 assert_eq!(buf, [5, 6, 7]);
97 assert_eq!(reader.buffer(), []);
98
99 let mut buf = [0, 0];
100 let nread = block_on(reader.read(&mut buf));
101 assert_eq!(nread.unwrap(), 2);
102 assert_eq!(buf, [0, 1]);
103 assert_eq!(reader.buffer(), []);
104
105 let mut buf = [0];
106 let nread = block_on(reader.read(&mut buf));
107 assert_eq!(nread.unwrap(), 1);
108 assert_eq!(buf, [2]);
109 assert_eq!(reader.buffer(), [3]);
110
111 let mut buf = [0, 0, 0];
112 let nread = block_on(reader.read(&mut buf));
113 assert_eq!(nread.unwrap(), 1);
114 assert_eq!(buf, [3, 0, 0]);
115 assert_eq!(reader.buffer(), []);
116
117 let nread = block_on(reader.read(&mut buf));
118 assert_eq!(nread.unwrap(), 1);
119 assert_eq!(buf, [4, 0, 0]);
120 assert_eq!(reader.buffer(), []);
121
122 assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
123 }
124
125 #[test]
test_buffered_reader_seek()126 fn test_buffered_reader_seek() {
127 use futures::executor::block_on;
128 use futures::io::{AsyncSeekExt, AsyncBufRead, BufReader, Cursor, SeekFrom};
129 use std::pin::Pin;
130 use util::run;
131
132 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
133 let mut reader = BufReader::with_capacity(2, Cursor::new(inner));
134
135 assert_eq!(block_on(reader.seek(SeekFrom::Start(3))).ok(), Some(3));
136 assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
137 assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
138 assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
139 assert_eq!(block_on(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
140 assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
141 Pin::new(&mut reader).consume(1);
142 assert_eq!(block_on(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
143 }
144
145 #[test]
test_buffered_reader_seek_underflow()146 fn test_buffered_reader_seek_underflow() {
147 use futures::executor::block_on;
148 use futures::io::{AsyncSeekExt, AsyncBufRead, AllowStdIo, BufReader, SeekFrom};
149 use std::io;
150
151 // gimmick reader that yields its position modulo 256 for each byte
152 struct PositionReader {
153 pos: u64
154 }
155 impl io::Read for PositionReader {
156 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
157 let len = buf.len();
158 for x in buf {
159 *x = self.pos as u8;
160 self.pos = self.pos.wrapping_add(1);
161 }
162 Ok(len)
163 }
164 }
165 impl io::Seek for PositionReader {
166 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
167 match pos {
168 SeekFrom::Start(n) => {
169 self.pos = n;
170 }
171 SeekFrom::Current(n) => {
172 self.pos = self.pos.wrapping_add(n as u64);
173 }
174 SeekFrom::End(n) => {
175 self.pos = u64::max_value().wrapping_add(n as u64);
176 }
177 }
178 Ok(self.pos)
179 }
180 }
181
182 let mut reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 }));
183 assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1, 2, 3, 4][..]));
184 assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value()-5));
185 assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
186 // the following seek will require two underlying seeks
187 let expected = 9_223_372_036_854_775_802;
188 assert_eq!(block_on(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), Some(expected));
189 assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
190 // seeking to 0 should empty the buffer.
191 assert_eq!(block_on(reader.seek(SeekFrom::Current(0))).ok(), Some(expected));
192 assert_eq!(reader.get_ref().get_ref().pos, expected);
193 }
194
195 #[test]
test_short_reads()196 fn test_short_reads() {
197 use futures::executor::block_on;
198 use futures::io::{AsyncReadExt, AllowStdIo, BufReader};
199 use std::io;
200
201 /// A dummy reader intended at testing short-reads propagation.
202 struct ShortReader {
203 lengths: Vec<usize>,
204 }
205
206 impl io::Read for ShortReader {
207 fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
208 if self.lengths.is_empty() {
209 Ok(0)
210 } else {
211 Ok(self.lengths.remove(0))
212 }
213 }
214 }
215
216 let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] };
217 let mut reader = BufReader::new(AllowStdIo::new(inner));
218 let mut buf = [0, 0];
219 assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
220 assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
221 assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 2);
222 assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
223 assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
224 assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
225 assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
226 }
227
228 #[test]
maybe_pending()229 fn maybe_pending() {
230 use futures::io::{AsyncReadExt, BufReader};
231 use util::run;
232 use maybe_pending::MaybePending;
233
234 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
235 let mut reader = BufReader::with_capacity(2, MaybePending::new(inner));
236
237 let mut buf = [0, 0, 0];
238 let nread = run(reader.read(&mut buf));
239 assert_eq!(nread.unwrap(), 3);
240 assert_eq!(buf, [5, 6, 7]);
241 assert_eq!(reader.buffer(), []);
242
243 let mut buf = [0, 0];
244 let nread = run(reader.read(&mut buf));
245 assert_eq!(nread.unwrap(), 2);
246 assert_eq!(buf, [0, 1]);
247 assert_eq!(reader.buffer(), []);
248
249 let mut buf = [0];
250 let nread = run(reader.read(&mut buf));
251 assert_eq!(nread.unwrap(), 1);
252 assert_eq!(buf, [2]);
253 assert_eq!(reader.buffer(), [3]);
254
255 let mut buf = [0, 0, 0];
256 let nread = run(reader.read(&mut buf));
257 assert_eq!(nread.unwrap(), 1);
258 assert_eq!(buf, [3, 0, 0]);
259 assert_eq!(reader.buffer(), []);
260
261 let nread = run(reader.read(&mut buf));
262 assert_eq!(nread.unwrap(), 1);
263 assert_eq!(buf, [4, 0, 0]);
264 assert_eq!(reader.buffer(), []);
265
266 assert_eq!(run(reader.read(&mut buf)).unwrap(), 0);
267 }
268
269 #[test]
maybe_pending_buf_read()270 fn maybe_pending_buf_read() {
271 use futures::io::{AsyncBufReadExt, BufReader};
272 use util::run;
273 use maybe_pending::MaybePending;
274
275 let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]);
276 let mut reader = BufReader::with_capacity(2, inner);
277 let mut v = Vec::new();
278 run(reader.read_until(3, &mut v)).unwrap();
279 assert_eq!(v, [0, 1, 2, 3]);
280 v.clear();
281 run(reader.read_until(1, &mut v)).unwrap();
282 assert_eq!(v, [1]);
283 v.clear();
284 run(reader.read_until(8, &mut v)).unwrap();
285 assert_eq!(v, [0]);
286 v.clear();
287 run(reader.read_until(9, &mut v)).unwrap();
288 assert_eq!(v, []);
289 }
290
291 // https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309
292 #[test]
maybe_pending_seek()293 fn maybe_pending_seek() {
294 use futures::io::{AsyncBufRead, AsyncSeek, AsyncSeekExt, AsyncRead, BufReader,
295 Cursor, SeekFrom
296 };
297 use futures::task::{Context,Poll};
298 use std::io;
299 use std::pin::Pin;
300 use util::run;
301 pub struct MaybePendingSeek<'a> {
302 inner: Cursor<&'a [u8]>,
303 ready: bool,
304 }
305
306 impl<'a> MaybePendingSeek<'a> {
307 pub fn new(inner: &'a [u8]) -> Self {
308 Self { inner: Cursor::new(inner), ready: true }
309 }
310 }
311
312 impl AsyncRead for MaybePendingSeek<'_> {
313 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
314 -> Poll<io::Result<usize>>
315 {
316 Pin::new(&mut self.inner).poll_read(cx, buf)
317 }
318 }
319
320 impl AsyncBufRead for MaybePendingSeek<'_> {
321 fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
322 -> Poll<io::Result<&[u8]>>
323 {
324 let this: *mut Self = &mut *self as *mut _;
325 Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx)
326 }
327
328 fn consume(mut self: Pin<&mut Self>, amt: usize) {
329 Pin::new(&mut self.inner).consume(amt)
330 }
331 }
332
333 impl AsyncSeek for MaybePendingSeek<'_> {
334 fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom)
335 -> Poll<io::Result<u64>>
336 {
337 if self.ready {
338 self.ready = false;
339 Pin::new(&mut self.inner).poll_seek(cx, pos)
340 } else {
341 self.ready = true;
342 Poll::Pending
343 }
344 }
345 }
346
347 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
348 let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
349
350 assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3));
351 assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
352 assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
353 assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
354 assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
355 assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
356 Pin::new(&mut reader).consume(1);
357 assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
358 }
359