1 use crate::io::sys; 2 use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; 3 4 use std::cmp; 5 use std::future::Future; 6 use std::io; 7 use std::io::prelude::*; 8 use std::pin::Pin; 9 use std::task::Poll::*; 10 use std::task::{Context, Poll}; 11 12 use self::State::*; 13 14 /// `T` should not implement _both_ Read and Write. 15 #[derive(Debug)] 16 pub(crate) struct Blocking<T> { 17 inner: Option<T>, 18 state: State<T>, 19 /// `true` if the lower IO layer needs flushing. 20 need_flush: bool, 21 } 22 23 #[derive(Debug)] 24 pub(crate) struct Buf { 25 buf: Vec<u8>, 26 pos: usize, 27 } 28 29 pub(crate) const MAX_BUF: usize = 2 * 1024 * 1024; 30 31 #[derive(Debug)] 32 enum State<T> { 33 Idle(Option<Buf>), 34 Busy(sys::Blocking<(io::Result<usize>, Buf, T)>), 35 } 36 37 cfg_io_blocking! { 38 impl<T> Blocking<T> { 39 #[cfg_attr(feature = "fs", allow(dead_code))] 40 pub(crate) fn new(inner: T) -> Blocking<T> { 41 Blocking { 42 inner: Some(inner), 43 state: State::Idle(Some(Buf::with_capacity(0))), 44 need_flush: false, 45 } 46 } 47 } 48 } 49 50 impl<T> AsyncRead for Blocking<T> 51 where 52 T: Read + Unpin + Send + 'static, 53 { poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, dst: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>54 fn poll_read( 55 mut self: Pin<&mut Self>, 56 cx: &mut Context<'_>, 57 dst: &mut ReadBuf<'_>, 58 ) -> Poll<io::Result<()>> { 59 loop { 60 match self.state { 61 Idle(ref mut buf_cell) => { 62 let mut buf = buf_cell.take().unwrap(); 63 64 if !buf.is_empty() { 65 buf.copy_to(dst); 66 *buf_cell = Some(buf); 67 return Ready(Ok(())); 68 } 69 70 buf.ensure_capacity_for(dst); 71 let mut inner = self.inner.take().unwrap(); 72 73 self.state = Busy(sys::run(move || { 74 let res = buf.read_from(&mut inner); 75 (res, buf, inner) 76 })); 77 } 78 Busy(ref mut rx) => { 79 let (res, mut buf, inner) = ready!(Pin::new(rx).poll(cx))?; 80 self.inner = Some(inner); 81 82 match res { 83 Ok(_) => { 84 buf.copy_to(dst); 85 self.state = Idle(Some(buf)); 86 return Ready(Ok(())); 87 } 88 Err(e) => { 89 assert!(buf.is_empty()); 90 91 self.state = Idle(Some(buf)); 92 return Ready(Err(e)); 93 } 94 } 95 } 96 } 97 } 98 } 99 } 100 101 impl<T> AsyncWrite for Blocking<T> 102 where 103 T: Write + Unpin + Send + 'static, 104 { poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, src: &[u8], ) -> Poll<io::Result<usize>>105 fn poll_write( 106 mut self: Pin<&mut Self>, 107 cx: &mut Context<'_>, 108 src: &[u8], 109 ) -> Poll<io::Result<usize>> { 110 loop { 111 match self.state { 112 Idle(ref mut buf_cell) => { 113 let mut buf = buf_cell.take().unwrap(); 114 115 assert!(buf.is_empty()); 116 117 let n = buf.copy_from(src); 118 let mut inner = self.inner.take().unwrap(); 119 120 self.state = Busy(sys::run(move || { 121 let n = buf.len(); 122 let res = buf.write_to(&mut inner).map(|_| n); 123 124 (res, buf, inner) 125 })); 126 self.need_flush = true; 127 128 return Ready(Ok(n)); 129 } 130 Busy(ref mut rx) => { 131 let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?; 132 self.state = Idle(Some(buf)); 133 self.inner = Some(inner); 134 135 // If error, return 136 res?; 137 } 138 } 139 } 140 } 141 poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>142 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { 143 loop { 144 let need_flush = self.need_flush; 145 match self.state { 146 // The buffer is not used here 147 Idle(ref mut buf_cell) => { 148 if need_flush { 149 let buf = buf_cell.take().unwrap(); 150 let mut inner = self.inner.take().unwrap(); 151 152 self.state = Busy(sys::run(move || { 153 let res = inner.flush().map(|_| 0); 154 (res, buf, inner) 155 })); 156 157 self.need_flush = false; 158 } else { 159 return Ready(Ok(())); 160 } 161 } 162 Busy(ref mut rx) => { 163 let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?; 164 self.state = Idle(Some(buf)); 165 self.inner = Some(inner); 166 167 // If error, return 168 res?; 169 } 170 } 171 } 172 } 173 poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>174 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { 175 Poll::Ready(Ok(())) 176 } 177 } 178 179 /// Repeats operations that are interrupted. 180 macro_rules! uninterruptibly { 181 ($e:expr) => {{ 182 loop { 183 match $e { 184 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} 185 res => break res, 186 } 187 } 188 }}; 189 } 190 191 impl Buf { with_capacity(n: usize) -> Buf192 pub(crate) fn with_capacity(n: usize) -> Buf { 193 Buf { 194 buf: Vec::with_capacity(n), 195 pos: 0, 196 } 197 } 198 is_empty(&self) -> bool199 pub(crate) fn is_empty(&self) -> bool { 200 self.len() == 0 201 } 202 len(&self) -> usize203 pub(crate) fn len(&self) -> usize { 204 self.buf.len() - self.pos 205 } 206 copy_to(&mut self, dst: &mut ReadBuf<'_>) -> usize207 pub(crate) fn copy_to(&mut self, dst: &mut ReadBuf<'_>) -> usize { 208 let n = cmp::min(self.len(), dst.remaining()); 209 dst.put_slice(&self.bytes()[..n]); 210 self.pos += n; 211 212 if self.pos == self.buf.len() { 213 self.buf.truncate(0); 214 self.pos = 0; 215 } 216 217 n 218 } 219 copy_from(&mut self, src: &[u8]) -> usize220 pub(crate) fn copy_from(&mut self, src: &[u8]) -> usize { 221 assert!(self.is_empty()); 222 223 let n = cmp::min(src.len(), MAX_BUF); 224 225 self.buf.extend_from_slice(&src[..n]); 226 n 227 } 228 bytes(&self) -> &[u8]229 pub(crate) fn bytes(&self) -> &[u8] { 230 &self.buf[self.pos..] 231 } 232 ensure_capacity_for(&mut self, bytes: &ReadBuf<'_>)233 pub(crate) fn ensure_capacity_for(&mut self, bytes: &ReadBuf<'_>) { 234 assert!(self.is_empty()); 235 236 let len = cmp::min(bytes.remaining(), MAX_BUF); 237 238 if self.buf.len() < len { 239 self.buf.reserve(len - self.buf.len()); 240 } 241 242 unsafe { 243 self.buf.set_len(len); 244 } 245 } 246 read_from<T: Read>(&mut self, rd: &mut T) -> io::Result<usize>247 pub(crate) fn read_from<T: Read>(&mut self, rd: &mut T) -> io::Result<usize> { 248 let res = uninterruptibly!(rd.read(&mut self.buf)); 249 250 if let Ok(n) = res { 251 self.buf.truncate(n); 252 } else { 253 self.buf.clear(); 254 } 255 256 assert_eq!(self.pos, 0); 257 258 res 259 } 260 write_to<T: Write>(&mut self, wr: &mut T) -> io::Result<()>261 pub(crate) fn write_to<T: Write>(&mut self, wr: &mut T) -> io::Result<()> { 262 assert_eq!(self.pos, 0); 263 264 // `write_all` already ignores interrupts 265 let res = wr.write_all(&self.buf); 266 self.buf.clear(); 267 res 268 } 269 } 270 271 cfg_fs! { 272 impl Buf { 273 pub(crate) fn discard_read(&mut self) -> i64 { 274 let ret = -(self.bytes().len() as i64); 275 self.pos = 0; 276 self.buf.truncate(0); 277 ret 278 } 279 } 280 } 281