1 use std::io::{self, BufRead, Read};
2
3 #[cfg(any(
4 feature = "futures-03",
5 feature = "tokio-02",
6 feature = "tokio-03",
7 feature = "tokio"
8 ))]
9 use std::pin::Pin;
10
11 #[cfg(any(
12 feature = "futures-03",
13 feature = "tokio-02",
14 feature = "tokio-03"
15 ))]
16 use std::mem::MaybeUninit;
17
18 #[cfg(feature = "futures-core-03")]
19 use std::task::{Context, Poll};
20
21 #[cfg(feature = "futures-03")]
22 use std::future::Future;
23
24 use bytes::{Buf, BufMut, BytesMut};
25
26 #[cfg(feature = "pin-project-lite")]
27 use pin_project_lite::pin_project;
28
29 #[cfg(feature = "tokio-03")]
30 use tokio_03_dep::io::AsyncBufRead as _;
31
32 #[cfg(feature = "tokio")]
33 use tokio_dep::io::AsyncBufRead as _;
34
35 #[cfg(feature = "futures-core-03")]
36 use futures_core_03::ready;
37
38 #[cfg(feature = "pin-project-lite")]
39 pin_project! {
40 /// `BufReader` used by `Decoder` when it is constructed with [`Decoder::new_bufferless`][]
41 ///
42 /// [`Decoder::new_bufferless`]: ../decoder/struct.Decoder.html#method.new_bufferless
43 #[derive(Debug)]
44 pub struct BufReader<R> {
45 #[pin]
46 inner: R,
47 buf: BytesMut
48 }
49 }
50
51 #[cfg(not(feature = "pin-project-lite"))]
52 /// `BufReader` used by `Decoder` when it is constructed with [`Decoder::new_bufferless`][]
53 ///
54 /// [`Decoder::new_bufferless`]: ../decoder/struct.Decoder.html#method.new_bufferless
55 #[derive(Debug)]
56 pub struct BufReader<R> {
57 inner: R,
58 buf: BytesMut,
59 }
60
61 impl<R> BufReader<R> {
62 /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
63 /// but may change in the future.
new(inner: R) -> Self64 pub fn new(inner: R) -> Self {
65 Self::with_capacity(8096, inner)
66 }
67
68 /// Creates a new `BufReader` with the specified buffer capacity.
with_capacity(capacity: usize, inner: R) -> Self69 pub fn with_capacity(capacity: usize, inner: R) -> Self {
70 let buf = BytesMut::with_capacity(capacity);
71
72 Self { inner, buf }
73 }
74
75 /// Gets a reference to the underlying reader.
76 ///
77 /// It is inadvisable to directly read from the underlying reader.
get_ref(&self) -> &R78 pub fn get_ref(&self) -> &R {
79 &self.inner
80 }
81
82 /// Gets a mutable reference to the underlying reader.
83 ///
84 /// It is inadvisable to directly read from the underlying reader.
get_mut(&mut self) -> &mut R85 pub fn get_mut(&mut self) -> &mut R {
86 &mut self.inner
87 }
88
89 #[cfg(feature = "pin-project-lite")]
90 /// Gets a pinned mutable reference to the underlying reader.
91 ///
92 /// It is inadvisable to directly read from the underlying reader.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R>93 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
94 self.project().inner
95 }
96
97 /// Consumes this `BufWriter`, returning the underlying reader.
98 ///
99 /// Note that any leftover data in the internal buffer is lost.
into_inner(self) -> R100 pub fn into_inner(self) -> R {
101 self.inner
102 }
103
104 /// Returns a reference to the internally buffered data.
105 ///
106 /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
buffer(&self) -> &[u8]107 pub fn buffer(&self) -> &[u8] {
108 &self.buf
109 }
110
111 /// Invalidates all data in the internal buffer.
112 #[inline]
113 #[cfg(any(feature = "tokio-02", feature = "tokio-03", feature = "tokio"))]
discard_buffer(self: Pin<&mut Self>)114 fn discard_buffer(self: Pin<&mut Self>) {
115 let me = self.project();
116 me.buf.clear();
117 }
118 }
119
120 mod sealed {
121 pub trait Sealed {}
122 }
123
124 #[doc(hidden)]
125 pub trait CombineBuffer<R>: sealed::Sealed {
buffer<'a>(&'a self, read: &'a R) -> &'a [u8]126 fn buffer<'a>(&'a self, read: &'a R) -> &'a [u8];
127
advance(&mut self, read: &mut R, len: usize)128 fn advance(&mut self, read: &mut R, len: usize);
129
130 #[cfg(feature = "pin-project-lite")]
advance_pin(&mut self, read: Pin<&mut R>, len: usize)131 fn advance_pin(&mut self, read: Pin<&mut R>, len: usize);
132 }
133
134 #[doc(hidden)]
135 pub trait CombineSyncRead<R>: CombineBuffer<R> {
extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize>136 fn extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize>;
137 }
138
139 #[cfg(any(feature = "tokio-02", feature = "tokio-03", feature = "tokio"))]
140 #[doc(hidden)]
141 pub trait CombineRead<R, T: ?Sized>: CombineBuffer<R> {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>142 fn poll_extend_buf(
143 &mut self,
144 cx: &mut Context<'_>,
145 read: Pin<&mut R>,
146 ) -> Poll<io::Result<usize>>;
147 }
148
149 #[cfg(feature = "futures-03")]
150 #[doc(hidden)]
151 pub trait CombineAsyncRead<R>: CombineBuffer<R> {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>152 fn poll_extend_buf(
153 &mut self,
154 cx: &mut Context<'_>,
155 read: Pin<&mut R>,
156 ) -> Poll<io::Result<usize>>;
157
extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R> where Self: Sized158 fn extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R>
159 where
160 Self: Sized;
161 }
162
163 #[cfg(feature = "futures-03")]
164 pin_project_lite::pin_project! {
165 #[doc(hidden)]
166 pub struct ExtendBuf<'a, C, R> {
167 buffer: &'a mut C,
168 read: Pin<&'a mut R>
169 }
170 }
171
172 #[cfg(feature = "futures-03")]
173 impl<'a, C, R> Future for ExtendBuf<'a, C, R>
174 where
175 C: CombineAsyncRead<R>,
176 {
177 type Output = io::Result<usize>;
178
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>179 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
180 let me = self.project();
181 me.buffer.poll_extend_buf(cx, me.read.as_mut())
182 }
183 }
184
185 /// Marker used by `Decoder` for an internal buffer
186 #[derive(Default)]
187 pub struct Buffer(pub(crate) BytesMut);
188
189 impl sealed::Sealed for Buffer {}
190
191 impl<R> CombineBuffer<R> for Buffer {
buffer<'a>(&'a self, _read: &'a R) -> &'a [u8]192 fn buffer<'a>(&'a self, _read: &'a R) -> &'a [u8] {
193 &self.0
194 }
195
advance(&mut self, _read: &mut R, len: usize)196 fn advance(&mut self, _read: &mut R, len: usize) {
197 self.0.advance(len);
198 }
199
200 #[cfg(feature = "pin-project-lite")]
advance_pin(&mut self, _read: Pin<&mut R>, len: usize)201 fn advance_pin(&mut self, _read: Pin<&mut R>, len: usize) {
202 self.0.advance(len);
203 }
204 }
205
206 impl<R> CombineSyncRead<R> for Buffer
207 where
208 R: Read,
209 {
extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize>210 fn extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize> {
211 extend_buf_sync(&mut self.0, read)
212 }
213 }
214
215 #[cfg(feature = "futures-03")]
216 impl<R> CombineAsyncRead<R> for Buffer
217 where
218 R: futures_io_03::AsyncRead,
219 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>220 fn poll_extend_buf(
221 &mut self,
222 cx: &mut Context<'_>,
223 read: Pin<&mut R>,
224 ) -> Poll<io::Result<usize>> {
225 poll_extend_buf(&mut self.0, cx, read)
226 }
227
extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R>228 fn extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R> {
229 if !self.0.has_remaining_mut() {
230 self.0.reserve(8 * 1024);
231 }
232 // Copy of tokio's read_buf method (but it has to force initialize the buffer)
233 let bs = self.0.chunk_mut();
234
235 for i in 0..bs.len() {
236 bs.write_byte(i, 0);
237 }
238 ExtendBuf { buffer: self, read }
239 }
240 }
241
242 #[cfg(feature = "tokio-02")]
243 impl<R> CombineRead<R, dyn tokio_02_dep::io::AsyncRead> for Buffer
244 where
245 R: tokio_02_dep::io::AsyncRead,
246 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>247 fn poll_extend_buf(
248 &mut self,
249 cx: &mut Context<'_>,
250 read: Pin<&mut R>,
251 ) -> Poll<io::Result<usize>> {
252 if !self.0.has_remaining_mut() {
253 self.0.reserve(8 * 1024);
254 }
255 read.poll_read_buf(cx, &mut Bytes05(&mut self.0))
256 }
257 }
258
259 #[cfg(feature = "tokio-03")]
tokio_03_to_read_buf(bs: &mut BytesMut) -> tokio_03_dep::io::ReadBuf<'_>260 fn tokio_03_to_read_buf(bs: &mut BytesMut) -> tokio_03_dep::io::ReadBuf<'_> {
261 let uninit = bs.chunk_mut();
262 unsafe {
263 tokio_03_dep::io::ReadBuf::uninit(std::slice::from_raw_parts_mut(
264 uninit.as_mut_ptr() as *mut MaybeUninit<u8>,
265 uninit.len(),
266 ))
267 }
268 }
269
270 #[cfg(feature = "tokio-03")]
271 impl<R> CombineRead<R, dyn tokio_03_dep::io::AsyncRead> for Buffer
272 where
273 R: tokio_03_dep::io::AsyncRead,
274 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>275 fn poll_extend_buf(
276 &mut self,
277 cx: &mut Context<'_>,
278 read: Pin<&mut R>,
279 ) -> Poll<io::Result<usize>> {
280 tokio_03_read_buf(cx, read, &mut self.0)
281 }
282 }
283
284 #[cfg(feature = "tokio-03")]
tokio_03_read_buf( cx: &mut Context<'_>, read: Pin<&mut impl tokio_03_dep::io::AsyncRead>, bs: &mut bytes::BytesMut, ) -> Poll<io::Result<usize>>285 fn tokio_03_read_buf(
286 cx: &mut Context<'_>,
287 read: Pin<&mut impl tokio_03_dep::io::AsyncRead>,
288 bs: &mut bytes::BytesMut,
289 ) -> Poll<io::Result<usize>> {
290 if !bs.has_remaining_mut() {
291 bs.reserve(8 * 1024);
292 }
293
294 let mut buf = tokio_03_to_read_buf(bs);
295 ready!(read.poll_read(cx, &mut buf))?;
296 unsafe {
297 let n = buf.filled().len();
298 bs.advance_mut(n);
299 Poll::Ready(Ok(n))
300 }
301 }
302
303 #[cfg(feature = "tokio")]
304 impl<R> CombineRead<R, dyn tokio_dep::io::AsyncRead> for Buffer
305 where
306 R: tokio_dep::io::AsyncRead,
307 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>308 fn poll_extend_buf(
309 &mut self,
310 cx: &mut Context<'_>,
311 read: Pin<&mut R>,
312 ) -> Poll<io::Result<usize>> {
313 tokio_read_buf(read, cx, &mut self.0)
314 }
315 }
316
317 #[cfg(feature = "tokio")]
tokio_read_buf( read: Pin<&mut impl tokio_dep::io::AsyncRead>, cx: &mut Context<'_>, bs: &mut bytes::BytesMut, ) -> Poll<io::Result<usize>>318 fn tokio_read_buf(
319 read: Pin<&mut impl tokio_dep::io::AsyncRead>,
320 cx: &mut Context<'_>,
321 bs: &mut bytes::BytesMut,
322 ) -> Poll<io::Result<usize>> {
323 if !bs.has_remaining_mut() {
324 bs.reserve(8 * 1024);
325 }
326
327 tokio_util::io::poll_read_buf(read, cx, bs)
328 }
329
330 /// Marker used by `Decoder` for an external buffer
331 #[derive(Default)]
332 pub struct Bufferless;
333
334 impl sealed::Sealed for Bufferless {}
335
336 impl<R> CombineBuffer<BufReader<R>> for Bufferless {
buffer<'a>(&'a self, read: &'a BufReader<R>) -> &'a [u8]337 fn buffer<'a>(&'a self, read: &'a BufReader<R>) -> &'a [u8] {
338 &read.buf
339 }
340
advance(&mut self, read: &mut BufReader<R>, len: usize)341 fn advance(&mut self, read: &mut BufReader<R>, len: usize) {
342 read.buf.advance(len);
343 }
344
345 #[cfg(feature = "pin-project-lite")]
advance_pin(&mut self, read: Pin<&mut BufReader<R>>, len: usize)346 fn advance_pin(&mut self, read: Pin<&mut BufReader<R>>, len: usize) {
347 read.project().buf.advance(len);
348 }
349 }
350
351 impl<R> CombineSyncRead<BufReader<R>> for Bufferless
352 where
353 R: Read,
354 {
extend_buf_sync(&mut self, read: &mut BufReader<R>) -> io::Result<usize>355 fn extend_buf_sync(&mut self, read: &mut BufReader<R>) -> io::Result<usize> {
356 extend_buf_sync(&mut read.buf, &mut read.inner)
357 }
358 }
359
extend_buf_sync<R>(buf: &mut BytesMut, read: &mut R) -> io::Result<usize> where R: Read,360 fn extend_buf_sync<R>(buf: &mut BytesMut, read: &mut R) -> io::Result<usize>
361 where
362 R: Read,
363 {
364 if !buf.has_remaining_mut() {
365 buf.reserve(8 * 1024);
366 }
367
368 // Copy of tokio's poll_read_buf method (but it has to force initialize the buffer)
369 let n = {
370 let bs = buf.chunk_mut();
371
372 for i in 0..bs.len() {
373 bs.write_byte(i, 0);
374 }
375
376 // Convert to `&mut [u8]`
377 // SAFETY: the entire buffer is preinitialized above
378 let bs = unsafe { &mut *(bs as *mut _ as *mut [u8]) };
379
380 let n = read.read(bs)?;
381 assert!(
382 n <= bs.len(),
383 "AsyncRead reported that it initialized more than the number of bytes in the buffer"
384 );
385 n
386 };
387
388 // SAFETY: the entire buffer has been preinitialized
389 unsafe { buf.advance_mut(n) };
390
391 Ok(n)
392 }
393
394 #[cfg(feature = "tokio-02")]
395 struct Bytes05<'a>(&'a mut BytesMut);
396
397 #[cfg(feature = "tokio-02")]
398 impl bytes_05::BufMut for Bytes05<'_> {
remaining_mut(&self) -> usize399 fn remaining_mut(&self) -> usize {
400 self.0.remaining_mut()
401 }
advance_mut(&mut self, cnt: usize)402 unsafe fn advance_mut(&mut self, cnt: usize) {
403 self.0.advance_mut(cnt)
404 }
bytes_mut(&mut self) -> &mut [MaybeUninit<u8>]405 fn bytes_mut(&mut self) -> &mut [MaybeUninit<u8>] {
406 unsafe { &mut *(self.0.chunk_mut() as *mut _ as *mut [MaybeUninit<u8>]) }
407 }
408 }
409
410 #[cfg(feature = "tokio-02")]
411 impl<R> CombineRead<BufReader<R>, dyn tokio_02_dep::io::AsyncRead> for Bufferless
412 where
413 R: tokio_02_dep::io::AsyncRead,
414 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut BufReader<R>>, ) -> Poll<io::Result<usize>>415 fn poll_extend_buf(
416 &mut self,
417 cx: &mut Context<'_>,
418 read: Pin<&mut BufReader<R>>,
419 ) -> Poll<io::Result<usize>> {
420 let me = read.project();
421
422 if !me.buf.has_remaining_mut() {
423 me.buf.reserve(8 * 1024);
424 }
425 tokio_02_dep::io::AsyncRead::poll_read_buf(me.inner, cx, &mut Bytes05(me.buf))
426 }
427 }
428
429 #[cfg(feature = "tokio-03")]
430 impl<R> CombineRead<BufReader<R>, dyn tokio_03_dep::io::AsyncRead> for Bufferless
431 where
432 R: tokio_03_dep::io::AsyncRead,
433 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut BufReader<R>>, ) -> Poll<io::Result<usize>>434 fn poll_extend_buf(
435 &mut self,
436 cx: &mut Context<'_>,
437 read: Pin<&mut BufReader<R>>,
438 ) -> Poll<io::Result<usize>> {
439 let me = read.project();
440
441 tokio_03_read_buf(cx, me.inner, me.buf)
442 }
443 }
444
445 #[cfg(feature = "tokio")]
446 impl<R> CombineRead<BufReader<R>, dyn tokio_dep::io::AsyncRead> for Bufferless
447 where
448 R: tokio_dep::io::AsyncRead,
449 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut BufReader<R>>, ) -> Poll<io::Result<usize>>450 fn poll_extend_buf(
451 &mut self,
452 cx: &mut Context<'_>,
453 read: Pin<&mut BufReader<R>>,
454 ) -> Poll<io::Result<usize>> {
455 let me = read.project();
456
457 tokio_read_buf(me.inner, cx, me.buf)
458 }
459 }
460
461 #[cfg(feature = "futures-03")]
462 impl<R> CombineAsyncRead<BufReader<R>> for Bufferless
463 where
464 R: futures_io_03::AsyncRead,
465 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut BufReader<R>>, ) -> Poll<io::Result<usize>>466 fn poll_extend_buf(
467 &mut self,
468 cx: &mut Context<'_>,
469 read: Pin<&mut BufReader<R>>,
470 ) -> Poll<io::Result<usize>> {
471 let me = read.project();
472
473 poll_extend_buf(me.buf, cx, me.inner)
474 }
475
extend_buf<'a>( &'a mut self, mut read: Pin<&'a mut BufReader<R>>, ) -> ExtendBuf<'a, Self, BufReader<R>>476 fn extend_buf<'a>(
477 &'a mut self,
478 mut read: Pin<&'a mut BufReader<R>>,
479 ) -> ExtendBuf<'a, Self, BufReader<R>> {
480 let me = read.as_mut().project();
481
482 if !me.buf.has_remaining_mut() {
483 me.buf.reserve(8 * 1024);
484 }
485 // Copy of tokio's read_buf method (but it has to force initialize the buffer)
486 let bs = me.buf.chunk_mut();
487
488 for i in 0..bs.len() {
489 bs.write_byte(i, 0);
490 }
491 ExtendBuf { buffer: self, read }
492 }
493 }
494
495 #[cfg(feature = "futures-03")]
poll_extend_buf<R>( buf: &mut BytesMut, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>> where R: futures_io_03::AsyncRead,496 fn poll_extend_buf<R>(
497 buf: &mut BytesMut,
498 cx: &mut Context<'_>,
499 read: Pin<&mut R>,
500 ) -> Poll<io::Result<usize>>
501 where
502 R: futures_io_03::AsyncRead,
503 {
504 // Copy of tokio's read_buf method (but it has to force initialize the buffer)
505 let n = {
506 let bs = buf.chunk_mut();
507 // preinit the buffer
508 for i in 0..bs.len() {
509 bs.write_byte(i, 0);
510 }
511
512 // Convert to `&mut [u8]`
513 // SAFETY: preinitialize the buffer
514 let bs = unsafe { &mut *(bs as *mut _ as *mut [u8]) };
515
516 let n = ready!(read.poll_read(cx, bs))?;
517 assert!(
518 n <= bs.len(),
519 "AsyncRead reported that it initialized more than the number of bytes in the buffer"
520 );
521 n
522 };
523 // SAFETY: the buffer was preinitialized
524 unsafe { buf.advance_mut(n) };
525 Poll::Ready(Ok(n))
526 }
527
528 #[cfg(feature = "tokio-02")]
529 impl<R: tokio_02_dep::io::AsyncRead> tokio_02_dep::io::AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>530 fn poll_read(
531 mut self: Pin<&mut Self>,
532 cx: &mut Context<'_>,
533 buf: &mut [u8],
534 ) -> Poll<io::Result<usize>> {
535 use tokio_02_dep::io::AsyncBufRead;
536
537 // If we don't have any buffered data and we're doing a massive read
538 // (larger than our internal buffer), bypass our internal buffer
539 // entirely.
540 if !self.buf.has_remaining_mut() && buf.len() >= self.buf.len() {
541 let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
542 self.discard_buffer();
543 return Poll::Ready(res);
544 }
545 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
546 let nread = rem.read(buf)?;
547 self.consume(nread);
548 Poll::Ready(Ok(nread))
549 }
550
551 // we can't skip unconditionally because of the large buffer case in read.
prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool552 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
553 self.inner.prepare_uninitialized_buffer(buf)
554 }
555 }
556
557 #[cfg(feature = "tokio-02")]
558 impl<R: tokio_02_dep::io::AsyncRead> tokio_02_dep::io::AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>559 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
560 let me = self.project();
561
562 // If we've reached the end of our internal buffer then we need to fetch
563 // some more data from the underlying reader.
564 // Branch using `>=` instead of the more correct `==`
565 // to tell the compiler that the pos..cap slice is always valid.
566
567 if me.buf.is_empty() {
568 ready!(me.inner.poll_read_buf(cx, &mut Bytes05(me.buf)))?;
569 }
570 Poll::Ready(Ok(&me.buf[..]))
571 }
572
consume(self: Pin<&mut Self>, amt: usize)573 fn consume(self: Pin<&mut Self>, amt: usize) {
574 let me = self.project();
575 me.buf.advance(amt);
576 }
577 }
578
579 #[cfg(feature = "tokio-02")]
580 impl<R: tokio_02_dep::io::AsyncRead + tokio_02_dep::io::AsyncWrite> tokio_02_dep::io::AsyncWrite
581 for BufReader<R>
582 {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>583 fn poll_write(
584 self: Pin<&mut Self>,
585 cx: &mut Context<'_>,
586 buf: &[u8],
587 ) -> Poll<io::Result<usize>> {
588 self.get_pin_mut().poll_write(cx, buf)
589 }
590
poll_write_buf<B: bytes_05::Buf>( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B, ) -> Poll<io::Result<usize>>591 fn poll_write_buf<B: bytes_05::Buf>(
592 self: Pin<&mut Self>,
593 cx: &mut Context<'_>,
594 buf: &mut B,
595 ) -> Poll<io::Result<usize>> {
596 self.get_pin_mut().poll_write_buf(cx, buf)
597 }
598
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>599 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
600 self.get_pin_mut().poll_flush(cx)
601 }
602
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>603 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
604 self.get_pin_mut().poll_shutdown(cx)
605 }
606 }
607
608 #[cfg(feature = "tokio-03")]
609 impl<R: tokio_03_dep::io::AsyncRead> tokio_03_dep::io::AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut tokio_03_dep::io::ReadBuf<'_>, ) -> Poll<io::Result<()>>610 fn poll_read(
611 mut self: Pin<&mut Self>,
612 cx: &mut Context<'_>,
613 buf: &mut tokio_03_dep::io::ReadBuf<'_>,
614 ) -> Poll<io::Result<()>> {
615 // If we don't have any buffered data and we're doing a massive read
616 // (larger than our internal buffer), bypass our internal buffer
617 // entirely.
618 if !self.buf.has_remaining_mut() && buf.remaining() >= self.buf.len() {
619 let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
620 self.discard_buffer();
621 return Poll::Ready(res);
622 }
623 let rem = ready!(self.as_mut().poll_fill_buf(cx))?;
624 let amt = std::cmp::min(rem.len(), buf.remaining());
625 buf.put_slice(&rem[..amt]);
626 self.consume(amt);
627 Poll::Ready(Ok(()))
628 }
629 }
630
631 #[cfg(feature = "tokio-03")]
632 impl<R: tokio_03_dep::io::AsyncRead> tokio_03_dep::io::AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>633 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
634 let me = self.project();
635
636 // If we've reached the end of our internal buffer then we need to fetch
637 // some more data from the underlying reader.
638 if me.buf.is_empty() {
639 ready!(tokio_03_read_buf(cx, me.inner, me.buf))?;
640 }
641 Poll::Ready(Ok(&me.buf[..]))
642 }
643
consume(self: Pin<&mut Self>, amt: usize)644 fn consume(self: Pin<&mut Self>, amt: usize) {
645 let me = self.project();
646 me.buf.advance(amt);
647 }
648 }
649
650 #[cfg(feature = "tokio-03")]
651 impl<R: tokio_03_dep::io::AsyncRead + tokio_03_dep::io::AsyncWrite> tokio_03_dep::io::AsyncWrite
652 for BufReader<R>
653 {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>654 fn poll_write(
655 self: Pin<&mut Self>,
656 cx: &mut Context<'_>,
657 buf: &[u8],
658 ) -> Poll<io::Result<usize>> {
659 self.get_pin_mut().poll_write(cx, buf)
660 }
661
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>662 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
663 self.get_pin_mut().poll_flush(cx)
664 }
665
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>666 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
667 self.get_pin_mut().poll_shutdown(cx)
668 }
669 }
670
671 #[cfg(feature = "tokio")]
672 impl<R: tokio_dep::io::AsyncRead> tokio_dep::io::AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut tokio_dep::io::ReadBuf<'_>, ) -> Poll<io::Result<()>>673 fn poll_read(
674 mut self: Pin<&mut Self>,
675 cx: &mut Context<'_>,
676 buf: &mut tokio_dep::io::ReadBuf<'_>,
677 ) -> Poll<io::Result<()>> {
678 // If we don't have any buffered data and we're doing a massive read
679 // (larger than our internal buffer), bypass our internal buffer
680 // entirely.
681 if !self.buf.has_remaining_mut() && buf.remaining() >= self.buf.len() {
682 let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
683 self.discard_buffer();
684 return Poll::Ready(res);
685 }
686 let rem = ready!(self.as_mut().poll_fill_buf(cx))?;
687 let amt = std::cmp::min(rem.len(), buf.remaining());
688 buf.put_slice(&rem[..amt]);
689 self.consume(amt);
690 Poll::Ready(Ok(()))
691 }
692 }
693
694 #[cfg(feature = "tokio")]
695 impl<R: tokio_dep::io::AsyncRead> tokio_dep::io::AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>696 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
697 let me = self.project();
698
699 // If we've reached the end of our internal buffer then we need to fetch
700 // some more data from the underlying reader.
701 if me.buf.is_empty() {
702 ready!(tokio_read_buf(me.inner, cx, me.buf))?;
703 }
704 Poll::Ready(Ok(&me.buf[..]))
705 }
706
consume(self: Pin<&mut Self>, amt: usize)707 fn consume(self: Pin<&mut Self>, amt: usize) {
708 let me = self.project();
709 me.buf.advance(amt);
710 }
711 }
712
713 #[cfg(feature = "tokio")]
714 impl<R: tokio_dep::io::AsyncRead + tokio_dep::io::AsyncWrite> tokio_dep::io::AsyncWrite
715 for BufReader<R>
716 {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>717 fn poll_write(
718 self: Pin<&mut Self>,
719 cx: &mut Context<'_>,
720 buf: &[u8],
721 ) -> Poll<io::Result<usize>> {
722 self.get_pin_mut().poll_write(cx, buf)
723 }
724
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>725 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
726 self.get_pin_mut().poll_flush(cx)
727 }
728
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>729 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
730 self.get_pin_mut().poll_shutdown(cx)
731 }
732 }
733
734 impl<R: Read> Read for BufReader<R> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>735 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
736 // If we don't have any buffered data and we're doing a massive read
737 // (larger than our internal buffer), bypass our internal buffer
738 // entirely.
739 if !self.buf.has_remaining_mut() && buf.len() >= self.buf.len() {
740 let res = self.read(buf);
741 self.buf.clear();
742 return res;
743 }
744 let nread = {
745 let mut rem = self.fill_buf()?;
746 rem.read(buf)?
747 };
748 self.consume(nread);
749 Ok(nread)
750 }
751 }
752
753 impl<R: Read> BufRead for BufReader<R> {
fill_buf(&mut self) -> io::Result<&[u8]>754 fn fill_buf(&mut self) -> io::Result<&[u8]> {
755 // If we've reached the end of our internal buffer then we need to fetch
756 // some more data from the underlying reader.
757 // Branch using `>=` instead of the more correct `==`
758 // to tell the compiler that the pos..cap slice is always valid.
759
760 if self.buf.is_empty() {
761 Bufferless.extend_buf_sync(self)?;
762 }
763 Ok(&self.buf[..])
764 }
765
consume(&mut self, amt: usize)766 fn consume(&mut self, amt: usize) {
767 self.buf.advance(amt);
768 }
769 }
770
771 #[cfg(test)]
772 #[cfg(feature = "tokio-02")]
773 mod tests {
774 use super::{BufReader, Bufferless, CombineRead};
775
776 use std::{io, pin::Pin};
777
778 use {
779 bytes_05::BytesMut,
780 tokio_02_dep::{
781 self as tokio,
782 io::{AsyncRead, AsyncReadExt},
783 },
784 };
785
786 impl<R: AsyncRead> BufReader<R> {
extend_buf_tokio_02(mut self: Pin<&mut Self>) -> io::Result<usize>787 async fn extend_buf_tokio_02(mut self: Pin<&mut Self>) -> io::Result<usize> {
788 crate::future_ext::poll_fn(|cx| Bufferless.poll_extend_buf(cx, self.as_mut())).await
789 }
790 }
791
792 #[tokio::test]
buf_reader()793 async fn buf_reader() {
794 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
795
796 let mut buf = [0u8; 3];
797 read.read(&mut buf).await.unwrap();
798 assert_eq!(buf, [1, 2, 3]);
799
800 let mut buf = [0u8; 3];
801 read.read(&mut buf).await.unwrap();
802 assert_eq!(buf, [4, 5, 6]);
803
804 let mut buf = [0u8; 3];
805 read.read(&mut buf).await.unwrap();
806 assert_eq!(buf, [7, 8, 9]);
807
808 let mut buf = [1u8; 3];
809 read.read(&mut buf).await.unwrap();
810 assert_eq!(buf, [0, 1, 1]);
811 }
812
813 #[tokio::test]
buf_reader_buf()814 async fn buf_reader_buf() {
815 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
816
817 let mut buf = BytesMut::with_capacity(3);
818 read.read_buf(&mut buf).await.unwrap();
819 assert_eq!(&buf[..], [1, 2, 3]);
820
821 read.read_buf(&mut buf).await.unwrap();
822 assert_eq!(&buf[..], [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
823 }
824
825 #[tokio::test]
buf_reader_extend_buf()826 async fn buf_reader_extend_buf() {
827 let read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
828 futures_03_dep::pin_mut!(read);
829
830 assert_eq!(read.as_mut().extend_buf_tokio_02().await.unwrap(), 3);
831 assert_eq!(read.buffer(), [1, 2, 3]);
832
833 assert_eq!(read.as_mut().extend_buf_tokio_02().await.unwrap(), 7);
834 assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
835 }
836 }
837
838 #[cfg(test)]
839 #[cfg(feature = "tokio")]
840 mod tests_tokio_1 {
841 use super::{BufReader, Bufferless, CombineRead};
842
843 use std::{io, pin::Pin};
844
845 use {
846 bytes::BytesMut,
847 tokio_dep::{
848 self as tokio,
849 io::{AsyncRead, AsyncReadExt},
850 },
851 };
852
853 impl<R: AsyncRead> BufReader<R> {
extend_buf_tokio(mut self: Pin<&mut Self>) -> io::Result<usize>854 async fn extend_buf_tokio(mut self: Pin<&mut Self>) -> io::Result<usize> {
855 crate::future_ext::poll_fn(|cx| Bufferless.poll_extend_buf(cx, self.as_mut())).await
856 }
857 }
858
859 #[tokio::test]
buf_reader()860 async fn buf_reader() {
861 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
862
863 let mut buf = [0u8; 3];
864 read.read(&mut buf).await.unwrap();
865 assert_eq!(buf, [1, 2, 3]);
866
867 let mut buf = [0u8; 3];
868 read.read(&mut buf).await.unwrap();
869 assert_eq!(buf, [4, 5, 6]);
870
871 let mut buf = [0u8; 3];
872 read.read(&mut buf).await.unwrap();
873 assert_eq!(buf, [7, 8, 9]);
874
875 let mut buf = [1u8; 3];
876 read.read(&mut buf).await.unwrap();
877 assert_eq!(buf, [0, 1, 1]);
878 }
879
880 #[tokio::test]
buf_reader_buf()881 async fn buf_reader_buf() {
882 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
883
884 let mut buf = BytesMut::with_capacity(3);
885 read.read_buf(&mut buf).await.unwrap();
886 assert_eq!(&buf[..], [1, 2, 3]);
887
888 read.read_buf(&mut buf).await.unwrap();
889 assert_eq!(&buf[..], [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
890 }
891
892 #[tokio::test]
buf_reader_extend_buf()893 async fn buf_reader_extend_buf() {
894 let read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
895 futures_03_dep::pin_mut!(read);
896
897 assert_eq!(read.as_mut().extend_buf_tokio().await.unwrap(), 3);
898 assert_eq!(read.buffer(), [1, 2, 3]);
899
900 assert_eq!(read.as_mut().extend_buf_tokio().await.unwrap(), 7);
901 assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
902 }
903 }
904
905 #[cfg(test)]
906 mod tests_sync {
907 use super::{BufReader, Bufferless, CombineSyncRead};
908
909 use std::io::Read;
910
911 #[test]
912 #[allow(clippy::unused_io_amount)]
buf_reader()913 fn buf_reader() {
914 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
915
916 let mut buf = [0u8; 3];
917 read.read(&mut buf).unwrap();
918 assert_eq!(buf, [1, 2, 3]);
919
920 let mut buf = [0u8; 3];
921 read.read(&mut buf).unwrap();
922 assert_eq!(buf, [4, 5, 6]);
923
924 let mut buf = [0u8; 3];
925 read.read(&mut buf).unwrap();
926 assert_eq!(buf, [7, 8, 9]);
927
928 let mut buf = [1u8; 3];
929 read.read(&mut buf).unwrap();
930 assert_eq!(buf, [0, 1, 1]);
931 }
932
933 #[test]
buf_reader_extend_buf()934 fn buf_reader_extend_buf() {
935 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
936
937 assert_eq!(Bufferless.extend_buf_sync(&mut read).unwrap(), 3);
938 assert_eq!(read.buffer(), [1, 2, 3]);
939
940 assert_eq!(Bufferless.extend_buf_sync(&mut read).unwrap(), 7);
941 assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
942 }
943 }
944