• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use bytes::Buf;
2 use futures_core::stream::Stream;
3 use std::io;
4 use std::pin::Pin;
5 use std::task::{Context, Poll};
6 use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf};
7 
8 /// Convert a [`Stream`] of byte chunks into an [`AsyncRead`].
9 ///
10 /// This type performs the inverse operation of [`ReaderStream`].
11 ///
12 /// This type also implements the [`AsyncBufRead`] trait, so you can use it
13 /// to read a `Stream` of byte chunks line-by-line. See the examples below.
14 ///
15 /// # Example
16 ///
17 /// ```
18 /// use bytes::Bytes;
19 /// use tokio::io::{AsyncReadExt, Result};
20 /// use tokio_util::io::StreamReader;
21 /// # #[tokio::main(flavor = "current_thread")]
22 /// # async fn main() -> std::io::Result<()> {
23 ///
24 /// // Create a stream from an iterator.
25 /// let stream = tokio_stream::iter(vec![
26 ///     Result::Ok(Bytes::from_static(&[0, 1, 2, 3])),
27 ///     Result::Ok(Bytes::from_static(&[4, 5, 6, 7])),
28 ///     Result::Ok(Bytes::from_static(&[8, 9, 10, 11])),
29 /// ]);
30 ///
31 /// // Convert it to an AsyncRead.
32 /// let mut read = StreamReader::new(stream);
33 ///
34 /// // Read five bytes from the stream.
35 /// let mut buf = [0; 5];
36 /// read.read_exact(&mut buf).await?;
37 /// assert_eq!(buf, [0, 1, 2, 3, 4]);
38 ///
39 /// // Read the rest of the current chunk.
40 /// assert_eq!(read.read(&mut buf).await?, 3);
41 /// assert_eq!(&buf[..3], [5, 6, 7]);
42 ///
43 /// // Read the next chunk.
44 /// assert_eq!(read.read(&mut buf).await?, 4);
45 /// assert_eq!(&buf[..4], [8, 9, 10, 11]);
46 ///
47 /// // We have now reached the end.
48 /// assert_eq!(read.read(&mut buf).await?, 0);
49 ///
50 /// # Ok(())
51 /// # }
52 /// ```
53 ///
54 /// If the stream produces errors which are not [`std::io::Error`],
55 /// the errors can be converted using [`StreamExt`] to map each
56 /// element.
57 ///
58 /// ```
59 /// use bytes::Bytes;
60 /// use tokio::io::AsyncReadExt;
61 /// use tokio_util::io::StreamReader;
62 /// use tokio_stream::StreamExt;
63 /// # #[tokio::main(flavor = "current_thread")]
64 /// # async fn main() -> std::io::Result<()> {
65 ///
66 /// // Create a stream from an iterator, including an error.
67 /// let stream = tokio_stream::iter(vec![
68 ///     Result::Ok(Bytes::from_static(&[0, 1, 2, 3])),
69 ///     Result::Ok(Bytes::from_static(&[4, 5, 6, 7])),
70 ///     Result::Err("Something bad happened!")
71 /// ]);
72 ///
73 /// // Use StreamExt to map the stream and error to a std::io::Error
74 /// let stream = stream.map(|result| result.map_err(|err| {
75 ///     std::io::Error::new(std::io::ErrorKind::Other, err)
76 /// }));
77 ///
78 /// // Convert it to an AsyncRead.
79 /// let mut read = StreamReader::new(stream);
80 ///
81 /// // Read five bytes from the stream.
82 /// let mut buf = [0; 5];
83 /// read.read_exact(&mut buf).await?;
84 /// assert_eq!(buf, [0, 1, 2, 3, 4]);
85 ///
86 /// // Read the rest of the current chunk.
87 /// assert_eq!(read.read(&mut buf).await?, 3);
88 /// assert_eq!(&buf[..3], [5, 6, 7]);
89 ///
90 /// // Reading the next chunk will produce an error
91 /// let error = read.read(&mut buf).await.unwrap_err();
92 /// assert_eq!(error.kind(), std::io::ErrorKind::Other);
93 /// assert_eq!(error.into_inner().unwrap().to_string(), "Something bad happened!");
94 ///
95 /// // We have now reached the end.
96 /// assert_eq!(read.read(&mut buf).await?, 0);
97 ///
98 /// # Ok(())
99 /// # }
100 /// ```
101 ///
102 /// Using the [`AsyncBufRead`] impl, you can read a `Stream` of byte chunks
103 /// line-by-line. Note that you will usually also need to convert the error
104 /// type when doing this. See the second example for an explanation of how
105 /// to do this.
106 ///
107 /// ```
108 /// use tokio::io::{Result, AsyncBufReadExt};
109 /// use tokio_util::io::StreamReader;
110 /// # #[tokio::main(flavor = "current_thread")]
111 /// # async fn main() -> std::io::Result<()> {
112 ///
113 /// // Create a stream of byte chunks.
114 /// let stream = tokio_stream::iter(vec![
115 ///     Result::Ok(b"The first line.\n".as_slice()),
116 ///     Result::Ok(b"The second line.".as_slice()),
117 ///     Result::Ok(b"\nThe third".as_slice()),
118 ///     Result::Ok(b" line.\nThe fourth line.\nThe fifth line.\n".as_slice()),
119 /// ]);
120 ///
121 /// // Convert it to an AsyncRead.
122 /// let mut read = StreamReader::new(stream);
123 ///
124 /// // Loop through the lines from the `StreamReader`.
125 /// let mut line = String::new();
126 /// let mut lines = Vec::new();
127 /// loop {
128 ///     line.clear();
129 ///     let len = read.read_line(&mut line).await?;
130 ///     if len == 0 { break; }
131 ///     lines.push(line.clone());
132 /// }
133 ///
134 /// // Verify that we got the lines we expected.
135 /// assert_eq!(
136 ///     lines,
137 ///     vec![
138 ///         "The first line.\n",
139 ///         "The second line.\n",
140 ///         "The third line.\n",
141 ///         "The fourth line.\n",
142 ///         "The fifth line.\n",
143 ///     ]
144 /// );
145 /// # Ok(())
146 /// # }
147 /// ```
148 ///
149 /// [`AsyncRead`]: tokio::io::AsyncRead
150 /// [`AsyncBufRead`]: tokio::io::AsyncBufRead
151 /// [`Stream`]: futures_core::Stream
152 /// [`ReaderStream`]: crate::io::ReaderStream
153 /// [`StreamExt`]: https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html
154 #[derive(Debug)]
155 pub struct StreamReader<S, B> {
156     // This field is pinned.
157     inner: S,
158     // This field is not pinned.
159     chunk: Option<B>,
160 }
161 
162 impl<S, B, E> StreamReader<S, B>
163 where
164     S: Stream<Item = Result<B, E>>,
165     B: Buf,
166     E: Into<std::io::Error>,
167 {
168     /// Convert a stream of byte chunks into an [`AsyncRead`](tokio::io::AsyncRead).
169     ///
170     /// The item should be a [`Result`] with the ok variant being something that
171     /// implements the [`Buf`] trait (e.g. `Vec<u8>` or `Bytes`). The error
172     /// should be convertible into an [io error].
173     ///
174     /// [`Result`]: std::result::Result
175     /// [`Buf`]: bytes::Buf
176     /// [io error]: std::io::Error
new(stream: S) -> Self177     pub fn new(stream: S) -> Self {
178         Self {
179             inner: stream,
180             chunk: None,
181         }
182     }
183 
184     /// Do we have a chunk and is it non-empty?
has_chunk(&self) -> bool185     fn has_chunk(&self) -> bool {
186         if let Some(ref chunk) = self.chunk {
187             chunk.remaining() > 0
188         } else {
189             false
190         }
191     }
192 
193     /// Consumes this `StreamReader`, returning a Tuple consisting
194     /// of the underlying stream and an Option of the internal buffer,
195     /// which is Some in case the buffer contains elements.
into_inner_with_chunk(self) -> (S, Option<B>)196     pub fn into_inner_with_chunk(self) -> (S, Option<B>) {
197         if self.has_chunk() {
198             (self.inner, self.chunk)
199         } else {
200             (self.inner, None)
201         }
202     }
203 }
204 
205 impl<S, B> StreamReader<S, B> {
206     /// Gets a reference to the underlying stream.
207     ///
208     /// It is inadvisable to directly read from the underlying stream.
get_ref(&self) -> &S209     pub fn get_ref(&self) -> &S {
210         &self.inner
211     }
212 
213     /// Gets a mutable reference to the underlying stream.
214     ///
215     /// It is inadvisable to directly read from the underlying stream.
get_mut(&mut self) -> &mut S216     pub fn get_mut(&mut self) -> &mut S {
217         &mut self.inner
218     }
219 
220     /// Gets a pinned mutable reference to the underlying stream.
221     ///
222     /// It is inadvisable to directly read from the underlying stream.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S>223     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S> {
224         self.project().inner
225     }
226 
227     /// Consumes this `BufWriter`, returning the underlying stream.
228     ///
229     /// Note that any leftover data in the internal buffer is lost.
230     /// If you additionally want access to the internal buffer use
231     /// [`into_inner_with_chunk`].
232     ///
233     /// [`into_inner_with_chunk`]: crate::io::StreamReader::into_inner_with_chunk
into_inner(self) -> S234     pub fn into_inner(self) -> S {
235         self.inner
236     }
237 }
238 
239 impl<S, B, E> AsyncRead for StreamReader<S, B>
240 where
241     S: Stream<Item = Result<B, E>>,
242     B: Buf,
243     E: Into<std::io::Error>,
244 {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>245     fn poll_read(
246         mut self: Pin<&mut Self>,
247         cx: &mut Context<'_>,
248         buf: &mut ReadBuf<'_>,
249     ) -> Poll<io::Result<()>> {
250         if buf.remaining() == 0 {
251             return Poll::Ready(Ok(()));
252         }
253 
254         let inner_buf = match self.as_mut().poll_fill_buf(cx) {
255             Poll::Ready(Ok(buf)) => buf,
256             Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
257             Poll::Pending => return Poll::Pending,
258         };
259         let len = std::cmp::min(inner_buf.len(), buf.remaining());
260         buf.put_slice(&inner_buf[..len]);
261 
262         self.consume(len);
263         Poll::Ready(Ok(()))
264     }
265 }
266 
267 impl<S, B, E> AsyncBufRead for StreamReader<S, B>
268 where
269     S: Stream<Item = Result<B, E>>,
270     B: Buf,
271     E: Into<std::io::Error>,
272 {
poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>273     fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
274         loop {
275             if self.as_mut().has_chunk() {
276                 // This unwrap is very sad, but it can't be avoided.
277                 let buf = self.project().chunk.as_ref().unwrap().chunk();
278                 return Poll::Ready(Ok(buf));
279             } else {
280                 match self.as_mut().project().inner.poll_next(cx) {
281                     Poll::Ready(Some(Ok(chunk))) => {
282                         // Go around the loop in case the chunk is empty.
283                         *self.as_mut().project().chunk = Some(chunk);
284                     }
285                     Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err.into())),
286                     Poll::Ready(None) => return Poll::Ready(Ok(&[])),
287                     Poll::Pending => return Poll::Pending,
288                 }
289             }
290         }
291     }
consume(self: Pin<&mut Self>, amt: usize)292     fn consume(self: Pin<&mut Self>, amt: usize) {
293         if amt > 0 {
294             self.project()
295                 .chunk
296                 .as_mut()
297                 .expect("No chunk present")
298                 .advance(amt);
299         }
300     }
301 }
302 
303 // The code below is a manual expansion of the code that pin-project-lite would
304 // generate. This is done because pin-project-lite fails by hitting the recusion
305 // limit on this struct. (Every line of documentation is handled recursively by
306 // the macro.)
307 
308 impl<S: Unpin, B> Unpin for StreamReader<S, B> {}
309 
310 struct StreamReaderProject<'a, S, B> {
311     inner: Pin<&'a mut S>,
312     chunk: &'a mut Option<B>,
313 }
314 
315 impl<S, B> StreamReader<S, B> {
316     #[inline]
project(self: Pin<&mut Self>) -> StreamReaderProject<'_, S, B>317     fn project(self: Pin<&mut Self>) -> StreamReaderProject<'_, S, B> {
318         // SAFETY: We define that only `inner` should be pinned when `Self` is
319         // and have an appropriate `impl Unpin` for this.
320         let me = unsafe { Pin::into_inner_unchecked(self) };
321         StreamReaderProject {
322             inner: unsafe { Pin::new_unchecked(&mut me.inner) },
323             chunk: &mut me.chunk,
324         }
325     }
326 }
327