1 use core::pin::Pin; 2 use futures_core::ready; 3 use futures_core::stream::TryStream; 4 use futures_core::task::{Context, Poll}; 5 use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; 6 use pin_project_lite::pin_project; 7 use std::cmp; 8 use std::io::{Error, Result}; 9 10 pin_project! { 11 /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. 12 #[derive(Debug)] 13 #[must_use = "readers do nothing unless polled"] 14 #[cfg_attr(docsrs, doc(cfg(feature = "io")))] 15 pub struct IntoAsyncRead<St> 16 where 17 St: TryStream<Error = Error>, 18 St::Ok: AsRef<[u8]>, 19 { 20 #[pin] 21 stream: St, 22 state: ReadState<St::Ok>, 23 } 24 } 25 26 #[derive(Debug)] 27 enum ReadState<T: AsRef<[u8]>> { 28 Ready { chunk: T, chunk_start: usize }, 29 PendingChunk, 30 Eof, 31 } 32 33 impl<St> IntoAsyncRead<St> 34 where 35 St: TryStream<Error = Error>, 36 St::Ok: AsRef<[u8]>, 37 { new(stream: St) -> Self38 pub(super) fn new(stream: St) -> Self { 39 Self { stream, state: ReadState::PendingChunk } 40 } 41 } 42 43 impl<St> AsyncRead for IntoAsyncRead<St> 44 where 45 St: TryStream<Error = Error>, 46 St::Ok: AsRef<[u8]>, 47 { poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>>48 fn poll_read( 49 self: Pin<&mut Self>, 50 cx: &mut Context<'_>, 51 buf: &mut [u8], 52 ) -> Poll<Result<usize>> { 53 let mut this = self.project(); 54 55 loop { 56 match this.state { 57 ReadState::Ready { chunk, chunk_start } => { 58 let chunk = chunk.as_ref(); 59 let len = cmp::min(buf.len(), chunk.len() - *chunk_start); 60 61 buf[..len].copy_from_slice(&chunk[*chunk_start..*chunk_start + len]); 62 *chunk_start += len; 63 64 if chunk.len() == *chunk_start { 65 *this.state = ReadState::PendingChunk; 66 } 67 68 return Poll::Ready(Ok(len)); 69 } 70 ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) { 71 Some(Ok(chunk)) => { 72 if !chunk.as_ref().is_empty() { 73 *this.state = ReadState::Ready { chunk, chunk_start: 0 }; 74 } 75 } 76 Some(Err(err)) => { 77 *this.state = ReadState::Eof; 78 return Poll::Ready(Err(err)); 79 } 80 None => { 81 *this.state = ReadState::Eof; 82 return Poll::Ready(Ok(0)); 83 } 84 }, 85 ReadState::Eof => { 86 return Poll::Ready(Ok(0)); 87 } 88 } 89 } 90 } 91 } 92 93 impl<St> AsyncWrite for IntoAsyncRead<St> 94 where 95 St: TryStream<Error = Error> + AsyncWrite, 96 St::Ok: AsRef<[u8]>, 97 { poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>98 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { 99 let this = self.project(); 100 this.stream.poll_write(cx, buf) 101 } 102 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>103 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 104 let this = self.project(); 105 this.stream.poll_flush(cx) 106 } 107 poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>108 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 109 let this = self.project(); 110 this.stream.poll_close(cx) 111 } 112 } 113 114 impl<St> AsyncBufRead for IntoAsyncRead<St> 115 where 116 St: TryStream<Error = Error>, 117 St::Ok: AsRef<[u8]>, 118 { poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>119 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { 120 let mut this = self.project(); 121 122 while let ReadState::PendingChunk = this.state { 123 match ready!(this.stream.as_mut().try_poll_next(cx)) { 124 Some(Ok(chunk)) => { 125 if !chunk.as_ref().is_empty() { 126 *this.state = ReadState::Ready { chunk, chunk_start: 0 }; 127 } 128 } 129 Some(Err(err)) => { 130 *this.state = ReadState::Eof; 131 return Poll::Ready(Err(err)); 132 } 133 None => { 134 *this.state = ReadState::Eof; 135 return Poll::Ready(Ok(&[])); 136 } 137 } 138 } 139 140 if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state { 141 let chunk = chunk.as_ref(); 142 return Poll::Ready(Ok(&chunk[chunk_start..])); 143 } 144 145 // To get to this point we must be in ReadState::Eof 146 Poll::Ready(Ok(&[])) 147 } 148 consume(self: Pin<&mut Self>, amount: usize)149 fn consume(self: Pin<&mut Self>, amount: usize) { 150 let this = self.project(); 151 152 // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295 153 if amount == 0 { 154 return; 155 } 156 if let ReadState::Ready { chunk, chunk_start } = this.state { 157 *chunk_start += amount; 158 debug_assert!(*chunk_start <= chunk.as_ref().len()); 159 if *chunk_start >= chunk.as_ref().len() { 160 *this.state = ReadState::PendingChunk; 161 } 162 } else { 163 debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk"); 164 } 165 } 166 } 167