1 //! Streams 2 //! 3 //! This module contains a number of functions for working with `Streams`s 4 //! that return `Result`s, allowing for short-circuiting computations. 5 6 #[cfg(feature = "compat")] 7 use crate::compat::Compat; 8 use crate::fns::{ 9 inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, InspectErrFn, InspectOkFn, 10 IntoFn, MapErrFn, MapOkFn, 11 }; 12 use crate::future::assert_future; 13 use crate::stream::assert_stream; 14 use crate::stream::{Inspect, Map}; 15 #[cfg(feature = "alloc")] 16 use alloc::vec::Vec; 17 use core::pin::Pin; 18 use futures_core::{ 19 future::{Future, TryFuture}, 20 stream::TryStream, 21 task::{Context, Poll}, 22 }; 23 24 mod and_then; 25 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 26 pub use self::and_then::AndThen; 27 28 delegate_all!( 29 /// Stream for the [`err_into`](super::TryStreamExt::err_into) method. 30 ErrInto<St, E>( 31 MapErr<St, IntoFn<E>> 32 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())] 33 ); 34 35 delegate_all!( 36 /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method. 37 InspectOk<St, F>( 38 Inspect<IntoStream<St>, InspectOkFn<F>> 39 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))] 40 ); 41 42 delegate_all!( 43 /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method. 44 InspectErr<St, F>( 45 Inspect<IntoStream<St>, InspectErrFn<F>> 46 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))] 47 ); 48 49 mod into_stream; 50 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 51 pub use self::into_stream::IntoStream; 52 53 delegate_all!( 54 /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method. 55 MapOk<St, F>( 56 Map<IntoStream<St>, MapOkFn<F>> 57 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))] 58 ); 59 60 delegate_all!( 61 /// Stream for the [`map_err`](super::TryStreamExt::map_err) method. 62 MapErr<St, F>( 63 Map<IntoStream<St>, MapErrFn<F>> 64 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))] 65 ); 66 67 mod or_else; 68 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 69 pub use self::or_else::OrElse; 70 71 mod try_next; 72 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 73 pub use self::try_next::TryNext; 74 75 mod try_for_each; 76 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 77 pub use self::try_for_each::TryForEach; 78 79 mod try_filter; 80 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 81 pub use self::try_filter::TryFilter; 82 83 mod try_filter_map; 84 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 85 pub use self::try_filter_map::TryFilterMap; 86 87 mod try_flatten; 88 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 89 pub use self::try_flatten::TryFlatten; 90 91 mod try_collect; 92 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 93 pub use self::try_collect::TryCollect; 94 95 mod try_concat; 96 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 97 pub use self::try_concat::TryConcat; 98 99 #[cfg(feature = "alloc")] 100 mod try_chunks; 101 #[cfg(feature = "alloc")] 102 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 103 pub use self::try_chunks::{TryChunks, TryChunksError}; 104 105 mod try_fold; 106 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 107 pub use self::try_fold::TryFold; 108 109 mod try_unfold; 110 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 111 pub use self::try_unfold::{try_unfold, TryUnfold}; 112 113 mod try_skip_while; 114 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 115 pub use self::try_skip_while::TrySkipWhile; 116 117 mod try_take_while; 118 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 119 pub use self::try_take_while::TryTakeWhile; 120 121 #[cfg(not(futures_no_atomic_cas))] 122 #[cfg(feature = "alloc")] 123 mod try_buffer_unordered; 124 #[cfg(not(futures_no_atomic_cas))] 125 #[cfg(feature = "alloc")] 126 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 127 pub use self::try_buffer_unordered::TryBufferUnordered; 128 129 #[cfg(not(futures_no_atomic_cas))] 130 #[cfg(feature = "alloc")] 131 mod try_buffered; 132 #[cfg(not(futures_no_atomic_cas))] 133 #[cfg(feature = "alloc")] 134 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 135 pub use self::try_buffered::TryBuffered; 136 137 #[cfg(not(futures_no_atomic_cas))] 138 #[cfg(feature = "alloc")] 139 mod try_for_each_concurrent; 140 #[cfg(not(futures_no_atomic_cas))] 141 #[cfg(feature = "alloc")] 142 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 143 pub use self::try_for_each_concurrent::TryForEachConcurrent; 144 145 #[cfg(feature = "io")] 146 #[cfg(feature = "std")] 147 mod into_async_read; 148 #[cfg(feature = "io")] 149 #[cfg_attr(docsrs, doc(cfg(feature = "io")))] 150 #[cfg(feature = "std")] 151 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 152 pub use self::into_async_read::IntoAsyncRead; 153 154 impl<S: ?Sized + TryStream> TryStreamExt for S {} 155 156 /// Adapters specific to `Result`-returning streams 157 pub trait TryStreamExt: TryStream { 158 /// Wraps the current stream in a new stream which converts the error type 159 /// into the one provided. 160 /// 161 /// # Examples 162 /// 163 /// ``` 164 /// # futures::executor::block_on(async { 165 /// use futures::stream::{self, TryStreamExt}; 166 /// 167 /// let mut stream = 168 /// stream::iter(vec![Ok(()), Err(5i32)]) 169 /// .err_into::<i64>(); 170 /// 171 /// assert_eq!(stream.try_next().await, Ok(Some(()))); 172 /// assert_eq!(stream.try_next().await, Err(5i64)); 173 /// # }) 174 /// ``` err_into<E>(self) -> ErrInto<Self, E> where Self: Sized, Self::Error: Into<E>,175 fn err_into<E>(self) -> ErrInto<Self, E> 176 where 177 Self: Sized, 178 Self::Error: Into<E>, 179 { 180 assert_stream::<Result<Self::Ok, E>, _>(ErrInto::new(self)) 181 } 182 183 /// Wraps the current stream in a new stream which maps the success value 184 /// using the provided closure. 185 /// 186 /// # Examples 187 /// 188 /// ``` 189 /// # futures::executor::block_on(async { 190 /// use futures::stream::{self, TryStreamExt}; 191 /// 192 /// let mut stream = 193 /// stream::iter(vec![Ok(5), Err(0)]) 194 /// .map_ok(|x| x + 2); 195 /// 196 /// assert_eq!(stream.try_next().await, Ok(Some(7))); 197 /// assert_eq!(stream.try_next().await, Err(0)); 198 /// # }) 199 /// ``` map_ok<T, F>(self, f: F) -> MapOk<Self, F> where Self: Sized, F: FnMut(Self::Ok) -> T,200 fn map_ok<T, F>(self, f: F) -> MapOk<Self, F> 201 where 202 Self: Sized, 203 F: FnMut(Self::Ok) -> T, 204 { 205 assert_stream::<Result<T, Self::Error>, _>(MapOk::new(self, f)) 206 } 207 208 /// Wraps the current stream in a new stream which maps the error value 209 /// using the provided closure. 210 /// 211 /// # Examples 212 /// 213 /// ``` 214 /// # futures::executor::block_on(async { 215 /// use futures::stream::{self, TryStreamExt}; 216 /// 217 /// let mut stream = 218 /// stream::iter(vec![Ok(5), Err(0)]) 219 /// .map_err(|x| x + 2); 220 /// 221 /// assert_eq!(stream.try_next().await, Ok(Some(5))); 222 /// assert_eq!(stream.try_next().await, Err(2)); 223 /// # }) 224 /// ``` map_err<E, F>(self, f: F) -> MapErr<Self, F> where Self: Sized, F: FnMut(Self::Error) -> E,225 fn map_err<E, F>(self, f: F) -> MapErr<Self, F> 226 where 227 Self: Sized, 228 F: FnMut(Self::Error) -> E, 229 { 230 assert_stream::<Result<Self::Ok, E>, _>(MapErr::new(self, f)) 231 } 232 233 /// Chain on a computation for when a value is ready, passing the successful 234 /// results to the provided closure `f`. 235 /// 236 /// This function can be used to run a unit of work when the next successful 237 /// value on a stream is ready. The closure provided will be yielded a value 238 /// when ready, and the returned future will then be run to completion to 239 /// produce the next value on this stream. 240 /// 241 /// Any errors produced by this stream will not be passed to the closure, 242 /// and will be passed through. 243 /// 244 /// The returned value of the closure must implement the `TryFuture` trait 245 /// and can represent some more work to be done before the composed stream 246 /// is finished. 247 /// 248 /// Note that this function consumes the receiving stream and returns a 249 /// wrapped version of it. 250 /// 251 /// To process the entire stream and return a single future representing 252 /// success or error, use `try_for_each` instead. 253 /// 254 /// # Examples 255 /// 256 /// ``` 257 /// use futures::channel::mpsc; 258 /// use futures::future; 259 /// use futures::stream::TryStreamExt; 260 /// 261 /// let (_tx, rx) = mpsc::channel::<Result<i32, ()>>(1); 262 /// 263 /// let rx = rx.and_then(|result| { 264 /// future::ok(if result % 2 == 0 { 265 /// Some(result) 266 /// } else { 267 /// None 268 /// }) 269 /// }); 270 /// ``` and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: TryFuture<Error = Self::Error>, Self: Sized,271 fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F> 272 where 273 F: FnMut(Self::Ok) -> Fut, 274 Fut: TryFuture<Error = Self::Error>, 275 Self: Sized, 276 { 277 assert_stream::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f)) 278 } 279 280 /// Chain on a computation for when an error happens, passing the 281 /// erroneous result to the provided closure `f`. 282 /// 283 /// This function can be used to run a unit of work and attempt to recover from 284 /// an error if one happens. The closure provided will be yielded an error 285 /// when one appears, and the returned future will then be run to completion 286 /// to produce the next value on this stream. 287 /// 288 /// Any successful values produced by this stream will not be passed to the 289 /// closure, and will be passed through. 290 /// 291 /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait 292 /// and can represent some more work to be done before the composed stream 293 /// is finished. 294 /// 295 /// Note that this function consumes the receiving stream and returns a 296 /// wrapped version of it. or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F> where F: FnMut(Self::Error) -> Fut, Fut: TryFuture<Ok = Self::Ok>, Self: Sized,297 fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F> 298 where 299 F: FnMut(Self::Error) -> Fut, 300 Fut: TryFuture<Ok = Self::Ok>, 301 Self: Sized, 302 { 303 assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f)) 304 } 305 306 /// Do something with the success value of this stream, afterwards passing 307 /// it on. 308 /// 309 /// This is similar to the `StreamExt::inspect` method where it allows 310 /// easily inspecting the success value as it passes through the stream, for 311 /// example to debug what's going on. inspect_ok<F>(self, f: F) -> InspectOk<Self, F> where F: FnMut(&Self::Ok), Self: Sized,312 fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F> 313 where 314 F: FnMut(&Self::Ok), 315 Self: Sized, 316 { 317 assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f)) 318 } 319 320 /// Do something with the error value of this stream, afterwards passing it on. 321 /// 322 /// This is similar to the `StreamExt::inspect` method where it allows 323 /// easily inspecting the error value as it passes through the stream, for 324 /// example to debug what's going on. inspect_err<F>(self, f: F) -> InspectErr<Self, F> where F: FnMut(&Self::Error), Self: Sized,325 fn inspect_err<F>(self, f: F) -> InspectErr<Self, F> 326 where 327 F: FnMut(&Self::Error), 328 Self: Sized, 329 { 330 assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f)) 331 } 332 333 /// Wraps a [`TryStream`] into a type that implements 334 /// [`Stream`](futures_core::stream::Stream) 335 /// 336 /// [`TryStream`]s currently do not implement the 337 /// [`Stream`](futures_core::stream::Stream) trait because of limitations 338 /// of the compiler. 339 /// 340 /// # Examples 341 /// 342 /// ``` 343 /// use futures::stream::{Stream, TryStream, TryStreamExt}; 344 /// 345 /// # type T = i32; 346 /// # type E = (); 347 /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> { // ... } 348 /// # futures::stream::empty() 349 /// # } 350 /// fn take_stream(stream: impl Stream<Item = Result<T, E>>) { /* ... */ } 351 /// 352 /// take_stream(make_try_stream().into_stream()); 353 /// ``` into_stream(self) -> IntoStream<Self> where Self: Sized,354 fn into_stream(self) -> IntoStream<Self> 355 where 356 Self: Sized, 357 { 358 assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self)) 359 } 360 361 /// Creates a future that attempts to resolve the next item in the stream. 362 /// If an error is encountered before the next item, the error is returned 363 /// instead. 364 /// 365 /// This is similar to the `Stream::next` combinator, but returns a 366 /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making 367 /// for easy use with the `?` operator. 368 /// 369 /// # Examples 370 /// 371 /// ``` 372 /// # futures::executor::block_on(async { 373 /// use futures::stream::{self, TryStreamExt}; 374 /// 375 /// let mut stream = stream::iter(vec![Ok(()), Err(())]); 376 /// 377 /// assert_eq!(stream.try_next().await, Ok(Some(()))); 378 /// assert_eq!(stream.try_next().await, Err(())); 379 /// # }) 380 /// ``` try_next(&mut self) -> TryNext<'_, Self> where Self: Unpin,381 fn try_next(&mut self) -> TryNext<'_, Self> 382 where 383 Self: Unpin, 384 { 385 assert_future::<Result<Option<Self::Ok>, Self::Error>, _>(TryNext::new(self)) 386 } 387 388 /// Attempts to run this stream to completion, executing the provided 389 /// asynchronous closure for each element on the stream. 390 /// 391 /// The provided closure will be called for each item this stream produces, 392 /// yielding a future. That future will then be executed to completion 393 /// before moving on to the next item. 394 /// 395 /// The returned value is a [`Future`](futures_core::future::Future) where the 396 /// [`Output`](futures_core::future::Future::Output) type is 397 /// `Result<(), Self::Error>`. If any of the intermediate 398 /// futures or the stream returns an error, this future will return 399 /// immediately with an error. 400 /// 401 /// # Examples 402 /// 403 /// ``` 404 /// # futures::executor::block_on(async { 405 /// use futures::future; 406 /// use futures::stream::{self, TryStreamExt}; 407 /// 408 /// let mut x = 0i32; 409 /// 410 /// { 411 /// let fut = stream::repeat(Ok(1)).try_for_each(|item| { 412 /// x += item; 413 /// future::ready(if x == 3 { Err(()) } else { Ok(()) }) 414 /// }); 415 /// assert_eq!(fut.await, Err(())); 416 /// } 417 /// 418 /// assert_eq!(x, 3); 419 /// # }) 420 /// ``` try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: TryFuture<Ok = (), Error = Self::Error>, Self: Sized,421 fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F> 422 where 423 F: FnMut(Self::Ok) -> Fut, 424 Fut: TryFuture<Ok = (), Error = Self::Error>, 425 Self: Sized, 426 { 427 assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f)) 428 } 429 430 /// Skip elements on this stream while the provided asynchronous predicate 431 /// resolves to `true`. 432 /// 433 /// This function is similar to 434 /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits 435 /// early if an error occurs. 436 /// 437 /// # Examples 438 /// 439 /// ``` 440 /// # futures::executor::block_on(async { 441 /// use futures::future; 442 /// use futures::stream::{self, TryStreamExt}; 443 /// 444 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)]); 445 /// let stream = stream.try_skip_while(|x| future::ready(Ok(*x < 3))); 446 /// 447 /// let output: Result<Vec<i32>, i32> = stream.try_collect().await; 448 /// assert_eq!(output, Ok(vec![3, 2])); 449 /// # }) 450 /// ``` try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F> where F: FnMut(&Self::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized,451 fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F> 452 where 453 F: FnMut(&Self::Ok) -> Fut, 454 Fut: TryFuture<Ok = bool, Error = Self::Error>, 455 Self: Sized, 456 { 457 assert_stream::<Result<Self::Ok, Self::Error>, _>(TrySkipWhile::new(self, f)) 458 } 459 460 /// Take elements on this stream while the provided asynchronous predicate 461 /// resolves to `true`. 462 /// 463 /// This function is similar to 464 /// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits 465 /// early if an error occurs. 466 /// 467 /// # Examples 468 /// 469 /// ``` 470 /// # futures::executor::block_on(async { 471 /// use futures::future; 472 /// use futures::stream::{self, TryStreamExt}; 473 /// 474 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]); 475 /// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3))); 476 /// 477 /// let output: Result<Vec<i32>, i32> = stream.try_collect().await; 478 /// assert_eq!(output, Ok(vec![1, 2])); 479 /// # }) 480 /// ``` try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F> where F: FnMut(&Self::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized,481 fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F> 482 where 483 F: FnMut(&Self::Ok) -> Fut, 484 Fut: TryFuture<Ok = bool, Error = Self::Error>, 485 Self: Sized, 486 { 487 assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f)) 488 } 489 490 /// Attempts to run this stream to completion, executing the provided asynchronous 491 /// closure for each element on the stream concurrently as elements become 492 /// available, exiting as soon as an error occurs. 493 /// 494 /// This is similar to 495 /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent), 496 /// but will resolve to an error immediately if the underlying stream or the provided 497 /// closure return an error. 498 /// 499 /// This method is only available when the `std` or `alloc` feature of this 500 /// library is activated, and it is activated by default. 501 /// 502 /// # Examples 503 /// 504 /// ``` 505 /// # futures::executor::block_on(async { 506 /// use futures::channel::oneshot; 507 /// use futures::stream::{self, StreamExt, TryStreamExt}; 508 /// 509 /// let (tx1, rx1) = oneshot::channel(); 510 /// let (tx2, rx2) = oneshot::channel(); 511 /// let (_tx3, rx3) = oneshot::channel(); 512 /// 513 /// let stream = stream::iter(vec![rx1, rx2, rx3]); 514 /// let fut = stream.map(Ok).try_for_each_concurrent( 515 /// /* limit */ 2, 516 /// |rx| async move { 517 /// let res: Result<(), oneshot::Canceled> = rx.await; 518 /// res 519 /// } 520 /// ); 521 /// 522 /// tx1.send(()).unwrap(); 523 /// // Drop the second sender so that `rx2` resolves to `Canceled`. 524 /// drop(tx2); 525 /// 526 /// // The final result is an error because the second future 527 /// // resulted in an error. 528 /// assert_eq!(Err(oneshot::Canceled), fut.await); 529 /// # }) 530 /// ``` 531 #[cfg(not(futures_no_atomic_cas))] 532 #[cfg(feature = "alloc")] try_for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> TryForEachConcurrent<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: Future<Output = Result<(), Self::Error>>, Self: Sized,533 fn try_for_each_concurrent<Fut, F>( 534 self, 535 limit: impl Into<Option<usize>>, 536 f: F, 537 ) -> TryForEachConcurrent<Self, Fut, F> 538 where 539 F: FnMut(Self::Ok) -> Fut, 540 Fut: Future<Output = Result<(), Self::Error>>, 541 Self: Sized, 542 { 543 assert_future::<Result<(), Self::Error>, _>(TryForEachConcurrent::new( 544 self, 545 limit.into(), 546 f, 547 )) 548 } 549 550 /// Attempt to transform a stream into a collection, 551 /// returning a future representing the result of that computation. 552 /// 553 /// This combinator will collect all successful results of this stream and 554 /// collect them into the specified collection type. If an error happens then all 555 /// collected elements will be dropped and the error will be returned. 556 /// 557 /// The returned future will be resolved when the stream terminates. 558 /// 559 /// # Examples 560 /// 561 /// ``` 562 /// # futures::executor::block_on(async { 563 /// use futures::channel::mpsc; 564 /// use futures::stream::TryStreamExt; 565 /// use std::thread; 566 /// 567 /// let (tx, rx) = mpsc::unbounded(); 568 /// 569 /// thread::spawn(move || { 570 /// for i in 1..=5 { 571 /// tx.unbounded_send(Ok(i)).unwrap(); 572 /// } 573 /// tx.unbounded_send(Err(6)).unwrap(); 574 /// }); 575 /// 576 /// let output: Result<Vec<i32>, i32> = rx.try_collect().await; 577 /// assert_eq!(output, Err(6)); 578 /// # }) 579 /// ``` try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C> where Self: Sized,580 fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C> 581 where 582 Self: Sized, 583 { 584 assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self)) 585 } 586 587 /// An adaptor for chunking up successful items of the stream inside a vector. 588 /// 589 /// This combinator will attempt to pull successful items from this stream and buffer 590 /// them into a local vector. At most `capacity` items will get buffered 591 /// before they're yielded from the returned stream. 592 /// 593 /// Note that the vectors returned from this iterator may not always have 594 /// `capacity` elements. If the underlying stream ended and only a partial 595 /// vector was created, it'll be returned. Additionally if an error happens 596 /// from the underlying stream then the currently buffered items will be 597 /// yielded. 598 /// 599 /// This method is only available when the `std` or `alloc` feature of this 600 /// library is activated, and it is activated by default. 601 /// 602 /// This function is similar to 603 /// [`StreamExt::chunks`](crate::stream::StreamExt::chunks) but exits 604 /// early if an error occurs. 605 /// 606 /// # Examples 607 /// 608 /// ``` 609 /// # futures::executor::block_on(async { 610 /// use futures::stream::{self, TryChunksError, TryStreamExt}; 611 /// 612 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]); 613 /// let mut stream = stream.try_chunks(2); 614 /// 615 /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2]))); 616 /// assert_eq!(stream.try_next().await, Err(TryChunksError(vec![3], 4))); 617 /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6]))); 618 /// # }) 619 /// ``` 620 /// 621 /// # Panics 622 /// 623 /// This method will panic if `capacity` is zero. 624 #[cfg(feature = "alloc")] try_chunks(self, capacity: usize) -> TryChunks<Self> where Self: Sized,625 fn try_chunks(self, capacity: usize) -> TryChunks<Self> 626 where 627 Self: Sized, 628 { 629 assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>( 630 TryChunks::new(self, capacity), 631 ) 632 } 633 634 /// Attempt to filter the values produced by this stream according to the 635 /// provided asynchronous closure. 636 /// 637 /// As values of this stream are made available, the provided predicate `f` 638 /// will be run on them. If the predicate returns a `Future` which resolves 639 /// to `true`, then the stream will yield the value, but if the predicate 640 /// return a `Future` which resolves to `false`, then the value will be 641 /// discarded and the next value will be produced. 642 /// 643 /// All errors are passed through without filtering in this combinator. 644 /// 645 /// Note that this function consumes the stream passed into it and returns a 646 /// wrapped version of it, similar to the existing `filter` methods in 647 /// the standard library. 648 /// 649 /// # Examples 650 /// ``` 651 /// # futures::executor::block_on(async { 652 /// use futures::future; 653 /// use futures::stream::{self, StreamExt, TryStreamExt}; 654 /// 655 /// let stream = stream::iter(vec![Ok(1i32), Ok(2i32), Ok(3i32), Err("error")]); 656 /// let mut evens = stream.try_filter(|x| { 657 /// future::ready(x % 2 == 0) 658 /// }); 659 /// 660 /// assert_eq!(evens.next().await, Some(Ok(2))); 661 /// assert_eq!(evens.next().await, Some(Err("error"))); 662 /// # }) 663 /// ``` try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F> where Fut: Future<Output = bool>, F: FnMut(&Self::Ok) -> Fut, Self: Sized,664 fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F> 665 where 666 Fut: Future<Output = bool>, 667 F: FnMut(&Self::Ok) -> Fut, 668 Self: Sized, 669 { 670 assert_stream::<Result<Self::Ok, Self::Error>, _>(TryFilter::new(self, f)) 671 } 672 673 /// Attempt to filter the values produced by this stream while 674 /// simultaneously mapping them to a different type according to the 675 /// provided asynchronous closure. 676 /// 677 /// As values of this stream are made available, the provided function will 678 /// be run on them. If the future returned by the predicate `f` resolves to 679 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if 680 /// it resolves to [`None`] then the next value will be produced. 681 /// 682 /// All errors are passed through without filtering in this combinator. 683 /// 684 /// Note that this function consumes the stream passed into it and returns a 685 /// wrapped version of it, similar to the existing `filter_map` methods in 686 /// the standard library. 687 /// 688 /// # Examples 689 /// ``` 690 /// # futures::executor::block_on(async { 691 /// use futures::stream::{self, StreamExt, TryStreamExt}; 692 /// use futures::pin_mut; 693 /// 694 /// let stream = stream::iter(vec![Ok(1i32), Ok(6i32), Err("error")]); 695 /// let halves = stream.try_filter_map(|x| async move { 696 /// let ret = if x % 2 == 0 { Some(x / 2) } else { None }; 697 /// Ok(ret) 698 /// }); 699 /// 700 /// pin_mut!(halves); 701 /// assert_eq!(halves.next().await, Some(Ok(3))); 702 /// assert_eq!(halves.next().await, Some(Err("error"))); 703 /// # }) 704 /// ``` try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F> where Fut: TryFuture<Ok = Option<T>, Error = Self::Error>, F: FnMut(Self::Ok) -> Fut, Self: Sized,705 fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F> 706 where 707 Fut: TryFuture<Ok = Option<T>, Error = Self::Error>, 708 F: FnMut(Self::Ok) -> Fut, 709 Self: Sized, 710 { 711 assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f)) 712 } 713 714 /// Flattens a stream of streams into just one continuous stream. 715 /// 716 /// If this stream's elements are themselves streams then this combinator 717 /// will flatten out the entire stream to one long chain of elements. Any 718 /// errors are passed through without looking at them, but otherwise each 719 /// individual stream will get exhausted before moving on to the next. 720 /// 721 /// # Examples 722 /// 723 /// ``` 724 /// # futures::executor::block_on(async { 725 /// use futures::channel::mpsc; 726 /// use futures::stream::{StreamExt, TryStreamExt}; 727 /// use std::thread; 728 /// 729 /// let (tx1, rx1) = mpsc::unbounded(); 730 /// let (tx2, rx2) = mpsc::unbounded(); 731 /// let (tx3, rx3) = mpsc::unbounded(); 732 /// 733 /// thread::spawn(move || { 734 /// tx1.unbounded_send(Ok(1)).unwrap(); 735 /// }); 736 /// thread::spawn(move || { 737 /// tx2.unbounded_send(Ok(2)).unwrap(); 738 /// tx2.unbounded_send(Err(3)).unwrap(); 739 /// tx2.unbounded_send(Ok(4)).unwrap(); 740 /// }); 741 /// thread::spawn(move || { 742 /// tx3.unbounded_send(Ok(rx1)).unwrap(); 743 /// tx3.unbounded_send(Ok(rx2)).unwrap(); 744 /// tx3.unbounded_send(Err(5)).unwrap(); 745 /// }); 746 /// 747 /// let mut stream = rx3.try_flatten(); 748 /// assert_eq!(stream.next().await, Some(Ok(1))); 749 /// assert_eq!(stream.next().await, Some(Ok(2))); 750 /// assert_eq!(stream.next().await, Some(Err(3))); 751 /// assert_eq!(stream.next().await, Some(Ok(4))); 752 /// assert_eq!(stream.next().await, Some(Err(5))); 753 /// assert_eq!(stream.next().await, None); 754 /// # }); 755 /// ``` try_flatten(self) -> TryFlatten<Self> where Self::Ok: TryStream, <Self::Ok as TryStream>::Error: From<Self::Error>, Self: Sized,756 fn try_flatten(self) -> TryFlatten<Self> 757 where 758 Self::Ok: TryStream, 759 <Self::Ok as TryStream>::Error: From<Self::Error>, 760 Self: Sized, 761 { 762 assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>( 763 TryFlatten::new(self), 764 ) 765 } 766 767 /// Attempt to execute an accumulating asynchronous computation over a 768 /// stream, collecting all the values into one final result. 769 /// 770 /// This combinator will accumulate all values returned by this stream 771 /// according to the closure provided. The initial state is also provided to 772 /// this method and then is returned again by each execution of the closure. 773 /// Once the entire stream has been exhausted the returned future will 774 /// resolve to this value. 775 /// 776 /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but will 777 /// exit early if an error is encountered in either the stream or the 778 /// provided closure. 779 /// 780 /// # Examples 781 /// 782 /// ``` 783 /// # futures::executor::block_on(async { 784 /// use futures::stream::{self, TryStreamExt}; 785 /// 786 /// let number_stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]); 787 /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok(acc + x) }); 788 /// assert_eq!(sum.await, Ok(3)); 789 /// 790 /// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]); 791 /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x) }); 792 /// assert_eq!(sum.await, Err(2)); 793 /// # }) 794 /// ``` try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F> where F: FnMut(T, Self::Ok) -> Fut, Fut: TryFuture<Ok = T, Error = Self::Error>, Self: Sized,795 fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F> 796 where 797 F: FnMut(T, Self::Ok) -> Fut, 798 Fut: TryFuture<Ok = T, Error = Self::Error>, 799 Self: Sized, 800 { 801 assert_future::<Result<T, Self::Error>, _>(TryFold::new(self, f, init)) 802 } 803 804 /// Attempt to concatenate all items of a stream into a single 805 /// extendable destination, returning a future representing the end result. 806 /// 807 /// This combinator will extend the first item with the contents of all 808 /// the subsequent successful results of the stream. If the stream is empty, 809 /// the default value will be returned. 810 /// 811 /// Works with all collections that implement the [`Extend`](std::iter::Extend) trait. 812 /// 813 /// This method is similar to [`concat`](crate::stream::StreamExt::concat), but will 814 /// exit early if an error is encountered in the stream. 815 /// 816 /// # Examples 817 /// 818 /// ``` 819 /// # futures::executor::block_on(async { 820 /// use futures::channel::mpsc; 821 /// use futures::stream::TryStreamExt; 822 /// use std::thread; 823 /// 824 /// let (tx, rx) = mpsc::unbounded::<Result<Vec<i32>, ()>>(); 825 /// 826 /// thread::spawn(move || { 827 /// for i in (0..3).rev() { 828 /// let n = i * 3; 829 /// tx.unbounded_send(Ok(vec![n + 1, n + 2, n + 3])).unwrap(); 830 /// } 831 /// }); 832 /// 833 /// let result = rx.try_concat().await; 834 /// 835 /// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3])); 836 /// # }); 837 /// ``` try_concat(self) -> TryConcat<Self> where Self: Sized, Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,838 fn try_concat(self) -> TryConcat<Self> 839 where 840 Self: Sized, 841 Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default, 842 { 843 assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self)) 844 } 845 846 /// Attempt to execute several futures from a stream concurrently (unordered). 847 /// 848 /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type 849 /// that matches the stream's `Error` type. 850 /// 851 /// This adaptor will buffer up to `n` futures and then return their 852 /// outputs in the order in which they complete. If the underlying stream 853 /// returns an error, it will be immediately propagated. 854 /// 855 /// The returned stream will be a stream of results, each containing either 856 /// an error or a future's output. An error can be produced either by the 857 /// underlying stream itself or by one of the futures it yielded. 858 /// 859 /// This method is only available when the `std` or `alloc` feature of this 860 /// library is activated, and it is activated by default. 861 /// 862 /// # Examples 863 /// 864 /// Results are returned in the order of completion: 865 /// ``` 866 /// # futures::executor::block_on(async { 867 /// use futures::channel::oneshot; 868 /// use futures::stream::{self, StreamExt, TryStreamExt}; 869 /// 870 /// let (send_one, recv_one) = oneshot::channel(); 871 /// let (send_two, recv_two) = oneshot::channel(); 872 /// 873 /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]); 874 /// 875 /// let mut buffered = stream_of_futures.try_buffer_unordered(10); 876 /// 877 /// send_two.send(2i32)?; 878 /// assert_eq!(buffered.next().await, Some(Ok(2i32))); 879 /// 880 /// send_one.send(1i32)?; 881 /// assert_eq!(buffered.next().await, Some(Ok(1i32))); 882 /// 883 /// assert_eq!(buffered.next().await, None); 884 /// # Ok::<(), i32>(()) }).unwrap(); 885 /// ``` 886 /// 887 /// Errors from the underlying stream itself are propagated: 888 /// ``` 889 /// # futures::executor::block_on(async { 890 /// use futures::channel::mpsc; 891 /// use futures::stream::{StreamExt, TryStreamExt}; 892 /// 893 /// let (sink, stream_of_futures) = mpsc::unbounded(); 894 /// let mut buffered = stream_of_futures.try_buffer_unordered(10); 895 /// 896 /// sink.unbounded_send(Ok(async { Ok(7i32) }))?; 897 /// assert_eq!(buffered.next().await, Some(Ok(7i32))); 898 /// 899 /// sink.unbounded_send(Err("error in the stream"))?; 900 /// assert_eq!(buffered.next().await, Some(Err("error in the stream"))); 901 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); 902 /// ``` 903 #[cfg(not(futures_no_atomic_cas))] 904 #[cfg(feature = "alloc")] try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self> where Self::Ok: TryFuture<Error = Self::Error>, Self: Sized,905 fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self> 906 where 907 Self::Ok: TryFuture<Error = Self::Error>, 908 Self: Sized, 909 { 910 assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>( 911 TryBufferUnordered::new(self, n), 912 ) 913 } 914 915 /// Attempt to execute several futures from a stream concurrently. 916 /// 917 /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type 918 /// that matches the stream's `Error` type. 919 /// 920 /// This adaptor will buffer up to `n` futures and then return their 921 /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will 922 /// be immediately propagated. 923 /// 924 /// The returned stream will be a stream of results, each containing either 925 /// an error or a future's output. An error can be produced either by the 926 /// underlying stream itself or by one of the futures it yielded. 927 /// 928 /// This method is only available when the `std` or `alloc` feature of this 929 /// library is activated, and it is activated by default. 930 /// 931 /// # Examples 932 /// 933 /// Results are returned in the order of addition: 934 /// ``` 935 /// # futures::executor::block_on(async { 936 /// use futures::channel::oneshot; 937 /// use futures::future::lazy; 938 /// use futures::stream::{self, StreamExt, TryStreamExt}; 939 /// 940 /// let (send_one, recv_one) = oneshot::channel(); 941 /// let (send_two, recv_two) = oneshot::channel(); 942 /// 943 /// let mut buffered = lazy(move |cx| { 944 /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]); 945 /// 946 /// let mut buffered = stream_of_futures.try_buffered(10); 947 /// 948 /// assert!(buffered.try_poll_next_unpin(cx).is_pending()); 949 /// 950 /// send_two.send(2i32)?; 951 /// assert!(buffered.try_poll_next_unpin(cx).is_pending()); 952 /// Ok::<_, i32>(buffered) 953 /// }).await?; 954 /// 955 /// send_one.send(1i32)?; 956 /// assert_eq!(buffered.next().await, Some(Ok(1i32))); 957 /// assert_eq!(buffered.next().await, Some(Ok(2i32))); 958 /// 959 /// assert_eq!(buffered.next().await, None); 960 /// # Ok::<(), i32>(()) }).unwrap(); 961 /// ``` 962 /// 963 /// Errors from the underlying stream itself are propagated: 964 /// ``` 965 /// # futures::executor::block_on(async { 966 /// use futures::channel::mpsc; 967 /// use futures::stream::{StreamExt, TryStreamExt}; 968 /// 969 /// let (sink, stream_of_futures) = mpsc::unbounded(); 970 /// let mut buffered = stream_of_futures.try_buffered(10); 971 /// 972 /// sink.unbounded_send(Ok(async { Ok(7i32) }))?; 973 /// assert_eq!(buffered.next().await, Some(Ok(7i32))); 974 /// 975 /// sink.unbounded_send(Err("error in the stream"))?; 976 /// assert_eq!(buffered.next().await, Some(Err("error in the stream"))); 977 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); 978 /// ``` 979 #[cfg(not(futures_no_atomic_cas))] 980 #[cfg(feature = "alloc")] try_buffered(self, n: usize) -> TryBuffered<Self> where Self::Ok: TryFuture<Error = Self::Error>, Self: Sized,981 fn try_buffered(self, n: usize) -> TryBuffered<Self> 982 where 983 Self::Ok: TryFuture<Error = Self::Error>, 984 Self: Sized, 985 { 986 assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new( 987 self, n, 988 )) 989 } 990 991 // TODO: false positive warning from rustdoc. Verify once #43466 settles 992 // 993 /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`] 994 /// stream types. try_poll_next_unpin( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<Self::Ok, Self::Error>>> where Self: Unpin,995 fn try_poll_next_unpin( 996 &mut self, 997 cx: &mut Context<'_>, 998 ) -> Poll<Option<Result<Self::Ok, Self::Error>>> 999 where 1000 Self: Unpin, 1001 { 1002 Pin::new(self).try_poll_next(cx) 1003 } 1004 1005 /// Wraps a [`TryStream`] into a stream compatible with libraries using 1006 /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled. 1007 /// ``` 1008 /// # if cfg!(miri) { return; } // Miri does not support epoll 1009 /// use futures::future::{FutureExt, TryFutureExt}; 1010 /// # let (tx, rx) = futures::channel::oneshot::channel(); 1011 /// 1012 /// let future03 = async { 1013 /// println!("Running on the pool"); 1014 /// tx.send(42).unwrap(); 1015 /// }; 1016 /// 1017 /// let future01 = future03 1018 /// .unit_error() // Make it a TryFuture 1019 /// .boxed() // Make it Unpin 1020 /// .compat(); 1021 /// 1022 /// tokio::run(future01); 1023 /// # assert_eq!(42, futures::executor::block_on(rx).unwrap()); 1024 /// ``` 1025 #[cfg(feature = "compat")] 1026 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] compat(self) -> Compat<Self> where Self: Sized + Unpin,1027 fn compat(self) -> Compat<Self> 1028 where 1029 Self: Sized + Unpin, 1030 { 1031 Compat::new(self) 1032 } 1033 1034 /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead). 1035 /// 1036 /// This method is only available when the `std` feature of this 1037 /// library is activated, and it is activated by default. 1038 /// 1039 /// # Examples 1040 /// 1041 /// ``` 1042 /// # futures::executor::block_on(async { 1043 /// use futures::stream::{self, TryStreamExt}; 1044 /// use futures::io::AsyncReadExt; 1045 /// 1046 /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]); 1047 /// let mut reader = stream.into_async_read(); 1048 /// 1049 /// let mut buf = Vec::new(); 1050 /// reader.read_to_end(&mut buf).await.unwrap(); 1051 /// assert_eq!(buf, [1, 2, 3, 4, 5]); 1052 /// # }) 1053 /// ``` 1054 #[cfg(feature = "io")] 1055 #[cfg_attr(docsrs, doc(cfg(feature = "io")))] 1056 #[cfg(feature = "std")] into_async_read(self) -> IntoAsyncRead<Self> where Self: Sized + TryStreamExt<Error = std::io::Error>, Self::Ok: AsRef<[u8]>,1057 fn into_async_read(self) -> IntoAsyncRead<Self> 1058 where 1059 Self: Sized + TryStreamExt<Error = std::io::Error>, 1060 Self::Ok: AsRef<[u8]>, 1061 { 1062 crate::io::assert_read(IntoAsyncRead::new(self)) 1063 } 1064 } 1065