• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Asynchronous I/O
2 //!
3 //! This crate contains the `AsyncRead`, `AsyncWrite`, `AsyncSeek`, and
4 //! `AsyncBufRead` traits, the asynchronous analogs to
5 //! `std::io::{Read, Write, Seek, BufRead}`. The primary difference is
6 //! that these traits integrate with the asynchronous task system.
7 //!
8 //! All items of this library are only available when the `std` feature of this
9 //! library is activated, and it is activated by default.
10 
11 #![cfg_attr(not(feature = "std"), no_std)]
12 #![warn(missing_debug_implementations, missing_docs, rust_2018_idioms, unreachable_pub)]
13 // It cannot be included in the published code because this lints have false positives in the minimum required version.
14 #![cfg_attr(test, warn(single_use_lifetimes))]
15 #![doc(test(
16     no_crate_inject,
17     attr(
18         deny(warnings, rust_2018_idioms, single_use_lifetimes),
19         allow(dead_code, unused_assignments, unused_variables)
20     )
21 ))]
22 #![cfg_attr(docsrs, feature(doc_cfg))]
23 
24 #[cfg(feature = "std")]
25 mod if_std {
26     use std::io;
27     use std::ops::DerefMut;
28     use std::pin::Pin;
29     use std::task::{Context, Poll};
30 
31     // Re-export some types from `std::io` so that users don't have to deal
32     // with conflicts when `use`ing `futures::io` and `std::io`.
33     #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
34     #[doc(no_inline)]
35     pub use io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
36 
37     /// Read bytes asynchronously.
38     ///
39     /// This trait is analogous to the `std::io::Read` trait, but integrates
40     /// with the asynchronous task system. In particular, the `poll_read`
41     /// method, unlike `Read::read`, will automatically queue the current task
42     /// for wakeup and return if data is not yet available, rather than blocking
43     /// the calling thread.
44     pub trait AsyncRead {
45         /// Attempt to read from the `AsyncRead` into `buf`.
46         ///
47         /// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
48         ///
49         /// If no data is available for reading, the method returns
50         /// `Poll::Pending` and arranges for the current task (via
51         /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
52         /// readable or is closed.
53         ///
54         /// # Implementation
55         ///
56         /// This function may not return errors of kind `WouldBlock` or
57         /// `Interrupted`.  Implementations must convert `WouldBlock` into
58         /// `Poll::Pending` and either internally retry or convert
59         /// `Interrupted` into another error kind.
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>>60         fn poll_read(
61             self: Pin<&mut Self>,
62             cx: &mut Context<'_>,
63             buf: &mut [u8],
64         ) -> Poll<Result<usize>>;
65 
66         /// Attempt to read from the `AsyncRead` into `bufs` using vectored
67         /// IO operations.
68         ///
69         /// This method is similar to `poll_read`, but allows data to be read
70         /// into multiple buffers using a single operation.
71         ///
72         /// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
73         ///
74         /// If no data is available for reading, the method returns
75         /// `Poll::Pending` and arranges for the current task (via
76         /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
77         /// readable or is closed.
78         /// By default, this method delegates to using `poll_read` on the first
79         /// nonempty buffer in `bufs`, or an empty one if none exists. Objects which
80         /// support vectored IO should override this method.
81         ///
82         /// # Implementation
83         ///
84         /// This function may not return errors of kind `WouldBlock` or
85         /// `Interrupted`.  Implementations must convert `WouldBlock` into
86         /// `Poll::Pending` and either internally retry or convert
87         /// `Interrupted` into another error kind.
poll_read_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<Result<usize>>88         fn poll_read_vectored(
89             self: Pin<&mut Self>,
90             cx: &mut Context<'_>,
91             bufs: &mut [IoSliceMut<'_>],
92         ) -> Poll<Result<usize>> {
93             for b in bufs {
94                 if !b.is_empty() {
95                     return self.poll_read(cx, b);
96                 }
97             }
98 
99             self.poll_read(cx, &mut [])
100         }
101     }
102 
103     /// Write bytes asynchronously.
104     ///
105     /// This trait is analogous to the `std::io::Write` trait, but integrates
106     /// with the asynchronous task system. In particular, the `poll_write`
107     /// method, unlike `Write::write`, will automatically queue the current task
108     /// for wakeup and return if the writer cannot take more data, rather than blocking
109     /// the calling thread.
110     pub trait AsyncWrite {
111         /// Attempt to write bytes from `buf` into the object.
112         ///
113         /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
114         ///
115         /// If the object is not ready for writing, the method returns
116         /// `Poll::Pending` and arranges for the current task (via
117         /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
118         /// writable or is closed.
119         ///
120         /// # Implementation
121         ///
122         /// This function may not return errors of kind `WouldBlock` or
123         /// `Interrupted`.  Implementations must convert `WouldBlock` into
124         /// `Poll::Pending` and either internally retry or convert
125         /// `Interrupted` into another error kind.
126         ///
127         /// `poll_write` must try to make progress by flushing the underlying object if
128         /// that is the only way the underlying object can become writable again.
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize>>129         fn poll_write(
130             self: Pin<&mut Self>,
131             cx: &mut Context<'_>,
132             buf: &[u8],
133         ) -> Poll<Result<usize>>;
134 
135         /// Attempt to write bytes from `bufs` into the object using vectored
136         /// IO operations.
137         ///
138         /// This method is similar to `poll_write`, but allows data from multiple buffers to be written
139         /// using a single operation.
140         ///
141         /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
142         ///
143         /// If the object is not ready for writing, the method returns
144         /// `Poll::Pending` and arranges for the current task (via
145         /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
146         /// writable or is closed.
147         ///
148         /// By default, this method delegates to using `poll_write` on the first
149         /// nonempty buffer in `bufs`, or an empty one if none exists. Objects which
150         /// support vectored IO should override this method.
151         ///
152         /// # Implementation
153         ///
154         /// This function may not return errors of kind `WouldBlock` or
155         /// `Interrupted`.  Implementations must convert `WouldBlock` into
156         /// `Poll::Pending` and either internally retry or convert
157         /// `Interrupted` into another error kind.
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize>>158         fn poll_write_vectored(
159             self: Pin<&mut Self>,
160             cx: &mut Context<'_>,
161             bufs: &[IoSlice<'_>],
162         ) -> Poll<Result<usize>> {
163             for b in bufs {
164                 if !b.is_empty() {
165                     return self.poll_write(cx, b);
166                 }
167             }
168 
169             self.poll_write(cx, &[])
170         }
171 
172         /// Attempt to flush the object, ensuring that any buffered data reach
173         /// their destination.
174         ///
175         /// On success, returns `Poll::Ready(Ok(()))`.
176         ///
177         /// If flushing cannot immediately complete, this method returns
178         /// `Poll::Pending` and arranges for the current task (via
179         /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
180         /// progress towards flushing.
181         ///
182         /// # Implementation
183         ///
184         /// This function may not return errors of kind `WouldBlock` or
185         /// `Interrupted`.  Implementations must convert `WouldBlock` into
186         /// `Poll::Pending` and either internally retry or convert
187         /// `Interrupted` into another error kind.
188         ///
189         /// It only makes sense to do anything here if you actually buffer data.
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>190         fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
191 
192         /// Attempt to close the object.
193         ///
194         /// On success, returns `Poll::Ready(Ok(()))`.
195         ///
196         /// If closing cannot immediately complete, this function returns
197         /// `Poll::Pending` and arranges for the current task (via
198         /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
199         /// progress towards closing.
200         ///
201         /// # Implementation
202         ///
203         /// This function may not return errors of kind `WouldBlock` or
204         /// `Interrupted`.  Implementations must convert `WouldBlock` into
205         /// `Poll::Pending` and either internally retry or convert
206         /// `Interrupted` into another error kind.
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>207         fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
208     }
209 
210     /// Seek bytes asynchronously.
211     ///
212     /// This trait is analogous to the `std::io::Seek` trait, but integrates
213     /// with the asynchronous task system. In particular, the `poll_seek`
214     /// method, unlike `Seek::seek`, will automatically queue the current task
215     /// for wakeup and return if data is not yet available, rather than blocking
216     /// the calling thread.
217     pub trait AsyncSeek {
218         /// Attempt to seek to an offset, in bytes, in a stream.
219         ///
220         /// A seek beyond the end of a stream is allowed, but behavior is defined
221         /// by the implementation.
222         ///
223         /// If the seek operation completed successfully,
224         /// this method returns the new position from the start of the stream.
225         /// That position can be used later with [`SeekFrom::Start`].
226         ///
227         /// # Errors
228         ///
229         /// Seeking to a negative offset is considered an error.
230         ///
231         /// # Implementation
232         ///
233         /// This function may not return errors of kind `WouldBlock` or
234         /// `Interrupted`.  Implementations must convert `WouldBlock` into
235         /// `Poll::Pending` and either internally retry or convert
236         /// `Interrupted` into another error kind.
poll_seek( self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<Result<u64>>237         fn poll_seek(
238             self: Pin<&mut Self>,
239             cx: &mut Context<'_>,
240             pos: SeekFrom,
241         ) -> Poll<Result<u64>>;
242     }
243 
244     /// Read bytes asynchronously.
245     ///
246     /// This trait is analogous to the `std::io::BufRead` trait, but integrates
247     /// with the asynchronous task system. In particular, the `poll_fill_buf`
248     /// method, unlike `BufRead::fill_buf`, will automatically queue the current task
249     /// for wakeup and return if data is not yet available, rather than blocking
250     /// the calling thread.
251     pub trait AsyncBufRead: AsyncRead {
252         /// Attempt to return the contents of the internal buffer, filling it with more data
253         /// from the inner reader if it is empty.
254         ///
255         /// On success, returns `Poll::Ready(Ok(buf))`.
256         ///
257         /// If no data is available for reading, the method returns
258         /// `Poll::Pending` and arranges for the current task (via
259         /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
260         /// readable or is closed.
261         ///
262         /// This function is a lower-level call. It needs to be paired with the
263         /// [`consume`] method to function properly. When calling this
264         /// method, none of the contents will be "read" in the sense that later
265         /// calling [`poll_read`] may return the same contents. As such, [`consume`] must
266         /// be called with the number of bytes that are consumed from this buffer to
267         /// ensure that the bytes are never returned twice.
268         ///
269         /// [`poll_read`]: AsyncRead::poll_read
270         /// [`consume`]: AsyncBufRead::consume
271         ///
272         /// An empty buffer returned indicates that the stream has reached EOF.
273         ///
274         /// # Implementation
275         ///
276         /// This function may not return errors of kind `WouldBlock` or
277         /// `Interrupted`.  Implementations must convert `WouldBlock` into
278         /// `Poll::Pending` and either internally retry or convert
279         /// `Interrupted` into another error kind.
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>280         fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>;
281 
282         /// Tells this buffer that `amt` bytes have been consumed from the buffer,
283         /// so they should no longer be returned in calls to [`poll_read`].
284         ///
285         /// This function is a lower-level call. It needs to be paired with the
286         /// [`poll_fill_buf`] method to function properly. This function does
287         /// not perform any I/O, it simply informs this object that some amount of
288         /// its buffer, returned from [`poll_fill_buf`], has been consumed and should
289         /// no longer be returned. As such, this function may do odd things if
290         /// [`poll_fill_buf`] isn't called before calling it.
291         ///
292         /// The `amt` must be `<=` the number of bytes in the buffer returned by
293         /// [`poll_fill_buf`].
294         ///
295         /// [`poll_read`]: AsyncRead::poll_read
296         /// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf
consume(self: Pin<&mut Self>, amt: usize)297         fn consume(self: Pin<&mut Self>, amt: usize);
298     }
299 
300     macro_rules! deref_async_read {
301         () => {
302             fn poll_read(
303                 mut self: Pin<&mut Self>,
304                 cx: &mut Context<'_>,
305                 buf: &mut [u8],
306             ) -> Poll<Result<usize>> {
307                 Pin::new(&mut **self).poll_read(cx, buf)
308             }
309 
310             fn poll_read_vectored(
311                 mut self: Pin<&mut Self>,
312                 cx: &mut Context<'_>,
313                 bufs: &mut [IoSliceMut<'_>],
314             ) -> Poll<Result<usize>> {
315                 Pin::new(&mut **self).poll_read_vectored(cx, bufs)
316             }
317         };
318     }
319 
320     impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for Box<T> {
321         deref_async_read!();
322     }
323 
324     impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for &mut T {
325         deref_async_read!();
326     }
327 
328     impl<P> AsyncRead for Pin<P>
329     where
330         P: DerefMut + Unpin,
331         P::Target: AsyncRead,
332     {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>>333         fn poll_read(
334             self: Pin<&mut Self>,
335             cx: &mut Context<'_>,
336             buf: &mut [u8],
337         ) -> Poll<Result<usize>> {
338             self.get_mut().as_mut().poll_read(cx, buf)
339         }
340 
poll_read_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<Result<usize>>341         fn poll_read_vectored(
342             self: Pin<&mut Self>,
343             cx: &mut Context<'_>,
344             bufs: &mut [IoSliceMut<'_>],
345         ) -> Poll<Result<usize>> {
346             self.get_mut().as_mut().poll_read_vectored(cx, bufs)
347         }
348     }
349 
350     macro_rules! delegate_async_read_to_stdio {
351         () => {
352             fn poll_read(
353                 mut self: Pin<&mut Self>,
354                 _: &mut Context<'_>,
355                 buf: &mut [u8],
356             ) -> Poll<Result<usize>> {
357                 Poll::Ready(io::Read::read(&mut *self, buf))
358             }
359 
360             fn poll_read_vectored(
361                 mut self: Pin<&mut Self>,
362                 _: &mut Context<'_>,
363                 bufs: &mut [IoSliceMut<'_>],
364             ) -> Poll<Result<usize>> {
365                 Poll::Ready(io::Read::read_vectored(&mut *self, bufs))
366             }
367         };
368     }
369 
370     impl AsyncRead for &[u8] {
371         delegate_async_read_to_stdio!();
372     }
373 
374     macro_rules! deref_async_write {
375         () => {
376             fn poll_write(
377                 mut self: Pin<&mut Self>,
378                 cx: &mut Context<'_>,
379                 buf: &[u8],
380             ) -> Poll<Result<usize>> {
381                 Pin::new(&mut **self).poll_write(cx, buf)
382             }
383 
384             fn poll_write_vectored(
385                 mut self: Pin<&mut Self>,
386                 cx: &mut Context<'_>,
387                 bufs: &[IoSlice<'_>],
388             ) -> Poll<Result<usize>> {
389                 Pin::new(&mut **self).poll_write_vectored(cx, bufs)
390             }
391 
392             fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
393                 Pin::new(&mut **self).poll_flush(cx)
394             }
395 
396             fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
397                 Pin::new(&mut **self).poll_close(cx)
398             }
399         };
400     }
401 
402     impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for Box<T> {
403         deref_async_write!();
404     }
405 
406     impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for &mut T {
407         deref_async_write!();
408     }
409 
410     impl<P> AsyncWrite for Pin<P>
411     where
412         P: DerefMut + Unpin,
413         P::Target: AsyncWrite,
414     {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize>>415         fn poll_write(
416             self: Pin<&mut Self>,
417             cx: &mut Context<'_>,
418             buf: &[u8],
419         ) -> Poll<Result<usize>> {
420             self.get_mut().as_mut().poll_write(cx, buf)
421         }
422 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize>>423         fn poll_write_vectored(
424             self: Pin<&mut Self>,
425             cx: &mut Context<'_>,
426             bufs: &[IoSlice<'_>],
427         ) -> Poll<Result<usize>> {
428             self.get_mut().as_mut().poll_write_vectored(cx, bufs)
429         }
430 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>431         fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
432             self.get_mut().as_mut().poll_flush(cx)
433         }
434 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>435         fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
436             self.get_mut().as_mut().poll_close(cx)
437         }
438     }
439 
440     macro_rules! delegate_async_write_to_stdio {
441         () => {
442             fn poll_write(
443                 mut self: Pin<&mut Self>,
444                 _: &mut Context<'_>,
445                 buf: &[u8],
446             ) -> Poll<Result<usize>> {
447                 Poll::Ready(io::Write::write(&mut *self, buf))
448             }
449 
450             fn poll_write_vectored(
451                 mut self: Pin<&mut Self>,
452                 _: &mut Context<'_>,
453                 bufs: &[IoSlice<'_>],
454             ) -> Poll<Result<usize>> {
455                 Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
456             }
457 
458             fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
459                 Poll::Ready(io::Write::flush(&mut *self))
460             }
461 
462             fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
463                 self.poll_flush(cx)
464             }
465         };
466     }
467 
468     impl AsyncWrite for Vec<u8> {
469         delegate_async_write_to_stdio!();
470     }
471 
472     macro_rules! deref_async_seek {
473         () => {
474             fn poll_seek(
475                 mut self: Pin<&mut Self>,
476                 cx: &mut Context<'_>,
477                 pos: SeekFrom,
478             ) -> Poll<Result<u64>> {
479                 Pin::new(&mut **self).poll_seek(cx, pos)
480             }
481         };
482     }
483 
484     impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for Box<T> {
485         deref_async_seek!();
486     }
487 
488     impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for &mut T {
489         deref_async_seek!();
490     }
491 
492     impl<P> AsyncSeek for Pin<P>
493     where
494         P: DerefMut + Unpin,
495         P::Target: AsyncSeek,
496     {
poll_seek( self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<Result<u64>>497         fn poll_seek(
498             self: Pin<&mut Self>,
499             cx: &mut Context<'_>,
500             pos: SeekFrom,
501         ) -> Poll<Result<u64>> {
502             self.get_mut().as_mut().poll_seek(cx, pos)
503         }
504     }
505 
506     macro_rules! deref_async_buf_read {
507         () => {
508             fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
509                 Pin::new(&mut **self.get_mut()).poll_fill_buf(cx)
510             }
511 
512             fn consume(mut self: Pin<&mut Self>, amt: usize) {
513                 Pin::new(&mut **self).consume(amt)
514             }
515         };
516     }
517 
518     impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for Box<T> {
519         deref_async_buf_read!();
520     }
521 
522     impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for &mut T {
523         deref_async_buf_read!();
524     }
525 
526     impl<P> AsyncBufRead for Pin<P>
527     where
528         P: DerefMut + Unpin,
529         P::Target: AsyncBufRead,
530     {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>531         fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
532             self.get_mut().as_mut().poll_fill_buf(cx)
533         }
534 
consume(self: Pin<&mut Self>, amt: usize)535         fn consume(self: Pin<&mut Self>, amt: usize) {
536             self.get_mut().as_mut().consume(amt)
537         }
538     }
539 
540     macro_rules! delegate_async_buf_read_to_stdio {
541         () => {
542             fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> {
543                 Poll::Ready(io::BufRead::fill_buf(self.get_mut()))
544             }
545 
546             fn consume(self: Pin<&mut Self>, amt: usize) {
547                 io::BufRead::consume(self.get_mut(), amt)
548             }
549         };
550     }
551 
552     impl AsyncBufRead for &[u8] {
553         delegate_async_buf_read_to_stdio!();
554     }
555 }
556 
557 #[cfg(feature = "std")]
558 pub use self::if_std::*;
559