1 #![allow(clippy::needless_doctest_main)] 2 #![warn( 3 missing_debug_implementations, 4 missing_docs, 5 rust_2018_idioms, 6 unreachable_pub 7 )] 8 #![doc(test( 9 no_crate_inject, 10 attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) 11 ))] 12 #![cfg_attr(docsrs, feature(doc_cfg))] 13 14 //! Utilities for working with Tokio. 15 //! 16 //! This crate is not versioned in lockstep with the core 17 //! [`tokio`] crate. However, `tokio-util` _will_ respect Rust's 18 //! semantic versioning policy, especially with regard to breaking changes. 19 //! 20 //! [`tokio`]: https://docs.rs/tokio 21 22 #[macro_use] 23 mod cfg; 24 25 mod loom; 26 27 cfg_codec! { 28 pub mod codec; 29 } 30 31 cfg_net! { 32 #[cfg(not(target_arch = "wasm32"))] 33 pub mod udp; 34 pub mod net; 35 } 36 37 cfg_compat! { 38 pub mod compat; 39 } 40 41 cfg_io! { 42 pub mod io; 43 } 44 45 cfg_rt! { 46 pub mod context; 47 pub mod task; 48 } 49 50 cfg_time! { 51 pub mod time; 52 } 53 54 pub mod sync; 55 56 pub mod either; 57 58 #[cfg(any(feature = "io", feature = "codec"))] 59 mod util { 60 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; 61 62 use bytes::{Buf, BufMut}; 63 use futures_core::ready; 64 use std::io::{self, IoSlice}; 65 use std::mem::MaybeUninit; 66 use std::pin::Pin; 67 use std::task::{Context, Poll}; 68 69 /// Try to read data from an `AsyncRead` into an implementer of the [`BufMut`] trait. 70 /// 71 /// [`BufMut`]: bytes::Buf 72 /// 73 /// # Example 74 /// 75 /// ``` 76 /// use bytes::{Bytes, BytesMut}; 77 /// use tokio_stream as stream; 78 /// use tokio::io::Result; 79 /// use tokio_util::io::{StreamReader, poll_read_buf}; 80 /// use futures::future::poll_fn; 81 /// use std::pin::Pin; 82 /// # #[tokio::main] 83 /// # async fn main() -> std::io::Result<()> { 84 /// 85 /// // Create a reader from an iterator. This particular reader will always be 86 /// // ready. 87 /// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))])); 88 /// 89 /// let mut buf = BytesMut::new(); 90 /// let mut reads = 0; 91 /// 92 /// loop { 93 /// reads += 1; 94 /// let n = poll_fn(|cx| poll_read_buf(Pin::new(&mut read), cx, &mut buf)).await?; 95 /// 96 /// if n == 0 { 97 /// break; 98 /// } 99 /// } 100 /// 101 /// // one or more reads might be necessary. 102 /// assert!(reads >= 1); 103 /// assert_eq!(&buf[..], &[0, 1, 2, 3]); 104 /// # Ok(()) 105 /// # } 106 /// ``` 107 #[cfg_attr(not(feature = "io"), allow(unreachable_pub))] poll_read_buf<T: AsyncRead, B: BufMut>( io: Pin<&mut T>, cx: &mut Context<'_>, buf: &mut B, ) -> Poll<io::Result<usize>>108 pub fn poll_read_buf<T: AsyncRead, B: BufMut>( 109 io: Pin<&mut T>, 110 cx: &mut Context<'_>, 111 buf: &mut B, 112 ) -> Poll<io::Result<usize>> { 113 if !buf.has_remaining_mut() { 114 return Poll::Ready(Ok(0)); 115 } 116 117 let n = { 118 let dst = buf.chunk_mut(); 119 120 // Safety: `chunk_mut()` returns a `&mut UninitSlice`, and `UninitSlice` is a 121 // transparent wrapper around `[MaybeUninit<u8>]`. 122 let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) }; 123 let mut buf = ReadBuf::uninit(dst); 124 let ptr = buf.filled().as_ptr(); 125 ready!(io.poll_read(cx, &mut buf)?); 126 127 // Ensure the pointer does not change from under us 128 assert_eq!(ptr, buf.filled().as_ptr()); 129 buf.filled().len() 130 }; 131 132 // Safety: This is guaranteed to be the number of initialized (and read) 133 // bytes due to the invariants provided by `ReadBuf::filled`. 134 unsafe { 135 buf.advance_mut(n); 136 } 137 138 Poll::Ready(Ok(n)) 139 } 140 141 /// Try to write data from an implementer of the [`Buf`] trait to an 142 /// [`AsyncWrite`], advancing the buffer's internal cursor. 143 /// 144 /// This function will use [vectored writes] when the [`AsyncWrite`] supports 145 /// vectored writes. 146 /// 147 /// # Examples 148 /// 149 /// [`File`] implements [`AsyncWrite`] and [`Cursor<&[u8]>`] implements 150 /// [`Buf`]: 151 /// 152 /// ```no_run 153 /// use tokio_util::io::poll_write_buf; 154 /// use tokio::io; 155 /// use tokio::fs::File; 156 /// 157 /// use bytes::Buf; 158 /// use std::io::Cursor; 159 /// use std::pin::Pin; 160 /// use futures::future::poll_fn; 161 /// 162 /// #[tokio::main] 163 /// async fn main() -> io::Result<()> { 164 /// let mut file = File::create("foo.txt").await?; 165 /// let mut buf = Cursor::new(b"data to write"); 166 /// 167 /// // Loop until the entire contents of the buffer are written to 168 /// // the file. 169 /// while buf.has_remaining() { 170 /// poll_fn(|cx| poll_write_buf(Pin::new(&mut file), cx, &mut buf)).await?; 171 /// } 172 /// 173 /// Ok(()) 174 /// } 175 /// ``` 176 /// 177 /// [`Buf`]: bytes::Buf 178 /// [`AsyncWrite`]: tokio::io::AsyncWrite 179 /// [`File`]: tokio::fs::File 180 /// [vectored writes]: tokio::io::AsyncWrite::poll_write_vectored 181 #[cfg_attr(not(feature = "io"), allow(unreachable_pub))] poll_write_buf<T: AsyncWrite, B: Buf>( io: Pin<&mut T>, cx: &mut Context<'_>, buf: &mut B, ) -> Poll<io::Result<usize>>182 pub fn poll_write_buf<T: AsyncWrite, B: Buf>( 183 io: Pin<&mut T>, 184 cx: &mut Context<'_>, 185 buf: &mut B, 186 ) -> Poll<io::Result<usize>> { 187 const MAX_BUFS: usize = 64; 188 189 if !buf.has_remaining() { 190 return Poll::Ready(Ok(0)); 191 } 192 193 let n = if io.is_write_vectored() { 194 let mut slices = [IoSlice::new(&[]); MAX_BUFS]; 195 let cnt = buf.chunks_vectored(&mut slices); 196 ready!(io.poll_write_vectored(cx, &slices[..cnt]))? 197 } else { 198 ready!(io.poll_write(cx, buf.chunk()))? 199 }; 200 201 buf.advance(n); 202 203 Poll::Ready(Ok(n)) 204 } 205 } 206