1 //! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s, 2 //! and the `AsyncRead` and `AsyncWrite` traits. 3 4 #![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))] 5 #![cfg_attr(feature = "read-initializer", feature(read_initializer))] 6 #![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))] 7 #![cfg_attr(not(feature = "std"), no_std)] 8 #![warn( 9 missing_docs, 10 missing_debug_implementations, 11 rust_2018_idioms, 12 unreachable_pub 13 )] 14 // It cannot be included in the published code because this lints have false positives in the minimum required version. 15 #![cfg_attr(test, warn(single_use_lifetimes))] 16 #![warn(clippy::all)] 17 #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] 18 #![cfg_attr(docsrs, feature(doc_cfg))] 19 20 #[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] 21 compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features"); 22 23 #[cfg(all(feature = "bilock", not(feature = "unstable")))] 24 compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); 25 26 #[cfg(all(feature = "read-initializer", not(feature = "unstable")))] 27 compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features"); 28 29 #[cfg(feature = "alloc")] 30 extern crate alloc; 31 32 // Macro re-exports 33 pub use futures_core::ready; 34 pub use pin_utils::pin_mut; 35 36 #[cfg(feature = "async-await")] 37 #[macro_use] 38 mod async_await; 39 #[cfg(feature = "async-await")] 40 #[doc(hidden)] 41 pub use self::async_await::*; 42 43 // Not public API. 44 #[cfg(feature = "async-await")] 45 #[doc(hidden)] 46 pub mod __private { 47 pub use crate::*; 48 pub use core::{ 49 option::Option::{self, None, Some}, 50 pin::Pin, 51 result::Result::{Err, Ok}, 52 }; 53 54 pub mod async_await { 55 pub use crate::async_await::*; 56 } 57 } 58 59 macro_rules! cfg_target_has_atomic { 60 ($($item:item)*) => {$( 61 #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] 62 $item 63 )*}; 64 } 65 66 #[cfg(feature = "sink")] 67 macro_rules! delegate_sink { 68 ($field:ident, $item:ty) => { 69 fn poll_ready( 70 self: core::pin::Pin<&mut Self>, 71 cx: &mut core::task::Context<'_>, 72 ) -> core::task::Poll<Result<(), Self::Error>> { 73 self.project().$field.poll_ready(cx) 74 } 75 76 fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> { 77 self.project().$field.start_send(item) 78 } 79 80 fn poll_flush( 81 self: core::pin::Pin<&mut Self>, 82 cx: &mut core::task::Context<'_>, 83 ) -> core::task::Poll<Result<(), Self::Error>> { 84 self.project().$field.poll_flush(cx) 85 } 86 87 fn poll_close( 88 self: core::pin::Pin<&mut Self>, 89 cx: &mut core::task::Context<'_>, 90 ) -> core::task::Poll<Result<(), Self::Error>> { 91 self.project().$field.poll_close(cx) 92 } 93 }; 94 } 95 96 macro_rules! delegate_future { 97 ($field:ident) => { 98 fn poll( 99 self: core::pin::Pin<&mut Self>, 100 cx: &mut core::task::Context<'_>, 101 ) -> core::task::Poll<Self::Output> { 102 self.project().$field.poll(cx) 103 } 104 }; 105 } 106 107 macro_rules! delegate_stream { 108 ($field:ident) => { 109 fn poll_next( 110 self: core::pin::Pin<&mut Self>, 111 cx: &mut core::task::Context<'_>, 112 ) -> core::task::Poll<Option<Self::Item>> { 113 self.project().$field.poll_next(cx) 114 } 115 fn size_hint(&self) -> (usize, Option<usize>) { 116 self.$field.size_hint() 117 } 118 }; 119 } 120 121 #[cfg(feature = "io")] 122 #[cfg(feature = "std")] 123 macro_rules! delegate_async_write { 124 ($field:ident) => { 125 fn poll_write( 126 self: core::pin::Pin<&mut Self>, 127 cx: &mut core::task::Context<'_>, 128 buf: &[u8], 129 ) -> core::task::Poll<std::io::Result<usize>> { 130 self.project().$field.poll_write(cx, buf) 131 } 132 fn poll_write_vectored( 133 self: core::pin::Pin<&mut Self>, 134 cx: &mut core::task::Context<'_>, 135 bufs: &[std::io::IoSlice<'_>], 136 ) -> core::task::Poll<std::io::Result<usize>> { 137 self.project().$field.poll_write_vectored(cx, bufs) 138 } 139 fn poll_flush( 140 self: core::pin::Pin<&mut Self>, 141 cx: &mut core::task::Context<'_>, 142 ) -> core::task::Poll<std::io::Result<()>> { 143 self.project().$field.poll_flush(cx) 144 } 145 fn poll_close( 146 self: core::pin::Pin<&mut Self>, 147 cx: &mut core::task::Context<'_>, 148 ) -> core::task::Poll<std::io::Result<()>> { 149 self.project().$field.poll_close(cx) 150 } 151 }; 152 } 153 154 #[cfg(feature = "io")] 155 #[cfg(feature = "std")] 156 macro_rules! delegate_async_read { 157 ($field:ident) => { 158 #[cfg(feature = "read-initializer")] 159 unsafe fn initializer(&self) -> $crate::io::Initializer { 160 self.$field.initializer() 161 } 162 163 fn poll_read( 164 self: core::pin::Pin<&mut Self>, 165 cx: &mut core::task::Context<'_>, 166 buf: &mut [u8], 167 ) -> core::task::Poll<std::io::Result<usize>> { 168 self.project().$field.poll_read(cx, buf) 169 } 170 171 fn poll_read_vectored( 172 self: core::pin::Pin<&mut Self>, 173 cx: &mut core::task::Context<'_>, 174 bufs: &mut [std::io::IoSliceMut<'_>], 175 ) -> core::task::Poll<std::io::Result<usize>> { 176 self.project().$field.poll_read_vectored(cx, bufs) 177 } 178 }; 179 } 180 181 #[cfg(feature = "io")] 182 #[cfg(feature = "std")] 183 macro_rules! delegate_async_buf_read { 184 ($field:ident) => { 185 fn poll_fill_buf( 186 self: core::pin::Pin<&mut Self>, 187 cx: &mut core::task::Context<'_>, 188 ) -> core::task::Poll<std::io::Result<&[u8]>> { 189 self.project().$field.poll_fill_buf(cx) 190 } 191 192 fn consume(self: core::pin::Pin<&mut Self>, amt: usize) { 193 self.project().$field.consume(amt) 194 } 195 }; 196 } 197 198 macro_rules! delegate_access_inner { 199 ($field:ident, $inner:ty, ($($ind:tt)*)) => { 200 /// Acquires a reference to the underlying sink or stream that this combinator is 201 /// pulling from. 202 pub fn get_ref(&self) -> &$inner { 203 (&self.$field) $($ind get_ref())* 204 } 205 206 /// Acquires a mutable reference to the underlying sink or stream that this 207 /// combinator is pulling from. 208 /// 209 /// Note that care must be taken to avoid tampering with the state of the 210 /// sink or stream which may otherwise confuse this combinator. 211 pub fn get_mut(&mut self) -> &mut $inner { 212 (&mut self.$field) $($ind get_mut())* 213 } 214 215 /// Acquires a pinned mutable reference to the underlying sink or stream that this 216 /// combinator is pulling from. 217 /// 218 /// Note that care must be taken to avoid tampering with the state of the 219 /// sink or stream which may otherwise confuse this combinator. 220 pub fn get_pin_mut(self: core::pin::Pin<&mut Self>) -> core::pin::Pin<&mut $inner> { 221 self.project().$field $($ind get_pin_mut())* 222 } 223 224 /// Consumes this combinator, returning the underlying sink or stream. 225 /// 226 /// Note that this may discard intermediate state of this combinator, so 227 /// care should be taken to avoid losing resources when this is called. 228 pub fn into_inner(self) -> $inner { 229 self.$field $($ind into_inner())* 230 } 231 } 232 } 233 234 macro_rules! delegate_all { 235 (@trait Future $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 236 impl<$($arg),*> futures_core::future::Future for $name<$($arg),*> where $t: futures_core::future::Future $(, $($bound)*)* { 237 type Output = <$t as futures_core::future::Future>::Output; 238 239 delegate_future!(inner); 240 } 241 }; 242 (@trait FusedFuture $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 243 impl<$($arg),*> futures_core::future::FusedFuture for $name<$($arg),*> where $t: futures_core::future::FusedFuture $(, $($bound)*)* { 244 fn is_terminated(&self) -> bool { 245 self.inner.is_terminated() 246 } 247 } 248 }; 249 (@trait Stream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 250 impl<$($arg),*> futures_core::stream::Stream for $name<$($arg),*> where $t: futures_core::stream::Stream $(, $($bound)*)* { 251 type Item = <$t as futures_core::stream::Stream>::Item; 252 253 delegate_stream!(inner); 254 } 255 }; 256 (@trait FusedStream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 257 impl<$($arg),*> futures_core::stream::FusedStream for $name<$($arg),*> where $t: futures_core::stream::FusedStream $(, $($bound)*)* { 258 fn is_terminated(&self) -> bool { 259 self.inner.is_terminated() 260 } 261 } 262 }; 263 (@trait Sink $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 264 #[cfg(feature = "sink")] 265 impl<_Item, $($arg),*> futures_sink::Sink<_Item> for $name<$($arg),*> where $t: futures_sink::Sink<_Item> $(, $($bound)*)* { 266 type Error = <$t as futures_sink::Sink<_Item>>::Error; 267 268 delegate_sink!(inner, _Item); 269 } 270 }; 271 (@trait Debug $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 272 impl<$($arg),*> core::fmt::Debug for $name<$($arg),*> where $t: core::fmt::Debug $(, $($bound)*)* { 273 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 274 core::fmt::Debug::fmt(&self.inner, f) 275 } 276 } 277 }; 278 (@trait AccessInner[$inner:ty, ($($ind:tt)*)] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 279 impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* { 280 delegate_access_inner!(inner, $inner, ($($ind)*)); 281 } 282 }; 283 (@trait New[|$($param:ident: $paramt:ty),*| $cons:expr] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 284 impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* { 285 pub(crate) fn new($($param: $paramt),*) -> Self { 286 Self { inner: $cons } 287 } 288 } 289 }; 290 ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($targs:tt)*])* $({$($item:tt)*})* $(where $($bound:tt)*)*) => { 291 pin_project_lite::pin_project! { 292 #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] 293 $(#[$attr])* 294 pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner: $t } 295 } 296 297 impl<$($arg),*> $name< $($arg),* > $(where $($bound)*)* { 298 $($($item)*)* 299 } 300 301 delegate_all!(@trait $ftrait $([$($targs)*])* $name<$($arg),*>($t) $(where $($bound)*)*); 302 }; 303 ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($ftargs:tt)*])* + $strait:ident $([$($stargs:tt)*])* $(+ $trait:ident $([$($targs:tt)*])*)* $({$($item:tt)*})* $(where $($bound:tt)*)*) => { 304 delegate_all!($(#[$attr])* $name<$($arg),*>($t) : $strait $([$($stargs)*])* $(+ $trait $([$($targs)*])*)* $({$($item)*})* $(where $($bound)*)*); 305 306 delegate_all!(@trait $ftrait $([$($ftargs)*])* $name<$($arg),*>($t) $(where $($bound)*)*); 307 }; 308 } 309 310 pub mod future; 311 #[doc(hidden)] 312 pub use crate::future::{FutureExt, TryFutureExt}; 313 314 pub mod stream; 315 #[doc(hidden)] 316 pub use crate::stream::{StreamExt, TryStreamExt}; 317 318 #[cfg(feature = "sink")] 319 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 320 pub mod sink; 321 #[cfg(feature = "sink")] 322 #[doc(hidden)] 323 pub use crate::sink::SinkExt; 324 325 pub mod task; 326 327 pub mod never; 328 329 #[cfg(feature = "compat")] 330 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] 331 pub mod compat; 332 333 #[cfg(feature = "io")] 334 #[cfg_attr(docsrs, doc(cfg(feature = "io")))] 335 #[cfg(feature = "std")] 336 pub mod io; 337 #[cfg(feature = "io")] 338 #[cfg(feature = "std")] 339 #[doc(hidden)] 340 pub use crate::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; 341 342 #[cfg(feature = "alloc")] 343 pub mod lock; 344 345 mod fns; 346 mod unfold_state; 347