1 //! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s, 2 //! and the `AsyncRead` and `AsyncWrite` traits. 3 4 #![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))] 5 #![cfg_attr(not(feature = "std"), no_std)] 6 #![warn( 7 missing_debug_implementations, 8 missing_docs, 9 rust_2018_idioms, 10 single_use_lifetimes, 11 unreachable_pub 12 )] 13 #![doc(test( 14 no_crate_inject, 15 attr( 16 deny(warnings, rust_2018_idioms, single_use_lifetimes), 17 allow(dead_code, unused_assignments, unused_variables) 18 ) 19 ))] 20 #![cfg_attr(docsrs, feature(doc_cfg))] 21 22 #[cfg(all(feature = "bilock", not(feature = "unstable")))] 23 compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); 24 25 #[cfg(feature = "alloc")] 26 extern crate alloc; 27 28 // Macro re-exports 29 pub use futures_core::ready; 30 pub use pin_utils::pin_mut; 31 32 #[cfg(feature = "async-await")] 33 #[macro_use] 34 mod async_await; 35 #[cfg(feature = "async-await")] 36 #[doc(hidden)] 37 pub use self::async_await::*; 38 39 // Not public API. 40 #[cfg(feature = "async-await")] 41 #[doc(hidden)] 42 pub mod __private { 43 pub use crate::*; 44 pub use core::{ 45 option::Option::{self, None, Some}, 46 pin::Pin, 47 result::Result::{Err, Ok}, 48 }; 49 50 pub mod async_await { 51 pub use crate::async_await::*; 52 } 53 } 54 55 #[cfg(feature = "sink")] 56 macro_rules! delegate_sink { 57 ($field:ident, $item:ty) => { 58 fn poll_ready( 59 self: core::pin::Pin<&mut Self>, 60 cx: &mut core::task::Context<'_>, 61 ) -> core::task::Poll<Result<(), Self::Error>> { 62 self.project().$field.poll_ready(cx) 63 } 64 65 fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> { 66 self.project().$field.start_send(item) 67 } 68 69 fn poll_flush( 70 self: core::pin::Pin<&mut Self>, 71 cx: &mut core::task::Context<'_>, 72 ) -> core::task::Poll<Result<(), Self::Error>> { 73 self.project().$field.poll_flush(cx) 74 } 75 76 fn poll_close( 77 self: core::pin::Pin<&mut Self>, 78 cx: &mut core::task::Context<'_>, 79 ) -> core::task::Poll<Result<(), Self::Error>> { 80 self.project().$field.poll_close(cx) 81 } 82 }; 83 } 84 85 macro_rules! delegate_future { 86 ($field:ident) => { 87 fn poll( 88 self: core::pin::Pin<&mut Self>, 89 cx: &mut core::task::Context<'_>, 90 ) -> core::task::Poll<Self::Output> { 91 self.project().$field.poll(cx) 92 } 93 }; 94 } 95 96 macro_rules! delegate_stream { 97 ($field:ident) => { 98 fn poll_next( 99 self: core::pin::Pin<&mut Self>, 100 cx: &mut core::task::Context<'_>, 101 ) -> core::task::Poll<Option<Self::Item>> { 102 self.project().$field.poll_next(cx) 103 } 104 fn size_hint(&self) -> (usize, Option<usize>) { 105 self.$field.size_hint() 106 } 107 }; 108 } 109 110 #[cfg(feature = "io")] 111 #[cfg(feature = "std")] 112 macro_rules! delegate_async_write { 113 ($field:ident) => { 114 fn poll_write( 115 self: core::pin::Pin<&mut Self>, 116 cx: &mut core::task::Context<'_>, 117 buf: &[u8], 118 ) -> core::task::Poll<std::io::Result<usize>> { 119 self.project().$field.poll_write(cx, buf) 120 } 121 fn poll_write_vectored( 122 self: core::pin::Pin<&mut Self>, 123 cx: &mut core::task::Context<'_>, 124 bufs: &[std::io::IoSlice<'_>], 125 ) -> core::task::Poll<std::io::Result<usize>> { 126 self.project().$field.poll_write_vectored(cx, bufs) 127 } 128 fn poll_flush( 129 self: core::pin::Pin<&mut Self>, 130 cx: &mut core::task::Context<'_>, 131 ) -> core::task::Poll<std::io::Result<()>> { 132 self.project().$field.poll_flush(cx) 133 } 134 fn poll_close( 135 self: core::pin::Pin<&mut Self>, 136 cx: &mut core::task::Context<'_>, 137 ) -> core::task::Poll<std::io::Result<()>> { 138 self.project().$field.poll_close(cx) 139 } 140 }; 141 } 142 143 #[cfg(feature = "io")] 144 #[cfg(feature = "std")] 145 macro_rules! delegate_async_read { 146 ($field:ident) => { 147 fn poll_read( 148 self: core::pin::Pin<&mut Self>, 149 cx: &mut core::task::Context<'_>, 150 buf: &mut [u8], 151 ) -> core::task::Poll<std::io::Result<usize>> { 152 self.project().$field.poll_read(cx, buf) 153 } 154 155 fn poll_read_vectored( 156 self: core::pin::Pin<&mut Self>, 157 cx: &mut core::task::Context<'_>, 158 bufs: &mut [std::io::IoSliceMut<'_>], 159 ) -> core::task::Poll<std::io::Result<usize>> { 160 self.project().$field.poll_read_vectored(cx, bufs) 161 } 162 }; 163 } 164 165 #[cfg(feature = "io")] 166 #[cfg(feature = "std")] 167 macro_rules! delegate_async_buf_read { 168 ($field:ident) => { 169 fn poll_fill_buf( 170 self: core::pin::Pin<&mut Self>, 171 cx: &mut core::task::Context<'_>, 172 ) -> core::task::Poll<std::io::Result<&[u8]>> { 173 self.project().$field.poll_fill_buf(cx) 174 } 175 176 fn consume(self: core::pin::Pin<&mut Self>, amt: usize) { 177 self.project().$field.consume(amt) 178 } 179 }; 180 } 181 182 macro_rules! delegate_access_inner { 183 ($field:ident, $inner:ty, ($($ind:tt)*)) => { 184 /// Acquires a reference to the underlying sink or stream that this combinator is 185 /// pulling from. 186 pub fn get_ref(&self) -> &$inner { 187 (&self.$field) $($ind get_ref())* 188 } 189 190 /// Acquires a mutable reference to the underlying sink or stream that this 191 /// combinator is pulling from. 192 /// 193 /// Note that care must be taken to avoid tampering with the state of the 194 /// sink or stream which may otherwise confuse this combinator. 195 pub fn get_mut(&mut self) -> &mut $inner { 196 (&mut self.$field) $($ind get_mut())* 197 } 198 199 /// Acquires a pinned mutable reference to the underlying sink or stream that this 200 /// combinator is pulling from. 201 /// 202 /// Note that care must be taken to avoid tampering with the state of the 203 /// sink or stream which may otherwise confuse this combinator. 204 pub fn get_pin_mut(self: core::pin::Pin<&mut Self>) -> core::pin::Pin<&mut $inner> { 205 self.project().$field $($ind get_pin_mut())* 206 } 207 208 /// Consumes this combinator, returning the underlying sink or stream. 209 /// 210 /// Note that this may discard intermediate state of this combinator, so 211 /// care should be taken to avoid losing resources when this is called. 212 pub fn into_inner(self) -> $inner { 213 self.$field $($ind into_inner())* 214 } 215 } 216 } 217 218 macro_rules! delegate_all { 219 (@trait Future $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 220 impl<$($arg),*> futures_core::future::Future for $name<$($arg),*> where $t: futures_core::future::Future $(, $($bound)*)* { 221 type Output = <$t as futures_core::future::Future>::Output; 222 223 delegate_future!(inner); 224 } 225 }; 226 (@trait FusedFuture $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 227 impl<$($arg),*> futures_core::future::FusedFuture for $name<$($arg),*> where $t: futures_core::future::FusedFuture $(, $($bound)*)* { 228 fn is_terminated(&self) -> bool { 229 self.inner.is_terminated() 230 } 231 } 232 }; 233 (@trait Stream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 234 impl<$($arg),*> futures_core::stream::Stream for $name<$($arg),*> where $t: futures_core::stream::Stream $(, $($bound)*)* { 235 type Item = <$t as futures_core::stream::Stream>::Item; 236 237 delegate_stream!(inner); 238 } 239 }; 240 (@trait FusedStream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 241 impl<$($arg),*> futures_core::stream::FusedStream for $name<$($arg),*> where $t: futures_core::stream::FusedStream $(, $($bound)*)* { 242 fn is_terminated(&self) -> bool { 243 self.inner.is_terminated() 244 } 245 } 246 }; 247 (@trait Sink $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 248 #[cfg(feature = "sink")] 249 impl<_Item, $($arg),*> futures_sink::Sink<_Item> for $name<$($arg),*> where $t: futures_sink::Sink<_Item> $(, $($bound)*)* { 250 type Error = <$t as futures_sink::Sink<_Item>>::Error; 251 252 delegate_sink!(inner, _Item); 253 } 254 }; 255 (@trait Debug $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 256 impl<$($arg),*> core::fmt::Debug for $name<$($arg),*> where $t: core::fmt::Debug $(, $($bound)*)* { 257 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 258 core::fmt::Debug::fmt(&self.inner, f) 259 } 260 } 261 }; 262 (@trait AccessInner[$inner:ty, ($($ind:tt)*)] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 263 impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* { 264 delegate_access_inner!(inner, $inner, ($($ind)*)); 265 } 266 }; 267 (@trait New[|$($param:ident: $paramt:ty),*| $cons:expr] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 268 impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* { 269 pub(crate) fn new($($param: $paramt),*) -> Self { 270 Self { inner: $cons } 271 } 272 } 273 }; 274 ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($targs:tt)*])* $({$($item:tt)*})* $(where $($bound:tt)*)*) => { 275 pin_project_lite::pin_project! { 276 #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] 277 $(#[$attr])* 278 pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner: $t } 279 } 280 281 impl<$($arg),*> $name< $($arg),* > $(where $($bound)*)* { 282 $($($item)*)* 283 } 284 285 delegate_all!(@trait $ftrait $([$($targs)*])* $name<$($arg),*>($t) $(where $($bound)*)*); 286 }; 287 ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($ftargs:tt)*])* + $strait:ident $([$($stargs:tt)*])* $(+ $trait:ident $([$($targs:tt)*])*)* $({$($item:tt)*})* $(where $($bound:tt)*)*) => { 288 delegate_all!($(#[$attr])* $name<$($arg),*>($t) : $strait $([$($stargs)*])* $(+ $trait $([$($targs)*])*)* $({$($item)*})* $(where $($bound)*)*); 289 290 delegate_all!(@trait $ftrait $([$($ftargs)*])* $name<$($arg),*>($t) $(where $($bound)*)*); 291 }; 292 } 293 294 pub mod future; 295 #[doc(no_inline)] 296 pub use crate::future::{Future, FutureExt, TryFuture, TryFutureExt}; 297 298 pub mod stream; 299 #[doc(no_inline)] 300 pub use crate::stream::{Stream, StreamExt, TryStream, TryStreamExt}; 301 302 #[cfg(feature = "sink")] 303 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 304 pub mod sink; 305 #[cfg(feature = "sink")] 306 #[doc(no_inline)] 307 pub use crate::sink::{Sink, SinkExt}; 308 309 pub mod task; 310 311 pub mod never; 312 313 #[cfg(feature = "compat")] 314 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] 315 pub mod compat; 316 317 #[cfg(feature = "io")] 318 #[cfg_attr(docsrs, doc(cfg(feature = "io")))] 319 #[cfg(feature = "std")] 320 pub mod io; 321 #[cfg(feature = "io")] 322 #[cfg(feature = "std")] 323 #[doc(no_inline)] 324 pub use crate::io::{ 325 AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, 326 AsyncWriteExt, 327 }; 328 329 #[cfg(feature = "alloc")] 330 pub mod lock; 331 332 #[cfg(not(futures_no_atomic_cas))] 333 #[cfg(feature = "alloc")] 334 mod abortable; 335 336 mod fns; 337 mod unfold_state; 338