• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::io::sys;
2 use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
3 
4 use std::cmp;
5 use std::future::Future;
6 use std::io;
7 use std::io::prelude::*;
8 use std::pin::Pin;
9 use std::task::Poll::*;
10 use std::task::{Context, Poll};
11 
12 use self::State::*;
13 
14 /// `T` should not implement _both_ Read and Write.
15 #[derive(Debug)]
16 pub(crate) struct Blocking<T> {
17     inner: Option<T>,
18     state: State<T>,
19     /// `true` if the lower IO layer needs flushing.
20     need_flush: bool,
21 }
22 
23 #[derive(Debug)]
24 pub(crate) struct Buf {
25     buf: Vec<u8>,
26     pos: usize,
27 }
28 
29 pub(crate) const MAX_BUF: usize = 2 * 1024 * 1024;
30 
31 #[derive(Debug)]
32 enum State<T> {
33     Idle(Option<Buf>),
34     Busy(sys::Blocking<(io::Result<usize>, Buf, T)>),
35 }
36 
37 cfg_io_blocking! {
38     impl<T> Blocking<T> {
39         #[cfg_attr(feature = "fs", allow(dead_code))]
40         pub(crate) fn new(inner: T) -> Blocking<T> {
41             Blocking {
42                 inner: Some(inner),
43                 state: State::Idle(Some(Buf::with_capacity(0))),
44                 need_flush: false,
45             }
46         }
47     }
48 }
49 
50 impl<T> AsyncRead for Blocking<T>
51 where
52     T: Read + Unpin + Send + 'static,
53 {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, dst: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>54     fn poll_read(
55         mut self: Pin<&mut Self>,
56         cx: &mut Context<'_>,
57         dst: &mut ReadBuf<'_>,
58     ) -> Poll<io::Result<()>> {
59         loop {
60             match self.state {
61                 Idle(ref mut buf_cell) => {
62                     let mut buf = buf_cell.take().unwrap();
63 
64                     if !buf.is_empty() {
65                         buf.copy_to(dst);
66                         *buf_cell = Some(buf);
67                         return Ready(Ok(()));
68                     }
69 
70                     buf.ensure_capacity_for(dst);
71                     let mut inner = self.inner.take().unwrap();
72 
73                     self.state = Busy(sys::run(move || {
74                         let res = buf.read_from(&mut inner);
75                         (res, buf, inner)
76                     }));
77                 }
78                 Busy(ref mut rx) => {
79                     let (res, mut buf, inner) = ready!(Pin::new(rx).poll(cx))?;
80                     self.inner = Some(inner);
81 
82                     match res {
83                         Ok(_) => {
84                             buf.copy_to(dst);
85                             self.state = Idle(Some(buf));
86                             return Ready(Ok(()));
87                         }
88                         Err(e) => {
89                             assert!(buf.is_empty());
90 
91                             self.state = Idle(Some(buf));
92                             return Ready(Err(e));
93                         }
94                     }
95                 }
96             }
97         }
98     }
99 }
100 
101 impl<T> AsyncWrite for Blocking<T>
102 where
103     T: Write + Unpin + Send + 'static,
104 {
poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, src: &[u8], ) -> Poll<io::Result<usize>>105     fn poll_write(
106         mut self: Pin<&mut Self>,
107         cx: &mut Context<'_>,
108         src: &[u8],
109     ) -> Poll<io::Result<usize>> {
110         loop {
111             match self.state {
112                 Idle(ref mut buf_cell) => {
113                     let mut buf = buf_cell.take().unwrap();
114 
115                     assert!(buf.is_empty());
116 
117                     let n = buf.copy_from(src);
118                     let mut inner = self.inner.take().unwrap();
119 
120                     self.state = Busy(sys::run(move || {
121                         let n = buf.len();
122                         let res = buf.write_to(&mut inner).map(|_| n);
123 
124                         (res, buf, inner)
125                     }));
126                     self.need_flush = true;
127 
128                     return Ready(Ok(n));
129                 }
130                 Busy(ref mut rx) => {
131                     let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?;
132                     self.state = Idle(Some(buf));
133                     self.inner = Some(inner);
134 
135                     // If error, return
136                     res?;
137                 }
138             }
139         }
140     }
141 
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>142     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
143         loop {
144             let need_flush = self.need_flush;
145             match self.state {
146                 // The buffer is not used here
147                 Idle(ref mut buf_cell) => {
148                     if need_flush {
149                         let buf = buf_cell.take().unwrap();
150                         let mut inner = self.inner.take().unwrap();
151 
152                         self.state = Busy(sys::run(move || {
153                             let res = inner.flush().map(|_| 0);
154                             (res, buf, inner)
155                         }));
156 
157                         self.need_flush = false;
158                     } else {
159                         return Ready(Ok(()));
160                     }
161                 }
162                 Busy(ref mut rx) => {
163                     let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?;
164                     self.state = Idle(Some(buf));
165                     self.inner = Some(inner);
166 
167                     // If error, return
168                     res?;
169                 }
170             }
171         }
172     }
173 
poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>174     fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
175         Poll::Ready(Ok(()))
176     }
177 }
178 
179 /// Repeats operations that are interrupted.
180 macro_rules! uninterruptibly {
181     ($e:expr) => {{
182         loop {
183             match $e {
184                 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
185                 res => break res,
186             }
187         }
188     }};
189 }
190 
191 impl Buf {
with_capacity(n: usize) -> Buf192     pub(crate) fn with_capacity(n: usize) -> Buf {
193         Buf {
194             buf: Vec::with_capacity(n),
195             pos: 0,
196         }
197     }
198 
is_empty(&self) -> bool199     pub(crate) fn is_empty(&self) -> bool {
200         self.len() == 0
201     }
202 
len(&self) -> usize203     pub(crate) fn len(&self) -> usize {
204         self.buf.len() - self.pos
205     }
206 
copy_to(&mut self, dst: &mut ReadBuf<'_>) -> usize207     pub(crate) fn copy_to(&mut self, dst: &mut ReadBuf<'_>) -> usize {
208         let n = cmp::min(self.len(), dst.remaining());
209         dst.put_slice(&self.bytes()[..n]);
210         self.pos += n;
211 
212         if self.pos == self.buf.len() {
213             self.buf.truncate(0);
214             self.pos = 0;
215         }
216 
217         n
218     }
219 
copy_from(&mut self, src: &[u8]) -> usize220     pub(crate) fn copy_from(&mut self, src: &[u8]) -> usize {
221         assert!(self.is_empty());
222 
223         let n = cmp::min(src.len(), MAX_BUF);
224 
225         self.buf.extend_from_slice(&src[..n]);
226         n
227     }
228 
bytes(&self) -> &[u8]229     pub(crate) fn bytes(&self) -> &[u8] {
230         &self.buf[self.pos..]
231     }
232 
ensure_capacity_for(&mut self, bytes: &ReadBuf<'_>)233     pub(crate) fn ensure_capacity_for(&mut self, bytes: &ReadBuf<'_>) {
234         assert!(self.is_empty());
235 
236         let len = cmp::min(bytes.remaining(), MAX_BUF);
237 
238         if self.buf.len() < len {
239             self.buf.reserve(len - self.buf.len());
240         }
241 
242         unsafe {
243             self.buf.set_len(len);
244         }
245     }
246 
read_from<T: Read>(&mut self, rd: &mut T) -> io::Result<usize>247     pub(crate) fn read_from<T: Read>(&mut self, rd: &mut T) -> io::Result<usize> {
248         let res = uninterruptibly!(rd.read(&mut self.buf));
249 
250         if let Ok(n) = res {
251             self.buf.truncate(n);
252         } else {
253             self.buf.clear();
254         }
255 
256         assert_eq!(self.pos, 0);
257 
258         res
259     }
260 
write_to<T: Write>(&mut self, wr: &mut T) -> io::Result<()>261     pub(crate) fn write_to<T: Write>(&mut self, wr: &mut T) -> io::Result<()> {
262         assert_eq!(self.pos, 0);
263 
264         // `write_all` already ignores interrupts
265         let res = wr.write_all(&self.buf);
266         self.buf.clear();
267         res
268     }
269 }
270 
271 cfg_fs! {
272     impl Buf {
273         pub(crate) fn discard_read(&mut self) -> i64 {
274             let ret = -(self.bytes().len() as i64);
275             self.pos = 0;
276             self.buf.truncate(0);
277             ret
278         }
279     }
280 }
281