1 //! Asynchronous I/O 2 //! 3 //! This crate contains the `AsyncRead`, `AsyncWrite`, `AsyncSeek`, and 4 //! `AsyncBufRead` traits, the asynchronous analogs to 5 //! `std::io::{Read, Write, Seek, BufRead}`. The primary difference is 6 //! that these traits integrate with the asynchronous task system. 7 //! 8 //! All items of this library are only available when the `std` feature of this 9 //! library is activated, and it is activated by default. 10 11 #![cfg_attr(not(feature = "std"), no_std)] 12 #![warn(missing_debug_implementations, missing_docs, rust_2018_idioms, unreachable_pub)] 13 // It cannot be included in the published code because this lints have false positives in the minimum required version. 14 #![cfg_attr(test, warn(single_use_lifetimes))] 15 #![doc(test( 16 no_crate_inject, 17 attr( 18 deny(warnings, rust_2018_idioms, single_use_lifetimes), 19 allow(dead_code, unused_assignments, unused_variables) 20 ) 21 ))] 22 #![cfg_attr(docsrs, feature(doc_cfg))] 23 24 #[cfg(feature = "std")] 25 mod if_std { 26 use std::io; 27 use std::ops::DerefMut; 28 use std::pin::Pin; 29 use std::task::{Context, Poll}; 30 31 // Re-export some types from `std::io` so that users don't have to deal 32 // with conflicts when `use`ing `futures::io` and `std::io`. 33 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 34 #[doc(no_inline)] 35 pub use io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom}; 36 37 /// Read bytes asynchronously. 38 /// 39 /// This trait is analogous to the `std::io::Read` trait, but integrates 40 /// with the asynchronous task system. In particular, the `poll_read` 41 /// method, unlike `Read::read`, will automatically queue the current task 42 /// for wakeup and return if data is not yet available, rather than blocking 43 /// the calling thread. 44 pub trait AsyncRead { 45 /// Attempt to read from the `AsyncRead` into `buf`. 46 /// 47 /// On success, returns `Poll::Ready(Ok(num_bytes_read))`. 48 /// 49 /// If no data is available for reading, the method returns 50 /// `Poll::Pending` and arranges for the current task (via 51 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes 52 /// readable or is closed. 53 /// 54 /// # Implementation 55 /// 56 /// This function may not return errors of kind `WouldBlock` or 57 /// `Interrupted`. Implementations must convert `WouldBlock` into 58 /// `Poll::Pending` and either internally retry or convert 59 /// `Interrupted` into another error kind. poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>>60 fn poll_read( 61 self: Pin<&mut Self>, 62 cx: &mut Context<'_>, 63 buf: &mut [u8], 64 ) -> Poll<Result<usize>>; 65 66 /// Attempt to read from the `AsyncRead` into `bufs` using vectored 67 /// IO operations. 68 /// 69 /// This method is similar to `poll_read`, but allows data to be read 70 /// into multiple buffers using a single operation. 71 /// 72 /// On success, returns `Poll::Ready(Ok(num_bytes_read))`. 73 /// 74 /// If no data is available for reading, the method returns 75 /// `Poll::Pending` and arranges for the current task (via 76 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes 77 /// readable or is closed. 78 /// By default, this method delegates to using `poll_read` on the first 79 /// nonempty buffer in `bufs`, or an empty one if none exists. Objects which 80 /// support vectored IO should override this method. 81 /// 82 /// # Implementation 83 /// 84 /// This function may not return errors of kind `WouldBlock` or 85 /// `Interrupted`. Implementations must convert `WouldBlock` into 86 /// `Poll::Pending` and either internally retry or convert 87 /// `Interrupted` into another error kind. poll_read_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<Result<usize>>88 fn poll_read_vectored( 89 self: Pin<&mut Self>, 90 cx: &mut Context<'_>, 91 bufs: &mut [IoSliceMut<'_>], 92 ) -> Poll<Result<usize>> { 93 for b in bufs { 94 if !b.is_empty() { 95 return self.poll_read(cx, b); 96 } 97 } 98 99 self.poll_read(cx, &mut []) 100 } 101 } 102 103 /// Write bytes asynchronously. 104 /// 105 /// This trait is analogous to the `std::io::Write` trait, but integrates 106 /// with the asynchronous task system. In particular, the `poll_write` 107 /// method, unlike `Write::write`, will automatically queue the current task 108 /// for wakeup and return if the writer cannot take more data, rather than blocking 109 /// the calling thread. 110 pub trait AsyncWrite { 111 /// Attempt to write bytes from `buf` into the object. 112 /// 113 /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. 114 /// 115 /// If the object is not ready for writing, the method returns 116 /// `Poll::Pending` and arranges for the current task (via 117 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes 118 /// writable or is closed. 119 /// 120 /// # Implementation 121 /// 122 /// This function may not return errors of kind `WouldBlock` or 123 /// `Interrupted`. Implementations must convert `WouldBlock` into 124 /// `Poll::Pending` and either internally retry or convert 125 /// `Interrupted` into another error kind. 126 /// 127 /// `poll_write` must try to make progress by flushing the underlying object if 128 /// that is the only way the underlying object can become writable again. poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize>>129 fn poll_write( 130 self: Pin<&mut Self>, 131 cx: &mut Context<'_>, 132 buf: &[u8], 133 ) -> Poll<Result<usize>>; 134 135 /// Attempt to write bytes from `bufs` into the object using vectored 136 /// IO operations. 137 /// 138 /// This method is similar to `poll_write`, but allows data from multiple buffers to be written 139 /// using a single operation. 140 /// 141 /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. 142 /// 143 /// If the object is not ready for writing, the method returns 144 /// `Poll::Pending` and arranges for the current task (via 145 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes 146 /// writable or is closed. 147 /// 148 /// By default, this method delegates to using `poll_write` on the first 149 /// nonempty buffer in `bufs`, or an empty one if none exists. Objects which 150 /// support vectored IO should override this method. 151 /// 152 /// # Implementation 153 /// 154 /// This function may not return errors of kind `WouldBlock` or 155 /// `Interrupted`. Implementations must convert `WouldBlock` into 156 /// `Poll::Pending` and either internally retry or convert 157 /// `Interrupted` into another error kind. poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize>>158 fn poll_write_vectored( 159 self: Pin<&mut Self>, 160 cx: &mut Context<'_>, 161 bufs: &[IoSlice<'_>], 162 ) -> Poll<Result<usize>> { 163 for b in bufs { 164 if !b.is_empty() { 165 return self.poll_write(cx, b); 166 } 167 } 168 169 self.poll_write(cx, &[]) 170 } 171 172 /// Attempt to flush the object, ensuring that any buffered data reach 173 /// their destination. 174 /// 175 /// On success, returns `Poll::Ready(Ok(()))`. 176 /// 177 /// If flushing cannot immediately complete, this method returns 178 /// `Poll::Pending` and arranges for the current task (via 179 /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make 180 /// progress towards flushing. 181 /// 182 /// # Implementation 183 /// 184 /// This function may not return errors of kind `WouldBlock` or 185 /// `Interrupted`. Implementations must convert `WouldBlock` into 186 /// `Poll::Pending` and either internally retry or convert 187 /// `Interrupted` into another error kind. 188 /// 189 /// It only makes sense to do anything here if you actually buffer data. poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>190 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>; 191 192 /// Attempt to close the object. 193 /// 194 /// On success, returns `Poll::Ready(Ok(()))`. 195 /// 196 /// If closing cannot immediately complete, this function returns 197 /// `Poll::Pending` and arranges for the current task (via 198 /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make 199 /// progress towards closing. 200 /// 201 /// # Implementation 202 /// 203 /// This function may not return errors of kind `WouldBlock` or 204 /// `Interrupted`. Implementations must convert `WouldBlock` into 205 /// `Poll::Pending` and either internally retry or convert 206 /// `Interrupted` into another error kind. poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>207 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>; 208 } 209 210 /// Seek bytes asynchronously. 211 /// 212 /// This trait is analogous to the `std::io::Seek` trait, but integrates 213 /// with the asynchronous task system. In particular, the `poll_seek` 214 /// method, unlike `Seek::seek`, will automatically queue the current task 215 /// for wakeup and return if data is not yet available, rather than blocking 216 /// the calling thread. 217 pub trait AsyncSeek { 218 /// Attempt to seek to an offset, in bytes, in a stream. 219 /// 220 /// A seek beyond the end of a stream is allowed, but behavior is defined 221 /// by the implementation. 222 /// 223 /// If the seek operation completed successfully, 224 /// this method returns the new position from the start of the stream. 225 /// That position can be used later with [`SeekFrom::Start`]. 226 /// 227 /// # Errors 228 /// 229 /// Seeking to a negative offset is considered an error. 230 /// 231 /// # Implementation 232 /// 233 /// This function may not return errors of kind `WouldBlock` or 234 /// `Interrupted`. Implementations must convert `WouldBlock` into 235 /// `Poll::Pending` and either internally retry or convert 236 /// `Interrupted` into another error kind. poll_seek( self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<Result<u64>>237 fn poll_seek( 238 self: Pin<&mut Self>, 239 cx: &mut Context<'_>, 240 pos: SeekFrom, 241 ) -> Poll<Result<u64>>; 242 } 243 244 /// Read bytes asynchronously. 245 /// 246 /// This trait is analogous to the `std::io::BufRead` trait, but integrates 247 /// with the asynchronous task system. In particular, the `poll_fill_buf` 248 /// method, unlike `BufRead::fill_buf`, will automatically queue the current task 249 /// for wakeup and return if data is not yet available, rather than blocking 250 /// the calling thread. 251 pub trait AsyncBufRead: AsyncRead { 252 /// Attempt to return the contents of the internal buffer, filling it with more data 253 /// from the inner reader if it is empty. 254 /// 255 /// On success, returns `Poll::Ready(Ok(buf))`. 256 /// 257 /// If no data is available for reading, the method returns 258 /// `Poll::Pending` and arranges for the current task (via 259 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes 260 /// readable or is closed. 261 /// 262 /// This function is a lower-level call. It needs to be paired with the 263 /// [`consume`] method to function properly. When calling this 264 /// method, none of the contents will be "read" in the sense that later 265 /// calling [`poll_read`] may return the same contents. As such, [`consume`] must 266 /// be called with the number of bytes that are consumed from this buffer to 267 /// ensure that the bytes are never returned twice. 268 /// 269 /// [`poll_read`]: AsyncRead::poll_read 270 /// [`consume`]: AsyncBufRead::consume 271 /// 272 /// An empty buffer returned indicates that the stream has reached EOF. 273 /// 274 /// # Implementation 275 /// 276 /// This function may not return errors of kind `WouldBlock` or 277 /// `Interrupted`. Implementations must convert `WouldBlock` into 278 /// `Poll::Pending` and either internally retry or convert 279 /// `Interrupted` into another error kind. poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>280 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>; 281 282 /// Tells this buffer that `amt` bytes have been consumed from the buffer, 283 /// so they should no longer be returned in calls to [`poll_read`]. 284 /// 285 /// This function is a lower-level call. It needs to be paired with the 286 /// [`poll_fill_buf`] method to function properly. This function does 287 /// not perform any I/O, it simply informs this object that some amount of 288 /// its buffer, returned from [`poll_fill_buf`], has been consumed and should 289 /// no longer be returned. As such, this function may do odd things if 290 /// [`poll_fill_buf`] isn't called before calling it. 291 /// 292 /// The `amt` must be `<=` the number of bytes in the buffer returned by 293 /// [`poll_fill_buf`]. 294 /// 295 /// [`poll_read`]: AsyncRead::poll_read 296 /// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf consume(self: Pin<&mut Self>, amt: usize)297 fn consume(self: Pin<&mut Self>, amt: usize); 298 } 299 300 macro_rules! deref_async_read { 301 () => { 302 fn poll_read( 303 mut self: Pin<&mut Self>, 304 cx: &mut Context<'_>, 305 buf: &mut [u8], 306 ) -> Poll<Result<usize>> { 307 Pin::new(&mut **self).poll_read(cx, buf) 308 } 309 310 fn poll_read_vectored( 311 mut self: Pin<&mut Self>, 312 cx: &mut Context<'_>, 313 bufs: &mut [IoSliceMut<'_>], 314 ) -> Poll<Result<usize>> { 315 Pin::new(&mut **self).poll_read_vectored(cx, bufs) 316 } 317 }; 318 } 319 320 impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for Box<T> { 321 deref_async_read!(); 322 } 323 324 impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for &mut T { 325 deref_async_read!(); 326 } 327 328 impl<P> AsyncRead for Pin<P> 329 where 330 P: DerefMut + Unpin, 331 P::Target: AsyncRead, 332 { poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>>333 fn poll_read( 334 self: Pin<&mut Self>, 335 cx: &mut Context<'_>, 336 buf: &mut [u8], 337 ) -> Poll<Result<usize>> { 338 self.get_mut().as_mut().poll_read(cx, buf) 339 } 340 poll_read_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<Result<usize>>341 fn poll_read_vectored( 342 self: Pin<&mut Self>, 343 cx: &mut Context<'_>, 344 bufs: &mut [IoSliceMut<'_>], 345 ) -> Poll<Result<usize>> { 346 self.get_mut().as_mut().poll_read_vectored(cx, bufs) 347 } 348 } 349 350 macro_rules! delegate_async_read_to_stdio { 351 () => { 352 fn poll_read( 353 mut self: Pin<&mut Self>, 354 _: &mut Context<'_>, 355 buf: &mut [u8], 356 ) -> Poll<Result<usize>> { 357 Poll::Ready(io::Read::read(&mut *self, buf)) 358 } 359 360 fn poll_read_vectored( 361 mut self: Pin<&mut Self>, 362 _: &mut Context<'_>, 363 bufs: &mut [IoSliceMut<'_>], 364 ) -> Poll<Result<usize>> { 365 Poll::Ready(io::Read::read_vectored(&mut *self, bufs)) 366 } 367 }; 368 } 369 370 impl AsyncRead for &[u8] { 371 delegate_async_read_to_stdio!(); 372 } 373 374 macro_rules! deref_async_write { 375 () => { 376 fn poll_write( 377 mut self: Pin<&mut Self>, 378 cx: &mut Context<'_>, 379 buf: &[u8], 380 ) -> Poll<Result<usize>> { 381 Pin::new(&mut **self).poll_write(cx, buf) 382 } 383 384 fn poll_write_vectored( 385 mut self: Pin<&mut Self>, 386 cx: &mut Context<'_>, 387 bufs: &[IoSlice<'_>], 388 ) -> Poll<Result<usize>> { 389 Pin::new(&mut **self).poll_write_vectored(cx, bufs) 390 } 391 392 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 393 Pin::new(&mut **self).poll_flush(cx) 394 } 395 396 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 397 Pin::new(&mut **self).poll_close(cx) 398 } 399 }; 400 } 401 402 impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for Box<T> { 403 deref_async_write!(); 404 } 405 406 impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for &mut T { 407 deref_async_write!(); 408 } 409 410 impl<P> AsyncWrite for Pin<P> 411 where 412 P: DerefMut + Unpin, 413 P::Target: AsyncWrite, 414 { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize>>415 fn poll_write( 416 self: Pin<&mut Self>, 417 cx: &mut Context<'_>, 418 buf: &[u8], 419 ) -> Poll<Result<usize>> { 420 self.get_mut().as_mut().poll_write(cx, buf) 421 } 422 poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize>>423 fn poll_write_vectored( 424 self: Pin<&mut Self>, 425 cx: &mut Context<'_>, 426 bufs: &[IoSlice<'_>], 427 ) -> Poll<Result<usize>> { 428 self.get_mut().as_mut().poll_write_vectored(cx, bufs) 429 } 430 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>431 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 432 self.get_mut().as_mut().poll_flush(cx) 433 } 434 poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>435 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 436 self.get_mut().as_mut().poll_close(cx) 437 } 438 } 439 440 macro_rules! delegate_async_write_to_stdio { 441 () => { 442 fn poll_write( 443 mut self: Pin<&mut Self>, 444 _: &mut Context<'_>, 445 buf: &[u8], 446 ) -> Poll<Result<usize>> { 447 Poll::Ready(io::Write::write(&mut *self, buf)) 448 } 449 450 fn poll_write_vectored( 451 mut self: Pin<&mut Self>, 452 _: &mut Context<'_>, 453 bufs: &[IoSlice<'_>], 454 ) -> Poll<Result<usize>> { 455 Poll::Ready(io::Write::write_vectored(&mut *self, bufs)) 456 } 457 458 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> { 459 Poll::Ready(io::Write::flush(&mut *self)) 460 } 461 462 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 463 self.poll_flush(cx) 464 } 465 }; 466 } 467 468 impl AsyncWrite for Vec<u8> { 469 delegate_async_write_to_stdio!(); 470 } 471 472 macro_rules! deref_async_seek { 473 () => { 474 fn poll_seek( 475 mut self: Pin<&mut Self>, 476 cx: &mut Context<'_>, 477 pos: SeekFrom, 478 ) -> Poll<Result<u64>> { 479 Pin::new(&mut **self).poll_seek(cx, pos) 480 } 481 }; 482 } 483 484 impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for Box<T> { 485 deref_async_seek!(); 486 } 487 488 impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for &mut T { 489 deref_async_seek!(); 490 } 491 492 impl<P> AsyncSeek for Pin<P> 493 where 494 P: DerefMut + Unpin, 495 P::Target: AsyncSeek, 496 { poll_seek( self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<Result<u64>>497 fn poll_seek( 498 self: Pin<&mut Self>, 499 cx: &mut Context<'_>, 500 pos: SeekFrom, 501 ) -> Poll<Result<u64>> { 502 self.get_mut().as_mut().poll_seek(cx, pos) 503 } 504 } 505 506 macro_rules! deref_async_buf_read { 507 () => { 508 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { 509 Pin::new(&mut **self.get_mut()).poll_fill_buf(cx) 510 } 511 512 fn consume(mut self: Pin<&mut Self>, amt: usize) { 513 Pin::new(&mut **self).consume(amt) 514 } 515 }; 516 } 517 518 impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for Box<T> { 519 deref_async_buf_read!(); 520 } 521 522 impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for &mut T { 523 deref_async_buf_read!(); 524 } 525 526 impl<P> AsyncBufRead for Pin<P> 527 where 528 P: DerefMut + Unpin, 529 P::Target: AsyncBufRead, 530 { poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>531 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { 532 self.get_mut().as_mut().poll_fill_buf(cx) 533 } 534 consume(self: Pin<&mut Self>, amt: usize)535 fn consume(self: Pin<&mut Self>, amt: usize) { 536 self.get_mut().as_mut().consume(amt) 537 } 538 } 539 540 macro_rules! delegate_async_buf_read_to_stdio { 541 () => { 542 fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> { 543 Poll::Ready(io::BufRead::fill_buf(self.get_mut())) 544 } 545 546 fn consume(self: Pin<&mut Self>, amt: usize) { 547 io::BufRead::consume(self.get_mut(), amt) 548 } 549 }; 550 } 551 552 impl AsyncBufRead for &[u8] { 553 delegate_async_buf_read_to_stdio!(); 554 } 555 } 556 557 #[cfg(feature = "std")] 558 pub use self::if_std::*; 559