1 use super::DEFAULT_BUF_SIZE; 2 use futures_core::future::Future; 3 use futures_core::ready; 4 use futures_core::task::{Context, Poll}; 5 use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom}; 6 use pin_project_lite::pin_project; 7 use std::io::{self, Read}; 8 use std::pin::Pin; 9 use std::{cmp, fmt}; 10 11 pin_project! { 12 /// The `BufReader` struct adds buffering to any reader. 13 /// 14 /// It can be excessively inefficient to work directly with a [`AsyncRead`] 15 /// instance. A `BufReader` performs large, infrequent reads on the underlying 16 /// [`AsyncRead`] and maintains an in-memory buffer of the results. 17 /// 18 /// `BufReader` can improve the speed of programs that make *small* and 19 /// *repeated* read calls to the same file or network socket. It does not 20 /// help when reading very large amounts at once, or reading just one or a few 21 /// times. It also provides no advantage when reading from a source that is 22 /// already in memory, like a `Vec<u8>`. 23 /// 24 /// When the `BufReader` is dropped, the contents of its buffer will be 25 /// discarded. Creating multiple instances of a `BufReader` on the same 26 /// stream can cause data loss. 27 /// 28 /// [`AsyncRead`]: futures_io::AsyncRead 29 /// 30 // TODO: Examples 31 pub struct BufReader<R> { 32 #[pin] 33 inner: R, 34 buffer: Box<[u8]>, 35 pos: usize, 36 cap: usize, 37 } 38 } 39 40 impl<R: AsyncRead> BufReader<R> { 41 /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB, 42 /// but may change in the future. new(inner: R) -> Self43 pub fn new(inner: R) -> Self { 44 Self::with_capacity(DEFAULT_BUF_SIZE, inner) 45 } 46 47 /// Creates a new `BufReader` with the specified buffer capacity. with_capacity(capacity: usize, inner: R) -> Self48 pub fn with_capacity(capacity: usize, inner: R) -> Self { 49 unsafe { 50 let mut buffer = Vec::with_capacity(capacity); 51 buffer.set_len(capacity); 52 super::initialize(&inner, &mut buffer); 53 Self { inner, buffer: buffer.into_boxed_slice(), pos: 0, cap: 0 } 54 } 55 } 56 57 delegate_access_inner!(inner, R, ()); 58 59 /// Returns a reference to the internally buffered data. 60 /// 61 /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty. buffer(&self) -> &[u8]62 pub fn buffer(&self) -> &[u8] { 63 &self.buffer[self.pos..self.cap] 64 } 65 66 /// Invalidates all data in the internal buffer. 67 #[inline] discard_buffer(self: Pin<&mut Self>)68 fn discard_buffer(self: Pin<&mut Self>) { 69 let this = self.project(); 70 *this.pos = 0; 71 *this.cap = 0; 72 } 73 } 74 75 impl<R: AsyncRead + AsyncSeek> BufReader<R> { 76 /// Seeks relative to the current position. If the new position lies within the buffer, 77 /// the buffer will not be flushed, allowing for more efficient seeks. 78 /// This method does not return the location of the underlying reader, so the caller 79 /// must track this information themselves if it is required. seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R>80 pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> { 81 SeeKRelative { inner: self, offset, first: true } 82 } 83 84 /// Attempts to seek relative to the current position. If the new position lies within the buffer, 85 /// the buffer will not be flushed, allowing for more efficient seeks. 86 /// This method does not return the location of the underlying reader, so the caller 87 /// must track this information themselves if it is required. poll_seek_relative( self: Pin<&mut Self>, cx: &mut Context<'_>, offset: i64, ) -> Poll<io::Result<()>>88 pub fn poll_seek_relative( 89 self: Pin<&mut Self>, 90 cx: &mut Context<'_>, 91 offset: i64, 92 ) -> Poll<io::Result<()>> { 93 let pos = self.pos as u64; 94 if offset < 0 { 95 if let Some(new_pos) = pos.checked_sub((-offset) as u64) { 96 *self.project().pos = new_pos as usize; 97 return Poll::Ready(Ok(())); 98 } 99 } else if let Some(new_pos) = pos.checked_add(offset as u64) { 100 if new_pos <= self.cap as u64 { 101 *self.project().pos = new_pos as usize; 102 return Poll::Ready(Ok(())); 103 } 104 } 105 self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ())) 106 } 107 } 108 109 impl<R: AsyncRead> AsyncRead for BufReader<R> { poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>110 fn poll_read( 111 mut self: Pin<&mut Self>, 112 cx: &mut Context<'_>, 113 buf: &mut [u8], 114 ) -> Poll<io::Result<usize>> { 115 // If we don't have any buffered data and we're doing a massive read 116 // (larger than our internal buffer), bypass our internal buffer 117 // entirely. 118 if self.pos == self.cap && buf.len() >= self.buffer.len() { 119 let res = ready!(self.as_mut().project().inner.poll_read(cx, buf)); 120 self.discard_buffer(); 121 return Poll::Ready(res); 122 } 123 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; 124 let nread = rem.read(buf)?; 125 self.consume(nread); 126 Poll::Ready(Ok(nread)) 127 } 128 poll_read_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<io::Result<usize>>129 fn poll_read_vectored( 130 mut self: Pin<&mut Self>, 131 cx: &mut Context<'_>, 132 bufs: &mut [IoSliceMut<'_>], 133 ) -> Poll<io::Result<usize>> { 134 let total_len = bufs.iter().map(|b| b.len()).sum::<usize>(); 135 if self.pos == self.cap && total_len >= self.buffer.len() { 136 let res = ready!(self.as_mut().project().inner.poll_read_vectored(cx, bufs)); 137 self.discard_buffer(); 138 return Poll::Ready(res); 139 } 140 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; 141 let nread = rem.read_vectored(bufs)?; 142 self.consume(nread); 143 Poll::Ready(Ok(nread)) 144 } 145 } 146 147 impl<R: AsyncRead> AsyncBufRead for BufReader<R> { poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>148 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { 149 let this = self.project(); 150 151 // If we've reached the end of our internal buffer then we need to fetch 152 // some more data from the underlying reader. 153 // Branch using `>=` instead of the more correct `==` 154 // to tell the compiler that the pos..cap slice is always valid. 155 if *this.pos >= *this.cap { 156 debug_assert!(*this.pos == *this.cap); 157 *this.cap = ready!(this.inner.poll_read(cx, this.buffer))?; 158 *this.pos = 0; 159 } 160 Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap])) 161 } 162 consume(self: Pin<&mut Self>, amt: usize)163 fn consume(self: Pin<&mut Self>, amt: usize) { 164 *self.project().pos = cmp::min(self.pos + amt, self.cap); 165 } 166 } 167 168 impl<R: AsyncWrite> AsyncWrite for BufReader<R> { 169 delegate_async_write!(inner); 170 } 171 172 impl<R: fmt::Debug> fmt::Debug for BufReader<R> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result173 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 174 f.debug_struct("BufReader") 175 .field("reader", &self.inner) 176 .field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buffer.len())) 177 .finish() 178 } 179 } 180 181 impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> { 182 /// Seek to an offset, in bytes, in the underlying reader. 183 /// 184 /// The position used for seeking with `SeekFrom::Current(_)` is the 185 /// position the underlying reader would be at if the `BufReader` had no 186 /// internal buffer. 187 /// 188 /// Seeking always discards the internal buffer, even if the seek position 189 /// would otherwise fall within it. This guarantees that calling 190 /// `.into_inner()` immediately after a seek yields the underlying reader 191 /// at the same position. 192 /// 193 /// To seek without discarding the internal buffer, use 194 /// [`BufReader::seek_relative`](BufReader::seek_relative) or 195 /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative). 196 /// 197 /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details. 198 /// 199 /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` 200 /// where `n` minus the internal buffer length overflows an `i64`, two 201 /// seeks will be performed instead of one. If the second seek returns 202 /// `Err`, the underlying reader will be left at the same position it would 203 /// have if you called `seek` with `SeekFrom::Current(0)`. poll_seek( mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<io::Result<u64>>204 fn poll_seek( 205 mut self: Pin<&mut Self>, 206 cx: &mut Context<'_>, 207 pos: SeekFrom, 208 ) -> Poll<io::Result<u64>> { 209 let result: u64; 210 if let SeekFrom::Current(n) = pos { 211 let remainder = (self.cap - self.pos) as i64; 212 // it should be safe to assume that remainder fits within an i64 as the alternative 213 // means we managed to allocate 8 exbibytes and that's absurd. 214 // But it's not out of the realm of possibility for some weird underlying reader to 215 // support seeking by i64::min_value() so we need to handle underflow when subtracting 216 // remainder. 217 if let Some(offset) = n.checked_sub(remainder) { 218 result = 219 ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(offset)))?; 220 } else { 221 // seek backwards by our remainder, and then by the offset 222 ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(-remainder)))?; 223 self.as_mut().discard_buffer(); 224 result = ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(n)))?; 225 } 226 } else { 227 // Seeking with Start/End doesn't care about our buffer length. 228 result = ready!(self.as_mut().project().inner.poll_seek(cx, pos))?; 229 } 230 self.discard_buffer(); 231 Poll::Ready(Ok(result)) 232 } 233 } 234 235 /// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method. 236 #[derive(Debug)] 237 #[must_use = "futures do nothing unless polled"] 238 pub struct SeeKRelative<'a, R> { 239 inner: Pin<&'a mut BufReader<R>>, 240 offset: i64, 241 first: bool, 242 } 243 244 impl<R> Future for SeeKRelative<'_, R> 245 where 246 R: AsyncRead + AsyncSeek, 247 { 248 type Output = io::Result<()>; 249 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>250 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 251 let offset = self.offset; 252 if self.first { 253 self.first = false; 254 self.inner.as_mut().poll_seek_relative(cx, offset) 255 } else { 256 self.inner 257 .as_mut() 258 .as_mut() 259 .poll_seek(cx, SeekFrom::Current(offset)) 260 .map(|res| res.map(|_| ())) 261 } 262 } 263 } 264