• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s,
2 //! and the `AsyncRead` and `AsyncWrite` traits.
3 
4 #![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]
5 #![cfg_attr(feature = "read-initializer", feature(read_initializer))]
6 #![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))]
7 #![cfg_attr(not(feature = "std"), no_std)]
8 #![warn(
9     missing_docs,
10     missing_debug_implementations,
11     rust_2018_idioms,
12     unreachable_pub
13 )]
14 // It cannot be included in the published code because this lints have false positives in the minimum required version.
15 #![cfg_attr(test, warn(single_use_lifetimes))]
16 #![warn(clippy::all)]
17 #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
18 #![cfg_attr(docsrs, feature(doc_cfg))]
19 
20 #[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
21 compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features");
22 
23 #[cfg(all(feature = "bilock", not(feature = "unstable")))]
24 compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");
25 
26 #[cfg(all(feature = "read-initializer", not(feature = "unstable")))]
27 compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features");
28 
29 #[cfg(feature = "alloc")]
30 extern crate alloc;
31 
32 // Macro re-exports
33 pub use futures_core::ready;
34 pub use pin_utils::pin_mut;
35 
36 #[cfg(feature = "async-await")]
37 #[macro_use]
38 mod async_await;
39 #[cfg(feature = "async-await")]
40 #[doc(hidden)]
41 pub use self::async_await::*;
42 
43 // Not public API.
44 #[cfg(feature = "async-await")]
45 #[doc(hidden)]
46 pub mod __private {
47     pub use crate::*;
48     pub use core::{
49         option::Option::{self, None, Some},
50         pin::Pin,
51         result::Result::{Err, Ok},
52     };
53 
54     pub mod async_await {
55         pub use crate::async_await::*;
56     }
57 }
58 
59 macro_rules! cfg_target_has_atomic {
60     ($($item:item)*) => {$(
61         #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
62         $item
63     )*};
64 }
65 
66 #[cfg(feature = "sink")]
67 macro_rules! delegate_sink {
68     ($field:ident, $item:ty) => {
69         fn poll_ready(
70             self: core::pin::Pin<&mut Self>,
71             cx: &mut core::task::Context<'_>,
72         ) -> core::task::Poll<Result<(), Self::Error>> {
73             self.project().$field.poll_ready(cx)
74         }
75 
76         fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> {
77             self.project().$field.start_send(item)
78         }
79 
80         fn poll_flush(
81             self: core::pin::Pin<&mut Self>,
82             cx: &mut core::task::Context<'_>,
83         ) -> core::task::Poll<Result<(), Self::Error>> {
84             self.project().$field.poll_flush(cx)
85         }
86 
87         fn poll_close(
88             self: core::pin::Pin<&mut Self>,
89             cx: &mut core::task::Context<'_>,
90         ) -> core::task::Poll<Result<(), Self::Error>> {
91             self.project().$field.poll_close(cx)
92         }
93     };
94 }
95 
96 macro_rules! delegate_future {
97     ($field:ident) => {
98         fn poll(
99             self: core::pin::Pin<&mut Self>,
100             cx: &mut core::task::Context<'_>,
101         ) -> core::task::Poll<Self::Output> {
102             self.project().$field.poll(cx)
103         }
104     };
105 }
106 
107 macro_rules! delegate_stream {
108     ($field:ident) => {
109         fn poll_next(
110             self: core::pin::Pin<&mut Self>,
111             cx: &mut core::task::Context<'_>,
112         ) -> core::task::Poll<Option<Self::Item>> {
113             self.project().$field.poll_next(cx)
114         }
115         fn size_hint(&self) -> (usize, Option<usize>) {
116             self.$field.size_hint()
117         }
118     };
119 }
120 
121 #[cfg(feature = "io")]
122 #[cfg(feature = "std")]
123 macro_rules! delegate_async_write {
124     ($field:ident) => {
125         fn poll_write(
126             self: core::pin::Pin<&mut Self>,
127             cx: &mut core::task::Context<'_>,
128             buf: &[u8],
129         ) -> core::task::Poll<std::io::Result<usize>> {
130             self.project().$field.poll_write(cx, buf)
131         }
132         fn poll_write_vectored(
133             self: core::pin::Pin<&mut Self>,
134             cx: &mut core::task::Context<'_>,
135             bufs: &[std::io::IoSlice<'_>],
136         ) -> core::task::Poll<std::io::Result<usize>> {
137             self.project().$field.poll_write_vectored(cx, bufs)
138         }
139         fn poll_flush(
140             self: core::pin::Pin<&mut Self>,
141             cx: &mut core::task::Context<'_>,
142         ) -> core::task::Poll<std::io::Result<()>> {
143             self.project().$field.poll_flush(cx)
144         }
145         fn poll_close(
146             self: core::pin::Pin<&mut Self>,
147             cx: &mut core::task::Context<'_>,
148         ) -> core::task::Poll<std::io::Result<()>> {
149             self.project().$field.poll_close(cx)
150         }
151     };
152 }
153 
154 #[cfg(feature = "io")]
155 #[cfg(feature = "std")]
156 macro_rules! delegate_async_read {
157     ($field:ident) => {
158         #[cfg(feature = "read-initializer")]
159         unsafe fn initializer(&self) -> $crate::io::Initializer {
160             self.$field.initializer()
161         }
162 
163         fn poll_read(
164             self: core::pin::Pin<&mut Self>,
165             cx: &mut core::task::Context<'_>,
166             buf: &mut [u8],
167         ) -> core::task::Poll<std::io::Result<usize>> {
168             self.project().$field.poll_read(cx, buf)
169         }
170 
171         fn poll_read_vectored(
172             self: core::pin::Pin<&mut Self>,
173             cx: &mut core::task::Context<'_>,
174             bufs: &mut [std::io::IoSliceMut<'_>],
175         ) -> core::task::Poll<std::io::Result<usize>> {
176             self.project().$field.poll_read_vectored(cx, bufs)
177         }
178     };
179 }
180 
181 #[cfg(feature = "io")]
182 #[cfg(feature = "std")]
183 macro_rules! delegate_async_buf_read {
184     ($field:ident) => {
185         fn poll_fill_buf(
186             self: core::pin::Pin<&mut Self>,
187             cx: &mut core::task::Context<'_>,
188         ) -> core::task::Poll<std::io::Result<&[u8]>> {
189             self.project().$field.poll_fill_buf(cx)
190         }
191 
192         fn consume(self: core::pin::Pin<&mut Self>, amt: usize) {
193             self.project().$field.consume(amt)
194         }
195     };
196 }
197 
198 macro_rules! delegate_access_inner {
199     ($field:ident, $inner:ty, ($($ind:tt)*)) => {
200         /// Acquires a reference to the underlying sink or stream that this combinator is
201         /// pulling from.
202         pub fn get_ref(&self) -> &$inner {
203             (&self.$field) $($ind get_ref())*
204         }
205 
206         /// Acquires a mutable reference to the underlying sink or stream that this
207         /// combinator is pulling from.
208         ///
209         /// Note that care must be taken to avoid tampering with the state of the
210         /// sink or stream which may otherwise confuse this combinator.
211         pub fn get_mut(&mut self) -> &mut $inner {
212             (&mut self.$field) $($ind get_mut())*
213         }
214 
215         /// Acquires a pinned mutable reference to the underlying sink or stream that this
216         /// combinator is pulling from.
217         ///
218         /// Note that care must be taken to avoid tampering with the state of the
219         /// sink or stream which may otherwise confuse this combinator.
220         pub fn get_pin_mut(self: core::pin::Pin<&mut Self>) -> core::pin::Pin<&mut $inner> {
221             self.project().$field $($ind get_pin_mut())*
222         }
223 
224         /// Consumes this combinator, returning the underlying sink or stream.
225         ///
226         /// Note that this may discard intermediate state of this combinator, so
227         /// care should be taken to avoid losing resources when this is called.
228         pub fn into_inner(self) -> $inner {
229             self.$field $($ind into_inner())*
230         }
231     }
232 }
233 
234 macro_rules! delegate_all {
235     (@trait Future $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
236         impl<$($arg),*> futures_core::future::Future for $name<$($arg),*> where $t: futures_core::future::Future $(, $($bound)*)* {
237             type Output = <$t as futures_core::future::Future>::Output;
238 
239             delegate_future!(inner);
240         }
241     };
242     (@trait FusedFuture $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
243         impl<$($arg),*> futures_core::future::FusedFuture for $name<$($arg),*> where $t: futures_core::future::FusedFuture $(, $($bound)*)* {
244             fn is_terminated(&self) -> bool {
245                 self.inner.is_terminated()
246             }
247         }
248     };
249     (@trait Stream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
250         impl<$($arg),*> futures_core::stream::Stream for $name<$($arg),*> where $t: futures_core::stream::Stream $(, $($bound)*)* {
251             type Item = <$t as futures_core::stream::Stream>::Item;
252 
253             delegate_stream!(inner);
254         }
255     };
256     (@trait FusedStream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
257         impl<$($arg),*> futures_core::stream::FusedStream for $name<$($arg),*> where $t: futures_core::stream::FusedStream $(, $($bound)*)* {
258             fn is_terminated(&self) -> bool {
259                 self.inner.is_terminated()
260             }
261         }
262     };
263     (@trait Sink $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
264         #[cfg(feature = "sink")]
265         impl<_Item, $($arg),*> futures_sink::Sink<_Item> for $name<$($arg),*> where $t: futures_sink::Sink<_Item> $(, $($bound)*)* {
266             type Error = <$t as futures_sink::Sink<_Item>>::Error;
267 
268             delegate_sink!(inner, _Item);
269         }
270     };
271     (@trait Debug $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
272         impl<$($arg),*> core::fmt::Debug for $name<$($arg),*> where $t: core::fmt::Debug $(, $($bound)*)* {
273             fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
274                 core::fmt::Debug::fmt(&self.inner, f)
275             }
276         }
277     };
278     (@trait AccessInner[$inner:ty, ($($ind:tt)*)] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
279         impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* {
280             delegate_access_inner!(inner, $inner, ($($ind)*));
281         }
282     };
283     (@trait New[|$($param:ident: $paramt:ty),*| $cons:expr] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
284         impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* {
285             pub(crate) fn new($($param: $paramt),*) -> Self {
286                 Self { inner: $cons }
287             }
288         }
289     };
290     ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($targs:tt)*])* $({$($item:tt)*})* $(where $($bound:tt)*)*) => {
291         pin_project_lite::pin_project! {
292             #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
293             $(#[$attr])*
294             pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner: $t }
295         }
296 
297         impl<$($arg),*> $name< $($arg),* > $(where $($bound)*)* {
298             $($($item)*)*
299         }
300 
301         delegate_all!(@trait $ftrait $([$($targs)*])* $name<$($arg),*>($t) $(where $($bound)*)*);
302     };
303     ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($ftargs:tt)*])* + $strait:ident $([$($stargs:tt)*])* $(+ $trait:ident $([$($targs:tt)*])*)* $({$($item:tt)*})* $(where $($bound:tt)*)*) => {
304         delegate_all!($(#[$attr])* $name<$($arg),*>($t) : $strait $([$($stargs)*])* $(+ $trait $([$($targs)*])*)* $({$($item)*})* $(where $($bound)*)*);
305 
306         delegate_all!(@trait $ftrait $([$($ftargs)*])* $name<$($arg),*>($t) $(where $($bound)*)*);
307     };
308 }
309 
310 pub mod future;
311 #[doc(hidden)]
312 pub use crate::future::{FutureExt, TryFutureExt};
313 
314 pub mod stream;
315 #[doc(hidden)]
316 pub use crate::stream::{StreamExt, TryStreamExt};
317 
318 #[cfg(feature = "sink")]
319 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
320 pub mod sink;
321 #[cfg(feature = "sink")]
322 #[doc(hidden)]
323 pub use crate::sink::SinkExt;
324 
325 pub mod task;
326 
327 pub mod never;
328 
329 #[cfg(feature = "compat")]
330 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
331 pub mod compat;
332 
333 #[cfg(feature = "io")]
334 #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
335 #[cfg(feature = "std")]
336 pub mod io;
337 #[cfg(feature = "io")]
338 #[cfg(feature = "std")]
339 #[doc(hidden)]
340 pub use crate::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
341 
342 #[cfg(feature = "alloc")]
343 pub mod lock;
344 
345 mod fns;
346 mod unfold_state;
347