1 //! Streams 2 //! 3 //! This module contains a number of functions for working with `Stream`s, 4 //! including the `StreamExt` trait which adds methods to `Stream` types. 5 6 use crate::future::{assert_future, Either}; 7 use crate::stream::assert_stream; 8 #[cfg(feature = "alloc")] 9 use alloc::boxed::Box; 10 #[cfg(feature = "alloc")] 11 use alloc::vec::Vec; 12 use core::pin::Pin; 13 #[cfg(feature = "sink")] 14 use futures_core::stream::TryStream; 15 #[cfg(feature = "alloc")] 16 use futures_core::stream::{BoxStream, LocalBoxStream}; 17 use futures_core::{ 18 future::Future, 19 stream::{FusedStream, Stream}, 20 task::{Context, Poll}, 21 }; 22 #[cfg(feature = "sink")] 23 use futures_sink::Sink; 24 25 use crate::fns::{inspect_fn, InspectFn}; 26 27 mod chain; 28 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 29 pub use self::chain::Chain; 30 31 mod collect; 32 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 33 pub use self::collect::Collect; 34 35 mod unzip; 36 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 37 pub use self::unzip::Unzip; 38 39 mod concat; 40 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 41 pub use self::concat::Concat; 42 43 mod cycle; 44 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 45 pub use self::cycle::Cycle; 46 47 mod enumerate; 48 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 49 pub use self::enumerate::Enumerate; 50 51 mod filter; 52 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 53 pub use self::filter::Filter; 54 55 mod filter_map; 56 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 57 pub use self::filter_map::FilterMap; 58 59 mod flatten; 60 61 delegate_all!( 62 /// Stream for the [`flatten`](StreamExt::flatten) method. 63 Flatten<St>( 64 flatten::Flatten<St, St::Item> 65 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)] 66 where St: Stream 67 ); 68 69 mod fold; 70 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 71 pub use self::fold::Fold; 72 73 #[cfg(feature = "sink")] 74 mod forward; 75 76 #[cfg(feature = "sink")] 77 delegate_all!( 78 /// Future for the [`forward`](super::StreamExt::forward) method. 79 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 80 Forward<St, Si>( 81 forward::Forward<St, Si, St::Ok> 82 ): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)] 83 where St: TryStream 84 ); 85 86 mod for_each; 87 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 88 pub use self::for_each::ForEach; 89 90 mod fuse; 91 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 92 pub use self::fuse::Fuse; 93 94 mod into_future; 95 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 96 pub use self::into_future::StreamFuture; 97 98 delegate_all!( 99 /// Stream for the [`inspect`](StreamExt::inspect) method. 100 Inspect<St, F>( 101 map::Map<St, InspectFn<F>> 102 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| map::Map::new(x, inspect_fn(f))] 103 ); 104 105 mod map; 106 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 107 pub use self::map::Map; 108 109 delegate_all!( 110 /// Stream for the [`flat_map`](StreamExt::flat_map) method. 111 FlatMap<St, U, F>( 112 flatten::Flatten<Map<St, F>, U> 113 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| flatten::Flatten::new(Map::new(x, f))] 114 ); 115 116 mod next; 117 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 118 pub use self::next::Next; 119 120 mod select_next_some; 121 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 122 pub use self::select_next_some::SelectNextSome; 123 124 mod peek; 125 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 126 pub use self::peek::{Peek, Peekable}; 127 128 mod skip; 129 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 130 pub use self::skip::Skip; 131 132 mod skip_while; 133 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 134 pub use self::skip_while::SkipWhile; 135 136 mod take; 137 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 138 pub use self::take::Take; 139 140 mod take_while; 141 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 142 pub use self::take_while::TakeWhile; 143 144 mod take_until; 145 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 146 pub use self::take_until::TakeUntil; 147 148 mod then; 149 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 150 pub use self::then::Then; 151 152 mod zip; 153 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 154 pub use self::zip::Zip; 155 156 #[cfg(feature = "alloc")] 157 mod chunks; 158 #[cfg(feature = "alloc")] 159 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 160 pub use self::chunks::Chunks; 161 162 #[cfg(feature = "alloc")] 163 mod ready_chunks; 164 #[cfg(feature = "alloc")] 165 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 166 pub use self::ready_chunks::ReadyChunks; 167 168 mod scan; 169 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 170 pub use self::scan::Scan; 171 172 cfg_target_has_atomic! { 173 #[cfg(feature = "alloc")] 174 mod buffer_unordered; 175 #[cfg(feature = "alloc")] 176 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 177 pub use self::buffer_unordered::BufferUnordered; 178 179 #[cfg(feature = "alloc")] 180 mod buffered; 181 #[cfg(feature = "alloc")] 182 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 183 pub use self::buffered::Buffered; 184 185 #[cfg(feature = "alloc")] 186 mod for_each_concurrent; 187 #[cfg(feature = "alloc")] 188 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 189 pub use self::for_each_concurrent::ForEachConcurrent; 190 191 #[cfg(feature = "sink")] 192 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 193 #[cfg(feature = "alloc")] 194 mod split; 195 #[cfg(feature = "sink")] 196 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 197 #[cfg(feature = "alloc")] 198 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 199 pub use self::split::{SplitStream, SplitSink, ReuniteError}; 200 } 201 202 #[cfg(feature = "std")] 203 mod catch_unwind; 204 #[cfg(feature = "std")] 205 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 206 pub use self::catch_unwind::CatchUnwind; 207 208 impl<T: ?Sized> StreamExt for T where T: Stream {} 209 210 /// An extension trait for `Stream`s that provides a variety of convenient 211 /// combinator functions. 212 pub trait StreamExt: Stream { 213 /// Creates a future that resolves to the next item in the stream. 214 /// 215 /// Note that because `next` doesn't take ownership over the stream, 216 /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a 217 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can 218 /// be done by boxing the stream using [`Box::pin`] or 219 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` 220 /// crate. 221 /// 222 /// # Examples 223 /// 224 /// ``` 225 /// # futures::executor::block_on(async { 226 /// use futures::stream::{self, StreamExt}; 227 /// 228 /// let mut stream = stream::iter(1..=3); 229 /// 230 /// assert_eq!(stream.next().await, Some(1)); 231 /// assert_eq!(stream.next().await, Some(2)); 232 /// assert_eq!(stream.next().await, Some(3)); 233 /// assert_eq!(stream.next().await, None); 234 /// # }); 235 /// ``` next(&mut self) -> Next<'_, Self> where Self: Unpin,236 fn next(&mut self) -> Next<'_, Self> 237 where 238 Self: Unpin, 239 { 240 assert_future::<Option<Self::Item>, _>(Next::new(self)) 241 } 242 243 /// Converts this stream into a future of `(next_item, tail_of_stream)`. 244 /// If the stream terminates, then the next item is [`None`]. 245 /// 246 /// The returned future can be used to compose streams and futures together 247 /// by placing everything into the "world of futures". 248 /// 249 /// Note that because `into_future` moves the stream, the [`Stream`] type 250 /// must be [`Unpin`]. If you want to use `into_future` with a 251 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can 252 /// be done by boxing the stream using [`Box::pin`] or 253 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` 254 /// crate. 255 /// 256 /// # Examples 257 /// 258 /// ``` 259 /// # futures::executor::block_on(async { 260 /// use futures::stream::{self, StreamExt}; 261 /// 262 /// let stream = stream::iter(1..=3); 263 /// 264 /// let (item, stream) = stream.into_future().await; 265 /// assert_eq!(Some(1), item); 266 /// 267 /// let (item, stream) = stream.into_future().await; 268 /// assert_eq!(Some(2), item); 269 /// # }); 270 /// ``` into_future(self) -> StreamFuture<Self> where Self: Sized + Unpin,271 fn into_future(self) -> StreamFuture<Self> 272 where 273 Self: Sized + Unpin, 274 { 275 assert_future::<(Option<Self::Item>, Self), _>(StreamFuture::new(self)) 276 } 277 278 /// Maps this stream's items to a different type, returning a new stream of 279 /// the resulting type. 280 /// 281 /// The provided closure is executed over all elements of this stream as 282 /// they are made available. It is executed inline with calls to 283 /// [`poll_next`](Stream::poll_next). 284 /// 285 /// Note that this function consumes the stream passed into it and returns a 286 /// wrapped version of it, similar to the existing `map` methods in the 287 /// standard library. 288 /// 289 /// # Examples 290 /// 291 /// ``` 292 /// # futures::executor::block_on(async { 293 /// use futures::stream::{self, StreamExt}; 294 /// 295 /// let stream = stream::iter(1..=3); 296 /// let stream = stream.map(|x| x + 3); 297 /// 298 /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await); 299 /// # }); 300 /// ``` map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,301 fn map<T, F>(self, f: F) -> Map<Self, F> 302 where 303 F: FnMut(Self::Item) -> T, 304 Self: Sized, 305 { 306 assert_stream::<T, _>(Map::new(self, f)) 307 } 308 309 /// Creates a stream which gives the current iteration count as well as 310 /// the next value. 311 /// 312 /// The stream returned yields pairs `(i, val)`, where `i` is the 313 /// current index of iteration and `val` is the value returned by the 314 /// stream. 315 /// 316 /// `enumerate()` keeps its count as a [`usize`]. If you want to count by a 317 /// different sized integer, the [`zip`](StreamExt::zip) function provides similar 318 /// functionality. 319 /// 320 /// # Overflow Behavior 321 /// 322 /// The method does no guarding against overflows, so enumerating more than 323 /// [`prim@usize::max_value()`] elements either produces the wrong result or panics. If 324 /// debug assertions are enabled, a panic is guaranteed. 325 /// 326 /// # Panics 327 /// 328 /// The returned stream might panic if the to-be-returned index would 329 /// overflow a [`usize`]. 330 /// 331 /// # Examples 332 /// 333 /// ``` 334 /// # futures::executor::block_on(async { 335 /// use futures::stream::{self, StreamExt}; 336 /// 337 /// let stream = stream::iter(vec!['a', 'b', 'c']); 338 /// 339 /// let mut stream = stream.enumerate(); 340 /// 341 /// assert_eq!(stream.next().await, Some((0, 'a'))); 342 /// assert_eq!(stream.next().await, Some((1, 'b'))); 343 /// assert_eq!(stream.next().await, Some((2, 'c'))); 344 /// assert_eq!(stream.next().await, None); 345 /// # }); 346 /// ``` enumerate(self) -> Enumerate<Self> where Self: Sized,347 fn enumerate(self) -> Enumerate<Self> 348 where 349 Self: Sized, 350 { 351 assert_stream::<(usize, Self::Item), _>(Enumerate::new(self)) 352 } 353 354 /// Filters the values produced by this stream according to the provided 355 /// asynchronous predicate. 356 /// 357 /// As values of this stream are made available, the provided predicate `f` 358 /// will be run against them. If the predicate returns a `Future` which 359 /// resolves to `true`, then the stream will yield the value, but if the 360 /// predicate returns a `Future` which resolves to `false`, then the value 361 /// will be discarded and the next value will be produced. 362 /// 363 /// Note that this function consumes the stream passed into it and returns a 364 /// wrapped version of it, similar to the existing `filter` methods in the 365 /// standard library. 366 /// 367 /// # Examples 368 /// 369 /// ``` 370 /// # futures::executor::block_on(async { 371 /// use futures::future; 372 /// use futures::stream::{self, StreamExt}; 373 /// 374 /// let stream = stream::iter(1..=10); 375 /// let evens = stream.filter(|x| future::ready(x % 2 == 0)); 376 /// 377 /// assert_eq!(vec![2, 4, 6, 8, 10], evens.collect::<Vec<_>>().await); 378 /// # }); 379 /// ``` filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,380 fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> 381 where 382 F: FnMut(&Self::Item) -> Fut, 383 Fut: Future<Output = bool>, 384 Self: Sized, 385 { 386 assert_stream::<Self::Item, _>(Filter::new(self, f)) 387 } 388 389 /// Filters the values produced by this stream while simultaneously mapping 390 /// them to a different type according to the provided asynchronous closure. 391 /// 392 /// As values of this stream are made available, the provided function will 393 /// be run on them. If the future returned by the predicate `f` resolves to 394 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if 395 /// it resolves to [`None`] then the next value will be produced. 396 /// 397 /// Note that this function consumes the stream passed into it and returns a 398 /// wrapped version of it, similar to the existing `filter_map` methods in 399 /// the standard library. 400 /// 401 /// # Examples 402 /// ``` 403 /// # futures::executor::block_on(async { 404 /// use futures::stream::{self, StreamExt}; 405 /// 406 /// let stream = stream::iter(1..=10); 407 /// let evens = stream.filter_map(|x| async move { 408 /// if x % 2 == 0 { Some(x + 1) } else { None } 409 /// }); 410 /// 411 /// assert_eq!(vec![3, 5, 7, 9, 11], evens.collect::<Vec<_>>().await); 412 /// # }); 413 /// ``` filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = Option<T>>, Self: Sized,414 fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> 415 where 416 F: FnMut(Self::Item) -> Fut, 417 Fut: Future<Output = Option<T>>, 418 Self: Sized, 419 { 420 assert_stream::<T, _>(FilterMap::new(self, f)) 421 } 422 423 /// Computes from this stream's items new items of a different type using 424 /// an asynchronous closure. 425 /// 426 /// The provided closure `f` will be called with an `Item` once a value is 427 /// ready, it returns a future which will then be run to completion 428 /// to produce the next value on this stream. 429 /// 430 /// Note that this function consumes the stream passed into it and returns a 431 /// wrapped version of it. 432 /// 433 /// # Examples 434 /// 435 /// ``` 436 /// # futures::executor::block_on(async { 437 /// use futures::stream::{self, StreamExt}; 438 /// 439 /// let stream = stream::iter(1..=3); 440 /// let stream = stream.then(|x| async move { x + 3 }); 441 /// 442 /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await); 443 /// # }); 444 /// ``` then<Fut, F>(self, f: F) -> Then<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized,445 fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F> 446 where 447 F: FnMut(Self::Item) -> Fut, 448 Fut: Future, 449 Self: Sized, 450 { 451 assert_stream::<Fut::Output, _>(Then::new(self, f)) 452 } 453 454 /// Transforms a stream into a collection, returning a 455 /// future representing the result of that computation. 456 /// 457 /// The returned future will be resolved when the stream terminates. 458 /// 459 /// # Examples 460 /// 461 /// ``` 462 /// # futures::executor::block_on(async { 463 /// use futures::channel::mpsc; 464 /// use futures::stream::StreamExt; 465 /// use std::thread; 466 /// 467 /// let (tx, rx) = mpsc::unbounded(); 468 /// 469 /// thread::spawn(move || { 470 /// for i in 1..=5 { 471 /// tx.unbounded_send(i).unwrap(); 472 /// } 473 /// }); 474 /// 475 /// let output = rx.collect::<Vec<i32>>().await; 476 /// assert_eq!(output, vec![1, 2, 3, 4, 5]); 477 /// # }); 478 /// ``` collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C> where Self: Sized,479 fn collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C> 480 where 481 Self: Sized, 482 { 483 assert_future::<C, _>(Collect::new(self)) 484 } 485 486 /// Converts a stream of pairs into a future, which 487 /// resolves to pair of containers. 488 /// 489 /// `unzip()` produces a future, which resolves to two 490 /// collections: one from the left elements of the pairs, 491 /// and one from the right elements. 492 /// 493 /// The returned future will be resolved when the stream terminates. 494 /// 495 /// # Examples 496 /// 497 /// ``` 498 /// # futures::executor::block_on(async { 499 /// use futures::channel::mpsc; 500 /// use futures::stream::StreamExt; 501 /// use std::thread; 502 /// 503 /// let (tx, rx) = mpsc::unbounded(); 504 /// 505 /// thread::spawn(move || { 506 /// tx.unbounded_send((1, 2)).unwrap(); 507 /// tx.unbounded_send((3, 4)).unwrap(); 508 /// tx.unbounded_send((5, 6)).unwrap(); 509 /// }); 510 /// 511 /// let (o1, o2): (Vec<_>, Vec<_>) = rx.unzip().await; 512 /// assert_eq!(o1, vec![1, 3, 5]); 513 /// assert_eq!(o2, vec![2, 4, 6]); 514 /// # }); 515 /// ``` unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB> where FromA: Default + Extend<A>, FromB: Default + Extend<B>, Self: Sized + Stream<Item = (A, B)>,516 fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB> 517 where 518 FromA: Default + Extend<A>, 519 FromB: Default + Extend<B>, 520 Self: Sized + Stream<Item = (A, B)>, 521 { 522 assert_future::<(FromA, FromB), _>(Unzip::new(self)) 523 } 524 525 /// Concatenate all items of a stream into a single extendable 526 /// destination, returning a future representing the end result. 527 /// 528 /// This combinator will extend the first item with the contents 529 /// of all the subsequent results of the stream. If the stream is 530 /// empty, the default value will be returned. 531 /// 532 /// Works with all collections that implement the 533 /// [`Extend`](std::iter::Extend) trait. 534 /// 535 /// # Examples 536 /// 537 /// ``` 538 /// # futures::executor::block_on(async { 539 /// use futures::channel::mpsc; 540 /// use futures::stream::StreamExt; 541 /// use std::thread; 542 /// 543 /// let (tx, rx) = mpsc::unbounded(); 544 /// 545 /// thread::spawn(move || { 546 /// for i in (0..3).rev() { 547 /// let n = i * 3; 548 /// tx.unbounded_send(vec![n + 1, n + 2, n + 3]).unwrap(); 549 /// } 550 /// }); 551 /// 552 /// let result = rx.concat().await; 553 /// 554 /// assert_eq!(result, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]); 555 /// # }); 556 /// ``` concat(self) -> Concat<Self> where Self: Sized, Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,557 fn concat(self) -> Concat<Self> 558 where 559 Self: Sized, 560 Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default, 561 { 562 assert_future::<Self::Item, _>(Concat::new(self)) 563 } 564 565 /// Repeats a stream endlessly. 566 /// 567 /// The stream never terminates. Note that you likely want to avoid 568 /// usage of `collect` or such on the returned stream as it will exhaust 569 /// available memory as it tries to just fill up all RAM. 570 /// 571 /// # Examples 572 /// 573 /// ``` 574 /// # futures::executor::block_on(async { 575 /// use futures::stream::{self, StreamExt}; 576 /// let a = [1, 2, 3]; 577 /// let mut s = stream::iter(a.iter()).cycle(); 578 /// 579 /// assert_eq!(s.next().await, Some(&1)); 580 /// assert_eq!(s.next().await, Some(&2)); 581 /// assert_eq!(s.next().await, Some(&3)); 582 /// assert_eq!(s.next().await, Some(&1)); 583 /// assert_eq!(s.next().await, Some(&2)); 584 /// assert_eq!(s.next().await, Some(&3)); 585 /// assert_eq!(s.next().await, Some(&1)); 586 /// # }); 587 /// ``` cycle(self) -> Cycle<Self> where Self: Sized + Clone,588 fn cycle(self) -> Cycle<Self> 589 where 590 Self: Sized + Clone, 591 { 592 assert_stream::<Self::Item, _>(Cycle::new(self)) 593 } 594 595 /// Execute an accumulating asynchronous computation over a stream, 596 /// collecting all the values into one final result. 597 /// 598 /// This combinator will accumulate all values returned by this stream 599 /// according to the closure provided. The initial state is also provided to 600 /// this method and then is returned again by each execution of the closure. 601 /// Once the entire stream has been exhausted the returned future will 602 /// resolve to this value. 603 /// 604 /// # Examples 605 /// 606 /// ``` 607 /// # futures::executor::block_on(async { 608 /// use futures::stream::{self, StreamExt}; 609 /// 610 /// let number_stream = stream::iter(0..6); 611 /// let sum = number_stream.fold(0, |acc, x| async move { acc + x }); 612 /// assert_eq!(sum.await, 15); 613 /// # }); 614 /// ``` fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F> where F: FnMut(T, Self::Item) -> Fut, Fut: Future<Output = T>, Self: Sized,615 fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F> 616 where 617 F: FnMut(T, Self::Item) -> Fut, 618 Fut: Future<Output = T>, 619 Self: Sized, 620 { 621 assert_future::<T, _>(Fold::new(self, f, init)) 622 } 623 624 /// Flattens a stream of streams into just one continuous stream. 625 /// 626 /// # Examples 627 /// 628 /// ``` 629 /// # futures::executor::block_on(async { 630 /// use futures::channel::mpsc; 631 /// use futures::stream::StreamExt; 632 /// use std::thread; 633 /// 634 /// let (tx1, rx1) = mpsc::unbounded(); 635 /// let (tx2, rx2) = mpsc::unbounded(); 636 /// let (tx3, rx3) = mpsc::unbounded(); 637 /// 638 /// thread::spawn(move || { 639 /// tx1.unbounded_send(1).unwrap(); 640 /// tx1.unbounded_send(2).unwrap(); 641 /// }); 642 /// thread::spawn(move || { 643 /// tx2.unbounded_send(3).unwrap(); 644 /// tx2.unbounded_send(4).unwrap(); 645 /// }); 646 /// thread::spawn(move || { 647 /// tx3.unbounded_send(rx1).unwrap(); 648 /// tx3.unbounded_send(rx2).unwrap(); 649 /// }); 650 /// 651 /// let output = rx3.flatten().collect::<Vec<i32>>().await; 652 /// assert_eq!(output, vec![1, 2, 3, 4]); 653 /// # }); 654 /// ``` flatten(self) -> Flatten<Self> where Self::Item: Stream, Self: Sized,655 fn flatten(self) -> Flatten<Self> 656 where 657 Self::Item: Stream, 658 Self: Sized, 659 { 660 assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self)) 661 } 662 663 /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s. 664 /// 665 /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead, 666 /// you would have to chain combinators like `.map(f).flatten()` while this 667 /// combinator provides ability to write `.flat_map(f)` instead of chaining. 668 /// 669 /// The provided closure which produce inner streams is executed over all elements 670 /// of stream as last inner stream is terminated and next stream item is available. 671 /// 672 /// Note that this function consumes the stream passed into it and returns a 673 /// wrapped version of it, similar to the existing `flat_map` methods in the 674 /// standard library. 675 /// 676 /// # Examples 677 /// 678 /// ``` 679 /// # futures::executor::block_on(async { 680 /// use futures::stream::{self, StreamExt}; 681 /// 682 /// let stream = stream::iter(1..=3); 683 /// let stream = stream.flat_map(|x| stream::iter(vec![x + 3; x])); 684 /// 685 /// assert_eq!(vec![4, 5, 5, 6, 6, 6], stream.collect::<Vec<_>>().await); 686 /// # }); 687 /// ``` flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> where F: FnMut(Self::Item) -> U, U: Stream, Self: Sized,688 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> 689 where 690 F: FnMut(Self::Item) -> U, 691 U: Stream, 692 Self: Sized, 693 { 694 assert_stream::<U::Item, _>(FlatMap::new(self, f)) 695 } 696 697 /// Combinator similar to [`StreamExt::fold`] that holds internal state 698 /// and produces a new stream. 699 /// 700 /// Accepts initial state and closure which will be applied to each element 701 /// of the stream until provided closure returns `None`. Once `None` is 702 /// returned, stream will be terminated. 703 /// 704 /// # Examples 705 /// 706 /// ``` 707 /// # futures::executor::block_on(async { 708 /// use futures::future; 709 /// use futures::stream::{self, StreamExt}; 710 /// 711 /// let stream = stream::iter(1..=10); 712 /// 713 /// let stream = stream.scan(0, |state, x| { 714 /// *state += x; 715 /// future::ready(if *state < 10 { Some(x) } else { None }) 716 /// }); 717 /// 718 /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await); 719 /// # }); 720 /// ``` scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F> where F: FnMut(&mut S, Self::Item) -> Fut, Fut: Future<Output = Option<B>>, Self: Sized,721 fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F> 722 where 723 F: FnMut(&mut S, Self::Item) -> Fut, 724 Fut: Future<Output = Option<B>>, 725 Self: Sized, 726 { 727 assert_stream::<B, _>(Scan::new(self, initial_state, f)) 728 } 729 730 /// Skip elements on this stream while the provided asynchronous predicate 731 /// resolves to `true`. 732 /// 733 /// This function, like `Iterator::skip_while`, will skip elements on the 734 /// stream until the predicate `f` resolves to `false`. Once one element 735 /// returns `false`, all future elements will be returned from the underlying 736 /// stream. 737 /// 738 /// # Examples 739 /// 740 /// ``` 741 /// # futures::executor::block_on(async { 742 /// use futures::future; 743 /// use futures::stream::{self, StreamExt}; 744 /// 745 /// let stream = stream::iter(1..=10); 746 /// 747 /// let stream = stream.skip_while(|x| future::ready(*x <= 5)); 748 /// 749 /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await); 750 /// # }); 751 /// ``` skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,752 fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F> 753 where 754 F: FnMut(&Self::Item) -> Fut, 755 Fut: Future<Output = bool>, 756 Self: Sized, 757 { 758 assert_stream::<Self::Item, _>(SkipWhile::new(self, f)) 759 } 760 761 /// Take elements from this stream while the provided asynchronous predicate 762 /// resolves to `true`. 763 /// 764 /// This function, like `Iterator::take_while`, will take elements from the 765 /// stream until the predicate `f` resolves to `false`. Once one element 766 /// returns `false`, it will always return that the stream is done. 767 /// 768 /// # Examples 769 /// 770 /// ``` 771 /// # futures::executor::block_on(async { 772 /// use futures::future; 773 /// use futures::stream::{self, StreamExt}; 774 /// 775 /// let stream = stream::iter(1..=10); 776 /// 777 /// let stream = stream.take_while(|x| future::ready(*x <= 5)); 778 /// 779 /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await); 780 /// # }); 781 /// ``` take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,782 fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F> 783 where 784 F: FnMut(&Self::Item) -> Fut, 785 Fut: Future<Output = bool>, 786 Self: Sized, 787 { 788 assert_stream::<Self::Item, _>(TakeWhile::new(self, f)) 789 } 790 791 /// Take elements from this stream until the provided future resolves. 792 /// 793 /// This function will take elements from the stream until the provided 794 /// stopping future `fut` resolves. Once the `fut` future becomes ready, 795 /// this stream combinator will always return that the stream is done. 796 /// 797 /// The stopping future may return any type. Once the stream is stopped 798 /// the result of the stopping future may be accessed with `TakeUntil::take_result()`. 799 /// The stream may also be resumed with `TakeUntil::take_future()`. 800 /// See the documentation of [`TakeUntil`] for more information. 801 /// 802 /// # Examples 803 /// 804 /// ``` 805 /// # futures::executor::block_on(async { 806 /// use futures::future; 807 /// use futures::stream::{self, StreamExt}; 808 /// use futures::task::Poll; 809 /// 810 /// let stream = stream::iter(1..=10); 811 /// 812 /// let mut i = 0; 813 /// let stop_fut = future::poll_fn(|_cx| { 814 /// i += 1; 815 /// if i <= 5 { 816 /// Poll::Pending 817 /// } else { 818 /// Poll::Ready(()) 819 /// } 820 /// }); 821 /// 822 /// let stream = stream.take_until(stop_fut); 823 /// 824 /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await); 825 /// # }); 826 /// ``` take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> where Fut: Future, Self: Sized,827 fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> 828 where 829 Fut: Future, 830 Self: Sized, 831 { 832 assert_stream::<Self::Item, _>(TakeUntil::new(self, fut)) 833 } 834 835 /// Runs this stream to completion, executing the provided asynchronous 836 /// closure for each element on the stream. 837 /// 838 /// The closure provided will be called for each item this stream produces, 839 /// yielding a future. That future will then be executed to completion 840 /// before moving on to the next item. 841 /// 842 /// The returned value is a `Future` where the `Output` type is `()`; it is 843 /// executed entirely for its side effects. 844 /// 845 /// To process each item in the stream and produce another stream instead 846 /// of a single future, use `then` instead. 847 /// 848 /// # Examples 849 /// 850 /// ``` 851 /// # futures::executor::block_on(async { 852 /// use futures::future; 853 /// use futures::stream::{self, StreamExt}; 854 /// 855 /// let mut x = 0; 856 /// 857 /// { 858 /// let fut = stream::repeat(1).take(3).for_each(|item| { 859 /// x += item; 860 /// future::ready(()) 861 /// }); 862 /// fut.await; 863 /// } 864 /// 865 /// assert_eq!(x, 3); 866 /// # }); 867 /// ``` for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,868 fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> 869 where 870 F: FnMut(Self::Item) -> Fut, 871 Fut: Future<Output = ()>, 872 Self: Sized, 873 { 874 assert_future::<(), _>(ForEach::new(self, f)) 875 } 876 877 /// Runs this stream to completion, executing the provided asynchronous 878 /// closure for each element on the stream concurrently as elements become 879 /// available. 880 /// 881 /// This is similar to [`StreamExt::for_each`], but the futures 882 /// produced by the closure are run concurrently (but not in parallel-- 883 /// this combinator does not introduce any threads). 884 /// 885 /// The closure provided will be called for each item this stream produces, 886 /// yielding a future. That future will then be executed to completion 887 /// concurrently with the other futures produced by the closure. 888 /// 889 /// The first argument is an optional limit on the number of concurrent 890 /// futures. If this limit is not `None`, no more than `limit` futures 891 /// will be run concurrently. The `limit` argument is of type 892 /// `Into<Option<usize>>`, and so can be provided as either `None`, 893 /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as 894 /// no limit at all, and will have the same result as passing in `None`. 895 /// 896 /// This method is only available when the `std` or `alloc` feature of this 897 /// library is activated, and it is activated by default. 898 /// 899 /// # Examples 900 /// 901 /// ``` 902 /// # futures::executor::block_on(async { 903 /// use futures::channel::oneshot; 904 /// use futures::stream::{self, StreamExt}; 905 /// 906 /// let (tx1, rx1) = oneshot::channel(); 907 /// let (tx2, rx2) = oneshot::channel(); 908 /// let (tx3, rx3) = oneshot::channel(); 909 /// 910 /// let fut = stream::iter(vec![rx1, rx2, rx3]).for_each_concurrent( 911 /// /* limit */ 2, 912 /// |rx| async move { 913 /// rx.await.unwrap(); 914 /// } 915 /// ); 916 /// tx1.send(()).unwrap(); 917 /// tx2.send(()).unwrap(); 918 /// tx3.send(()).unwrap(); 919 /// fut.await; 920 /// # }) 921 /// ``` 922 #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] 923 #[cfg(feature = "alloc")] for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> ForEachConcurrent<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,924 fn for_each_concurrent<Fut, F>( 925 self, 926 limit: impl Into<Option<usize>>, 927 f: F, 928 ) -> ForEachConcurrent<Self, Fut, F> 929 where 930 F: FnMut(Self::Item) -> Fut, 931 Fut: Future<Output = ()>, 932 Self: Sized, 933 { 934 assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f)) 935 } 936 937 /// Creates a new stream of at most `n` items of the underlying stream. 938 /// 939 /// Once `n` items have been yielded from this stream then it will always 940 /// return that the stream is done. 941 /// 942 /// # Examples 943 /// 944 /// ``` 945 /// # futures::executor::block_on(async { 946 /// use futures::stream::{self, StreamExt}; 947 /// 948 /// let stream = stream::iter(1..=10).take(3); 949 /// 950 /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await); 951 /// # }); 952 /// ``` take(self, n: usize) -> Take<Self> where Self: Sized,953 fn take(self, n: usize) -> Take<Self> 954 where 955 Self: Sized, 956 { 957 assert_stream::<Self::Item, _>(Take::new(self, n)) 958 } 959 960 /// Creates a new stream which skips `n` items of the underlying stream. 961 /// 962 /// Once `n` items have been skipped from this stream then it will always 963 /// return the remaining items on this stream. 964 /// 965 /// # Examples 966 /// 967 /// ``` 968 /// # futures::executor::block_on(async { 969 /// use futures::stream::{self, StreamExt}; 970 /// 971 /// let stream = stream::iter(1..=10).skip(5); 972 /// 973 /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await); 974 /// # }); 975 /// ``` skip(self, n: usize) -> Skip<Self> where Self: Sized,976 fn skip(self, n: usize) -> Skip<Self> 977 where 978 Self: Sized, 979 { 980 assert_stream::<Self::Item, _>(Skip::new(self, n)) 981 } 982 983 /// Fuse a stream such that [`poll_next`](Stream::poll_next) will never 984 /// again be called once it has finished. This method can be used to turn 985 /// any `Stream` into a `FusedStream`. 986 /// 987 /// Normally, once a stream has returned [`None`] from 988 /// [`poll_next`](Stream::poll_next) any further calls could exhibit bad 989 /// behavior such as block forever, panic, never return, etc. If it is known 990 /// that [`poll_next`](Stream::poll_next) may be called after stream 991 /// has already finished, then this method can be used to ensure that it has 992 /// defined semantics. 993 /// 994 /// The [`poll_next`](Stream::poll_next) method of a `fuse`d stream 995 /// is guaranteed to return [`None`] after the underlying stream has 996 /// finished. 997 /// 998 /// # Examples 999 /// 1000 /// ``` 1001 /// use futures::executor::block_on_stream; 1002 /// use futures::stream::{self, StreamExt}; 1003 /// use futures::task::Poll; 1004 /// 1005 /// let mut x = 0; 1006 /// let stream = stream::poll_fn(|_| { 1007 /// x += 1; 1008 /// match x { 1009 /// 0..=2 => Poll::Ready(Some(x)), 1010 /// 3 => Poll::Ready(None), 1011 /// _ => panic!("should not happen") 1012 /// } 1013 /// }).fuse(); 1014 /// 1015 /// let mut iter = block_on_stream(stream); 1016 /// assert_eq!(Some(1), iter.next()); 1017 /// assert_eq!(Some(2), iter.next()); 1018 /// assert_eq!(None, iter.next()); 1019 /// assert_eq!(None, iter.next()); 1020 /// // ... 1021 /// ``` fuse(self) -> Fuse<Self> where Self: Sized,1022 fn fuse(self) -> Fuse<Self> 1023 where 1024 Self: Sized, 1025 { 1026 assert_stream::<Self::Item, _>(Fuse::new(self)) 1027 } 1028 1029 /// Borrows a stream, rather than consuming it. 1030 /// 1031 /// This is useful to allow applying stream adaptors while still retaining 1032 /// ownership of the original stream. 1033 /// 1034 /// # Examples 1035 /// 1036 /// ``` 1037 /// # futures::executor::block_on(async { 1038 /// use futures::stream::{self, StreamExt}; 1039 /// 1040 /// let mut stream = stream::iter(1..5); 1041 /// 1042 /// let sum = stream.by_ref() 1043 /// .take(2) 1044 /// .fold(0, |a, b| async move { a + b }) 1045 /// .await; 1046 /// assert_eq!(sum, 3); 1047 /// 1048 /// // You can use the stream again 1049 /// let sum = stream.take(2) 1050 /// .fold(0, |a, b| async move { a + b }) 1051 /// .await; 1052 /// assert_eq!(sum, 7); 1053 /// # }); 1054 /// ``` by_ref(&mut self) -> &mut Self1055 fn by_ref(&mut self) -> &mut Self { 1056 self 1057 } 1058 1059 /// Catches unwinding panics while polling the stream. 1060 /// 1061 /// Caught panic (if any) will be the last element of the resulting stream. 1062 /// 1063 /// In general, panics within a stream can propagate all the way out to the 1064 /// task level. This combinator makes it possible to halt unwinding within 1065 /// the stream itself. It's most commonly used within task executors. This 1066 /// method should not be used for error handling. 1067 /// 1068 /// Note that this method requires the `UnwindSafe` bound from the standard 1069 /// library. This isn't always applied automatically, and the standard 1070 /// library provides an `AssertUnwindSafe` wrapper type to apply it 1071 /// after-the fact. To assist using this method, the [`Stream`] trait is 1072 /// also implemented for `AssertUnwindSafe<St>` where `St` implements 1073 /// [`Stream`]. 1074 /// 1075 /// This method is only available when the `std` feature of this 1076 /// library is activated, and it is activated by default. 1077 /// 1078 /// # Examples 1079 /// 1080 /// ``` 1081 /// # futures::executor::block_on(async { 1082 /// use futures::stream::{self, StreamExt}; 1083 /// 1084 /// let stream = stream::iter(vec![Some(10), None, Some(11)]); 1085 /// // Panic on second element 1086 /// let stream_panicking = stream.map(|o| o.unwrap()); 1087 /// // Collect all the results 1088 /// let stream = stream_panicking.catch_unwind(); 1089 /// 1090 /// let results: Vec<Result<i32, _>> = stream.collect().await; 1091 /// match results[0] { 1092 /// Ok(10) => {} 1093 /// _ => panic!("unexpected result!"), 1094 /// } 1095 /// assert!(results[1].is_err()); 1096 /// assert_eq!(results.len(), 2); 1097 /// # }); 1098 /// ``` 1099 #[cfg(feature = "std")] catch_unwind(self) -> CatchUnwind<Self> where Self: Sized + std::panic::UnwindSafe,1100 fn catch_unwind(self) -> CatchUnwind<Self> 1101 where 1102 Self: Sized + std::panic::UnwindSafe, 1103 { 1104 assert_stream(CatchUnwind::new(self)) 1105 } 1106 1107 /// Wrap the stream in a Box, pinning it. 1108 /// 1109 /// This method is only available when the `std` or `alloc` feature of this 1110 /// library is activated, and it is activated by default. 1111 #[cfg(feature = "alloc")] boxed<'a>(self) -> BoxStream<'a, Self::Item> where Self: Sized + Send + 'a,1112 fn boxed<'a>(self) -> BoxStream<'a, Self::Item> 1113 where 1114 Self: Sized + Send + 'a, 1115 { 1116 assert_stream::<Self::Item, _>(Box::pin(self)) 1117 } 1118 1119 /// Wrap the stream in a Box, pinning it. 1120 /// 1121 /// Similar to `boxed`, but without the `Send` requirement. 1122 /// 1123 /// This method is only available when the `std` or `alloc` feature of this 1124 /// library is activated, and it is activated by default. 1125 #[cfg(feature = "alloc")] boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item> where Self: Sized + 'a,1126 fn boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item> 1127 where 1128 Self: Sized + 'a, 1129 { 1130 assert_stream::<Self::Item, _>(Box::pin(self)) 1131 } 1132 1133 /// An adaptor for creating a buffered list of pending futures. 1134 /// 1135 /// If this stream's item can be converted into a future, then this adaptor 1136 /// will buffer up to at most `n` futures and then return the outputs in the 1137 /// same order as the underlying stream. No more than `n` futures will be 1138 /// buffered at any point in time, and less than `n` may also be buffered 1139 /// depending on the state of each future. 1140 /// 1141 /// The returned stream will be a stream of each future's output. 1142 /// 1143 /// This method is only available when the `std` or `alloc` feature of this 1144 /// library is activated, and it is activated by default. 1145 #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] 1146 #[cfg(feature = "alloc")] buffered(self, n: usize) -> Buffered<Self> where Self::Item: Future, Self: Sized,1147 fn buffered(self, n: usize) -> Buffered<Self> 1148 where 1149 Self::Item: Future, 1150 Self: Sized, 1151 { 1152 assert_stream::<<Self::Item as Future>::Output, _>(Buffered::new(self, n)) 1153 } 1154 1155 /// An adaptor for creating a buffered list of pending futures (unordered). 1156 /// 1157 /// If this stream's item can be converted into a future, then this adaptor 1158 /// will buffer up to `n` futures and then return the outputs in the order 1159 /// in which they complete. No more than `n` futures will be buffered at 1160 /// any point in time, and less than `n` may also be buffered depending on 1161 /// the state of each future. 1162 /// 1163 /// The returned stream will be a stream of each future's output. 1164 /// 1165 /// This method is only available when the `std` or `alloc` feature of this 1166 /// library is activated, and it is activated by default. 1167 /// 1168 /// # Examples 1169 /// 1170 /// ``` 1171 /// # futures::executor::block_on(async { 1172 /// use futures::channel::oneshot; 1173 /// use futures::stream::{self, StreamExt}; 1174 /// 1175 /// let (send_one, recv_one) = oneshot::channel(); 1176 /// let (send_two, recv_two) = oneshot::channel(); 1177 /// 1178 /// let stream_of_futures = stream::iter(vec![recv_one, recv_two]); 1179 /// let mut buffered = stream_of_futures.buffer_unordered(10); 1180 /// 1181 /// send_two.send(2i32)?; 1182 /// assert_eq!(buffered.next().await, Some(Ok(2i32))); 1183 /// 1184 /// send_one.send(1i32)?; 1185 /// assert_eq!(buffered.next().await, Some(Ok(1i32))); 1186 /// 1187 /// assert_eq!(buffered.next().await, None); 1188 /// # Ok::<(), i32>(()) }).unwrap(); 1189 /// ``` 1190 #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] 1191 #[cfg(feature = "alloc")] buffer_unordered(self, n: usize) -> BufferUnordered<Self> where Self::Item: Future, Self: Sized,1192 fn buffer_unordered(self, n: usize) -> BufferUnordered<Self> 1193 where 1194 Self::Item: Future, 1195 Self: Sized, 1196 { 1197 assert_stream::<<Self::Item as Future>::Output, _>(BufferUnordered::new(self, n)) 1198 } 1199 1200 /// An adapter for zipping two streams together. 1201 /// 1202 /// The zipped stream waits for both streams to produce an item, and then 1203 /// returns that pair. If either stream ends then the zipped stream will 1204 /// also end. 1205 /// 1206 /// # Examples 1207 /// 1208 /// ``` 1209 /// # futures::executor::block_on(async { 1210 /// use futures::stream::{self, StreamExt}; 1211 /// 1212 /// let stream1 = stream::iter(1..=3); 1213 /// let stream2 = stream::iter(5..=10); 1214 /// 1215 /// let vec = stream1.zip(stream2) 1216 /// .collect::<Vec<_>>() 1217 /// .await; 1218 /// assert_eq!(vec![(1, 5), (2, 6), (3, 7)], vec); 1219 /// # }); 1220 /// ``` 1221 /// zip<St>(self, other: St) -> Zip<Self, St> where St: Stream, Self: Sized,1222 fn zip<St>(self, other: St) -> Zip<Self, St> 1223 where 1224 St: Stream, 1225 Self: Sized, 1226 { 1227 assert_stream::<(Self::Item, St::Item), _>(Zip::new(self, other)) 1228 } 1229 1230 /// Adapter for chaining two streams. 1231 /// 1232 /// The resulting stream emits elements from the first stream, and when 1233 /// first stream reaches the end, emits the elements from the second stream. 1234 /// 1235 /// ``` 1236 /// # futures::executor::block_on(async { 1237 /// use futures::stream::{self, StreamExt}; 1238 /// 1239 /// let stream1 = stream::iter(vec![Ok(10), Err(false)]); 1240 /// let stream2 = stream::iter(vec![Err(true), Ok(20)]); 1241 /// 1242 /// let stream = stream1.chain(stream2); 1243 /// 1244 /// let result: Vec<_> = stream.collect().await; 1245 /// assert_eq!(result, vec![ 1246 /// Ok(10), 1247 /// Err(false), 1248 /// Err(true), 1249 /// Ok(20), 1250 /// ]); 1251 /// # }); 1252 /// ``` chain<St>(self, other: St) -> Chain<Self, St> where St: Stream<Item = Self::Item>, Self: Sized,1253 fn chain<St>(self, other: St) -> Chain<Self, St> 1254 where 1255 St: Stream<Item = Self::Item>, 1256 Self: Sized, 1257 { 1258 assert_stream::<Self::Item, _>(Chain::new(self, other)) 1259 } 1260 1261 /// Creates a new stream which exposes a `peek` method. 1262 /// 1263 /// Calling `peek` returns a reference to the next item in the stream. peekable(self) -> Peekable<Self> where Self: Sized,1264 fn peekable(self) -> Peekable<Self> 1265 where 1266 Self: Sized, 1267 { 1268 assert_stream::<Self::Item, _>(Peekable::new(self)) 1269 } 1270 1271 /// An adaptor for chunking up items of the stream inside a vector. 1272 /// 1273 /// This combinator will attempt to pull items from this stream and buffer 1274 /// them into a local vector. At most `capacity` items will get buffered 1275 /// before they're yielded from the returned stream. 1276 /// 1277 /// Note that the vectors returned from this iterator may not always have 1278 /// `capacity` elements. If the underlying stream ended and only a partial 1279 /// vector was created, it'll be returned. Additionally if an error happens 1280 /// from the underlying stream then the currently buffered items will be 1281 /// yielded. 1282 /// 1283 /// This method is only available when the `std` or `alloc` feature of this 1284 /// library is activated, and it is activated by default. 1285 /// 1286 /// # Panics 1287 /// 1288 /// This method will panic if `capacity` is zero. 1289 #[cfg(feature = "alloc")] chunks(self, capacity: usize) -> Chunks<Self> where Self: Sized,1290 fn chunks(self, capacity: usize) -> Chunks<Self> 1291 where 1292 Self: Sized, 1293 { 1294 assert_stream::<Vec<Self::Item>, _>(Chunks::new(self, capacity)) 1295 } 1296 1297 /// An adaptor for chunking up ready items of the stream inside a vector. 1298 /// 1299 /// This combinator will attempt to pull ready items from this stream and 1300 /// buffer them into a local vector. At most `capacity` items will get 1301 /// buffered before they're yielded from the returned stream. If underlying 1302 /// stream returns `Poll::Pending`, and collected chunk is not empty, it will 1303 /// be immediately returned. 1304 /// 1305 /// If the underlying stream ended and only a partial vector was created, 1306 /// it'll be returned. Additionally if an error happens from the underlying 1307 /// stream then the currently buffered items will be yielded. 1308 /// 1309 /// This method is only available when the `std` or `alloc` feature of this 1310 /// library is activated, and it is activated by default. 1311 /// 1312 /// # Panics 1313 /// 1314 /// This method will panic if `capacity` is zero. 1315 #[cfg(feature = "alloc")] ready_chunks(self, capacity: usize) -> ReadyChunks<Self> where Self: Sized,1316 fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self> 1317 where 1318 Self: Sized, 1319 { 1320 assert_stream::<Vec<Self::Item>, _>(ReadyChunks::new(self, capacity)) 1321 } 1322 1323 /// A future that completes after the given stream has been fully processed 1324 /// into the sink and the sink has been flushed and closed. 1325 /// 1326 /// This future will drive the stream to keep producing items until it is 1327 /// exhausted, sending each item to the sink. It will complete once the 1328 /// stream is exhausted, the sink has received and flushed all items, and 1329 /// the sink is closed. Note that neither the original stream nor provided 1330 /// sink will be output by this future. Pass the sink by `Pin<&mut S>` 1331 /// (for example, via `forward(&mut sink)` inside an `async` fn/block) in 1332 /// order to preserve access to the `Sink`. 1333 #[cfg(feature = "sink")] 1334 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] forward<S>(self, sink: S) -> Forward<Self, S> where S: Sink<Self::Ok, Error = Self::Error>, Self: TryStream + Sized,1335 fn forward<S>(self, sink: S) -> Forward<Self, S> 1336 where 1337 S: Sink<Self::Ok, Error = Self::Error>, 1338 Self: TryStream + Sized, 1339 // Self: TryStream + Sized + Stream<Item = Result<<Self as TryStream>::Ok, <Self as TryStream>::Error>>, 1340 { 1341 // TODO: type mismatch resolving `<Self as futures_core::Stream>::Item == std::result::Result<<Self as futures_core::TryStream>::Ok, <Self as futures_core::TryStream>::Error>` 1342 // assert_future::<Result<(), Self::Error>, _>(Forward::new(self, sink)) 1343 Forward::new(self, sink) 1344 } 1345 1346 /// Splits this `Stream + Sink` object into separate `Sink` and `Stream` 1347 /// objects. 1348 /// 1349 /// This can be useful when you want to split ownership between tasks, or 1350 /// allow direct interaction between the two objects (e.g. via 1351 /// `Sink::send_all`). 1352 /// 1353 /// This method is only available when the `std` or `alloc` feature of this 1354 /// library is activated, and it is activated by default. 1355 #[cfg(feature = "sink")] 1356 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 1357 #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] 1358 #[cfg(feature = "alloc")] split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) where Self: Sink<Item> + Sized,1359 fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) 1360 where 1361 Self: Sink<Item> + Sized, 1362 { 1363 let (sink, stream) = split::split(self); 1364 ( 1365 crate::sink::assert_sink::<Item, Self::Error, _>(sink), 1366 assert_stream::<Self::Item, _>(stream), 1367 ) 1368 } 1369 1370 /// Do something with each item of this stream, afterwards passing it on. 1371 /// 1372 /// This is similar to the `Iterator::inspect` method in the standard 1373 /// library where it allows easily inspecting each value as it passes 1374 /// through the stream, for example to debug what's going on. inspect<F>(self, f: F) -> Inspect<Self, F> where F: FnMut(&Self::Item), Self: Sized,1375 fn inspect<F>(self, f: F) -> Inspect<Self, F> 1376 where 1377 F: FnMut(&Self::Item), 1378 Self: Sized, 1379 { 1380 assert_stream::<Self::Item, _>(Inspect::new(self, f)) 1381 } 1382 1383 /// Wrap this stream in an `Either` stream, making it the left-hand variant 1384 /// of that `Either`. 1385 /// 1386 /// This can be used in combination with the `right_stream` method to write `if` 1387 /// statements that evaluate to different streams in different branches. left_stream<B>(self) -> Either<Self, B> where B: Stream<Item = Self::Item>, Self: Sized,1388 fn left_stream<B>(self) -> Either<Self, B> 1389 where 1390 B: Stream<Item = Self::Item>, 1391 Self: Sized, 1392 { 1393 assert_stream::<Self::Item, _>(Either::Left(self)) 1394 } 1395 1396 /// Wrap this stream in an `Either` stream, making it the right-hand variant 1397 /// of that `Either`. 1398 /// 1399 /// This can be used in combination with the `left_stream` method to write `if` 1400 /// statements that evaluate to different streams in different branches. right_stream<B>(self) -> Either<B, Self> where B: Stream<Item = Self::Item>, Self: Sized,1401 fn right_stream<B>(self) -> Either<B, Self> 1402 where 1403 B: Stream<Item = Self::Item>, 1404 Self: Sized, 1405 { 1406 assert_stream::<Self::Item, _>(Either::Right(self)) 1407 } 1408 1409 /// A convenience method for calling [`Stream::poll_next`] on [`Unpin`] 1410 /// stream types. poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> where Self: Unpin,1411 fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> 1412 where 1413 Self: Unpin, 1414 { 1415 Pin::new(self).poll_next(cx) 1416 } 1417 1418 /// Returns a [`Future`] that resolves when the next item in this stream is 1419 /// ready. 1420 /// 1421 /// This is similar to the [`next`][StreamExt::next] method, but it won't 1422 /// resolve to [`None`] if used on an empty [`Stream`]. Instead, the 1423 /// returned future type will return `true` from 1424 /// [`FusedFuture::is_terminated`][] when the [`Stream`] is empty, allowing 1425 /// [`select_next_some`][StreamExt::select_next_some] to be easily used with 1426 /// the [`select!`] macro. 1427 /// 1428 /// If the future is polled after this [`Stream`] is empty it will panic. 1429 /// Using the future with a [`FusedFuture`][]-aware primitive like the 1430 /// [`select!`] macro will prevent this. 1431 /// 1432 /// [`FusedFuture`]: futures_core::future::FusedFuture 1433 /// [`FusedFuture::is_terminated`]: futures_core::future::FusedFuture::is_terminated 1434 /// 1435 /// # Examples 1436 /// 1437 /// ``` 1438 /// # futures::executor::block_on(async { 1439 /// use futures::{future, select}; 1440 /// use futures::stream::{StreamExt, FuturesUnordered}; 1441 /// 1442 /// let mut fut = future::ready(1); 1443 /// let mut async_tasks = FuturesUnordered::new(); 1444 /// let mut total = 0; 1445 /// loop { 1446 /// select! { 1447 /// num = fut => { 1448 /// // First, the `ready` future completes. 1449 /// total += num; 1450 /// // Then we spawn a new task onto `async_tasks`, 1451 /// async_tasks.push(async { 5 }); 1452 /// }, 1453 /// // On the next iteration of the loop, the task we spawned 1454 /// // completes. 1455 /// num = async_tasks.select_next_some() => { 1456 /// total += num; 1457 /// } 1458 /// // Finally, both the `ready` future and `async_tasks` have 1459 /// // finished, so we enter the `complete` branch. 1460 /// complete => break, 1461 /// } 1462 /// } 1463 /// assert_eq!(total, 6); 1464 /// # }); 1465 /// ``` select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Unpin + FusedStream,1466 fn select_next_some(&mut self) -> SelectNextSome<'_, Self> 1467 where 1468 Self: Unpin + FusedStream, 1469 { 1470 assert_future::<Self::Item, _>(SelectNextSome::new(self)) 1471 } 1472 } 1473