1 use bytes::Buf; 2 use futures_core::stream::Stream; 3 use std::io; 4 use std::pin::Pin; 5 use std::task::{Context, Poll}; 6 use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; 7 8 /// Convert a [`Stream`] of byte chunks into an [`AsyncRead`]. 9 /// 10 /// This type performs the inverse operation of [`ReaderStream`]. 11 /// 12 /// This type also implements the [`AsyncBufRead`] trait, so you can use it 13 /// to read a `Stream` of byte chunks line-by-line. See the examples below. 14 /// 15 /// # Example 16 /// 17 /// ``` 18 /// use bytes::Bytes; 19 /// use tokio::io::{AsyncReadExt, Result}; 20 /// use tokio_util::io::StreamReader; 21 /// # #[tokio::main(flavor = "current_thread")] 22 /// # async fn main() -> std::io::Result<()> { 23 /// 24 /// // Create a stream from an iterator. 25 /// let stream = tokio_stream::iter(vec![ 26 /// Result::Ok(Bytes::from_static(&[0, 1, 2, 3])), 27 /// Result::Ok(Bytes::from_static(&[4, 5, 6, 7])), 28 /// Result::Ok(Bytes::from_static(&[8, 9, 10, 11])), 29 /// ]); 30 /// 31 /// // Convert it to an AsyncRead. 32 /// let mut read = StreamReader::new(stream); 33 /// 34 /// // Read five bytes from the stream. 35 /// let mut buf = [0; 5]; 36 /// read.read_exact(&mut buf).await?; 37 /// assert_eq!(buf, [0, 1, 2, 3, 4]); 38 /// 39 /// // Read the rest of the current chunk. 40 /// assert_eq!(read.read(&mut buf).await?, 3); 41 /// assert_eq!(&buf[..3], [5, 6, 7]); 42 /// 43 /// // Read the next chunk. 44 /// assert_eq!(read.read(&mut buf).await?, 4); 45 /// assert_eq!(&buf[..4], [8, 9, 10, 11]); 46 /// 47 /// // We have now reached the end. 48 /// assert_eq!(read.read(&mut buf).await?, 0); 49 /// 50 /// # Ok(()) 51 /// # } 52 /// ``` 53 /// 54 /// If the stream produces errors which are not [`std::io::Error`], 55 /// the errors can be converted using [`StreamExt`] to map each 56 /// element. 57 /// 58 /// ``` 59 /// use bytes::Bytes; 60 /// use tokio::io::AsyncReadExt; 61 /// use tokio_util::io::StreamReader; 62 /// use tokio_stream::StreamExt; 63 /// # #[tokio::main(flavor = "current_thread")] 64 /// # async fn main() -> std::io::Result<()> { 65 /// 66 /// // Create a stream from an iterator, including an error. 67 /// let stream = tokio_stream::iter(vec![ 68 /// Result::Ok(Bytes::from_static(&[0, 1, 2, 3])), 69 /// Result::Ok(Bytes::from_static(&[4, 5, 6, 7])), 70 /// Result::Err("Something bad happened!") 71 /// ]); 72 /// 73 /// // Use StreamExt to map the stream and error to a std::io::Error 74 /// let stream = stream.map(|result| result.map_err(|err| { 75 /// std::io::Error::new(std::io::ErrorKind::Other, err) 76 /// })); 77 /// 78 /// // Convert it to an AsyncRead. 79 /// let mut read = StreamReader::new(stream); 80 /// 81 /// // Read five bytes from the stream. 82 /// let mut buf = [0; 5]; 83 /// read.read_exact(&mut buf).await?; 84 /// assert_eq!(buf, [0, 1, 2, 3, 4]); 85 /// 86 /// // Read the rest of the current chunk. 87 /// assert_eq!(read.read(&mut buf).await?, 3); 88 /// assert_eq!(&buf[..3], [5, 6, 7]); 89 /// 90 /// // Reading the next chunk will produce an error 91 /// let error = read.read(&mut buf).await.unwrap_err(); 92 /// assert_eq!(error.kind(), std::io::ErrorKind::Other); 93 /// assert_eq!(error.into_inner().unwrap().to_string(), "Something bad happened!"); 94 /// 95 /// // We have now reached the end. 96 /// assert_eq!(read.read(&mut buf).await?, 0); 97 /// 98 /// # Ok(()) 99 /// # } 100 /// ``` 101 /// 102 /// Using the [`AsyncBufRead`] impl, you can read a `Stream` of byte chunks 103 /// line-by-line. Note that you will usually also need to convert the error 104 /// type when doing this. See the second example for an explanation of how 105 /// to do this. 106 /// 107 /// ``` 108 /// use tokio::io::{Result, AsyncBufReadExt}; 109 /// use tokio_util::io::StreamReader; 110 /// # #[tokio::main(flavor = "current_thread")] 111 /// # async fn main() -> std::io::Result<()> { 112 /// 113 /// // Create a stream of byte chunks. 114 /// let stream = tokio_stream::iter(vec![ 115 /// Result::Ok(b"The first line.\n".as_slice()), 116 /// Result::Ok(b"The second line.".as_slice()), 117 /// Result::Ok(b"\nThe third".as_slice()), 118 /// Result::Ok(b" line.\nThe fourth line.\nThe fifth line.\n".as_slice()), 119 /// ]); 120 /// 121 /// // Convert it to an AsyncRead. 122 /// let mut read = StreamReader::new(stream); 123 /// 124 /// // Loop through the lines from the `StreamReader`. 125 /// let mut line = String::new(); 126 /// let mut lines = Vec::new(); 127 /// loop { 128 /// line.clear(); 129 /// let len = read.read_line(&mut line).await?; 130 /// if len == 0 { break; } 131 /// lines.push(line.clone()); 132 /// } 133 /// 134 /// // Verify that we got the lines we expected. 135 /// assert_eq!( 136 /// lines, 137 /// vec![ 138 /// "The first line.\n", 139 /// "The second line.\n", 140 /// "The third line.\n", 141 /// "The fourth line.\n", 142 /// "The fifth line.\n", 143 /// ] 144 /// ); 145 /// # Ok(()) 146 /// # } 147 /// ``` 148 /// 149 /// [`AsyncRead`]: tokio::io::AsyncRead 150 /// [`AsyncBufRead`]: tokio::io::AsyncBufRead 151 /// [`Stream`]: futures_core::Stream 152 /// [`ReaderStream`]: crate::io::ReaderStream 153 /// [`StreamExt`]: https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html 154 #[derive(Debug)] 155 pub struct StreamReader<S, B> { 156 // This field is pinned. 157 inner: S, 158 // This field is not pinned. 159 chunk: Option<B>, 160 } 161 162 impl<S, B, E> StreamReader<S, B> 163 where 164 S: Stream<Item = Result<B, E>>, 165 B: Buf, 166 E: Into<std::io::Error>, 167 { 168 /// Convert a stream of byte chunks into an [`AsyncRead`](tokio::io::AsyncRead). 169 /// 170 /// The item should be a [`Result`] with the ok variant being something that 171 /// implements the [`Buf`] trait (e.g. `Vec<u8>` or `Bytes`). The error 172 /// should be convertible into an [io error]. 173 /// 174 /// [`Result`]: std::result::Result 175 /// [`Buf`]: bytes::Buf 176 /// [io error]: std::io::Error new(stream: S) -> Self177 pub fn new(stream: S) -> Self { 178 Self { 179 inner: stream, 180 chunk: None, 181 } 182 } 183 184 /// Do we have a chunk and is it non-empty? has_chunk(&self) -> bool185 fn has_chunk(&self) -> bool { 186 if let Some(ref chunk) = self.chunk { 187 chunk.remaining() > 0 188 } else { 189 false 190 } 191 } 192 193 /// Consumes this `StreamReader`, returning a Tuple consisting 194 /// of the underlying stream and an Option of the internal buffer, 195 /// which is Some in case the buffer contains elements. into_inner_with_chunk(self) -> (S, Option<B>)196 pub fn into_inner_with_chunk(self) -> (S, Option<B>) { 197 if self.has_chunk() { 198 (self.inner, self.chunk) 199 } else { 200 (self.inner, None) 201 } 202 } 203 } 204 205 impl<S, B> StreamReader<S, B> { 206 /// Gets a reference to the underlying stream. 207 /// 208 /// It is inadvisable to directly read from the underlying stream. get_ref(&self) -> &S209 pub fn get_ref(&self) -> &S { 210 &self.inner 211 } 212 213 /// Gets a mutable reference to the underlying stream. 214 /// 215 /// It is inadvisable to directly read from the underlying stream. get_mut(&mut self) -> &mut S216 pub fn get_mut(&mut self) -> &mut S { 217 &mut self.inner 218 } 219 220 /// Gets a pinned mutable reference to the underlying stream. 221 /// 222 /// It is inadvisable to directly read from the underlying stream. get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S>223 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S> { 224 self.project().inner 225 } 226 227 /// Consumes this `BufWriter`, returning the underlying stream. 228 /// 229 /// Note that any leftover data in the internal buffer is lost. 230 /// If you additionally want access to the internal buffer use 231 /// [`into_inner_with_chunk`]. 232 /// 233 /// [`into_inner_with_chunk`]: crate::io::StreamReader::into_inner_with_chunk into_inner(self) -> S234 pub fn into_inner(self) -> S { 235 self.inner 236 } 237 } 238 239 impl<S, B, E> AsyncRead for StreamReader<S, B> 240 where 241 S: Stream<Item = Result<B, E>>, 242 B: Buf, 243 E: Into<std::io::Error>, 244 { poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>245 fn poll_read( 246 mut self: Pin<&mut Self>, 247 cx: &mut Context<'_>, 248 buf: &mut ReadBuf<'_>, 249 ) -> Poll<io::Result<()>> { 250 if buf.remaining() == 0 { 251 return Poll::Ready(Ok(())); 252 } 253 254 let inner_buf = match self.as_mut().poll_fill_buf(cx) { 255 Poll::Ready(Ok(buf)) => buf, 256 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), 257 Poll::Pending => return Poll::Pending, 258 }; 259 let len = std::cmp::min(inner_buf.len(), buf.remaining()); 260 buf.put_slice(&inner_buf[..len]); 261 262 self.consume(len); 263 Poll::Ready(Ok(())) 264 } 265 } 266 267 impl<S, B, E> AsyncBufRead for StreamReader<S, B> 268 where 269 S: Stream<Item = Result<B, E>>, 270 B: Buf, 271 E: Into<std::io::Error>, 272 { poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>273 fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { 274 loop { 275 if self.as_mut().has_chunk() { 276 // This unwrap is very sad, but it can't be avoided. 277 let buf = self.project().chunk.as_ref().unwrap().chunk(); 278 return Poll::Ready(Ok(buf)); 279 } else { 280 match self.as_mut().project().inner.poll_next(cx) { 281 Poll::Ready(Some(Ok(chunk))) => { 282 // Go around the loop in case the chunk is empty. 283 *self.as_mut().project().chunk = Some(chunk); 284 } 285 Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err.into())), 286 Poll::Ready(None) => return Poll::Ready(Ok(&[])), 287 Poll::Pending => return Poll::Pending, 288 } 289 } 290 } 291 } consume(self: Pin<&mut Self>, amt: usize)292 fn consume(self: Pin<&mut Self>, amt: usize) { 293 if amt > 0 { 294 self.project() 295 .chunk 296 .as_mut() 297 .expect("No chunk present") 298 .advance(amt); 299 } 300 } 301 } 302 303 // The code below is a manual expansion of the code that pin-project-lite would 304 // generate. This is done because pin-project-lite fails by hitting the recusion 305 // limit on this struct. (Every line of documentation is handled recursively by 306 // the macro.) 307 308 impl<S: Unpin, B> Unpin for StreamReader<S, B> {} 309 310 struct StreamReaderProject<'a, S, B> { 311 inner: Pin<&'a mut S>, 312 chunk: &'a mut Option<B>, 313 } 314 315 impl<S, B> StreamReader<S, B> { 316 #[inline] project(self: Pin<&mut Self>) -> StreamReaderProject<'_, S, B>317 fn project(self: Pin<&mut Self>) -> StreamReaderProject<'_, S, B> { 318 // SAFETY: We define that only `inner` should be pinned when `Self` is 319 // and have an appropriate `impl Unpin` for this. 320 let me = unsafe { Pin::into_inner_unchecked(self) }; 321 StreamReaderProject { 322 inner: unsafe { Pin::new_unchecked(&mut me.inner) }, 323 chunk: &mut me.chunk, 324 } 325 } 326 } 327