1 //! Futures 2 //! 3 //! This module contains a number of functions for working with `Future`s, 4 //! including the `FutureExt` trait which adds methods to `Future` types. 5 6 #[cfg(feature = "alloc")] 7 use alloc::boxed::Box; 8 use core::pin::Pin; 9 10 use crate::fns::{inspect_fn, into_fn, ok_fn, InspectFn, IntoFn, OkFn}; 11 use crate::future::{assert_future, Either}; 12 use crate::never::Never; 13 use crate::stream::assert_stream; 14 #[cfg(feature = "alloc")] 15 use futures_core::future::{BoxFuture, LocalBoxFuture}; 16 use futures_core::{ 17 future::Future, 18 stream::Stream, 19 task::{Context, Poll}, 20 }; 21 use pin_utils::pin_mut; 22 23 // Combinators 24 25 mod flatten; 26 mod fuse; 27 mod map; 28 29 delegate_all!( 30 /// Future for the [`flatten`](super::FutureExt::flatten) method. 31 Flatten<F>( 32 flatten::Flatten<F, <F as Future>::Output> 33 ): Debug + Future + FusedFuture + New[|x: F| flatten::Flatten::new(x)] 34 where F: Future 35 ); 36 37 delegate_all!( 38 /// Stream for the [`flatten_stream`](FutureExt::flatten_stream) method. 39 FlattenStream<F>( 40 flatten::Flatten<F, <F as Future>::Output> 41 ): Debug + Sink + Stream + FusedStream + New[|x: F| flatten::Flatten::new(x)] 42 where F: Future 43 ); 44 45 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 46 pub use fuse::Fuse; 47 48 delegate_all!( 49 /// Future for the [`map`](super::FutureExt::map) method. 50 Map<Fut, F>( 51 map::Map<Fut, F> 52 ): Debug + Future + FusedFuture + New[|x: Fut, f: F| map::Map::new(x, f)] 53 ); 54 55 delegate_all!( 56 /// Stream for the [`into_stream`](FutureExt::into_stream) method. 57 IntoStream<F>( 58 crate::stream::Once<F> 59 ): Debug + Stream + FusedStream + New[|x: F| crate::stream::Once::new(x)] 60 ); 61 62 delegate_all!( 63 /// Future for the [`map_into`](FutureExt::map_into) combinator. 64 MapInto<Fut, T>( 65 Map<Fut, IntoFn<T>> 66 ): Debug + Future + FusedFuture + New[|x: Fut| Map::new(x, into_fn())] 67 ); 68 69 delegate_all!( 70 /// Future for the [`then`](FutureExt::then) method. 71 Then<Fut1, Fut2, F>( 72 flatten::Flatten<Map<Fut1, F>, Fut2> 73 ): Debug + Future + FusedFuture + New[|x: Fut1, y: F| flatten::Flatten::new(Map::new(x, y))] 74 ); 75 76 delegate_all!( 77 /// Future for the [`inspect`](FutureExt::inspect) method. 78 Inspect<Fut, F>( 79 map::Map<Fut, InspectFn<F>> 80 ): Debug + Future + FusedFuture + New[|x: Fut, f: F| map::Map::new(x, inspect_fn(f))] 81 ); 82 83 delegate_all!( 84 /// Future for the [`never_error`](super::FutureExt::never_error) combinator. 85 NeverError<Fut>( 86 Map<Fut, OkFn<Never>> 87 ): Debug + Future + FusedFuture + New[|x: Fut| Map::new(x, ok_fn())] 88 ); 89 90 delegate_all!( 91 /// Future for the [`unit_error`](super::FutureExt::unit_error) combinator. 92 UnitError<Fut>( 93 Map<Fut, OkFn<()>> 94 ): Debug + Future + FusedFuture + New[|x: Fut| Map::new(x, ok_fn())] 95 ); 96 97 #[cfg(feature = "std")] 98 mod catch_unwind; 99 #[cfg(feature = "std")] 100 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 101 pub use self::catch_unwind::CatchUnwind; 102 103 #[cfg(feature = "channel")] 104 #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] 105 #[cfg(feature = "std")] 106 mod remote_handle; 107 #[cfg(feature = "channel")] 108 #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] 109 #[cfg(feature = "std")] 110 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 111 pub use self::remote_handle::{Remote, RemoteHandle}; 112 113 #[cfg(feature = "std")] 114 mod shared; 115 #[cfg(feature = "std")] 116 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 117 pub use self::shared::{Shared, WeakShared}; 118 119 impl<T: ?Sized> FutureExt for T where T: Future {} 120 121 /// An extension trait for `Future`s that provides a variety of convenient 122 /// adapters. 123 pub trait FutureExt: Future { 124 /// Map this future's output to a different type, returning a new future of 125 /// the resulting type. 126 /// 127 /// This function is similar to the `Option::map` or `Iterator::map` where 128 /// it will change the type of the underlying future. This is useful to 129 /// chain along a computation once a future has been resolved. 130 /// 131 /// Note that this function consumes the receiving future and returns a 132 /// wrapped version of it, similar to the existing `map` methods in the 133 /// standard library. 134 /// 135 /// # Examples 136 /// 137 /// ``` 138 /// # futures::executor::block_on(async { 139 /// use futures::future::FutureExt; 140 /// 141 /// let future = async { 1 }; 142 /// let new_future = future.map(|x| x + 3); 143 /// assert_eq!(new_future.await, 4); 144 /// # }); 145 /// ``` map<U, F>(self, f: F) -> Map<Self, F> where F: FnOnce(Self::Output) -> U, Self: Sized,146 fn map<U, F>(self, f: F) -> Map<Self, F> 147 where 148 F: FnOnce(Self::Output) -> U, 149 Self: Sized, 150 { 151 assert_future::<U, _>(Map::new(self, f)) 152 } 153 154 /// Map this future's output to a different type, returning a new future of 155 /// the resulting type. 156 /// 157 /// This function is equivalent to calling `map(Into::into)` but allows naming 158 /// the return type. map_into<U>(self) -> MapInto<Self, U> where Self::Output: Into<U>, Self: Sized,159 fn map_into<U>(self) -> MapInto<Self, U> 160 where 161 Self::Output: Into<U>, 162 Self: Sized, 163 { 164 assert_future::<U, _>(MapInto::new(self)) 165 } 166 167 /// Chain on a computation for when a future finished, passing the result of 168 /// the future to the provided closure `f`. 169 /// 170 /// The returned value of the closure must implement the `Future` trait 171 /// and can represent some more work to be done before the composed future 172 /// is finished. 173 /// 174 /// The closure `f` is only run *after* successful completion of the `self` 175 /// future. 176 /// 177 /// Note that this function consumes the receiving future and returns a 178 /// wrapped version of it. 179 /// 180 /// # Examples 181 /// 182 /// ``` 183 /// # futures::executor::block_on(async { 184 /// use futures::future::FutureExt; 185 /// 186 /// let future_of_1 = async { 1 }; 187 /// let future_of_4 = future_of_1.then(|x| async move { x + 3 }); 188 /// assert_eq!(future_of_4.await, 4); 189 /// # }); 190 /// ``` then<Fut, F>(self, f: F) -> Then<Self, Fut, F> where F: FnOnce(Self::Output) -> Fut, Fut: Future, Self: Sized,191 fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F> 192 where 193 F: FnOnce(Self::Output) -> Fut, 194 Fut: Future, 195 Self: Sized, 196 { 197 assert_future::<Fut::Output, _>(Then::new(self, f)) 198 } 199 200 /// Wrap this future in an `Either` future, making it the left-hand variant 201 /// of that `Either`. 202 /// 203 /// This can be used in combination with the `right_future` method to write `if` 204 /// statements that evaluate to different futures in different branches. 205 /// 206 /// # Examples 207 /// 208 /// ``` 209 /// # futures::executor::block_on(async { 210 /// use futures::future::FutureExt; 211 /// 212 /// let x = 6; 213 /// let future = if x < 10 { 214 /// async { true }.left_future() 215 /// } else { 216 /// async { false }.right_future() 217 /// }; 218 /// 219 /// assert_eq!(future.await, true); 220 /// # }); 221 /// ``` left_future<B>(self) -> Either<Self, B> where B: Future<Output = Self::Output>, Self: Sized,222 fn left_future<B>(self) -> Either<Self, B> 223 where 224 B: Future<Output = Self::Output>, 225 Self: Sized, 226 { 227 assert_future::<Self::Output, _>(Either::Left(self)) 228 } 229 230 /// Wrap this future in an `Either` future, making it the right-hand variant 231 /// of that `Either`. 232 /// 233 /// This can be used in combination with the `left_future` method to write `if` 234 /// statements that evaluate to different futures in different branches. 235 /// 236 /// # Examples 237 /// 238 /// ``` 239 /// # futures::executor::block_on(async { 240 /// use futures::future::FutureExt; 241 /// 242 /// let x = 6; 243 /// let future = if x > 10 { 244 /// async { true }.left_future() 245 /// } else { 246 /// async { false }.right_future() 247 /// }; 248 /// 249 /// assert_eq!(future.await, false); 250 /// # }); 251 /// ``` right_future<A>(self) -> Either<A, Self> where A: Future<Output = Self::Output>, Self: Sized,252 fn right_future<A>(self) -> Either<A, Self> 253 where 254 A: Future<Output = Self::Output>, 255 Self: Sized, 256 { 257 assert_future::<Self::Output, _>(Either::Right(self)) 258 } 259 260 /// Convert this future into a single element stream. 261 /// 262 /// The returned stream contains single success if this future resolves to 263 /// success or single error if this future resolves into error. 264 /// 265 /// # Examples 266 /// 267 /// ``` 268 /// # futures::executor::block_on(async { 269 /// use futures::future::FutureExt; 270 /// use futures::stream::StreamExt; 271 /// 272 /// let future = async { 17 }; 273 /// let stream = future.into_stream(); 274 /// let collected: Vec<_> = stream.collect().await; 275 /// assert_eq!(collected, vec![17]); 276 /// # }); 277 /// ``` into_stream(self) -> IntoStream<Self> where Self: Sized,278 fn into_stream(self) -> IntoStream<Self> 279 where 280 Self: Sized, 281 { 282 assert_stream::<Self::Output, _>(IntoStream::new(self)) 283 } 284 285 /// Flatten the execution of this future when the output of this 286 /// future is itself another future. 287 /// 288 /// This can be useful when combining futures together to flatten the 289 /// computation out the final result. 290 /// 291 /// This method is roughly equivalent to `self.then(|x| x)`. 292 /// 293 /// Note that this function consumes the receiving future and returns a 294 /// wrapped version of it. 295 /// 296 /// # Examples 297 /// 298 /// ``` 299 /// # futures::executor::block_on(async { 300 /// use futures::future::FutureExt; 301 /// 302 /// let nested_future = async { async { 1 } }; 303 /// let future = nested_future.flatten(); 304 /// assert_eq!(future.await, 1); 305 /// # }); 306 /// ``` flatten(self) -> Flatten<Self> where Self::Output: Future, Self: Sized,307 fn flatten(self) -> Flatten<Self> 308 where 309 Self::Output: Future, 310 Self: Sized, 311 { 312 let f = Flatten::new(self); 313 assert_future::<<<Self as Future>::Output as Future>::Output, _>(f) 314 } 315 316 /// Flatten the execution of this future when the successful result of this 317 /// future is a stream. 318 /// 319 /// This can be useful when stream initialization is deferred, and it is 320 /// convenient to work with that stream as if stream was available at the 321 /// call site. 322 /// 323 /// Note that this function consumes this future and returns a wrapped 324 /// version of it. 325 /// 326 /// # Examples 327 /// 328 /// ``` 329 /// # futures::executor::block_on(async { 330 /// use futures::future::FutureExt; 331 /// use futures::stream::{self, StreamExt}; 332 /// 333 /// let stream_items = vec![17, 18, 19]; 334 /// let future_of_a_stream = async { stream::iter(stream_items) }; 335 /// 336 /// let stream = future_of_a_stream.flatten_stream(); 337 /// let list: Vec<_> = stream.collect().await; 338 /// assert_eq!(list, vec![17, 18, 19]); 339 /// # }); 340 /// ``` flatten_stream(self) -> FlattenStream<Self> where Self::Output: Stream, Self: Sized,341 fn flatten_stream(self) -> FlattenStream<Self> 342 where 343 Self::Output: Stream, 344 Self: Sized, 345 { 346 assert_stream::<<Self::Output as Stream>::Item, _>(FlattenStream::new(self)) 347 } 348 349 /// Fuse a future such that `poll` will never again be called once it has 350 /// completed. This method can be used to turn any `Future` into a 351 /// `FusedFuture`. 352 /// 353 /// Normally, once a future has returned `Poll::Ready` from `poll`, 354 /// any further calls could exhibit bad behavior such as blocking 355 /// forever, panicking, never returning, etc. If it is known that `poll` 356 /// may be called too often then this method can be used to ensure that it 357 /// has defined semantics. 358 /// 359 /// If a `fuse`d future is `poll`ed after having returned `Poll::Ready` 360 /// previously, it will return `Poll::Pending`, from `poll` again (and will 361 /// continue to do so for all future calls to `poll`). 362 /// 363 /// This combinator will drop the underlying future as soon as it has been 364 /// completed to ensure resources are reclaimed as soon as possible. fuse(self) -> Fuse<Self> where Self: Sized,365 fn fuse(self) -> Fuse<Self> 366 where 367 Self: Sized, 368 { 369 let f = Fuse::new(self); 370 assert_future::<Self::Output, _>(f) 371 } 372 373 /// Do something with the output of a future before passing it on. 374 /// 375 /// When using futures, you'll often chain several of them together. While 376 /// working on such code, you might want to check out what's happening at 377 /// various parts in the pipeline, without consuming the intermediate 378 /// value. To do that, insert a call to `inspect`. 379 /// 380 /// # Examples 381 /// 382 /// ``` 383 /// # futures::executor::block_on(async { 384 /// use futures::future::FutureExt; 385 /// 386 /// let future = async { 1 }; 387 /// let new_future = future.inspect(|&x| println!("about to resolve: {}", x)); 388 /// assert_eq!(new_future.await, 1); 389 /// # }); 390 /// ``` inspect<F>(self, f: F) -> Inspect<Self, F> where F: FnOnce(&Self::Output), Self: Sized,391 fn inspect<F>(self, f: F) -> Inspect<Self, F> 392 where 393 F: FnOnce(&Self::Output), 394 Self: Sized, 395 { 396 assert_future::<Self::Output, _>(Inspect::new(self, f)) 397 } 398 399 /// Catches unwinding panics while polling the future. 400 /// 401 /// In general, panics within a future can propagate all the way out to the 402 /// task level. This combinator makes it possible to halt unwinding within 403 /// the future itself. It's most commonly used within task executors. It's 404 /// not recommended to use this for error handling. 405 /// 406 /// Note that this method requires the `UnwindSafe` bound from the standard 407 /// library. This isn't always applied automatically, and the standard 408 /// library provides an `AssertUnwindSafe` wrapper type to apply it 409 /// after-the fact. To assist using this method, the `Future` trait is also 410 /// implemented for `AssertUnwindSafe<F>` where `F` implements `Future`. 411 /// 412 /// This method is only available when the `std` feature of this 413 /// library is activated, and it is activated by default. 414 /// 415 /// # Examples 416 /// 417 /// ``` 418 /// # futures::executor::block_on(async { 419 /// use futures::future::{self, FutureExt, Ready}; 420 /// 421 /// let future = future::ready(2); 422 /// assert!(future.catch_unwind().await.is_ok()); 423 /// 424 /// let future = future::lazy(|_| -> Ready<i32> { 425 /// unimplemented!() 426 /// }); 427 /// assert!(future.catch_unwind().await.is_err()); 428 /// # }); 429 /// ``` 430 #[cfg(feature = "std")] catch_unwind(self) -> CatchUnwind<Self> where Self: Sized + ::std::panic::UnwindSafe,431 fn catch_unwind(self) -> CatchUnwind<Self> 432 where 433 Self: Sized + ::std::panic::UnwindSafe, 434 { 435 assert_future::<Result<Self::Output, Box<dyn std::any::Any + Send>>, _>(CatchUnwind::new( 436 self, 437 )) 438 } 439 440 /// Create a cloneable handle to this future where all handles will resolve 441 /// to the same result. 442 /// 443 /// The `shared` combinator method provides a method to convert any future 444 /// into a cloneable future. It enables a future to be polled by multiple 445 /// threads. 446 /// 447 /// This method is only available when the `std` feature of this 448 /// library is activated, and it is activated by default. 449 /// 450 /// # Examples 451 /// 452 /// ``` 453 /// # futures::executor::block_on(async { 454 /// use futures::future::FutureExt; 455 /// 456 /// let future = async { 6 }; 457 /// let shared1 = future.shared(); 458 /// let shared2 = shared1.clone(); 459 /// 460 /// assert_eq!(6, shared1.await); 461 /// assert_eq!(6, shared2.await); 462 /// # }); 463 /// ``` 464 /// 465 /// ``` 466 /// // Note, unlike most examples this is written in the context of a 467 /// // synchronous function to better illustrate the cross-thread aspect of 468 /// // the `shared` combinator. 469 /// 470 /// # futures::executor::block_on(async { 471 /// use futures::future::FutureExt; 472 /// use futures::executor::block_on; 473 /// use std::thread; 474 /// 475 /// let future = async { 6 }; 476 /// let shared1 = future.shared(); 477 /// let shared2 = shared1.clone(); 478 /// let join_handle = thread::spawn(move || { 479 /// assert_eq!(6, block_on(shared2)); 480 /// }); 481 /// assert_eq!(6, shared1.await); 482 /// join_handle.join().unwrap(); 483 /// # }); 484 /// ``` 485 #[cfg(feature = "std")] shared(self) -> Shared<Self> where Self: Sized, Self::Output: Clone,486 fn shared(self) -> Shared<Self> 487 where 488 Self: Sized, 489 Self::Output: Clone, 490 { 491 assert_future::<Self::Output, _>(Shared::new(self)) 492 } 493 494 /// Turn this future into a future that yields `()` on completion and sends 495 /// its output to another future on a separate task. 496 /// 497 /// This can be used with spawning executors to easily retrieve the result 498 /// of a future executing on a separate task or thread. 499 /// 500 /// This method is only available when the `std` feature of this 501 /// library is activated, and it is activated by default. 502 #[cfg(feature = "channel")] 503 #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] 504 #[cfg(feature = "std")] remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>) where Self: Sized,505 fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>) 506 where 507 Self: Sized, 508 { 509 let (wrapped, handle) = remote_handle::remote_handle(self); 510 (assert_future::<(), _>(wrapped), handle) 511 } 512 513 /// Wrap the future in a Box, pinning it. 514 /// 515 /// This method is only available when the `std` or `alloc` feature of this 516 /// library is activated, and it is activated by default. 517 #[cfg(feature = "alloc")] boxed<'a>(self) -> BoxFuture<'a, Self::Output> where Self: Sized + Send + 'a,518 fn boxed<'a>(self) -> BoxFuture<'a, Self::Output> 519 where 520 Self: Sized + Send + 'a, 521 { 522 assert_future::<Self::Output, _>(Box::pin(self)) 523 } 524 525 /// Wrap the future in a Box, pinning it. 526 /// 527 /// Similar to `boxed`, but without the `Send` requirement. 528 /// 529 /// This method is only available when the `std` or `alloc` feature of this 530 /// library is activated, and it is activated by default. 531 #[cfg(feature = "alloc")] boxed_local<'a>(self) -> LocalBoxFuture<'a, Self::Output> where Self: Sized + 'a,532 fn boxed_local<'a>(self) -> LocalBoxFuture<'a, Self::Output> 533 where 534 Self: Sized + 'a, 535 { 536 assert_future::<Self::Output, _>(Box::pin(self)) 537 } 538 539 /// Turns a [`Future<Output = T>`](Future) into a 540 /// [`TryFuture<Ok = T, Error = ()`>](futures_core::future::TryFuture). unit_error(self) -> UnitError<Self> where Self: Sized,541 fn unit_error(self) -> UnitError<Self> 542 where 543 Self: Sized, 544 { 545 assert_future::<Result<Self::Output, ()>, _>(UnitError::new(self)) 546 } 547 548 /// Turns a [`Future<Output = T>`](Future) into a 549 /// [`TryFuture<Ok = T, Error = Never`>](futures_core::future::TryFuture). never_error(self) -> NeverError<Self> where Self: Sized,550 fn never_error(self) -> NeverError<Self> 551 where 552 Self: Sized, 553 { 554 assert_future::<Result<Self::Output, Never>, _>(NeverError::new(self)) 555 } 556 557 /// A convenience for calling `Future::poll` on `Unpin` future types. poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output> where Self: Unpin,558 fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output> 559 where 560 Self: Unpin, 561 { 562 Pin::new(self).poll(cx) 563 } 564 565 /// Evaluates and consumes the future, returning the resulting output if 566 /// the future is ready after the first call to `Future::poll`. 567 /// 568 /// If `poll` instead returns `Poll::Pending`, `None` is returned. 569 /// 570 /// This method is useful in cases where immediacy is more important than 571 /// waiting for a result. It is also convenient for quickly obtaining 572 /// the value of a future that is known to always resolve immediately. 573 /// 574 /// # Examples 575 /// 576 /// ``` 577 /// # use futures::prelude::*; 578 /// use futures::{future::ready, future::pending}; 579 /// let future_ready = ready("foobar"); 580 /// let future_pending = pending::<&'static str>(); 581 /// 582 /// assert_eq!(future_ready.now_or_never(), Some("foobar")); 583 /// assert_eq!(future_pending.now_or_never(), None); 584 /// ``` 585 /// 586 /// In cases where it is absolutely known that a future should always 587 /// resolve immediately and never return `Poll::Pending`, this method can 588 /// be combined with `expect()`: 589 /// 590 /// ``` 591 /// # use futures::{prelude::*, future::ready}; 592 /// let future_ready = ready("foobar"); 593 /// 594 /// assert_eq!(future_ready.now_or_never().expect("Future not ready"), "foobar"); 595 /// ``` now_or_never(self) -> Option<Self::Output> where Self: Sized,596 fn now_or_never(self) -> Option<Self::Output> 597 where 598 Self: Sized, 599 { 600 let noop_waker = crate::task::noop_waker(); 601 let mut cx = Context::from_waker(&noop_waker); 602 603 let this = self; 604 pin_mut!(this); 605 match this.poll(&mut cx) { 606 Poll::Ready(x) => Some(x), 607 _ => None, 608 } 609 } 610 } 611