• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![allow(clippy::needless_doctest_main)]
2 #![warn(
3     missing_debug_implementations,
4     missing_docs,
5     rust_2018_idioms,
6     unreachable_pub
7 )]
8 #![doc(test(
9     no_crate_inject,
10     attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
11 ))]
12 #![cfg_attr(docsrs, feature(doc_cfg))]
13 
14 //! Utilities for working with Tokio.
15 //!
16 //! This crate is not versioned in lockstep with the core
17 //! [`tokio`] crate. However, `tokio-util` _will_ respect Rust's
18 //! semantic versioning policy, especially with regard to breaking changes.
19 //!
20 //! [`tokio`]: https://docs.rs/tokio
21 
22 #[macro_use]
23 mod cfg;
24 
25 mod loom;
26 
27 cfg_codec! {
28     pub mod codec;
29 }
30 
31 cfg_net! {
32     #[cfg(not(target_arch = "wasm32"))]
33     pub mod udp;
34     pub mod net;
35 }
36 
37 cfg_compat! {
38     pub mod compat;
39 }
40 
41 cfg_io! {
42     pub mod io;
43 }
44 
45 cfg_rt! {
46     pub mod context;
47     pub mod task;
48 }
49 
50 cfg_time! {
51     pub mod time;
52 }
53 
54 pub mod sync;
55 
56 pub mod either;
57 
58 #[cfg(any(feature = "io", feature = "codec"))]
59 mod util {
60     use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
61 
62     use bytes::{Buf, BufMut};
63     use futures_core::ready;
64     use std::io::{self, IoSlice};
65     use std::mem::MaybeUninit;
66     use std::pin::Pin;
67     use std::task::{Context, Poll};
68 
69     /// Try to read data from an `AsyncRead` into an implementer of the [`BufMut`] trait.
70     ///
71     /// [`BufMut`]: bytes::Buf
72     ///
73     /// # Example
74     ///
75     /// ```
76     /// use bytes::{Bytes, BytesMut};
77     /// use tokio_stream as stream;
78     /// use tokio::io::Result;
79     /// use tokio_util::io::{StreamReader, poll_read_buf};
80     /// use futures::future::poll_fn;
81     /// use std::pin::Pin;
82     /// # #[tokio::main]
83     /// # async fn main() -> std::io::Result<()> {
84     ///
85     /// // Create a reader from an iterator. This particular reader will always be
86     /// // ready.
87     /// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))]));
88     ///
89     /// let mut buf = BytesMut::new();
90     /// let mut reads = 0;
91     ///
92     /// loop {
93     ///     reads += 1;
94     ///     let n = poll_fn(|cx| poll_read_buf(Pin::new(&mut read), cx, &mut buf)).await?;
95     ///
96     ///     if n == 0 {
97     ///         break;
98     ///     }
99     /// }
100     ///
101     /// // one or more reads might be necessary.
102     /// assert!(reads >= 1);
103     /// assert_eq!(&buf[..], &[0, 1, 2, 3]);
104     /// # Ok(())
105     /// # }
106     /// ```
107     #[cfg_attr(not(feature = "io"), allow(unreachable_pub))]
poll_read_buf<T: AsyncRead, B: BufMut>( io: Pin<&mut T>, cx: &mut Context<'_>, buf: &mut B, ) -> Poll<io::Result<usize>>108     pub fn poll_read_buf<T: AsyncRead, B: BufMut>(
109         io: Pin<&mut T>,
110         cx: &mut Context<'_>,
111         buf: &mut B,
112     ) -> Poll<io::Result<usize>> {
113         if !buf.has_remaining_mut() {
114             return Poll::Ready(Ok(0));
115         }
116 
117         let n = {
118             let dst = buf.chunk_mut();
119 
120             // Safety: `chunk_mut()` returns a `&mut UninitSlice`, and `UninitSlice` is a
121             // transparent wrapper around `[MaybeUninit<u8>]`.
122             let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
123             let mut buf = ReadBuf::uninit(dst);
124             let ptr = buf.filled().as_ptr();
125             ready!(io.poll_read(cx, &mut buf)?);
126 
127             // Ensure the pointer does not change from under us
128             assert_eq!(ptr, buf.filled().as_ptr());
129             buf.filled().len()
130         };
131 
132         // Safety: This is guaranteed to be the number of initialized (and read)
133         // bytes due to the invariants provided by `ReadBuf::filled`.
134         unsafe {
135             buf.advance_mut(n);
136         }
137 
138         Poll::Ready(Ok(n))
139     }
140 
141     /// Try to write data from an implementer of the [`Buf`] trait to an
142     /// [`AsyncWrite`], advancing the buffer's internal cursor.
143     ///
144     /// This function will use [vectored writes] when the [`AsyncWrite`] supports
145     /// vectored writes.
146     ///
147     /// # Examples
148     ///
149     /// [`File`] implements [`AsyncWrite`] and [`Cursor<&[u8]>`] implements
150     /// [`Buf`]:
151     ///
152     /// ```no_run
153     /// use tokio_util::io::poll_write_buf;
154     /// use tokio::io;
155     /// use tokio::fs::File;
156     ///
157     /// use bytes::Buf;
158     /// use std::io::Cursor;
159     /// use std::pin::Pin;
160     /// use futures::future::poll_fn;
161     ///
162     /// #[tokio::main]
163     /// async fn main() -> io::Result<()> {
164     ///     let mut file = File::create("foo.txt").await?;
165     ///     let mut buf = Cursor::new(b"data to write");
166     ///
167     ///     // Loop until the entire contents of the buffer are written to
168     ///     // the file.
169     ///     while buf.has_remaining() {
170     ///         poll_fn(|cx| poll_write_buf(Pin::new(&mut file), cx, &mut buf)).await?;
171     ///     }
172     ///
173     ///     Ok(())
174     /// }
175     /// ```
176     ///
177     /// [`Buf`]: bytes::Buf
178     /// [`AsyncWrite`]: tokio::io::AsyncWrite
179     /// [`File`]: tokio::fs::File
180     /// [vectored writes]: tokio::io::AsyncWrite::poll_write_vectored
181     #[cfg_attr(not(feature = "io"), allow(unreachable_pub))]
poll_write_buf<T: AsyncWrite, B: Buf>( io: Pin<&mut T>, cx: &mut Context<'_>, buf: &mut B, ) -> Poll<io::Result<usize>>182     pub fn poll_write_buf<T: AsyncWrite, B: Buf>(
183         io: Pin<&mut T>,
184         cx: &mut Context<'_>,
185         buf: &mut B,
186     ) -> Poll<io::Result<usize>> {
187         const MAX_BUFS: usize = 64;
188 
189         if !buf.has_remaining() {
190             return Poll::Ready(Ok(0));
191         }
192 
193         let n = if io.is_write_vectored() {
194             let mut slices = [IoSlice::new(&[]); MAX_BUFS];
195             let cnt = buf.chunks_vectored(&mut slices);
196             ready!(io.poll_write_vectored(cx, &slices[..cnt]))?
197         } else {
198             ready!(io.poll_write(cx, buf.chunk()))?
199         };
200 
201         buf.advance(n);
202 
203         Poll::Ready(Ok(n))
204     }
205 }
206