• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use core::pin::Pin;
2 use futures_core::ready;
3 use futures_core::stream::TryStream;
4 use futures_core::task::{Context, Poll};
5 use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
6 use pin_project_lite::pin_project;
7 use std::cmp;
8 use std::io::{Error, Result};
9 
10 pin_project! {
11     /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
12     #[derive(Debug)]
13     #[must_use = "readers do nothing unless polled"]
14     #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
15     pub struct IntoAsyncRead<St>
16     where
17         St: TryStream<Error = Error>,
18         St::Ok: AsRef<[u8]>,
19     {
20         #[pin]
21         stream: St,
22         state: ReadState<St::Ok>,
23     }
24 }
25 
26 #[derive(Debug)]
27 enum ReadState<T: AsRef<[u8]>> {
28     Ready { chunk: T, chunk_start: usize },
29     PendingChunk,
30     Eof,
31 }
32 
33 impl<St> IntoAsyncRead<St>
34 where
35     St: TryStream<Error = Error>,
36     St::Ok: AsRef<[u8]>,
37 {
new(stream: St) -> Self38     pub(super) fn new(stream: St) -> Self {
39         Self { stream, state: ReadState::PendingChunk }
40     }
41 }
42 
43 impl<St> AsyncRead for IntoAsyncRead<St>
44 where
45     St: TryStream<Error = Error>,
46     St::Ok: AsRef<[u8]>,
47 {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>>48     fn poll_read(
49         self: Pin<&mut Self>,
50         cx: &mut Context<'_>,
51         buf: &mut [u8],
52     ) -> Poll<Result<usize>> {
53         let mut this = self.project();
54 
55         loop {
56             match this.state {
57                 ReadState::Ready { chunk, chunk_start } => {
58                     let chunk = chunk.as_ref();
59                     let len = cmp::min(buf.len(), chunk.len() - *chunk_start);
60 
61                     buf[..len].copy_from_slice(&chunk[*chunk_start..*chunk_start + len]);
62                     *chunk_start += len;
63 
64                     if chunk.len() == *chunk_start {
65                         *this.state = ReadState::PendingChunk;
66                     }
67 
68                     return Poll::Ready(Ok(len));
69                 }
70                 ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) {
71                     Some(Ok(chunk)) => {
72                         if !chunk.as_ref().is_empty() {
73                             *this.state = ReadState::Ready { chunk, chunk_start: 0 };
74                         }
75                     }
76                     Some(Err(err)) => {
77                         *this.state = ReadState::Eof;
78                         return Poll::Ready(Err(err));
79                     }
80                     None => {
81                         *this.state = ReadState::Eof;
82                         return Poll::Ready(Ok(0));
83                     }
84                 },
85                 ReadState::Eof => {
86                     return Poll::Ready(Ok(0));
87                 }
88             }
89         }
90     }
91 }
92 
93 impl<St> AsyncWrite for IntoAsyncRead<St>
94 where
95     St: TryStream<Error = Error> + AsyncWrite,
96     St::Ok: AsRef<[u8]>,
97 {
poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>98     fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
99         let this = self.project();
100         this.stream.poll_write(cx, buf)
101     }
102 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>103     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
104         let this = self.project();
105         this.stream.poll_flush(cx)
106     }
107 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>108     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
109         let this = self.project();
110         this.stream.poll_close(cx)
111     }
112 }
113 
114 impl<St> AsyncBufRead for IntoAsyncRead<St>
115 where
116     St: TryStream<Error = Error>,
117     St::Ok: AsRef<[u8]>,
118 {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>119     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
120         let mut this = self.project();
121 
122         while let ReadState::PendingChunk = this.state {
123             match ready!(this.stream.as_mut().try_poll_next(cx)) {
124                 Some(Ok(chunk)) => {
125                     if !chunk.as_ref().is_empty() {
126                         *this.state = ReadState::Ready { chunk, chunk_start: 0 };
127                     }
128                 }
129                 Some(Err(err)) => {
130                     *this.state = ReadState::Eof;
131                     return Poll::Ready(Err(err));
132                 }
133                 None => {
134                     *this.state = ReadState::Eof;
135                     return Poll::Ready(Ok(&[]));
136                 }
137             }
138         }
139 
140         if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state {
141             let chunk = chunk.as_ref();
142             return Poll::Ready(Ok(&chunk[chunk_start..]));
143         }
144 
145         // To get to this point we must be in ReadState::Eof
146         Poll::Ready(Ok(&[]))
147     }
148 
consume(self: Pin<&mut Self>, amount: usize)149     fn consume(self: Pin<&mut Self>, amount: usize) {
150         let this = self.project();
151 
152         // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295
153         if amount == 0 {
154             return;
155         }
156         if let ReadState::Ready { chunk, chunk_start } = this.state {
157             *chunk_start += amount;
158             debug_assert!(*chunk_start <= chunk.as_ref().len());
159             if *chunk_start >= chunk.as_ref().len() {
160                 *this.state = ReadState::PendingChunk;
161             }
162         } else {
163             debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk");
164         }
165     }
166 }
167