use futures_core::Stream; mod all; use all::AllFuture; mod any; use any::AnyFuture; mod chain; use chain::Chain; pub(crate) mod collect; use collect::{Collect, FromStream}; mod filter; use filter::Filter; mod filter_map; use filter_map::FilterMap; mod fold; use fold::FoldFuture; mod fuse; use fuse::Fuse; mod map; use map::Map; mod merge; use merge::Merge; mod next; use next::Next; mod skip; use skip::Skip; mod skip_while; use skip_while::SkipWhile; mod try_next; use try_next::TryNext; mod take; use take::Take; mod take_while; use take_while::TakeWhile; cfg_time! { mod timeout; use timeout::Timeout; use tokio::time::Duration; mod throttle; use throttle::{throttle, Throttle}; } /// An extension trait for the [`Stream`] trait that provides a variety of /// convenient combinator functions. /// /// Be aware that the `Stream` trait in Tokio is a re-export of the trait found /// in the [futures] crate, however both Tokio and futures provide separate /// `StreamExt` utility traits, and some utilities are only available on one of /// these traits. Click [here][futures-StreamExt] to see the other `StreamExt` /// trait in the futures crate. /// /// If you need utilities from both `StreamExt` traits, you should prefer to /// import one of them, and use the other through the fully qualified call /// syntax. For example: /// ``` /// // import one of the traits: /// use futures::stream::StreamExt; /// # #[tokio::main(flavor = "current_thread")] /// # async fn main() { /// /// let a = tokio_stream::iter(vec![1, 3, 5]); /// let b = tokio_stream::iter(vec![2, 4, 6]); /// /// // use the fully qualified call syntax for the other trait: /// let merged = tokio_stream::StreamExt::merge(a, b); /// /// // use normal call notation for futures::stream::StreamExt::collect /// let output: Vec<_> = merged.collect().await; /// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]); /// # } /// ``` /// /// [`Stream`]: crate::Stream /// [futures]: https://docs.rs/futures /// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html pub trait StreamExt: Stream { /// Consumes and returns the next value in the stream or `None` if the /// stream is finished. /// /// Equivalent to: /// /// ```ignore /// async fn next(&mut self) -> Option; /// ``` /// /// Note that because `next` doesn't take ownership over the stream, /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can /// be done by boxing the stream using [`Box::pin`] or /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` /// crate. /// /// # Examples /// /// ``` /// # #[tokio::main] /// # async fn main() { /// use tokio_stream::{self as stream, StreamExt}; /// /// let mut stream = stream::iter(1..=3); /// /// assert_eq!(stream.next().await, Some(1)); /// assert_eq!(stream.next().await, Some(2)); /// assert_eq!(stream.next().await, Some(3)); /// assert_eq!(stream.next().await, None); /// # } /// ``` fn next(&mut self) -> Next<'_, Self> where Self: Unpin, { Next::new(self) } /// Consumes and returns the next item in the stream. If an error is /// encountered before the next item, the error is returned instead. /// /// Equivalent to: /// /// ```ignore /// async fn try_next(&mut self) -> Result, E>; /// ``` /// /// This is similar to the [`next`](StreamExt::next) combinator, /// but returns a [`Result, E>`](Result) rather than /// an [`Option>`](Option), making for easy use /// with the [`?`](std::ops::Try) operator. /// /// # Examples /// /// ``` /// # #[tokio::main] /// # async fn main() { /// use tokio_stream::{self as stream, StreamExt}; /// /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]); /// /// assert_eq!(stream.try_next().await, Ok(Some(1))); /// assert_eq!(stream.try_next().await, Ok(Some(2))); /// assert_eq!(stream.try_next().await, Err("nope")); /// # } /// ``` fn try_next(&mut self) -> TryNext<'_, Self> where Self: Stream> + Unpin, { TryNext::new(self) } /// Maps this stream's items to a different type, returning a new stream of /// the resulting type. /// /// The provided closure is executed over all elements of this stream as /// they are made available. It is executed inline with calls to /// [`poll_next`](Stream::poll_next). /// /// Note that this function consumes the stream passed into it and returns a /// wrapped version of it, similar to the existing `map` methods in the /// standard library. /// /// # Examples /// /// ``` /// # #[tokio::main] /// # async fn main() { /// use tokio_stream::{self as stream, StreamExt}; /// /// let stream = stream::iter(1..=3); /// let mut stream = stream.map(|x| x + 3); /// /// assert_eq!(stream.next().await, Some(4)); /// assert_eq!(stream.next().await, Some(5)); /// assert_eq!(stream.next().await, Some(6)); /// # } /// ``` fn map(self, f: F) -> Map where F: FnMut(Self::Item) -> T, Self: Sized, { Map::new(self, f) } /// Combine two streams into one by interleaving the output of both as it /// is produced. /// /// Values are produced from the merged stream in the order they arrive from /// the two source streams. If both source streams provide values /// simultaneously, the merge stream alternates between them. This provides /// some level of fairness. You should not chain calls to `merge`, as this /// will break the fairness of the merging. /// /// The merged stream completes once **both** source streams complete. When /// one source stream completes before the other, the merge stream /// exclusively polls the remaining stream. /// /// For merging multiple streams, consider using [`StreamMap`] instead. /// /// [`StreamMap`]: crate::StreamMap /// /// # Examples /// /// ``` /// use tokio_stream::{StreamExt, Stream}; /// use tokio::sync::mpsc; /// use tokio::time; /// /// use std::time::Duration; /// use std::pin::Pin; /// /// # /* /// #[tokio::main] /// # */ /// # #[tokio::main(flavor = "current_thread")] /// async fn main() { /// # time::pause(); /// let (tx1, mut rx1) = mpsc::channel::(10); /// let (tx2, mut rx2) = mpsc::channel::(10); /// /// // Convert the channels to a `Stream`. /// let rx1 = Box::pin(async_stream::stream! { /// while let Some(item) = rx1.recv().await { /// yield item; /// } /// }) as Pin + Send>>; /// /// let rx2 = Box::pin(async_stream::stream! { /// while let Some(item) = rx2.recv().await { /// yield item; /// } /// }) as Pin + Send>>; /// /// let mut rx = rx1.merge(rx2); /// /// tokio::spawn(async move { /// // Send some values immediately /// tx1.send(1).await.unwrap(); /// tx1.send(2).await.unwrap(); /// /// // Let the other task send values /// time::sleep(Duration::from_millis(20)).await; /// /// tx1.send(4).await.unwrap(); /// }); /// /// tokio::spawn(async move { /// // Wait for the first task to send values /// time::sleep(Duration::from_millis(5)).await; /// /// tx2.send(3).await.unwrap(); /// /// time::sleep(Duration::from_millis(25)).await; /// /// // Send the final value /// tx2.send(5).await.unwrap(); /// }); /// /// assert_eq!(1, rx.next().await.unwrap()); /// assert_eq!(2, rx.next().await.unwrap()); /// assert_eq!(3, rx.next().await.unwrap()); /// assert_eq!(4, rx.next().await.unwrap()); /// assert_eq!(5, rx.next().await.unwrap()); /// /// // The merged stream is consumed /// assert!(rx.next().await.is_none()); /// } /// ``` fn merge(self, other: U) -> Merge where U: Stream, Self: Sized, { Merge::new(self, other) } /// Filters the values produced by this stream according to the provided /// predicate. /// /// As values of this stream are made available, the provided predicate `f` /// will be run against them. If the predicate /// resolves to `true`, then the stream will yield the value, but if the /// predicate resolves to `false`, then the value /// will be discarded and the next value will be produced. /// /// Note that this function consumes the stream passed into it and returns a /// wrapped version of it, similar to [`Iterator::filter`] method in the /// standard library. /// /// # Examples /// /// ``` /// # #[tokio::main] /// # async fn main() { /// use tokio_stream::{self as stream, StreamExt}; /// /// let stream = stream::iter(1..=8); /// let mut evens = stream.filter(|x| x % 2 == 0); /// /// assert_eq!(Some(2), evens.next().await); /// assert_eq!(Some(4), evens.next().await); /// assert_eq!(Some(6), evens.next().await); /// assert_eq!(Some(8), evens.next().await); /// assert_eq!(None, evens.next().await); /// # } /// ``` fn filter(self, f: F) -> Filter where F: FnMut(&Self::Item) -> bool, Self: Sized, { Filter::new(self, f) } /// Filters the values produced by this stream while simultaneously mapping /// them to a different type according to the provided closure. /// /// As values of this stream are made available, the provided function will /// be run on them. If the predicate `f` resolves to /// [`Some(item)`](Some) then the stream will yield the value `item`, but if /// it resolves to [`None`], then the value will be skipped. /// /// Note that this function consumes the stream passed into it and returns a /// wrapped version of it, similar to [`Iterator::filter_map`] method in the /// standard library. /// /// # Examples /// ``` /// # #[tokio::main] /// # async fn main() { /// use tokio_stream::{self as stream, StreamExt}; /// /// let stream = stream::iter(1..=8); /// let mut evens = stream.filter_map(|x| { /// if x % 2 == 0 { Some(x + 1) } else { None } /// }); /// /// assert_eq!(Some(3), evens.next().await); /// assert_eq!(Some(5), evens.next().await); /// assert_eq!(Some(7), evens.next().await); /// assert_eq!(Some(9), evens.next().await); /// assert_eq!(None, evens.next().await); /// # } /// ``` fn filter_map(self, f: F) -> FilterMap where F: FnMut(Self::Item) -> Option, Self: Sized, { FilterMap::new(self, f) } /// Creates a stream which ends after the first `None`. /// /// After a stream returns `None`, behavior is undefined. Future calls to /// `poll_next` may or may not return `Some(T)` again or they may panic. /// `fuse()` adapts a stream, ensuring that after `None` is given, it will /// return `None` forever. /// /// # Examples /// /// ``` /// use tokio_stream::{Stream, StreamExt}; /// /// use std::pin::Pin; /// use std::task::{Context, Poll}; /// /// // a stream which alternates between Some and None /// struct Alternate { /// state: i32, /// } /// /// impl Stream for Alternate { /// type Item = i32; /// /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { /// let val = self.state; /// self.state = self.state + 1; /// /// // if it's even, Some(i32), else None /// if val % 2 == 0 { /// Poll::Ready(Some(val)) /// } else { /// Poll::Ready(None) /// } /// } /// } /// /// #[tokio::main] /// async fn main() { /// let mut stream = Alternate { state: 0 }; /// /// // the stream goes back and forth /// assert_eq!(stream.next().await, Some(0)); /// assert_eq!(stream.next().await, None); /// assert_eq!(stream.next().await, Some(2)); /// assert_eq!(stream.next().await, None); /// /// // however, once it is fused /// let mut stream = stream.fuse(); /// /// assert_eq!(stream.next().await, Some(4)); /// assert_eq!(stream.next().await, None); /// /// // it will always return `None` after the first time. /// assert_eq!(stream.next().await, None); /// assert_eq!(stream.next().await, None); /// assert_eq!(stream.next().await, None); /// } /// ``` fn fuse(self) -> Fuse where Self: Sized, { Fuse::new(self) } /// Creates a new stream of at most `n` items of the underlying stream. /// /// Once `n` items have been yielded from this stream then it will always /// return that the stream is done. /// /// # Examples /// /// ``` /// # #[tokio::main] /// # async fn main() { /// use tokio_stream::{self as stream, StreamExt}; /// /// let mut stream = stream::iter(1..=10).take(3); /// /// assert_eq!(Some(1), stream.next().await); /// assert_eq!(Some(2), stream.next().await); /// assert_eq!(Some(3), stream.next().await); /// assert_eq!(None, stream.next().await); /// # } /// ``` fn take(self, n: usize) -> Take where Self: Sized, { Take::new(self, n) } /// Take elements from this stream while the provided predicate /// resolves to `true`. /// /// This function, like `Iterator::take_while`, will take elements from the /// stream until the predicate `f` resolves to `false`. Once one element /// returns false it will always return that the stream is done. /// /// # Examples /// /// ``` /// # #[tokio::main] /// # async fn main() { /// use tokio_stream::{self as stream, StreamExt}; /// /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3); /// /// assert_eq!(Some(1), stream.next().await); /// assert_eq!(Some(2), stream.next().await); /// assert_eq!(Some(3), stream.next().await); /// assert_eq!(None, stream.next().await); /// # } /// ``` fn take_while(self, f: F) -> TakeWhile where F: FnMut(&Self::Item) -> bool, Self: Sized, { TakeWhile::new(self, f) } /// Creates a new stream that will skip the `n` first items of the /// underlying stream. /// /// # Examples /// /// ``` /// # #[tokio::main] /// # async fn main() { /// use tokio_stream::{self as stream, StreamExt}; /// /// let mut stream = stream::iter(1..=10).skip(7); /// /// assert_eq!(Some(8), stream.next().await); /// assert_eq!(Some(9), stream.next().await); /// assert_eq!(Some(10), stream.next().await); /// assert_eq!(None, stream.next().await); /// # } /// ``` fn skip(self, n: usize) -> Skip where Self: Sized, { Skip::new(self, n) } /// Skip elements from the underlying stream while the provided predicate /// resolves to `true`. /// /// This function, like [`Iterator::skip_while`], will ignore elemets from the /// stream until the predicate `f` resolves to `false`. Once one element /// returns false, the rest of the elements will be yielded. /// /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while() /// /// # Examples /// /// ``` /// # #[tokio::main] /// # async fn main() { /// use tokio_stream::{self as stream, StreamExt}; /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3); /// /// assert_eq!(Some(3), stream.next().await); /// assert_eq!(Some(4), stream.next().await); /// assert_eq!(Some(1), stream.next().await); /// assert_eq!(None, stream.next().await); /// # } /// ``` fn skip_while(self, f: F) -> SkipWhile where F: FnMut(&Self::Item) -> bool, Self: Sized, { SkipWhile::new(self, f) } /// Tests if every element of the stream matches a predicate. /// /// Equivalent to: /// /// ```ignore /// async fn all(&mut self, f: F) -> bool; /// ``` /// /// `all()` takes a closure that returns `true` or `false`. It applies /// this closure to each element of the stream, and if they all return /// `true`, then so does `all`. If any of them return `false`, it /// returns `false`. An empty stream returns `true`. /// /// `all()` is short-circuiting; in other words, it will stop processing /// as soon as it finds a `false`, given that no matter what else happens, /// the result will also be `false`. /// /// An empty stream returns `true`. /// /// # Examples /// /// Basic usage: /// /// ``` /// # #[tokio::main] /// # async fn main() { /// use tokio_stream::{self as stream, StreamExt}; /// /// let a = [1, 2, 3]; /// /// assert!(stream::iter(&a).all(|&x| x > 0).await); /// /// assert!(!stream::iter(&a).all(|&x| x > 2).await); /// # } /// ``` /// /// Stopping at the first `false`: /// /// ``` /// # #[tokio::main] /// # async fn main() { /// use tokio_stream::{self as stream, StreamExt}; /// /// let a = [1, 2, 3]; /// /// let mut iter = stream::iter(&a); /// /// assert!(!iter.all(|&x| x != 2).await); /// /// // we can still use `iter`, as there are more elements. /// assert_eq!(iter.next().await, Some(&3)); /// # } /// ``` fn all(&mut self, f: F) -> AllFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool, { AllFuture::new(self, f) } /// Tests if any element of the stream matches a predicate. /// /// Equivalent to: /// /// ```ignore /// async fn any(&mut self, f: F) -> bool; /// ``` /// /// `any()` takes a closure that returns `true` or `false`. It applies /// this closure to each element of the stream, and if any of them return /// `true`, then so does `any()`. If they all return `false`, it /// returns `false`. /// /// `any()` is short-circuiting; in other words, it will stop processing /// as soon as it finds a `true`, given that no matter what else happens, /// the result will also be `true`. /// /// An empty stream returns `false`. /// /// Basic usage: /// /// ``` /// # #[tokio::main] /// # async fn main() { /// use tokio_stream::{self as stream, StreamExt}; /// /// let a = [1, 2, 3]; /// /// assert!(stream::iter(&a).any(|&x| x > 0).await); /// /// assert!(!stream::iter(&a).any(|&x| x > 5).await); /// # } /// ``` /// /// Stopping at the first `true`: /// /// ``` /// # #[tokio::main] /// # async fn main() { /// use tokio_stream::{self as stream, StreamExt}; /// /// let a = [1, 2, 3]; /// /// let mut iter = stream::iter(&a); /// /// assert!(iter.any(|&x| x != 2).await); /// /// // we can still use `iter`, as there are more elements. /// assert_eq!(iter.next().await, Some(&2)); /// # } /// ``` fn any(&mut self, f: F) -> AnyFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool, { AnyFuture::new(self, f) } /// Combine two streams into one by first returning all values from the /// first stream then all values from the second stream. /// /// As long as `self` still has values to emit, no values from `other` are /// emitted, even if some are ready. /// /// # Examples /// /// ``` /// use tokio_stream::{self as stream, StreamExt}; /// /// #[tokio::main] /// async fn main() { /// let one = stream::iter(vec![1, 2, 3]); /// let two = stream::iter(vec![4, 5, 6]); /// /// let mut stream = one.chain(two); /// /// assert_eq!(stream.next().await, Some(1)); /// assert_eq!(stream.next().await, Some(2)); /// assert_eq!(stream.next().await, Some(3)); /// assert_eq!(stream.next().await, Some(4)); /// assert_eq!(stream.next().await, Some(5)); /// assert_eq!(stream.next().await, Some(6)); /// assert_eq!(stream.next().await, None); /// } /// ``` fn chain(self, other: U) -> Chain where U: Stream, Self: Sized, { Chain::new(self, other) } /// A combinator that applies a function to every element in a stream /// producing a single, final value. /// /// Equivalent to: /// /// ```ignore /// async fn fold(self, init: B, f: F) -> B; /// ``` /// /// # Examples /// Basic usage: /// ``` /// # #[tokio::main] /// # async fn main() { /// use tokio_stream::{self as stream, *}; /// /// let s = stream::iter(vec![1u8, 2, 3]); /// let sum = s.fold(0, |acc, x| acc + x).await; /// /// assert_eq!(sum, 6); /// # } /// ``` fn fold(self, init: B, f: F) -> FoldFuture where Self: Sized, F: FnMut(B, Self::Item) -> B, { FoldFuture::new(self, init, f) } /// Drain stream pushing all emitted values into a collection. /// /// Equivalent to: /// /// ```ignore /// async fn collect(self) -> T; /// ``` /// /// `collect` streams all values, awaiting as needed. Values are pushed into /// a collection. A number of different target collection types are /// supported, including [`Vec`](std::vec::Vec), /// [`String`](std::string::String), and [`Bytes`]. /// /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html /// /// # `Result` /// /// `collect()` can also be used with streams of type `Result` where /// `T: FromStream<_>`. In this case, `collect()` will stream as long as /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered, /// streaming is terminated and `collect()` returns the `Err`. /// /// # Notes /// /// `FromStream` is currently a sealed trait. Stabilization is pending /// enhancements to the Rust language. /// /// # Examples /// /// Basic usage: /// /// ``` /// use tokio_stream::{self as stream, StreamExt}; /// /// #[tokio::main] /// async fn main() { /// let doubled: Vec = /// stream::iter(vec![1, 2, 3]) /// .map(|x| x * 2) /// .collect() /// .await; /// /// assert_eq!(vec![2, 4, 6], doubled); /// } /// ``` /// /// Collecting a stream of `Result` values /// /// ``` /// use tokio_stream::{self as stream, StreamExt}; /// /// #[tokio::main] /// async fn main() { /// // A stream containing only `Ok` values will be collected /// let values: Result, &str> = /// stream::iter(vec![Ok(1), Ok(2), Ok(3)]) /// .collect() /// .await; /// /// assert_eq!(Ok(vec![1, 2, 3]), values); /// /// // A stream containing `Err` values will return the first error. /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")]; /// /// let values: Result, &str> = /// stream::iter(results) /// .collect() /// .await; /// /// assert_eq!(Err("no"), values); /// } /// ``` fn collect(self) -> Collect where T: FromStream, Self: Sized, { Collect::new(self) } /// Applies a per-item timeout to the passed stream. /// /// `timeout()` takes a `Duration` that represents the maximum amount of /// time each element of the stream has to complete before timing out. /// /// If the wrapped stream yields a value before the deadline is reached, the /// value is returned. Otherwise, an error is returned. The caller may decide /// to continue consuming the stream and will eventually get the next source /// stream value once it becomes available. /// /// # Notes /// /// This function consumes the stream passed into it and returns a /// wrapped version of it. /// /// Polling the returned stream will continue to poll the inner stream even /// if one or more items time out. /// /// # Examples /// /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): /// /// ``` /// # #[tokio::main] /// # async fn main() { /// use tokio_stream::{self as stream, StreamExt}; /// use std::time::Duration; /// # let int_stream = stream::iter(1..=3); /// /// let int_stream = int_stream.timeout(Duration::from_secs(1)); /// tokio::pin!(int_stream); /// /// // When no items time out, we get the 3 elements in succession: /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); /// assert_eq!(int_stream.try_next().await, Ok(None)); /// /// // If the second item times out, we get an error and continue polling the stream: /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); /// assert!(int_stream.try_next().await.is_err()); /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); /// assert_eq!(int_stream.try_next().await, Ok(None)); /// /// // If we want to stop consuming the source stream the first time an /// // element times out, we can use the `take_while` operator: /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); /// let mut int_stream = int_stream.take_while(Result::is_ok); /// /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); /// assert_eq!(int_stream.try_next().await, Ok(None)); /// # } /// ``` #[cfg(all(feature = "time"))] #[cfg_attr(docsrs, doc(cfg(feature = "time")))] fn timeout(self, duration: Duration) -> Timeout where Self: Sized, { Timeout::new(self, duration) } /// Slows down a stream by enforcing a delay between items. /// /// # Example /// /// Create a throttled stream. /// ```rust,no_run /// use std::time::Duration; /// use tokio_stream::StreamExt; /// /// # async fn dox() { /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2)); /// tokio::pin!(item_stream); /// /// loop { /// // The string will be produced at most every 2 seconds /// println!("{:?}", item_stream.next().await); /// } /// # } /// ``` #[cfg(all(feature = "time"))] #[cfg_attr(docsrs, doc(cfg(feature = "time")))] fn throttle(self, duration: Duration) -> Throttle where Self: Sized, { throttle(duration, self) } } impl StreamExt for St where St: Stream {} /// Merge the size hints from two streams. fn merge_size_hints( (left_low, left_high): (usize, Option), (right_low, right_hign): (usize, Option), ) -> (usize, Option) { let low = left_low.saturating_add(right_low); let high = match (left_high, right_hign) { (Some(h1), Some(h2)) => h1.checked_add(h2), _ => None, }; (low, high) }