1 //! Streams 2 //! 3 //! This module contains a number of functions for working with `Streams`s 4 //! that return `Result`s, allowing for short-circuiting computations. 5 6 #[cfg(feature = "compat")] 7 use crate::compat::Compat; 8 use core::pin::Pin; 9 use futures_core::{ 10 future::{Future, TryFuture}, 11 stream::TryStream, 12 task::{Context, Poll}, 13 }; 14 use crate::fns::{ 15 InspectOkFn, inspect_ok_fn, InspectErrFn, inspect_err_fn, MapErrFn, map_err_fn, IntoFn, into_fn, MapOkFn, map_ok_fn, 16 }; 17 use crate::future::assert_future; 18 use crate::stream::{Map, Inspect}; 19 use crate::stream::assert_stream; 20 21 mod and_then; 22 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 23 pub use self::and_then::AndThen; 24 25 delegate_all!( 26 /// Stream for the [`err_into`](super::TryStreamExt::err_into) method. 27 ErrInto<St, E>( 28 MapErr<St, IntoFn<E>> 29 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())] 30 ); 31 32 delegate_all!( 33 /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method. 34 InspectOk<St, F>( 35 Inspect<IntoStream<St>, InspectOkFn<F>> 36 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))] 37 ); 38 39 delegate_all!( 40 /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method. 41 InspectErr<St, F>( 42 Inspect<IntoStream<St>, InspectErrFn<F>> 43 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))] 44 ); 45 46 mod into_stream; 47 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 48 pub use self::into_stream::IntoStream; 49 50 delegate_all!( 51 /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method. 52 MapOk<St, F>( 53 Map<IntoStream<St>, MapOkFn<F>> 54 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))] 55 ); 56 57 delegate_all!( 58 /// Stream for the [`map_err`](super::TryStreamExt::map_err) method. 59 MapErr<St, F>( 60 Map<IntoStream<St>, MapErrFn<F>> 61 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))] 62 ); 63 64 mod or_else; 65 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 66 pub use self::or_else::OrElse; 67 68 mod try_next; 69 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 70 pub use self::try_next::TryNext; 71 72 mod try_for_each; 73 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 74 pub use self::try_for_each::TryForEach; 75 76 mod try_filter; 77 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 78 pub use self::try_filter::TryFilter; 79 80 mod try_filter_map; 81 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 82 pub use self::try_filter_map::TryFilterMap; 83 84 mod try_flatten; 85 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 86 pub use self::try_flatten::TryFlatten; 87 88 mod try_collect; 89 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 90 pub use self::try_collect::TryCollect; 91 92 mod try_concat; 93 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 94 pub use self::try_concat::TryConcat; 95 96 mod try_fold; 97 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 98 pub use self::try_fold::TryFold; 99 100 mod try_unfold; 101 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 102 pub use self::try_unfold::{try_unfold, TryUnfold}; 103 104 mod try_skip_while; 105 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 106 pub use self::try_skip_while::TrySkipWhile; 107 108 mod try_take_while; 109 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 110 pub use self::try_take_while::TryTakeWhile; 111 112 cfg_target_has_atomic! { 113 #[cfg(feature = "alloc")] 114 mod try_buffer_unordered; 115 #[cfg(feature = "alloc")] 116 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 117 pub use self::try_buffer_unordered::TryBufferUnordered; 118 119 #[cfg(feature = "alloc")] 120 mod try_buffered; 121 #[cfg(feature = "alloc")] 122 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 123 pub use self::try_buffered::TryBuffered; 124 125 #[cfg(feature = "alloc")] 126 mod try_for_each_concurrent; 127 #[cfg(feature = "alloc")] 128 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 129 pub use self::try_for_each_concurrent::TryForEachConcurrent; 130 } 131 132 #[cfg(feature = "io")] 133 #[cfg(feature = "std")] 134 mod into_async_read; 135 #[cfg(feature = "io")] 136 #[cfg_attr(docsrs, doc(cfg(feature = "io")))] 137 #[cfg(feature = "std")] 138 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 139 pub use self::into_async_read::IntoAsyncRead; 140 141 impl<S: ?Sized + TryStream> TryStreamExt for S {} 142 143 /// Adapters specific to `Result`-returning streams 144 pub trait TryStreamExt: TryStream { 145 /// Wraps the current stream in a new stream which converts the error type 146 /// into the one provided. 147 /// 148 /// # Examples 149 /// 150 /// ``` 151 /// # futures::executor::block_on(async { 152 /// use futures::stream::{self, TryStreamExt}; 153 /// 154 /// let mut stream = 155 /// stream::iter(vec![Ok(()), Err(5i32)]) 156 /// .err_into::<i64>(); 157 /// 158 /// assert_eq!(stream.try_next().await, Ok(Some(()))); 159 /// assert_eq!(stream.try_next().await, Err(5i64)); 160 /// # }) 161 /// ``` err_into<E>(self) -> ErrInto<Self, E> where Self: Sized, Self::Error: Into<E>,162 fn err_into<E>(self) -> ErrInto<Self, E> 163 where 164 Self: Sized, 165 Self::Error: Into<E>, 166 { 167 assert_stream::<Result<Self::Ok, E>, _>(ErrInto::new(self)) 168 } 169 170 /// Wraps the current stream in a new stream which maps the success value 171 /// using the provided closure. 172 /// 173 /// # Examples 174 /// 175 /// ``` 176 /// # futures::executor::block_on(async { 177 /// use futures::stream::{self, TryStreamExt}; 178 /// 179 /// let mut stream = 180 /// stream::iter(vec![Ok(5), Err(0)]) 181 /// .map_ok(|x| x + 2); 182 /// 183 /// assert_eq!(stream.try_next().await, Ok(Some(7))); 184 /// assert_eq!(stream.try_next().await, Err(0)); 185 /// # }) 186 /// ``` map_ok<T, F>(self, f: F) -> MapOk<Self, F> where Self: Sized, F: FnMut(Self::Ok) -> T,187 fn map_ok<T, F>(self, f: F) -> MapOk<Self, F> 188 where 189 Self: Sized, 190 F: FnMut(Self::Ok) -> T, 191 { 192 assert_stream::<Result<T, Self::Error>, _>(MapOk::new(self, f)) 193 } 194 195 /// Wraps the current stream in a new stream which maps the error value 196 /// using the provided closure. 197 /// 198 /// # Examples 199 /// 200 /// ``` 201 /// # futures::executor::block_on(async { 202 /// use futures::stream::{self, TryStreamExt}; 203 /// 204 /// let mut stream = 205 /// stream::iter(vec![Ok(5), Err(0)]) 206 /// .map_err(|x| x + 2); 207 /// 208 /// assert_eq!(stream.try_next().await, Ok(Some(5))); 209 /// assert_eq!(stream.try_next().await, Err(2)); 210 /// # }) 211 /// ``` map_err<E, F>(self, f: F) -> MapErr<Self, F> where Self: Sized, F: FnMut(Self::Error) -> E,212 fn map_err<E, F>(self, f: F) -> MapErr<Self, F> 213 where 214 Self: Sized, 215 F: FnMut(Self::Error) -> E, 216 { 217 assert_stream::<Result<Self::Ok, E>, _>(MapErr::new(self, f)) 218 } 219 220 /// Chain on a computation for when a value is ready, passing the successful 221 /// results to the provided closure `f`. 222 /// 223 /// This function can be used to run a unit of work when the next successful 224 /// value on a stream is ready. The closure provided will be yielded a value 225 /// when ready, and the returned future will then be run to completion to 226 /// produce the next value on this stream. 227 /// 228 /// Any errors produced by this stream will not be passed to the closure, 229 /// and will be passed through. 230 /// 231 /// The returned value of the closure must implement the `TryFuture` trait 232 /// and can represent some more work to be done before the composed stream 233 /// is finished. 234 /// 235 /// Note that this function consumes the receiving stream and returns a 236 /// wrapped version of it. 237 /// 238 /// To process the entire stream and return a single future representing 239 /// success or error, use `try_for_each` instead. 240 /// 241 /// # Examples 242 /// 243 /// ``` 244 /// use futures::channel::mpsc; 245 /// use futures::future; 246 /// use futures::stream::TryStreamExt; 247 /// 248 /// let (_tx, rx) = mpsc::channel::<Result<i32, ()>>(1); 249 /// 250 /// let rx = rx.and_then(|result| { 251 /// future::ok(if result % 2 == 0 { 252 /// Some(result) 253 /// } else { 254 /// None 255 /// }) 256 /// }); 257 /// ``` and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: TryFuture<Error = Self::Error>, Self: Sized,258 fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F> 259 where 260 F: FnMut(Self::Ok) -> Fut, 261 Fut: TryFuture<Error = Self::Error>, 262 Self: Sized, 263 { 264 assert_stream::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f)) 265 } 266 267 /// Chain on a computation for when an error happens, passing the 268 /// erroneous result to the provided closure `f`. 269 /// 270 /// This function can be used to run a unit of work and attempt to recover from 271 /// an error if one happens. The closure provided will be yielded an error 272 /// when one appears, and the returned future will then be run to completion 273 /// to produce the next value on this stream. 274 /// 275 /// Any successful values produced by this stream will not be passed to the 276 /// closure, and will be passed through. 277 /// 278 /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait 279 /// and can represent some more work to be done before the composed stream 280 /// is finished. 281 /// 282 /// Note that this function consumes the receiving stream and returns a 283 /// wrapped version of it. or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F> where F: FnMut(Self::Error) -> Fut, Fut: TryFuture<Ok = Self::Ok>, Self: Sized,284 fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F> 285 where 286 F: FnMut(Self::Error) -> Fut, 287 Fut: TryFuture<Ok = Self::Ok>, 288 Self: Sized, 289 { 290 assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f)) 291 } 292 293 /// Do something with the success value of this stream, afterwards passing 294 /// it on. 295 /// 296 /// This is similar to the `StreamExt::inspect` method where it allows 297 /// easily inspecting the success value as it passes through the stream, for 298 /// example to debug what's going on. inspect_ok<F>(self, f: F) -> InspectOk<Self, F> where F: FnMut(&Self::Ok), Self: Sized,299 fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F> 300 where 301 F: FnMut(&Self::Ok), 302 Self: Sized, 303 { 304 assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f)) 305 } 306 307 /// Do something with the error value of this stream, afterwards passing it on. 308 /// 309 /// This is similar to the `StreamExt::inspect` method where it allows 310 /// easily inspecting the error value as it passes through the stream, for 311 /// example to debug what's going on. inspect_err<F>(self, f: F) -> InspectErr<Self, F> where F: FnMut(&Self::Error), Self: Sized,312 fn inspect_err<F>(self, f: F) -> InspectErr<Self, F> 313 where 314 F: FnMut(&Self::Error), 315 Self: Sized, 316 { 317 assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f)) 318 } 319 320 /// Wraps a [`TryStream`] into a type that implements 321 /// [`Stream`](futures_core::stream::Stream) 322 /// 323 /// [`TryStream`]s currently do not implement the 324 /// [`Stream`](futures_core::stream::Stream) trait because of limitations 325 /// of the compiler. 326 /// 327 /// # Examples 328 /// 329 /// ``` 330 /// use futures::stream::{Stream, TryStream, TryStreamExt}; 331 /// 332 /// # type T = i32; 333 /// # type E = (); 334 /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> { // ... } 335 /// # futures::stream::empty() 336 /// # } 337 /// fn take_stream(stream: impl Stream<Item = Result<T, E>>) { /* ... */ } 338 /// 339 /// take_stream(make_try_stream().into_stream()); 340 /// ``` into_stream(self) -> IntoStream<Self> where Self: Sized,341 fn into_stream(self) -> IntoStream<Self> 342 where 343 Self: Sized, 344 { 345 assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self)) 346 } 347 348 /// Creates a future that attempts to resolve the next item in the stream. 349 /// If an error is encountered before the next item, the error is returned 350 /// instead. 351 /// 352 /// This is similar to the `Stream::next` combinator, but returns a 353 /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making 354 /// for easy use with the `?` operator. 355 /// 356 /// # Examples 357 /// 358 /// ``` 359 /// # futures::executor::block_on(async { 360 /// use futures::stream::{self, TryStreamExt}; 361 /// 362 /// let mut stream = stream::iter(vec![Ok(()), Err(())]); 363 /// 364 /// assert_eq!(stream.try_next().await, Ok(Some(()))); 365 /// assert_eq!(stream.try_next().await, Err(())); 366 /// # }) 367 /// ``` try_next(&mut self) -> TryNext<'_, Self> where Self: Unpin,368 fn try_next(&mut self) -> TryNext<'_, Self> 369 where 370 Self: Unpin, 371 { 372 assert_future::<Result<Option<Self::Ok>, Self::Error>, _>(TryNext::new(self)) 373 } 374 375 /// Attempts to run this stream to completion, executing the provided 376 /// asynchronous closure for each element on the stream. 377 /// 378 /// The provided closure will be called for each item this stream produces, 379 /// yielding a future. That future will then be executed to completion 380 /// before moving on to the next item. 381 /// 382 /// The returned value is a [`Future`](futures_core::future::Future) where the 383 /// [`Output`](futures_core::future::Future::Output) type is 384 /// `Result<(), Self::Error>`. If any of the intermediate 385 /// futures or the stream returns an error, this future will return 386 /// immediately with an error. 387 /// 388 /// # Examples 389 /// 390 /// ``` 391 /// # futures::executor::block_on(async { 392 /// use futures::future; 393 /// use futures::stream::{self, TryStreamExt}; 394 /// 395 /// let mut x = 0i32; 396 /// 397 /// { 398 /// let fut = stream::repeat(Ok(1)).try_for_each(|item| { 399 /// x += item; 400 /// future::ready(if x == 3 { Err(()) } else { Ok(()) }) 401 /// }); 402 /// assert_eq!(fut.await, Err(())); 403 /// } 404 /// 405 /// assert_eq!(x, 3); 406 /// # }) 407 /// ``` try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: TryFuture<Ok = (), Error = Self::Error>, Self: Sized,408 fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F> 409 where 410 F: FnMut(Self::Ok) -> Fut, 411 Fut: TryFuture<Ok = (), Error = Self::Error>, 412 Self: Sized, 413 { 414 assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f)) 415 } 416 417 /// Skip elements on this stream while the provided asynchronous predicate 418 /// resolves to `true`. 419 /// 420 /// This function is similar to 421 /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits 422 /// early if an error occurs. 423 /// 424 /// # Examples 425 /// 426 /// ``` 427 /// # futures::executor::block_on(async { 428 /// use futures::future; 429 /// use futures::stream::{self, TryStreamExt}; 430 /// 431 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)]); 432 /// let stream = stream.try_skip_while(|x| future::ready(Ok(*x < 3))); 433 /// 434 /// let output: Result<Vec<i32>, i32> = stream.try_collect().await; 435 /// assert_eq!(output, Ok(vec![3, 2])); 436 /// # }) 437 /// ``` try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F> where F: FnMut(&Self::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized,438 fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F> 439 where 440 F: FnMut(&Self::Ok) -> Fut, 441 Fut: TryFuture<Ok = bool, Error = Self::Error>, 442 Self: Sized, 443 { 444 assert_stream::<Result<Self::Ok, Self::Error>, _>(TrySkipWhile::new(self, f)) 445 } 446 447 /// Take elements on this stream while the provided asynchronous predicate 448 /// resolves to `true`. 449 /// 450 /// This function is similar to 451 /// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits 452 /// early if an error occurs. 453 /// 454 /// # Examples 455 /// 456 /// ``` 457 /// # futures::executor::block_on(async { 458 /// use futures::future; 459 /// use futures::stream::{self, TryStreamExt}; 460 /// 461 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]); 462 /// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3))); 463 /// 464 /// let output: Result<Vec<i32>, i32> = stream.try_collect().await; 465 /// assert_eq!(output, Ok(vec![1, 2])); 466 /// # }) 467 /// ``` try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F> where F: FnMut(&Self::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized,468 fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F> 469 where 470 F: FnMut(&Self::Ok) -> Fut, 471 Fut: TryFuture<Ok = bool, Error = Self::Error>, 472 Self: Sized, 473 { 474 assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f)) 475 } 476 477 /// Attempts to run this stream to completion, executing the provided asynchronous 478 /// closure for each element on the stream concurrently as elements become 479 /// available, exiting as soon as an error occurs. 480 /// 481 /// This is similar to 482 /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent), 483 /// but will resolve to an error immediately if the underlying stream or the provided 484 /// closure return an error. 485 /// 486 /// This method is only available when the `std` or `alloc` feature of this 487 /// library is activated, and it is activated by default. 488 /// 489 /// # Examples 490 /// 491 /// ``` 492 /// # futures::executor::block_on(async { 493 /// use futures::channel::oneshot; 494 /// use futures::stream::{self, StreamExt, TryStreamExt}; 495 /// 496 /// let (tx1, rx1) = oneshot::channel(); 497 /// let (tx2, rx2) = oneshot::channel(); 498 /// let (_tx3, rx3) = oneshot::channel(); 499 /// 500 /// let stream = stream::iter(vec![rx1, rx2, rx3]); 501 /// let fut = stream.map(Ok).try_for_each_concurrent( 502 /// /* limit */ 2, 503 /// |rx| async move { 504 /// let res: Result<(), oneshot::Canceled> = rx.await; 505 /// res 506 /// } 507 /// ); 508 /// 509 /// tx1.send(()).unwrap(); 510 /// // Drop the second sender so that `rx2` resolves to `Canceled`. 511 /// drop(tx2); 512 /// 513 /// // The final result is an error because the second future 514 /// // resulted in an error. 515 /// assert_eq!(Err(oneshot::Canceled), fut.await); 516 /// # }) 517 /// ``` 518 #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] 519 #[cfg(feature = "alloc")] try_for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> TryForEachConcurrent<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: Future<Output = Result<(), Self::Error>>, Self: Sized,520 fn try_for_each_concurrent<Fut, F>( 521 self, 522 limit: impl Into<Option<usize>>, 523 f: F, 524 ) -> TryForEachConcurrent<Self, Fut, F> 525 where 526 F: FnMut(Self::Ok) -> Fut, 527 Fut: Future<Output = Result<(), Self::Error>>, 528 Self: Sized, 529 { 530 assert_future::<Result<(), Self::Error>, _>(TryForEachConcurrent::new( 531 self, 532 limit.into(), 533 f, 534 )) 535 } 536 537 /// Attempt to transform a stream into a collection, 538 /// returning a future representing the result of that computation. 539 /// 540 /// This combinator will collect all successful results of this stream and 541 /// collect them into the specified collection type. If an error happens then all 542 /// collected elements will be dropped and the error will be returned. 543 /// 544 /// The returned future will be resolved when the stream terminates. 545 /// 546 /// # Examples 547 /// 548 /// ``` 549 /// # futures::executor::block_on(async { 550 /// use futures::channel::mpsc; 551 /// use futures::stream::TryStreamExt; 552 /// use std::thread; 553 /// 554 /// let (tx, rx) = mpsc::unbounded(); 555 /// 556 /// thread::spawn(move || { 557 /// for i in 1..=5 { 558 /// tx.unbounded_send(Ok(i)).unwrap(); 559 /// } 560 /// tx.unbounded_send(Err(6)).unwrap(); 561 /// }); 562 /// 563 /// let output: Result<Vec<i32>, i32> = rx.try_collect().await; 564 /// assert_eq!(output, Err(6)); 565 /// # }) 566 /// ``` try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C> where Self: Sized,567 fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C> 568 where 569 Self: Sized, 570 { 571 assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self)) 572 } 573 574 /// Attempt to filter the values produced by this stream according to the 575 /// provided asynchronous closure. 576 /// 577 /// As values of this stream are made available, the provided predicate `f` 578 /// will be run on them. If the predicate returns a `Future` which resolves 579 /// to `true`, then the stream will yield the value, but if the predicate 580 /// return a `Future` which resolves to `false`, then the value will be 581 /// discarded and the next value will be produced. 582 /// 583 /// All errors are passed through without filtering in this combinator. 584 /// 585 /// Note that this function consumes the stream passed into it and returns a 586 /// wrapped version of it, similar to the existing `filter` methods in 587 /// the standard library. 588 /// 589 /// # Examples 590 /// ``` 591 /// # futures::executor::block_on(async { 592 /// use futures::future; 593 /// use futures::stream::{self, StreamExt, TryStreamExt}; 594 /// 595 /// let stream = stream::iter(vec![Ok(1i32), Ok(2i32), Ok(3i32), Err("error")]); 596 /// let mut evens = stream.try_filter(|x| { 597 /// future::ready(x % 2 == 0) 598 /// }); 599 /// 600 /// assert_eq!(evens.next().await, Some(Ok(2))); 601 /// assert_eq!(evens.next().await, Some(Err("error"))); 602 /// # }) 603 /// ``` try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F> where Fut: Future<Output = bool>, F: FnMut(&Self::Ok) -> Fut, Self: Sized,604 fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F> 605 where 606 Fut: Future<Output = bool>, 607 F: FnMut(&Self::Ok) -> Fut, 608 Self: Sized, 609 { 610 assert_stream::<Result<Self::Ok, Self::Error>, _>(TryFilter::new(self, f)) 611 } 612 613 /// Attempt to filter the values produced by this stream while 614 /// simultaneously mapping them to a different type according to the 615 /// provided asynchronous closure. 616 /// 617 /// As values of this stream are made available, the provided function will 618 /// be run on them. If the future returned by the predicate `f` resolves to 619 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if 620 /// it resolves to [`None`] then the next value will be produced. 621 /// 622 /// All errors are passed through without filtering in this combinator. 623 /// 624 /// Note that this function consumes the stream passed into it and returns a 625 /// wrapped version of it, similar to the existing `filter_map` methods in 626 /// the standard library. 627 /// 628 /// # Examples 629 /// ``` 630 /// # futures::executor::block_on(async { 631 /// use futures::stream::{self, StreamExt, TryStreamExt}; 632 /// use futures::pin_mut; 633 /// 634 /// let stream = stream::iter(vec![Ok(1i32), Ok(6i32), Err("error")]); 635 /// let halves = stream.try_filter_map(|x| async move { 636 /// let ret = if x % 2 == 0 { Some(x / 2) } else { None }; 637 /// Ok(ret) 638 /// }); 639 /// 640 /// pin_mut!(halves); 641 /// assert_eq!(halves.next().await, Some(Ok(3))); 642 /// assert_eq!(halves.next().await, Some(Err("error"))); 643 /// # }) 644 /// ``` try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F> where Fut: TryFuture<Ok = Option<T>, Error = Self::Error>, F: FnMut(Self::Ok) -> Fut, Self: Sized,645 fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F> 646 where 647 Fut: TryFuture<Ok = Option<T>, Error = Self::Error>, 648 F: FnMut(Self::Ok) -> Fut, 649 Self: Sized, 650 { 651 assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f)) 652 } 653 654 /// Flattens a stream of streams into just one continuous stream. 655 /// 656 /// If this stream's elements are themselves streams then this combinator 657 /// will flatten out the entire stream to one long chain of elements. Any 658 /// errors are passed through without looking at them, but otherwise each 659 /// individual stream will get exhausted before moving on to the next. 660 /// 661 /// # Examples 662 /// 663 /// ``` 664 /// # futures::executor::block_on(async { 665 /// use futures::channel::mpsc; 666 /// use futures::stream::{StreamExt, TryStreamExt}; 667 /// use std::thread; 668 /// 669 /// let (tx1, rx1) = mpsc::unbounded(); 670 /// let (tx2, rx2) = mpsc::unbounded(); 671 /// let (tx3, rx3) = mpsc::unbounded(); 672 /// 673 /// thread::spawn(move || { 674 /// tx1.unbounded_send(Ok(1)).unwrap(); 675 /// }); 676 /// thread::spawn(move || { 677 /// tx2.unbounded_send(Ok(2)).unwrap(); 678 /// tx2.unbounded_send(Err(3)).unwrap(); 679 /// }); 680 /// thread::spawn(move || { 681 /// tx3.unbounded_send(Ok(rx1)).unwrap(); 682 /// tx3.unbounded_send(Ok(rx2)).unwrap(); 683 /// tx3.unbounded_send(Err(4)).unwrap(); 684 /// }); 685 /// 686 /// let mut stream = rx3.try_flatten(); 687 /// assert_eq!(stream.next().await, Some(Ok(1))); 688 /// assert_eq!(stream.next().await, Some(Ok(2))); 689 /// assert_eq!(stream.next().await, Some(Err(3))); 690 /// # }); 691 /// ``` try_flatten(self) -> TryFlatten<Self> where Self::Ok: TryStream, <Self::Ok as TryStream>::Error: From<Self::Error>, Self: Sized,692 fn try_flatten(self) -> TryFlatten<Self> 693 where 694 Self::Ok: TryStream, 695 <Self::Ok as TryStream>::Error: From<Self::Error>, 696 Self: Sized, 697 { 698 assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>( 699 TryFlatten::new(self), 700 ) 701 } 702 703 /// Attempt to execute an accumulating asynchronous computation over a 704 /// stream, collecting all the values into one final result. 705 /// 706 /// This combinator will accumulate all values returned by this stream 707 /// according to the closure provided. The initial state is also provided to 708 /// this method and then is returned again by each execution of the closure. 709 /// Once the entire stream has been exhausted the returned future will 710 /// resolve to this value. 711 /// 712 /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but will 713 /// exit early if an error is encountered in either the stream or the 714 /// provided closure. 715 /// 716 /// # Examples 717 /// 718 /// ``` 719 /// # futures::executor::block_on(async { 720 /// use futures::stream::{self, TryStreamExt}; 721 /// 722 /// let number_stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]); 723 /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok(acc + x) }); 724 /// assert_eq!(sum.await, Ok(3)); 725 /// 726 /// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]); 727 /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x) }); 728 /// assert_eq!(sum.await, Err(2)); 729 /// # }) 730 /// ``` try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F> where F: FnMut(T, Self::Ok) -> Fut, Fut: TryFuture<Ok = T, Error = Self::Error>, Self: Sized,731 fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F> 732 where 733 F: FnMut(T, Self::Ok) -> Fut, 734 Fut: TryFuture<Ok = T, Error = Self::Error>, 735 Self: Sized, 736 { 737 assert_future::<Result<T, Self::Error>, _>(TryFold::new(self, f, init)) 738 } 739 740 /// Attempt to concatenate all items of a stream into a single 741 /// extendable destination, returning a future representing the end result. 742 /// 743 /// This combinator will extend the first item with the contents of all 744 /// the subsequent successful results of the stream. If the stream is empty, 745 /// the default value will be returned. 746 /// 747 /// Works with all collections that implement the [`Extend`](std::iter::Extend) trait. 748 /// 749 /// This method is similar to [`concat`](crate::stream::StreamExt::concat), but will 750 /// exit early if an error is encountered in the stream. 751 /// 752 /// # Examples 753 /// 754 /// ``` 755 /// # futures::executor::block_on(async { 756 /// use futures::channel::mpsc; 757 /// use futures::stream::TryStreamExt; 758 /// use std::thread; 759 /// 760 /// let (tx, rx) = mpsc::unbounded::<Result<Vec<i32>, ()>>(); 761 /// 762 /// thread::spawn(move || { 763 /// for i in (0..3).rev() { 764 /// let n = i * 3; 765 /// tx.unbounded_send(Ok(vec![n + 1, n + 2, n + 3])).unwrap(); 766 /// } 767 /// }); 768 /// 769 /// let result = rx.try_concat().await; 770 /// 771 /// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3])); 772 /// # }); 773 /// ``` try_concat(self) -> TryConcat<Self> where Self: Sized, Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,774 fn try_concat(self) -> TryConcat<Self> 775 where 776 Self: Sized, 777 Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default, 778 { 779 assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self)) 780 } 781 782 /// Attempt to execute several futures from a stream concurrently (unordered). 783 /// 784 /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type 785 /// that matches the stream's `Error` type. 786 /// 787 /// This adaptor will buffer up to `n` futures and then return their 788 /// outputs in the order in which they complete. If the underlying stream 789 /// returns an error, it will be immediately propagated. 790 /// 791 /// The returned stream will be a stream of results, each containing either 792 /// an error or a future's output. An error can be produced either by the 793 /// underlying stream itself or by one of the futures it yielded. 794 /// 795 /// This method is only available when the `std` or `alloc` feature of this 796 /// library is activated, and it is activated by default. 797 /// 798 /// # Examples 799 /// 800 /// Results are returned in the order of completion: 801 /// ``` 802 /// # futures::executor::block_on(async { 803 /// use futures::channel::oneshot; 804 /// use futures::stream::{self, StreamExt, TryStreamExt}; 805 /// 806 /// let (send_one, recv_one) = oneshot::channel(); 807 /// let (send_two, recv_two) = oneshot::channel(); 808 /// 809 /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]); 810 /// 811 /// let mut buffered = stream_of_futures.try_buffer_unordered(10); 812 /// 813 /// send_two.send(2i32)?; 814 /// assert_eq!(buffered.next().await, Some(Ok(2i32))); 815 /// 816 /// send_one.send(1i32)?; 817 /// assert_eq!(buffered.next().await, Some(Ok(1i32))); 818 /// 819 /// assert_eq!(buffered.next().await, None); 820 /// # Ok::<(), i32>(()) }).unwrap(); 821 /// ``` 822 /// 823 /// Errors from the underlying stream itself are propagated: 824 /// ``` 825 /// # futures::executor::block_on(async { 826 /// use futures::channel::mpsc; 827 /// use futures::stream::{StreamExt, TryStreamExt}; 828 /// 829 /// let (sink, stream_of_futures) = mpsc::unbounded(); 830 /// let mut buffered = stream_of_futures.try_buffer_unordered(10); 831 /// 832 /// sink.unbounded_send(Ok(async { Ok(7i32) }))?; 833 /// assert_eq!(buffered.next().await, Some(Ok(7i32))); 834 /// 835 /// sink.unbounded_send(Err("error in the stream"))?; 836 /// assert_eq!(buffered.next().await, Some(Err("error in the stream"))); 837 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); 838 /// ``` 839 #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] 840 #[cfg(feature = "alloc")] try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self> where Self::Ok: TryFuture<Error = Self::Error>, Self: Sized,841 fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self> 842 where 843 Self::Ok: TryFuture<Error = Self::Error>, 844 Self: Sized, 845 { 846 assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>( 847 TryBufferUnordered::new(self, n), 848 ) 849 } 850 851 /// Attempt to execute several futures from a stream concurrently. 852 /// 853 /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type 854 /// that matches the stream's `Error` type. 855 /// 856 /// This adaptor will buffer up to `n` futures and then return their 857 /// outputs in the order. If the underlying stream returns an error, it will 858 /// be immediately propagated. 859 /// 860 /// The returned stream will be a stream of results, each containing either 861 /// an error or a future's output. An error can be produced either by the 862 /// underlying stream itself or by one of the futures it yielded. 863 /// 864 /// This method is only available when the `std` or `alloc` feature of this 865 /// library is activated, and it is activated by default. 866 /// 867 /// # Examples 868 /// 869 /// Results are returned in the order of addition: 870 /// ``` 871 /// # futures::executor::block_on(async { 872 /// use futures::channel::oneshot; 873 /// use futures::future::lazy; 874 /// use futures::stream::{self, StreamExt, TryStreamExt}; 875 /// 876 /// let (send_one, recv_one) = oneshot::channel(); 877 /// let (send_two, recv_two) = oneshot::channel(); 878 /// 879 /// let mut buffered = lazy(move |cx| { 880 /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]); 881 /// 882 /// let mut buffered = stream_of_futures.try_buffered(10); 883 /// 884 /// assert!(buffered.try_poll_next_unpin(cx).is_pending()); 885 /// 886 /// send_two.send(2i32)?; 887 /// assert!(buffered.try_poll_next_unpin(cx).is_pending()); 888 /// Ok::<_, i32>(buffered) 889 /// }).await?; 890 /// 891 /// send_one.send(1i32)?; 892 /// assert_eq!(buffered.next().await, Some(Ok(1i32))); 893 /// assert_eq!(buffered.next().await, Some(Ok(2i32))); 894 /// 895 /// assert_eq!(buffered.next().await, None); 896 /// # Ok::<(), i32>(()) }).unwrap(); 897 /// ``` 898 /// 899 /// Errors from the underlying stream itself are propagated: 900 /// ``` 901 /// # futures::executor::block_on(async { 902 /// use futures::channel::mpsc; 903 /// use futures::stream::{StreamExt, TryStreamExt}; 904 /// 905 /// let (sink, stream_of_futures) = mpsc::unbounded(); 906 /// let mut buffered = stream_of_futures.try_buffered(10); 907 /// 908 /// sink.unbounded_send(Ok(async { Ok(7i32) }))?; 909 /// assert_eq!(buffered.next().await, Some(Ok(7i32))); 910 /// 911 /// sink.unbounded_send(Err("error in the stream"))?; 912 /// assert_eq!(buffered.next().await, Some(Err("error in the stream"))); 913 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); 914 /// ``` 915 #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] 916 #[cfg(feature = "alloc")] try_buffered(self, n: usize) -> TryBuffered<Self> where Self::Ok: TryFuture<Error = Self::Error>, Self: Sized,917 fn try_buffered(self, n: usize) -> TryBuffered<Self> 918 where 919 Self::Ok: TryFuture<Error = Self::Error>, 920 Self: Sized, 921 { 922 assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new(self, n)) 923 } 924 925 // TODO: false positive warning from rustdoc. Verify once #43466 settles 926 // 927 /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`] 928 /// stream types. try_poll_next_unpin( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<Self::Ok, Self::Error>>> where Self: Unpin,929 fn try_poll_next_unpin( 930 &mut self, 931 cx: &mut Context<'_>, 932 ) -> Poll<Option<Result<Self::Ok, Self::Error>>> 933 where 934 Self: Unpin, 935 { 936 Pin::new(self).try_poll_next(cx) 937 } 938 939 /// Wraps a [`TryStream`] into a stream compatible with libraries using 940 /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled. 941 /// ``` 942 /// use futures::future::{FutureExt, TryFutureExt}; 943 /// # let (tx, rx) = futures::channel::oneshot::channel(); 944 /// 945 /// let future03 = async { 946 /// println!("Running on the pool"); 947 /// tx.send(42).unwrap(); 948 /// }; 949 /// 950 /// let future01 = future03 951 /// .unit_error() // Make it a TryFuture 952 /// .boxed() // Make it Unpin 953 /// .compat(); 954 /// 955 /// tokio::run(future01); 956 /// # assert_eq!(42, futures::executor::block_on(rx).unwrap()); 957 /// ``` 958 #[cfg(feature = "compat")] 959 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] compat(self) -> Compat<Self> where Self: Sized + Unpin,960 fn compat(self) -> Compat<Self> 961 where 962 Self: Sized + Unpin, 963 { 964 Compat::new(self) 965 } 966 967 /// Adapter that converts this stream into an [`AsyncRead`](crate::io::AsyncRead). 968 /// 969 /// Note that because `into_async_read` moves the stream, the [`Stream`](futures_core::stream::Stream) type must be 970 /// [`Unpin`]. If you want to use `into_async_read` with a [`!Unpin`](Unpin) stream, you'll 971 /// first have to pin the stream. This can be done by boxing the stream using [`Box::pin`] 972 /// or pinning it to the stack using the `pin_mut!` macro from the `pin_utils` crate. 973 /// 974 /// This method is only available when the `std` feature of this 975 /// library is activated, and it is activated by default. 976 /// 977 /// # Examples 978 /// 979 /// ``` 980 /// # futures::executor::block_on(async { 981 /// use futures::stream::{self, TryStreamExt}; 982 /// use futures::io::AsyncReadExt; 983 /// 984 /// let stream = stream::iter(vec![Ok(vec![1, 2, 3, 4, 5])]); 985 /// let mut reader = stream.into_async_read(); 986 /// let mut buf = Vec::new(); 987 /// 988 /// assert!(reader.read_to_end(&mut buf).await.is_ok()); 989 /// assert_eq!(buf, &[1, 2, 3, 4, 5]); 990 /// # }) 991 /// ``` 992 #[cfg(feature = "io")] 993 #[cfg_attr(docsrs, doc(cfg(feature = "io")))] 994 #[cfg(feature = "std")] into_async_read(self) -> IntoAsyncRead<Self> where Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin, Self::Ok: AsRef<[u8]>,995 fn into_async_read(self) -> IntoAsyncRead<Self> 996 where 997 Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin, 998 Self::Ok: AsRef<[u8]>, 999 { 1000 crate::io::assert_read(IntoAsyncRead::new(self)) 1001 } 1002 } 1003