• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::io::{self, BufRead, Read};
2 
3 #[cfg(any(
4     feature = "futures-03",
5     feature = "tokio-02",
6     feature = "tokio-03",
7     feature = "tokio"
8 ))]
9 use std::pin::Pin;
10 
11 #[cfg(any(
12     feature = "futures-03",
13     feature = "tokio-02",
14     feature = "tokio-03"
15 ))]
16 use std::mem::MaybeUninit;
17 
18 #[cfg(feature = "futures-core-03")]
19 use std::task::{Context, Poll};
20 
21 #[cfg(feature = "futures-03")]
22 use std::future::Future;
23 
24 use bytes::{Buf, BufMut, BytesMut};
25 
26 #[cfg(feature = "pin-project-lite")]
27 use pin_project_lite::pin_project;
28 
29 #[cfg(feature = "tokio-03")]
30 use tokio_03_dep::io::AsyncBufRead as _;
31 
32 #[cfg(feature = "tokio")]
33 use tokio_dep::io::AsyncBufRead as _;
34 
35 #[cfg(feature = "futures-core-03")]
36 use futures_core_03::ready;
37 
38 #[cfg(feature = "pin-project-lite")]
39 pin_project! {
40     /// `BufReader` used by `Decoder` when it is constructed with [`Decoder::new_bufferless`][]
41     ///
42     /// [`Decoder::new_bufferless`]: ../decoder/struct.Decoder.html#method.new_bufferless
43     #[derive(Debug)]
44     pub struct BufReader<R> {
45         #[pin]
46         inner: R,
47         buf: BytesMut
48     }
49 }
50 
51 #[cfg(not(feature = "pin-project-lite"))]
52 /// `BufReader` used by `Decoder` when it is constructed with [`Decoder::new_bufferless`][]
53 ///
54 /// [`Decoder::new_bufferless`]: ../decoder/struct.Decoder.html#method.new_bufferless
55 #[derive(Debug)]
56 pub struct BufReader<R> {
57     inner: R,
58     buf: BytesMut,
59 }
60 
61 impl<R> BufReader<R> {
62     /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
63     /// but may change in the future.
new(inner: R) -> Self64     pub fn new(inner: R) -> Self {
65         Self::with_capacity(8096, inner)
66     }
67 
68     /// Creates a new `BufReader` with the specified buffer capacity.
with_capacity(capacity: usize, inner: R) -> Self69     pub fn with_capacity(capacity: usize, inner: R) -> Self {
70         let buf = BytesMut::with_capacity(capacity);
71 
72         Self { inner, buf }
73     }
74 
75     /// Gets a reference to the underlying reader.
76     ///
77     /// It is inadvisable to directly read from the underlying reader.
get_ref(&self) -> &R78     pub fn get_ref(&self) -> &R {
79         &self.inner
80     }
81 
82     /// Gets a mutable reference to the underlying reader.
83     ///
84     /// It is inadvisable to directly read from the underlying reader.
get_mut(&mut self) -> &mut R85     pub fn get_mut(&mut self) -> &mut R {
86         &mut self.inner
87     }
88 
89     #[cfg(feature = "pin-project-lite")]
90     /// Gets a pinned mutable reference to the underlying reader.
91     ///
92     /// It is inadvisable to directly read from the underlying reader.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R>93     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
94         self.project().inner
95     }
96 
97     /// Consumes this `BufWriter`, returning the underlying reader.
98     ///
99     /// Note that any leftover data in the internal buffer is lost.
into_inner(self) -> R100     pub fn into_inner(self) -> R {
101         self.inner
102     }
103 
104     /// Returns a reference to the internally buffered data.
105     ///
106     /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
buffer(&self) -> &[u8]107     pub fn buffer(&self) -> &[u8] {
108         &self.buf
109     }
110 
111     /// Invalidates all data in the internal buffer.
112     #[inline]
113     #[cfg(any(feature = "tokio-02", feature = "tokio-03", feature = "tokio"))]
discard_buffer(self: Pin<&mut Self>)114     fn discard_buffer(self: Pin<&mut Self>) {
115         let me = self.project();
116         me.buf.clear();
117     }
118 }
119 
120 mod sealed {
121     pub trait Sealed {}
122 }
123 
124 #[doc(hidden)]
125 pub trait CombineBuffer<R>: sealed::Sealed {
buffer<'a>(&'a self, read: &'a R) -> &'a [u8]126     fn buffer<'a>(&'a self, read: &'a R) -> &'a [u8];
127 
advance(&mut self, read: &mut R, len: usize)128     fn advance(&mut self, read: &mut R, len: usize);
129 
130     #[cfg(feature = "pin-project-lite")]
advance_pin(&mut self, read: Pin<&mut R>, len: usize)131     fn advance_pin(&mut self, read: Pin<&mut R>, len: usize);
132 }
133 
134 #[doc(hidden)]
135 pub trait CombineSyncRead<R>: CombineBuffer<R> {
extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize>136     fn extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize>;
137 }
138 
139 #[cfg(any(feature = "tokio-02", feature = "tokio-03", feature = "tokio"))]
140 #[doc(hidden)]
141 pub trait CombineRead<R, T: ?Sized>: CombineBuffer<R> {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>142     fn poll_extend_buf(
143         &mut self,
144         cx: &mut Context<'_>,
145         read: Pin<&mut R>,
146     ) -> Poll<io::Result<usize>>;
147 }
148 
149 #[cfg(feature = "futures-03")]
150 #[doc(hidden)]
151 pub trait CombineAsyncRead<R>: CombineBuffer<R> {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>152     fn poll_extend_buf(
153         &mut self,
154         cx: &mut Context<'_>,
155         read: Pin<&mut R>,
156     ) -> Poll<io::Result<usize>>;
157 
extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R> where Self: Sized158     fn extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R>
159     where
160         Self: Sized;
161 }
162 
163 #[cfg(feature = "futures-03")]
164 pin_project_lite::pin_project! {
165     #[doc(hidden)]
166     pub struct ExtendBuf<'a, C, R> {
167         buffer: &'a mut C,
168         read: Pin<&'a mut R>
169     }
170 }
171 
172 #[cfg(feature = "futures-03")]
173 impl<'a, C, R> Future for ExtendBuf<'a, C, R>
174 where
175     C: CombineAsyncRead<R>,
176 {
177     type Output = io::Result<usize>;
178 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>179     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
180         let me = self.project();
181         me.buffer.poll_extend_buf(cx, me.read.as_mut())
182     }
183 }
184 
185 /// Marker used by `Decoder` for an internal buffer
186 #[derive(Default)]
187 pub struct Buffer(pub(crate) BytesMut);
188 
189 impl sealed::Sealed for Buffer {}
190 
191 impl<R> CombineBuffer<R> for Buffer {
buffer<'a>(&'a self, _read: &'a R) -> &'a [u8]192     fn buffer<'a>(&'a self, _read: &'a R) -> &'a [u8] {
193         &self.0
194     }
195 
advance(&mut self, _read: &mut R, len: usize)196     fn advance(&mut self, _read: &mut R, len: usize) {
197         self.0.advance(len);
198     }
199 
200     #[cfg(feature = "pin-project-lite")]
advance_pin(&mut self, _read: Pin<&mut R>, len: usize)201     fn advance_pin(&mut self, _read: Pin<&mut R>, len: usize) {
202         self.0.advance(len);
203     }
204 }
205 
206 impl<R> CombineSyncRead<R> for Buffer
207 where
208     R: Read,
209 {
extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize>210     fn extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize> {
211         extend_buf_sync(&mut self.0, read)
212     }
213 }
214 
215 #[cfg(feature = "futures-03")]
216 impl<R> CombineAsyncRead<R> for Buffer
217 where
218     R: futures_io_03::AsyncRead,
219 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>220     fn poll_extend_buf(
221         &mut self,
222         cx: &mut Context<'_>,
223         read: Pin<&mut R>,
224     ) -> Poll<io::Result<usize>> {
225         poll_extend_buf(&mut self.0, cx, read)
226     }
227 
extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R>228     fn extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R> {
229         if !self.0.has_remaining_mut() {
230             self.0.reserve(8 * 1024);
231         }
232         // Copy of tokio's read_buf method (but it has to force initialize the buffer)
233         let bs = self.0.chunk_mut();
234 
235         for i in 0..bs.len() {
236             bs.write_byte(i, 0);
237         }
238         ExtendBuf { buffer: self, read }
239     }
240 }
241 
242 #[cfg(feature = "tokio-02")]
243 impl<R> CombineRead<R, dyn tokio_02_dep::io::AsyncRead> for Buffer
244 where
245     R: tokio_02_dep::io::AsyncRead,
246 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>247     fn poll_extend_buf(
248         &mut self,
249         cx: &mut Context<'_>,
250         read: Pin<&mut R>,
251     ) -> Poll<io::Result<usize>> {
252         if !self.0.has_remaining_mut() {
253             self.0.reserve(8 * 1024);
254         }
255         read.poll_read_buf(cx, &mut Bytes05(&mut self.0))
256     }
257 }
258 
259 #[cfg(feature = "tokio-03")]
tokio_03_to_read_buf(bs: &mut BytesMut) -> tokio_03_dep::io::ReadBuf<'_>260 fn tokio_03_to_read_buf(bs: &mut BytesMut) -> tokio_03_dep::io::ReadBuf<'_> {
261     let uninit = bs.chunk_mut();
262     unsafe {
263         tokio_03_dep::io::ReadBuf::uninit(std::slice::from_raw_parts_mut(
264             uninit.as_mut_ptr() as *mut MaybeUninit<u8>,
265             uninit.len(),
266         ))
267     }
268 }
269 
270 #[cfg(feature = "tokio-03")]
271 impl<R> CombineRead<R, dyn tokio_03_dep::io::AsyncRead> for Buffer
272 where
273     R: tokio_03_dep::io::AsyncRead,
274 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>275     fn poll_extend_buf(
276         &mut self,
277         cx: &mut Context<'_>,
278         read: Pin<&mut R>,
279     ) -> Poll<io::Result<usize>> {
280         tokio_03_read_buf(cx, read, &mut self.0)
281     }
282 }
283 
284 #[cfg(feature = "tokio-03")]
tokio_03_read_buf( cx: &mut Context<'_>, read: Pin<&mut impl tokio_03_dep::io::AsyncRead>, bs: &mut bytes::BytesMut, ) -> Poll<io::Result<usize>>285 fn tokio_03_read_buf(
286     cx: &mut Context<'_>,
287     read: Pin<&mut impl tokio_03_dep::io::AsyncRead>,
288     bs: &mut bytes::BytesMut,
289 ) -> Poll<io::Result<usize>> {
290     if !bs.has_remaining_mut() {
291         bs.reserve(8 * 1024);
292     }
293 
294     let mut buf = tokio_03_to_read_buf(bs);
295     ready!(read.poll_read(cx, &mut buf))?;
296     unsafe {
297         let n = buf.filled().len();
298         bs.advance_mut(n);
299         Poll::Ready(Ok(n))
300     }
301 }
302 
303 #[cfg(feature = "tokio")]
304 impl<R> CombineRead<R, dyn tokio_dep::io::AsyncRead> for Buffer
305 where
306     R: tokio_dep::io::AsyncRead,
307 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>308     fn poll_extend_buf(
309         &mut self,
310         cx: &mut Context<'_>,
311         read: Pin<&mut R>,
312     ) -> Poll<io::Result<usize>> {
313         tokio_read_buf(read, cx, &mut self.0)
314     }
315 }
316 
317 #[cfg(feature = "tokio")]
tokio_read_buf( read: Pin<&mut impl tokio_dep::io::AsyncRead>, cx: &mut Context<'_>, bs: &mut bytes::BytesMut, ) -> Poll<io::Result<usize>>318 fn tokio_read_buf(
319     read: Pin<&mut impl tokio_dep::io::AsyncRead>,
320     cx: &mut Context<'_>,
321     bs: &mut bytes::BytesMut,
322 ) -> Poll<io::Result<usize>> {
323     if !bs.has_remaining_mut() {
324         bs.reserve(8 * 1024);
325     }
326 
327     tokio_util::io::poll_read_buf(read, cx, bs)
328 }
329 
330 /// Marker used by `Decoder` for an external buffer
331 #[derive(Default)]
332 pub struct Bufferless;
333 
334 impl sealed::Sealed for Bufferless {}
335 
336 impl<R> CombineBuffer<BufReader<R>> for Bufferless {
buffer<'a>(&'a self, read: &'a BufReader<R>) -> &'a [u8]337     fn buffer<'a>(&'a self, read: &'a BufReader<R>) -> &'a [u8] {
338         &read.buf
339     }
340 
advance(&mut self, read: &mut BufReader<R>, len: usize)341     fn advance(&mut self, read: &mut BufReader<R>, len: usize) {
342         read.buf.advance(len);
343     }
344 
345     #[cfg(feature = "pin-project-lite")]
advance_pin(&mut self, read: Pin<&mut BufReader<R>>, len: usize)346     fn advance_pin(&mut self, read: Pin<&mut BufReader<R>>, len: usize) {
347         read.project().buf.advance(len);
348     }
349 }
350 
351 impl<R> CombineSyncRead<BufReader<R>> for Bufferless
352 where
353     R: Read,
354 {
extend_buf_sync(&mut self, read: &mut BufReader<R>) -> io::Result<usize>355     fn extend_buf_sync(&mut self, read: &mut BufReader<R>) -> io::Result<usize> {
356         extend_buf_sync(&mut read.buf, &mut read.inner)
357     }
358 }
359 
extend_buf_sync<R>(buf: &mut BytesMut, read: &mut R) -> io::Result<usize> where R: Read,360 fn extend_buf_sync<R>(buf: &mut BytesMut, read: &mut R) -> io::Result<usize>
361 where
362     R: Read,
363 {
364     if !buf.has_remaining_mut() {
365         buf.reserve(8 * 1024);
366     }
367 
368     // Copy of tokio's poll_read_buf method (but it has to force initialize the buffer)
369     let n = {
370         let bs = buf.chunk_mut();
371 
372         for i in 0..bs.len() {
373             bs.write_byte(i, 0);
374         }
375 
376         // Convert to `&mut [u8]`
377         // SAFETY: the entire buffer is preinitialized above
378         let bs = unsafe { &mut *(bs as *mut _ as *mut [u8]) };
379 
380         let n = read.read(bs)?;
381         assert!(
382             n <= bs.len(),
383             "AsyncRead reported that it initialized more than the number of bytes in the buffer"
384         );
385         n
386     };
387 
388     // SAFETY: the entire buffer has been preinitialized
389     unsafe { buf.advance_mut(n) };
390 
391     Ok(n)
392 }
393 
394 #[cfg(feature = "tokio-02")]
395 struct Bytes05<'a>(&'a mut BytesMut);
396 
397 #[cfg(feature = "tokio-02")]
398 impl bytes_05::BufMut for Bytes05<'_> {
remaining_mut(&self) -> usize399     fn remaining_mut(&self) -> usize {
400         self.0.remaining_mut()
401     }
advance_mut(&mut self, cnt: usize)402     unsafe fn advance_mut(&mut self, cnt: usize) {
403         self.0.advance_mut(cnt)
404     }
bytes_mut(&mut self) -> &mut [MaybeUninit<u8>]405     fn bytes_mut(&mut self) -> &mut [MaybeUninit<u8>] {
406         unsafe { &mut *(self.0.chunk_mut() as *mut _ as *mut [MaybeUninit<u8>]) }
407     }
408 }
409 
410 #[cfg(feature = "tokio-02")]
411 impl<R> CombineRead<BufReader<R>, dyn tokio_02_dep::io::AsyncRead> for Bufferless
412 where
413     R: tokio_02_dep::io::AsyncRead,
414 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut BufReader<R>>, ) -> Poll<io::Result<usize>>415     fn poll_extend_buf(
416         &mut self,
417         cx: &mut Context<'_>,
418         read: Pin<&mut BufReader<R>>,
419     ) -> Poll<io::Result<usize>> {
420         let me = read.project();
421 
422         if !me.buf.has_remaining_mut() {
423             me.buf.reserve(8 * 1024);
424         }
425         tokio_02_dep::io::AsyncRead::poll_read_buf(me.inner, cx, &mut Bytes05(me.buf))
426     }
427 }
428 
429 #[cfg(feature = "tokio-03")]
430 impl<R> CombineRead<BufReader<R>, dyn tokio_03_dep::io::AsyncRead> for Bufferless
431 where
432     R: tokio_03_dep::io::AsyncRead,
433 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut BufReader<R>>, ) -> Poll<io::Result<usize>>434     fn poll_extend_buf(
435         &mut self,
436         cx: &mut Context<'_>,
437         read: Pin<&mut BufReader<R>>,
438     ) -> Poll<io::Result<usize>> {
439         let me = read.project();
440 
441         tokio_03_read_buf(cx, me.inner, me.buf)
442     }
443 }
444 
445 #[cfg(feature = "tokio")]
446 impl<R> CombineRead<BufReader<R>, dyn tokio_dep::io::AsyncRead> for Bufferless
447 where
448     R: tokio_dep::io::AsyncRead,
449 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut BufReader<R>>, ) -> Poll<io::Result<usize>>450     fn poll_extend_buf(
451         &mut self,
452         cx: &mut Context<'_>,
453         read: Pin<&mut BufReader<R>>,
454     ) -> Poll<io::Result<usize>> {
455         let me = read.project();
456 
457         tokio_read_buf(me.inner, cx, me.buf)
458     }
459 }
460 
461 #[cfg(feature = "futures-03")]
462 impl<R> CombineAsyncRead<BufReader<R>> for Bufferless
463 where
464     R: futures_io_03::AsyncRead,
465 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut BufReader<R>>, ) -> Poll<io::Result<usize>>466     fn poll_extend_buf(
467         &mut self,
468         cx: &mut Context<'_>,
469         read: Pin<&mut BufReader<R>>,
470     ) -> Poll<io::Result<usize>> {
471         let me = read.project();
472 
473         poll_extend_buf(me.buf, cx, me.inner)
474     }
475 
extend_buf<'a>( &'a mut self, mut read: Pin<&'a mut BufReader<R>>, ) -> ExtendBuf<'a, Self, BufReader<R>>476     fn extend_buf<'a>(
477         &'a mut self,
478         mut read: Pin<&'a mut BufReader<R>>,
479     ) -> ExtendBuf<'a, Self, BufReader<R>> {
480         let me = read.as_mut().project();
481 
482         if !me.buf.has_remaining_mut() {
483             me.buf.reserve(8 * 1024);
484         }
485         // Copy of tokio's read_buf method (but it has to force initialize the buffer)
486         let bs = me.buf.chunk_mut();
487 
488         for i in 0..bs.len() {
489             bs.write_byte(i, 0);
490         }
491         ExtendBuf { buffer: self, read }
492     }
493 }
494 
495 #[cfg(feature = "futures-03")]
poll_extend_buf<R>( buf: &mut BytesMut, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>> where R: futures_io_03::AsyncRead,496 fn poll_extend_buf<R>(
497     buf: &mut BytesMut,
498     cx: &mut Context<'_>,
499     read: Pin<&mut R>,
500 ) -> Poll<io::Result<usize>>
501 where
502     R: futures_io_03::AsyncRead,
503 {
504     // Copy of tokio's read_buf method (but it has to force initialize the buffer)
505     let n = {
506         let bs = buf.chunk_mut();
507         // preinit the buffer
508         for i in 0..bs.len() {
509             bs.write_byte(i, 0);
510         }
511 
512         // Convert to `&mut [u8]`
513         // SAFETY: preinitialize the buffer
514         let bs = unsafe { &mut *(bs as *mut _ as *mut [u8]) };
515 
516         let n = ready!(read.poll_read(cx, bs))?;
517         assert!(
518             n <= bs.len(),
519             "AsyncRead reported that it initialized more than the number of bytes in the buffer"
520         );
521         n
522     };
523     // SAFETY: the buffer was preinitialized
524     unsafe { buf.advance_mut(n) };
525     Poll::Ready(Ok(n))
526 }
527 
528 #[cfg(feature = "tokio-02")]
529 impl<R: tokio_02_dep::io::AsyncRead> tokio_02_dep::io::AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>530     fn poll_read(
531         mut self: Pin<&mut Self>,
532         cx: &mut Context<'_>,
533         buf: &mut [u8],
534     ) -> Poll<io::Result<usize>> {
535         use tokio_02_dep::io::AsyncBufRead;
536 
537         // If we don't have any buffered data and we're doing a massive read
538         // (larger than our internal buffer), bypass our internal buffer
539         // entirely.
540         if !self.buf.has_remaining_mut() && buf.len() >= self.buf.len() {
541             let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
542             self.discard_buffer();
543             return Poll::Ready(res);
544         }
545         let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
546         let nread = rem.read(buf)?;
547         self.consume(nread);
548         Poll::Ready(Ok(nread))
549     }
550 
551     // we can't skip unconditionally because of the large buffer case in read.
prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool552     unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
553         self.inner.prepare_uninitialized_buffer(buf)
554     }
555 }
556 
557 #[cfg(feature = "tokio-02")]
558 impl<R: tokio_02_dep::io::AsyncRead> tokio_02_dep::io::AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>559     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
560         let me = self.project();
561 
562         // If we've reached the end of our internal buffer then we need to fetch
563         // some more data from the underlying reader.
564         // Branch using `>=` instead of the more correct `==`
565         // to tell the compiler that the pos..cap slice is always valid.
566 
567         if me.buf.is_empty() {
568             ready!(me.inner.poll_read_buf(cx, &mut Bytes05(me.buf)))?;
569         }
570         Poll::Ready(Ok(&me.buf[..]))
571     }
572 
consume(self: Pin<&mut Self>, amt: usize)573     fn consume(self: Pin<&mut Self>, amt: usize) {
574         let me = self.project();
575         me.buf.advance(amt);
576     }
577 }
578 
579 #[cfg(feature = "tokio-02")]
580 impl<R: tokio_02_dep::io::AsyncRead + tokio_02_dep::io::AsyncWrite> tokio_02_dep::io::AsyncWrite
581     for BufReader<R>
582 {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>583     fn poll_write(
584         self: Pin<&mut Self>,
585         cx: &mut Context<'_>,
586         buf: &[u8],
587     ) -> Poll<io::Result<usize>> {
588         self.get_pin_mut().poll_write(cx, buf)
589     }
590 
poll_write_buf<B: bytes_05::Buf>( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B, ) -> Poll<io::Result<usize>>591     fn poll_write_buf<B: bytes_05::Buf>(
592         self: Pin<&mut Self>,
593         cx: &mut Context<'_>,
594         buf: &mut B,
595     ) -> Poll<io::Result<usize>> {
596         self.get_pin_mut().poll_write_buf(cx, buf)
597     }
598 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>599     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
600         self.get_pin_mut().poll_flush(cx)
601     }
602 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>603     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
604         self.get_pin_mut().poll_shutdown(cx)
605     }
606 }
607 
608 #[cfg(feature = "tokio-03")]
609 impl<R: tokio_03_dep::io::AsyncRead> tokio_03_dep::io::AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut tokio_03_dep::io::ReadBuf<'_>, ) -> Poll<io::Result<()>>610     fn poll_read(
611         mut self: Pin<&mut Self>,
612         cx: &mut Context<'_>,
613         buf: &mut tokio_03_dep::io::ReadBuf<'_>,
614     ) -> Poll<io::Result<()>> {
615         // If we don't have any buffered data and we're doing a massive read
616         // (larger than our internal buffer), bypass our internal buffer
617         // entirely.
618         if !self.buf.has_remaining_mut() && buf.remaining() >= self.buf.len() {
619             let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
620             self.discard_buffer();
621             return Poll::Ready(res);
622         }
623         let rem = ready!(self.as_mut().poll_fill_buf(cx))?;
624         let amt = std::cmp::min(rem.len(), buf.remaining());
625         buf.put_slice(&rem[..amt]);
626         self.consume(amt);
627         Poll::Ready(Ok(()))
628     }
629 }
630 
631 #[cfg(feature = "tokio-03")]
632 impl<R: tokio_03_dep::io::AsyncRead> tokio_03_dep::io::AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>633     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
634         let me = self.project();
635 
636         // If we've reached the end of our internal buffer then we need to fetch
637         // some more data from the underlying reader.
638         if me.buf.is_empty() {
639             ready!(tokio_03_read_buf(cx, me.inner, me.buf))?;
640         }
641         Poll::Ready(Ok(&me.buf[..]))
642     }
643 
consume(self: Pin<&mut Self>, amt: usize)644     fn consume(self: Pin<&mut Self>, amt: usize) {
645         let me = self.project();
646         me.buf.advance(amt);
647     }
648 }
649 
650 #[cfg(feature = "tokio-03")]
651 impl<R: tokio_03_dep::io::AsyncRead + tokio_03_dep::io::AsyncWrite> tokio_03_dep::io::AsyncWrite
652     for BufReader<R>
653 {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>654     fn poll_write(
655         self: Pin<&mut Self>,
656         cx: &mut Context<'_>,
657         buf: &[u8],
658     ) -> Poll<io::Result<usize>> {
659         self.get_pin_mut().poll_write(cx, buf)
660     }
661 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>662     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
663         self.get_pin_mut().poll_flush(cx)
664     }
665 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>666     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
667         self.get_pin_mut().poll_shutdown(cx)
668     }
669 }
670 
671 #[cfg(feature = "tokio")]
672 impl<R: tokio_dep::io::AsyncRead> tokio_dep::io::AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut tokio_dep::io::ReadBuf<'_>, ) -> Poll<io::Result<()>>673     fn poll_read(
674         mut self: Pin<&mut Self>,
675         cx: &mut Context<'_>,
676         buf: &mut tokio_dep::io::ReadBuf<'_>,
677     ) -> Poll<io::Result<()>> {
678         // If we don't have any buffered data and we're doing a massive read
679         // (larger than our internal buffer), bypass our internal buffer
680         // entirely.
681         if !self.buf.has_remaining_mut() && buf.remaining() >= self.buf.len() {
682             let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
683             self.discard_buffer();
684             return Poll::Ready(res);
685         }
686         let rem = ready!(self.as_mut().poll_fill_buf(cx))?;
687         let amt = std::cmp::min(rem.len(), buf.remaining());
688         buf.put_slice(&rem[..amt]);
689         self.consume(amt);
690         Poll::Ready(Ok(()))
691     }
692 }
693 
694 #[cfg(feature = "tokio")]
695 impl<R: tokio_dep::io::AsyncRead> tokio_dep::io::AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>696     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
697         let me = self.project();
698 
699         // If we've reached the end of our internal buffer then we need to fetch
700         // some more data from the underlying reader.
701         if me.buf.is_empty() {
702             ready!(tokio_read_buf(me.inner, cx, me.buf))?;
703         }
704         Poll::Ready(Ok(&me.buf[..]))
705     }
706 
consume(self: Pin<&mut Self>, amt: usize)707     fn consume(self: Pin<&mut Self>, amt: usize) {
708         let me = self.project();
709         me.buf.advance(amt);
710     }
711 }
712 
713 #[cfg(feature = "tokio")]
714 impl<R: tokio_dep::io::AsyncRead + tokio_dep::io::AsyncWrite> tokio_dep::io::AsyncWrite
715     for BufReader<R>
716 {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>717     fn poll_write(
718         self: Pin<&mut Self>,
719         cx: &mut Context<'_>,
720         buf: &[u8],
721     ) -> Poll<io::Result<usize>> {
722         self.get_pin_mut().poll_write(cx, buf)
723     }
724 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>725     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
726         self.get_pin_mut().poll_flush(cx)
727     }
728 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>729     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
730         self.get_pin_mut().poll_shutdown(cx)
731     }
732 }
733 
734 impl<R: Read> Read for BufReader<R> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>735     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
736         // If we don't have any buffered data and we're doing a massive read
737         // (larger than our internal buffer), bypass our internal buffer
738         // entirely.
739         if !self.buf.has_remaining_mut() && buf.len() >= self.buf.len() {
740             let res = self.read(buf);
741             self.buf.clear();
742             return res;
743         }
744         let nread = {
745             let mut rem = self.fill_buf()?;
746             rem.read(buf)?
747         };
748         self.consume(nread);
749         Ok(nread)
750     }
751 }
752 
753 impl<R: Read> BufRead for BufReader<R> {
fill_buf(&mut self) -> io::Result<&[u8]>754     fn fill_buf(&mut self) -> io::Result<&[u8]> {
755         // If we've reached the end of our internal buffer then we need to fetch
756         // some more data from the underlying reader.
757         // Branch using `>=` instead of the more correct `==`
758         // to tell the compiler that the pos..cap slice is always valid.
759 
760         if self.buf.is_empty() {
761             Bufferless.extend_buf_sync(self)?;
762         }
763         Ok(&self.buf[..])
764     }
765 
consume(&mut self, amt: usize)766     fn consume(&mut self, amt: usize) {
767         self.buf.advance(amt);
768     }
769 }
770 
771 #[cfg(test)]
772 #[cfg(feature = "tokio-02")]
773 mod tests {
774     use super::{BufReader, Bufferless, CombineRead};
775 
776     use std::{io, pin::Pin};
777 
778     use {
779         bytes_05::BytesMut,
780         tokio_02_dep::{
781             self as tokio,
782             io::{AsyncRead, AsyncReadExt},
783         },
784     };
785 
786     impl<R: AsyncRead> BufReader<R> {
extend_buf_tokio_02(mut self: Pin<&mut Self>) -> io::Result<usize>787         async fn extend_buf_tokio_02(mut self: Pin<&mut Self>) -> io::Result<usize> {
788             crate::future_ext::poll_fn(|cx| Bufferless.poll_extend_buf(cx, self.as_mut())).await
789         }
790     }
791 
792     #[tokio::test]
buf_reader()793     async fn buf_reader() {
794         let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
795 
796         let mut buf = [0u8; 3];
797         read.read(&mut buf).await.unwrap();
798         assert_eq!(buf, [1, 2, 3]);
799 
800         let mut buf = [0u8; 3];
801         read.read(&mut buf).await.unwrap();
802         assert_eq!(buf, [4, 5, 6]);
803 
804         let mut buf = [0u8; 3];
805         read.read(&mut buf).await.unwrap();
806         assert_eq!(buf, [7, 8, 9]);
807 
808         let mut buf = [1u8; 3];
809         read.read(&mut buf).await.unwrap();
810         assert_eq!(buf, [0, 1, 1]);
811     }
812 
813     #[tokio::test]
buf_reader_buf()814     async fn buf_reader_buf() {
815         let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
816 
817         let mut buf = BytesMut::with_capacity(3);
818         read.read_buf(&mut buf).await.unwrap();
819         assert_eq!(&buf[..], [1, 2, 3]);
820 
821         read.read_buf(&mut buf).await.unwrap();
822         assert_eq!(&buf[..], [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
823     }
824 
825     #[tokio::test]
buf_reader_extend_buf()826     async fn buf_reader_extend_buf() {
827         let read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
828         futures_03_dep::pin_mut!(read);
829 
830         assert_eq!(read.as_mut().extend_buf_tokio_02().await.unwrap(), 3);
831         assert_eq!(read.buffer(), [1, 2, 3]);
832 
833         assert_eq!(read.as_mut().extend_buf_tokio_02().await.unwrap(), 7);
834         assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
835     }
836 }
837 
838 #[cfg(test)]
839 #[cfg(feature = "tokio")]
840 mod tests_tokio_1 {
841     use super::{BufReader, Bufferless, CombineRead};
842 
843     use std::{io, pin::Pin};
844 
845     use {
846         bytes::BytesMut,
847         tokio_dep::{
848             self as tokio,
849             io::{AsyncRead, AsyncReadExt},
850         },
851     };
852 
853     impl<R: AsyncRead> BufReader<R> {
extend_buf_tokio(mut self: Pin<&mut Self>) -> io::Result<usize>854         async fn extend_buf_tokio(mut self: Pin<&mut Self>) -> io::Result<usize> {
855             crate::future_ext::poll_fn(|cx| Bufferless.poll_extend_buf(cx, self.as_mut())).await
856         }
857     }
858 
859     #[tokio::test]
buf_reader()860     async fn buf_reader() {
861         let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
862 
863         let mut buf = [0u8; 3];
864         read.read(&mut buf).await.unwrap();
865         assert_eq!(buf, [1, 2, 3]);
866 
867         let mut buf = [0u8; 3];
868         read.read(&mut buf).await.unwrap();
869         assert_eq!(buf, [4, 5, 6]);
870 
871         let mut buf = [0u8; 3];
872         read.read(&mut buf).await.unwrap();
873         assert_eq!(buf, [7, 8, 9]);
874 
875         let mut buf = [1u8; 3];
876         read.read(&mut buf).await.unwrap();
877         assert_eq!(buf, [0, 1, 1]);
878     }
879 
880     #[tokio::test]
buf_reader_buf()881     async fn buf_reader_buf() {
882         let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
883 
884         let mut buf = BytesMut::with_capacity(3);
885         read.read_buf(&mut buf).await.unwrap();
886         assert_eq!(&buf[..], [1, 2, 3]);
887 
888         read.read_buf(&mut buf).await.unwrap();
889         assert_eq!(&buf[..], [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
890     }
891 
892     #[tokio::test]
buf_reader_extend_buf()893     async fn buf_reader_extend_buf() {
894         let read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
895         futures_03_dep::pin_mut!(read);
896 
897         assert_eq!(read.as_mut().extend_buf_tokio().await.unwrap(), 3);
898         assert_eq!(read.buffer(), [1, 2, 3]);
899 
900         assert_eq!(read.as_mut().extend_buf_tokio().await.unwrap(), 7);
901         assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
902     }
903 }
904 
905 #[cfg(test)]
906 mod tests_sync {
907     use super::{BufReader, Bufferless, CombineSyncRead};
908 
909     use std::io::Read;
910 
911     #[test]
912     #[allow(clippy::unused_io_amount)]
buf_reader()913     fn buf_reader() {
914         let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
915 
916         let mut buf = [0u8; 3];
917         read.read(&mut buf).unwrap();
918         assert_eq!(buf, [1, 2, 3]);
919 
920         let mut buf = [0u8; 3];
921         read.read(&mut buf).unwrap();
922         assert_eq!(buf, [4, 5, 6]);
923 
924         let mut buf = [0u8; 3];
925         read.read(&mut buf).unwrap();
926         assert_eq!(buf, [7, 8, 9]);
927 
928         let mut buf = [1u8; 3];
929         read.read(&mut buf).unwrap();
930         assert_eq!(buf, [0, 1, 1]);
931     }
932 
933     #[test]
buf_reader_extend_buf()934     fn buf_reader_extend_buf() {
935         let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
936 
937         assert_eq!(Bufferless.extend_buf_sync(&mut read).unwrap(), 3);
938         assert_eq!(read.buffer(), [1, 2, 3]);
939 
940         assert_eq!(Bufferless.extend_buf_sync(&mut read).unwrap(), 7);
941         assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
942     }
943 }
944