• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s,
2 //! and the `AsyncRead` and `AsyncWrite` traits.
3 
4 #![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))]
5 #![cfg_attr(not(feature = "std"), no_std)]
6 #![warn(
7     missing_debug_implementations,
8     missing_docs,
9     rust_2018_idioms,
10     single_use_lifetimes,
11     unreachable_pub
12 )]
13 #![doc(test(
14     no_crate_inject,
15     attr(
16         deny(warnings, rust_2018_idioms, single_use_lifetimes),
17         allow(dead_code, unused_assignments, unused_variables)
18     )
19 ))]
20 #![cfg_attr(docsrs, feature(doc_cfg))]
21 
22 #[cfg(all(feature = "bilock", not(feature = "unstable")))]
23 compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");
24 
25 #[cfg(feature = "alloc")]
26 extern crate alloc;
27 
28 // Macro re-exports
29 pub use futures_core::ready;
30 pub use pin_utils::pin_mut;
31 
32 #[cfg(feature = "async-await")]
33 #[macro_use]
34 mod async_await;
35 #[cfg(feature = "async-await")]
36 #[doc(hidden)]
37 pub use self::async_await::*;
38 
39 // Not public API.
40 #[cfg(feature = "async-await")]
41 #[doc(hidden)]
42 pub mod __private {
43     pub use crate::*;
44     pub use core::{
45         option::Option::{self, None, Some},
46         pin::Pin,
47         result::Result::{Err, Ok},
48     };
49 
50     pub mod async_await {
51         pub use crate::async_await::*;
52     }
53 }
54 
55 #[cfg(feature = "sink")]
56 macro_rules! delegate_sink {
57     ($field:ident, $item:ty) => {
58         fn poll_ready(
59             self: core::pin::Pin<&mut Self>,
60             cx: &mut core::task::Context<'_>,
61         ) -> core::task::Poll<Result<(), Self::Error>> {
62             self.project().$field.poll_ready(cx)
63         }
64 
65         fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> {
66             self.project().$field.start_send(item)
67         }
68 
69         fn poll_flush(
70             self: core::pin::Pin<&mut Self>,
71             cx: &mut core::task::Context<'_>,
72         ) -> core::task::Poll<Result<(), Self::Error>> {
73             self.project().$field.poll_flush(cx)
74         }
75 
76         fn poll_close(
77             self: core::pin::Pin<&mut Self>,
78             cx: &mut core::task::Context<'_>,
79         ) -> core::task::Poll<Result<(), Self::Error>> {
80             self.project().$field.poll_close(cx)
81         }
82     };
83 }
84 
85 macro_rules! delegate_future {
86     ($field:ident) => {
87         fn poll(
88             self: core::pin::Pin<&mut Self>,
89             cx: &mut core::task::Context<'_>,
90         ) -> core::task::Poll<Self::Output> {
91             self.project().$field.poll(cx)
92         }
93     };
94 }
95 
96 macro_rules! delegate_stream {
97     ($field:ident) => {
98         fn poll_next(
99             self: core::pin::Pin<&mut Self>,
100             cx: &mut core::task::Context<'_>,
101         ) -> core::task::Poll<Option<Self::Item>> {
102             self.project().$field.poll_next(cx)
103         }
104         fn size_hint(&self) -> (usize, Option<usize>) {
105             self.$field.size_hint()
106         }
107     };
108 }
109 
110 #[cfg(feature = "io")]
111 #[cfg(feature = "std")]
112 macro_rules! delegate_async_write {
113     ($field:ident) => {
114         fn poll_write(
115             self: core::pin::Pin<&mut Self>,
116             cx: &mut core::task::Context<'_>,
117             buf: &[u8],
118         ) -> core::task::Poll<std::io::Result<usize>> {
119             self.project().$field.poll_write(cx, buf)
120         }
121         fn poll_write_vectored(
122             self: core::pin::Pin<&mut Self>,
123             cx: &mut core::task::Context<'_>,
124             bufs: &[std::io::IoSlice<'_>],
125         ) -> core::task::Poll<std::io::Result<usize>> {
126             self.project().$field.poll_write_vectored(cx, bufs)
127         }
128         fn poll_flush(
129             self: core::pin::Pin<&mut Self>,
130             cx: &mut core::task::Context<'_>,
131         ) -> core::task::Poll<std::io::Result<()>> {
132             self.project().$field.poll_flush(cx)
133         }
134         fn poll_close(
135             self: core::pin::Pin<&mut Self>,
136             cx: &mut core::task::Context<'_>,
137         ) -> core::task::Poll<std::io::Result<()>> {
138             self.project().$field.poll_close(cx)
139         }
140     };
141 }
142 
143 #[cfg(feature = "io")]
144 #[cfg(feature = "std")]
145 macro_rules! delegate_async_read {
146     ($field:ident) => {
147         fn poll_read(
148             self: core::pin::Pin<&mut Self>,
149             cx: &mut core::task::Context<'_>,
150             buf: &mut [u8],
151         ) -> core::task::Poll<std::io::Result<usize>> {
152             self.project().$field.poll_read(cx, buf)
153         }
154 
155         fn poll_read_vectored(
156             self: core::pin::Pin<&mut Self>,
157             cx: &mut core::task::Context<'_>,
158             bufs: &mut [std::io::IoSliceMut<'_>],
159         ) -> core::task::Poll<std::io::Result<usize>> {
160             self.project().$field.poll_read_vectored(cx, bufs)
161         }
162     };
163 }
164 
165 #[cfg(feature = "io")]
166 #[cfg(feature = "std")]
167 macro_rules! delegate_async_buf_read {
168     ($field:ident) => {
169         fn poll_fill_buf(
170             self: core::pin::Pin<&mut Self>,
171             cx: &mut core::task::Context<'_>,
172         ) -> core::task::Poll<std::io::Result<&[u8]>> {
173             self.project().$field.poll_fill_buf(cx)
174         }
175 
176         fn consume(self: core::pin::Pin<&mut Self>, amt: usize) {
177             self.project().$field.consume(amt)
178         }
179     };
180 }
181 
182 macro_rules! delegate_access_inner {
183     ($field:ident, $inner:ty, ($($ind:tt)*)) => {
184         /// Acquires a reference to the underlying sink or stream that this combinator is
185         /// pulling from.
186         pub fn get_ref(&self) -> &$inner {
187             (&self.$field) $($ind get_ref())*
188         }
189 
190         /// Acquires a mutable reference to the underlying sink or stream that this
191         /// combinator is pulling from.
192         ///
193         /// Note that care must be taken to avoid tampering with the state of the
194         /// sink or stream which may otherwise confuse this combinator.
195         pub fn get_mut(&mut self) -> &mut $inner {
196             (&mut self.$field) $($ind get_mut())*
197         }
198 
199         /// Acquires a pinned mutable reference to the underlying sink or stream that this
200         /// combinator is pulling from.
201         ///
202         /// Note that care must be taken to avoid tampering with the state of the
203         /// sink or stream which may otherwise confuse this combinator.
204         pub fn get_pin_mut(self: core::pin::Pin<&mut Self>) -> core::pin::Pin<&mut $inner> {
205             self.project().$field $($ind get_pin_mut())*
206         }
207 
208         /// Consumes this combinator, returning the underlying sink or stream.
209         ///
210         /// Note that this may discard intermediate state of this combinator, so
211         /// care should be taken to avoid losing resources when this is called.
212         pub fn into_inner(self) -> $inner {
213             self.$field $($ind into_inner())*
214         }
215     }
216 }
217 
218 macro_rules! delegate_all {
219     (@trait Future $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
220         impl<$($arg),*> futures_core::future::Future for $name<$($arg),*> where $t: futures_core::future::Future $(, $($bound)*)* {
221             type Output = <$t as futures_core::future::Future>::Output;
222 
223             delegate_future!(inner);
224         }
225     };
226     (@trait FusedFuture $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
227         impl<$($arg),*> futures_core::future::FusedFuture for $name<$($arg),*> where $t: futures_core::future::FusedFuture $(, $($bound)*)* {
228             fn is_terminated(&self) -> bool {
229                 self.inner.is_terminated()
230             }
231         }
232     };
233     (@trait Stream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
234         impl<$($arg),*> futures_core::stream::Stream for $name<$($arg),*> where $t: futures_core::stream::Stream $(, $($bound)*)* {
235             type Item = <$t as futures_core::stream::Stream>::Item;
236 
237             delegate_stream!(inner);
238         }
239     };
240     (@trait FusedStream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
241         impl<$($arg),*> futures_core::stream::FusedStream for $name<$($arg),*> where $t: futures_core::stream::FusedStream $(, $($bound)*)* {
242             fn is_terminated(&self) -> bool {
243                 self.inner.is_terminated()
244             }
245         }
246     };
247     (@trait Sink $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
248         #[cfg(feature = "sink")]
249         impl<_Item, $($arg),*> futures_sink::Sink<_Item> for $name<$($arg),*> where $t: futures_sink::Sink<_Item> $(, $($bound)*)* {
250             type Error = <$t as futures_sink::Sink<_Item>>::Error;
251 
252             delegate_sink!(inner, _Item);
253         }
254     };
255     (@trait Debug $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
256         impl<$($arg),*> core::fmt::Debug for $name<$($arg),*> where $t: core::fmt::Debug $(, $($bound)*)* {
257             fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
258                 core::fmt::Debug::fmt(&self.inner, f)
259             }
260         }
261     };
262     (@trait AccessInner[$inner:ty, ($($ind:tt)*)] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
263         impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* {
264             delegate_access_inner!(inner, $inner, ($($ind)*));
265         }
266     };
267     (@trait New[|$($param:ident: $paramt:ty),*| $cons:expr] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
268         impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* {
269             pub(crate) fn new($($param: $paramt),*) -> Self {
270                 Self { inner: $cons }
271             }
272         }
273     };
274     ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($targs:tt)*])* $({$($item:tt)*})* $(where $($bound:tt)*)*) => {
275         pin_project_lite::pin_project! {
276             #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
277             $(#[$attr])*
278             pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner: $t }
279         }
280 
281         impl<$($arg),*> $name< $($arg),* > $(where $($bound)*)* {
282             $($($item)*)*
283         }
284 
285         delegate_all!(@trait $ftrait $([$($targs)*])* $name<$($arg),*>($t) $(where $($bound)*)*);
286     };
287     ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($ftargs:tt)*])* + $strait:ident $([$($stargs:tt)*])* $(+ $trait:ident $([$($targs:tt)*])*)* $({$($item:tt)*})* $(where $($bound:tt)*)*) => {
288         delegate_all!($(#[$attr])* $name<$($arg),*>($t) : $strait $([$($stargs)*])* $(+ $trait $([$($targs)*])*)* $({$($item)*})* $(where $($bound)*)*);
289 
290         delegate_all!(@trait $ftrait $([$($ftargs)*])* $name<$($arg),*>($t) $(where $($bound)*)*);
291     };
292 }
293 
294 pub mod future;
295 #[doc(no_inline)]
296 pub use crate::future::{Future, FutureExt, TryFuture, TryFutureExt};
297 
298 pub mod stream;
299 #[doc(no_inline)]
300 pub use crate::stream::{Stream, StreamExt, TryStream, TryStreamExt};
301 
302 #[cfg(feature = "sink")]
303 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
304 pub mod sink;
305 #[cfg(feature = "sink")]
306 #[doc(no_inline)]
307 pub use crate::sink::{Sink, SinkExt};
308 
309 pub mod task;
310 
311 pub mod never;
312 
313 #[cfg(feature = "compat")]
314 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
315 pub mod compat;
316 
317 #[cfg(feature = "io")]
318 #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
319 #[cfg(feature = "std")]
320 pub mod io;
321 #[cfg(feature = "io")]
322 #[cfg(feature = "std")]
323 #[doc(no_inline)]
324 pub use crate::io::{
325     AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
326     AsyncWriteExt,
327 };
328 
329 #[cfg(feature = "alloc")]
330 pub mod lock;
331 
332 #[cfg(not(futures_no_atomic_cas))]
333 #[cfg(feature = "alloc")]
334 mod abortable;
335 
336 mod fns;
337 mod unfold_state;
338