• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use super::DEFAULT_BUF_SIZE;
2 use futures_core::future::Future;
3 use futures_core::ready;
4 use futures_core::task::{Context, Poll};
5 use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
6 use pin_project_lite::pin_project;
7 use std::io::{self, Read};
8 use std::pin::Pin;
9 use std::{cmp, fmt};
10 
11 pin_project! {
12     /// The `BufReader` struct adds buffering to any reader.
13     ///
14     /// It can be excessively inefficient to work directly with a [`AsyncRead`]
15     /// instance. A `BufReader` performs large, infrequent reads on the underlying
16     /// [`AsyncRead`] and maintains an in-memory buffer of the results.
17     ///
18     /// `BufReader` can improve the speed of programs that make *small* and
19     /// *repeated* read calls to the same file or network socket. It does not
20     /// help when reading very large amounts at once, or reading just one or a few
21     /// times. It also provides no advantage when reading from a source that is
22     /// already in memory, like a `Vec<u8>`.
23     ///
24     /// When the `BufReader` is dropped, the contents of its buffer will be
25     /// discarded. Creating multiple instances of a `BufReader` on the same
26     /// stream can cause data loss.
27     ///
28     /// [`AsyncRead`]: futures_io::AsyncRead
29     ///
30     // TODO: Examples
31     pub struct BufReader<R> {
32         #[pin]
33         inner: R,
34         buffer: Box<[u8]>,
35         pos: usize,
36         cap: usize,
37     }
38 }
39 
40 impl<R: AsyncRead> BufReader<R> {
41     /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
42     /// but may change in the future.
new(inner: R) -> Self43     pub fn new(inner: R) -> Self {
44         Self::with_capacity(DEFAULT_BUF_SIZE, inner)
45     }
46 
47     /// Creates a new `BufReader` with the specified buffer capacity.
with_capacity(capacity: usize, inner: R) -> Self48     pub fn with_capacity(capacity: usize, inner: R) -> Self {
49         unsafe {
50             let mut buffer = Vec::with_capacity(capacity);
51             buffer.set_len(capacity);
52             super::initialize(&inner, &mut buffer);
53             Self { inner, buffer: buffer.into_boxed_slice(), pos: 0, cap: 0 }
54         }
55     }
56 
57     delegate_access_inner!(inner, R, ());
58 
59     /// Returns a reference to the internally buffered data.
60     ///
61     /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
buffer(&self) -> &[u8]62     pub fn buffer(&self) -> &[u8] {
63         &self.buffer[self.pos..self.cap]
64     }
65 
66     /// Invalidates all data in the internal buffer.
67     #[inline]
discard_buffer(self: Pin<&mut Self>)68     fn discard_buffer(self: Pin<&mut Self>) {
69         let this = self.project();
70         *this.pos = 0;
71         *this.cap = 0;
72     }
73 }
74 
75 impl<R: AsyncRead + AsyncSeek> BufReader<R> {
76     /// Seeks relative to the current position. If the new position lies within the buffer,
77     /// the buffer will not be flushed, allowing for more efficient seeks.
78     /// This method does not return the location of the underlying reader, so the caller
79     /// must track this information themselves if it is required.
seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R>80     pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> {
81         SeeKRelative { inner: self, offset, first: true }
82     }
83 
84     /// Attempts to seek relative to the current position. If the new position lies within the buffer,
85     /// the buffer will not be flushed, allowing for more efficient seeks.
86     /// This method does not return the location of the underlying reader, so the caller
87     /// must track this information themselves if it is required.
poll_seek_relative( self: Pin<&mut Self>, cx: &mut Context<'_>, offset: i64, ) -> Poll<io::Result<()>>88     pub fn poll_seek_relative(
89         self: Pin<&mut Self>,
90         cx: &mut Context<'_>,
91         offset: i64,
92     ) -> Poll<io::Result<()>> {
93         let pos = self.pos as u64;
94         if offset < 0 {
95             if let Some(new_pos) = pos.checked_sub((-offset) as u64) {
96                 *self.project().pos = new_pos as usize;
97                 return Poll::Ready(Ok(()));
98             }
99         } else if let Some(new_pos) = pos.checked_add(offset as u64) {
100             if new_pos <= self.cap as u64 {
101                 *self.project().pos = new_pos as usize;
102                 return Poll::Ready(Ok(()));
103             }
104         }
105         self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ()))
106     }
107 }
108 
109 impl<R: AsyncRead> AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>110     fn poll_read(
111         mut self: Pin<&mut Self>,
112         cx: &mut Context<'_>,
113         buf: &mut [u8],
114     ) -> Poll<io::Result<usize>> {
115         // If we don't have any buffered data and we're doing a massive read
116         // (larger than our internal buffer), bypass our internal buffer
117         // entirely.
118         if self.pos == self.cap && buf.len() >= self.buffer.len() {
119             let res = ready!(self.as_mut().project().inner.poll_read(cx, buf));
120             self.discard_buffer();
121             return Poll::Ready(res);
122         }
123         let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
124         let nread = rem.read(buf)?;
125         self.consume(nread);
126         Poll::Ready(Ok(nread))
127     }
128 
poll_read_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<io::Result<usize>>129     fn poll_read_vectored(
130         mut self: Pin<&mut Self>,
131         cx: &mut Context<'_>,
132         bufs: &mut [IoSliceMut<'_>],
133     ) -> Poll<io::Result<usize>> {
134         let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
135         if self.pos == self.cap && total_len >= self.buffer.len() {
136             let res = ready!(self.as_mut().project().inner.poll_read_vectored(cx, bufs));
137             self.discard_buffer();
138             return Poll::Ready(res);
139         }
140         let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
141         let nread = rem.read_vectored(bufs)?;
142         self.consume(nread);
143         Poll::Ready(Ok(nread))
144     }
145 }
146 
147 impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>148     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
149         let this = self.project();
150 
151         // If we've reached the end of our internal buffer then we need to fetch
152         // some more data from the underlying reader.
153         // Branch using `>=` instead of the more correct `==`
154         // to tell the compiler that the pos..cap slice is always valid.
155         if *this.pos >= *this.cap {
156             debug_assert!(*this.pos == *this.cap);
157             *this.cap = ready!(this.inner.poll_read(cx, this.buffer))?;
158             *this.pos = 0;
159         }
160         Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap]))
161     }
162 
consume(self: Pin<&mut Self>, amt: usize)163     fn consume(self: Pin<&mut Self>, amt: usize) {
164         *self.project().pos = cmp::min(self.pos + amt, self.cap);
165     }
166 }
167 
168 impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
169     delegate_async_write!(inner);
170 }
171 
172 impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result173     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
174         f.debug_struct("BufReader")
175             .field("reader", &self.inner)
176             .field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buffer.len()))
177             .finish()
178     }
179 }
180 
181 impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
182     /// Seek to an offset, in bytes, in the underlying reader.
183     ///
184     /// The position used for seeking with `SeekFrom::Current(_)` is the
185     /// position the underlying reader would be at if the `BufReader` had no
186     /// internal buffer.
187     ///
188     /// Seeking always discards the internal buffer, even if the seek position
189     /// would otherwise fall within it. This guarantees that calling
190     /// `.into_inner()` immediately after a seek yields the underlying reader
191     /// at the same position.
192     ///
193     /// To seek without discarding the internal buffer, use
194     /// [`BufReader::seek_relative`](BufReader::seek_relative) or
195     /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative).
196     ///
197     /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details.
198     ///
199     /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
200     /// where `n` minus the internal buffer length overflows an `i64`, two
201     /// seeks will be performed instead of one. If the second seek returns
202     /// `Err`, the underlying reader will be left at the same position it would
203     /// have if you called `seek` with `SeekFrom::Current(0)`.
poll_seek( mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<io::Result<u64>>204     fn poll_seek(
205         mut self: Pin<&mut Self>,
206         cx: &mut Context<'_>,
207         pos: SeekFrom,
208     ) -> Poll<io::Result<u64>> {
209         let result: u64;
210         if let SeekFrom::Current(n) = pos {
211             let remainder = (self.cap - self.pos) as i64;
212             // it should be safe to assume that remainder fits within an i64 as the alternative
213             // means we managed to allocate 8 exbibytes and that's absurd.
214             // But it's not out of the realm of possibility for some weird underlying reader to
215             // support seeking by i64::min_value() so we need to handle underflow when subtracting
216             // remainder.
217             if let Some(offset) = n.checked_sub(remainder) {
218                 result =
219                     ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(offset)))?;
220             } else {
221                 // seek backwards by our remainder, and then by the offset
222                 ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(-remainder)))?;
223                 self.as_mut().discard_buffer();
224                 result = ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(n)))?;
225             }
226         } else {
227             // Seeking with Start/End doesn't care about our buffer length.
228             result = ready!(self.as_mut().project().inner.poll_seek(cx, pos))?;
229         }
230         self.discard_buffer();
231         Poll::Ready(Ok(result))
232     }
233 }
234 
235 /// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method.
236 #[derive(Debug)]
237 #[must_use = "futures do nothing unless polled"]
238 pub struct SeeKRelative<'a, R> {
239     inner: Pin<&'a mut BufReader<R>>,
240     offset: i64,
241     first: bool,
242 }
243 
244 impl<R> Future for SeeKRelative<'_, R>
245 where
246     R: AsyncRead + AsyncSeek,
247 {
248     type Output = io::Result<()>;
249 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>250     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
251         let offset = self.offset;
252         if self.first {
253             self.first = false;
254             self.inner.as_mut().poll_seek_relative(cx, offset)
255         } else {
256             self.inner
257                 .as_mut()
258                 .as_mut()
259                 .poll_seek(cx, SeekFrom::Current(offset))
260                 .map(|res| res.map(|_| ()))
261         }
262     }
263 }
264