• Home
  • Raw
  • Download

Lines Matching +full:merge +full:- +full:stream

2 use futures_core::Stream;
34 mod merge; module
35 use merge::Merge;
70 /// An extension trait for the [`Stream`] trait that provides a variety of
73 /// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
76 /// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
84 /// use futures::stream::StreamExt;
92 /// let merged = tokio_stream::StreamExt::merge(a, b);
94 /// // use normal call notation for futures::stream::StreamExt::collect
100 /// [`Stream`]: crate::Stream
102 /// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
103 pub trait StreamExt: Stream {
104 /// Consumes and returns the next value in the stream or `None` if the
105 /// stream is finished.
110 /// async fn next(&mut self) -> Option<Self::Item>;
113 /// Note that because `next` doesn't take ownership over the stream,
114 /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
115 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
116 /// be done by boxing the stream using [`Box::pin`] or
123 /// holds onto a reference to the underlying stream,
131 /// use tokio_stream::{self as stream, StreamExt};
133 /// let mut stream = stream::iter(1..=3);
135 /// assert_eq!(stream.next().await, Some(1));
136 /// assert_eq!(stream.next().await, Some(2));
137 /// assert_eq!(stream.next().await, Some(3));
138 /// assert_eq!(stream.next().await, None);
141 fn next(&mut self) -> Next<'_, Self> in next()
148 /// Consumes and returns the next item in the stream. If an error is
154 /// async fn try_next(&mut self) -> Result<Option<T>, E>;
165 /// holds onto a reference to the underlying stream,
173 /// use tokio_stream::{self as stream, StreamExt};
175 /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
177 /// assert_eq!(stream.try_next().await, Ok(Some(1)));
178 /// assert_eq!(stream.try_next().await, Ok(Some(2)));
179 /// assert_eq!(stream.try_next().await, Err("nope"));
182 fn try_next<T, E>(&mut self) -> TryNext<'_, Self> in try_next()
184 Self: Stream<Item = Result<T, E>> + Unpin, in try_next()
189 /// Maps this stream's items to a different type, returning a new stream of
192 /// The provided closure is executed over all elements of this stream as
194 /// [`poll_next`](Stream::poll_next).
196 /// Note that this function consumes the stream passed into it and returns a
205 /// use tokio_stream::{self as stream, StreamExt};
207 /// let stream = stream::iter(1..=3);
208 /// let mut stream = stream.map(|x| x + 3);
210 /// assert_eq!(stream.next().await, Some(4));
211 /// assert_eq!(stream.next().await, Some(5));
212 /// assert_eq!(stream.next().await, Some(6));
215 fn map<T, F>(self, f: F) -> Map<Self, F> in map()
217 F: FnMut(Self::Item) -> T, in map()
223 /// Map this stream's items to a different type for as long as determined by
224 /// the provided closure. A stream of the target type will be returned,
227 /// The provided closure is executed over all elements of this stream as
229 /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
230 /// the underlying stream will not be polled again.
232 /// Note that this function consumes the stream passed into it and returns a
241 /// use tokio_stream::{self as stream, StreamExt};
243 /// let stream = stream::iter(1..=10);
244 /// let mut stream = stream.map_while(|x| {
251 /// assert_eq!(stream.next().await, Some(4));
252 /// assert_eq!(stream.next().await, Some(5));
253 /// assert_eq!(stream.next().await, Some(6));
254 /// assert_eq!(stream.next().await, None);
257 fn map_while<T, F>(self, f: F) -> MapWhile<Self, F> in map_while()
259 F: FnMut(Self::Item) -> Option<T>, in map_while()
265 /// Maps this stream's items asynchronously to a different type, returning a
266 /// new stream of the resulting type.
268 /// The provided closure is executed over all elements of this stream as
272 /// Note that this function consumes the stream passed into it and returns a
276 /// Be aware that if the future is not `Unpin`, then neither is the `Stream`
278 /// the example below or put the stream in a `Box` with `Box::pin(stream)`.
285 /// use tokio_stream::{self as stream, StreamExt};
287 /// async fn do_async_work(value: i32) -> i32 {
291 /// let stream = stream::iter(1..=3);
292 /// let stream = stream.then(do_async_work);
294 /// tokio::pin!(stream);
296 /// assert_eq!(stream.next().await, Some(4));
297 /// assert_eq!(stream.next().await, Some(5));
298 /// assert_eq!(stream.next().await, Some(6));
301 fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F> in then()
303 F: FnMut(Self::Item) -> Fut, in then()
313 /// Values are produced from the merged stream in the order they arrive from
315 /// simultaneously, the merge stream alternates between them. This provides
316 /// some level of fairness. You should not chain calls to `merge`, as this
319 /// The merged stream completes once **both** source streams complete. When
320 /// one source stream completes before the other, the merge stream
321 /// exclusively polls the remaining stream.
330 /// use tokio_stream::{StreamExt, Stream};
346 /// // Convert the channels to a `Stream`.
347 /// let rx1 = Box::pin(async_stream::stream! {
351 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
353 /// let rx2 = Box::pin(async_stream::stream! {
357 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
359 /// let mut rx = rx1.merge(rx2);
390 /// // The merged stream is consumed
394 fn merge<U>(self, other: U) -> Merge<Self, U> in merge() method
396 U: Stream<Item = Self::Item>, in merge()
399 Merge::new(self, other) in merge()
402 /// Filters the values produced by this stream according to the provided
405 /// As values of this stream are made available, the provided predicate `f`
407 /// resolves to `true`, then the stream will yield the value, but if the
411 /// Note that this function consumes the stream passed into it and returns a
420 /// use tokio_stream::{self as stream, StreamExt};
422 /// let stream = stream::iter(1..=8);
423 /// let mut evens = stream.filter(|x| x % 2 == 0);
432 fn filter<F>(self, f: F) -> Filter<Self, F> in filter()
434 F: FnMut(&Self::Item) -> bool, in filter()
440 /// Filters the values produced by this stream while simultaneously mapping
443 /// As values of this stream are made available, the provided function will
445 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
448 /// Note that this function consumes the stream passed into it and returns a
456 /// use tokio_stream::{self as stream, StreamExt};
458 /// let stream = stream::iter(1..=8);
459 /// let mut evens = stream.filter_map(|x| {
470 fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F> in filter_map()
472 F: FnMut(Self::Item) -> Option<T>, in filter_map()
478 /// Creates a stream which ends after the first `None`.
480 /// After a stream returns `None`, behavior is undefined. Future calls to
482 /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
488 /// use tokio_stream::{Stream, StreamExt};
493 /// // a stream which alternates between Some and None
498 /// impl Stream for Alternate {
501 /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
516 /// let mut stream = Alternate { state: 0 };
518 /// // the stream goes back and forth
519 /// assert_eq!(stream.next().await, Some(0));
520 /// assert_eq!(stream.next().await, None);
521 /// assert_eq!(stream.next().await, Some(2));
522 /// assert_eq!(stream.next().await, None);
525 /// let mut stream = stream.fuse();
527 /// assert_eq!(stream.next().await, Some(4));
528 /// assert_eq!(stream.next().await, None);
531 /// assert_eq!(stream.next().await, None);
532 /// assert_eq!(stream.next().await, None);
533 /// assert_eq!(stream.next().await, None);
536 fn fuse(self) -> Fuse<Self> in fuse()
543 /// Creates a new stream of at most `n` items of the underlying stream.
545 /// Once `n` items have been yielded from this stream then it will always
546 /// return that the stream is done.
553 /// use tokio_stream::{self as stream, StreamExt};
555 /// let mut stream = stream::iter(1..=10).take(3);
557 /// assert_eq!(Some(1), stream.next().await);
558 /// assert_eq!(Some(2), stream.next().await);
559 /// assert_eq!(Some(3), stream.next().await);
560 /// assert_eq!(None, stream.next().await);
563 fn take(self, n: usize) -> Take<Self> in take()
570 /// Take elements from this stream while the provided predicate
574 /// stream until the predicate `f` resolves to `false`. Once one element
575 /// returns false it will always return that the stream is done.
582 /// use tokio_stream::{self as stream, StreamExt};
584 /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
586 /// assert_eq!(Some(1), stream.next().await);
587 /// assert_eq!(Some(2), stream.next().await);
588 /// assert_eq!(Some(3), stream.next().await);
589 /// assert_eq!(None, stream.next().await);
592 fn take_while<F>(self, f: F) -> TakeWhile<Self, F> in take_while()
594 F: FnMut(&Self::Item) -> bool, in take_while()
600 /// Creates a new stream that will skip the `n` first items of the
601 /// underlying stream.
608 /// use tokio_stream::{self as stream, StreamExt};
610 /// let mut stream = stream::iter(1..=10).skip(7);
612 /// assert_eq!(Some(8), stream.next().await);
613 /// assert_eq!(Some(9), stream.next().await);
614 /// assert_eq!(Some(10), stream.next().await);
615 /// assert_eq!(None, stream.next().await);
618 fn skip(self, n: usize) -> Skip<Self> in skip()
625 /// Skip elements from the underlying stream while the provided predicate
629 /// stream until the predicate `f` resolves to `false`. Once one element
639 /// use tokio_stream::{self as stream, StreamExt};
640 /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
642 /// assert_eq!(Some(3), stream.next().await);
643 /// assert_eq!(Some(4), stream.next().await);
644 /// assert_eq!(Some(1), stream.next().await);
645 /// assert_eq!(None, stream.next().await);
648 fn skip_while<F>(self, f: F) -> SkipWhile<Self, F> in skip_while()
650 F: FnMut(&Self::Item) -> bool, in skip_while()
656 /// Tests if every element of the stream matches a predicate.
661 /// async fn all<F>(&mut self, f: F) -> bool;
665 /// this closure to each element of the stream, and if they all return
667 /// returns `false`. An empty stream returns `true`.
669 /// `all()` is short-circuiting; in other words, it will stop processing
673 /// An empty stream returns `true`.
682 /// use tokio_stream::{self as stream, StreamExt};
686 /// assert!(stream::iter(&a).all(|&x| x > 0).await);
688 /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
697 /// use tokio_stream::{self as stream, StreamExt};
701 /// let mut iter = stream::iter(&a);
709 fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> in all()
712 F: FnMut(Self::Item) -> bool, in all()
717 /// Tests if any element of the stream matches a predicate.
722 /// async fn any<F>(&mut self, f: F) -> bool;
726 /// this closure to each element of the stream, and if any of them return
730 /// `any()` is short-circuiting; in other words, it will stop processing
734 /// An empty stream returns `false`.
741 /// use tokio_stream::{self as stream, StreamExt};
745 /// assert!(stream::iter(&a).any(|&x| x > 0).await);
747 /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
756 /// use tokio_stream::{self as stream, StreamExt};
760 /// let mut iter = stream::iter(&a);
768 fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> in any()
771 F: FnMut(Self::Item) -> bool, in any()
777 /// first stream then all values from the second stream.
785 /// use tokio_stream::{self as stream, StreamExt};
789 /// let one = stream::iter(vec![1, 2, 3]);
790 /// let two = stream::iter(vec![4, 5, 6]);
792 /// let mut stream = one.chain(two);
794 /// assert_eq!(stream.next().await, Some(1));
795 /// assert_eq!(stream.next().await, Some(2));
796 /// assert_eq!(stream.next().await, Some(3));
797 /// assert_eq!(stream.next().await, Some(4));
798 /// assert_eq!(stream.next().await, Some(5));
799 /// assert_eq!(stream.next().await, Some(6));
800 /// assert_eq!(stream.next().await, None);
803 fn chain<U>(self, other: U) -> Chain<Self, U> in chain()
805 U: Stream<Item = Self::Item>, in chain()
811 /// A combinator that applies a function to every element in a stream
817 /// async fn fold<B, F>(self, init: B, f: F) -> B;
825 /// use tokio_stream::{self as stream, *};
827 /// let s = stream::iter(vec![1u8, 2, 3]);
833 fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> in fold()
836 F: FnMut(B, Self::Item) -> B, in fold()
841 /// Drain stream pushing all emitted values into a collection.
846 /// async fn collect<T>(self) -> T;
859 /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
860 /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
873 /// use tokio_stream::{self as stream, StreamExt};
878 /// stream::iter(vec![1, 2, 3])
887 /// Collecting a stream of `Result` values
890 /// use tokio_stream::{self as stream, StreamExt};
894 /// // A stream containing only `Ok` values will be collected
896 /// stream::iter(vec![Ok(1), Ok(2), Ok(3)])
902 /// // A stream containing `Err` values will return the first error.
906 /// stream::iter(results)
913 fn collect<T>(self) -> Collect<Self, T> in collect()
921 /// Applies a per-item timeout to the passed stream.
924 /// time each element of the stream has to complete before timing out.
926 /// If the wrapped stream yields a value before the deadline is reached, the
928 /// to continue consuming the stream and will eventually get the next source
929 /// stream value once it becomes available. See
935 /// This function consumes the stream passed into it and returns a
938 /// Polling the returned stream will continue to poll the inner stream even
943 /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
948 /// use tokio_stream::{self as stream, StreamExt};
950 /// # let int_stream = stream::iter(1..=3);
961 /// // If the second item times out, we get an error and continue polling the stream:
962 /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
969 /// // If we want to stop consuming the source stream the first time an
971 /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
980 /// unless the wrapped stream yields a value (timeouts do not repeat).
991 /// // Only one timeout will be received between values in the source stream.
999 fn timeout(self, duration: Duration) -> Timeout<Self> in timeout()
1006 /// Applies a per-item timeout to the passed stream.
1009 /// controls the time each element of the stream has to complete before
1012 /// If the wrapped stream yields a value before the deadline is reached, the
1014 /// to continue consuming the stream and will eventually get the next source
1015 /// stream value once it becomes available. Unlike `timeout()`, if no value
1022 /// This function consumes the stream passed into it and returns a
1025 /// Polling the returned stream will continue to poll the inner stream even
1030 /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
1035 /// use tokio_stream::{self as stream, StreamExt};
1037 /// # let int_stream = stream::iter(1..=3);
1048 /// // If the second item times out, we get an error and continue polling the stream:
1049 /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1056 /// // If we want to stop consuming the source stream the first time an
1058 /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1067 /// until the wrapped stream yields a value.
1078 /// // Multiple timeouts will be received between values in the source stream.
1082 /// // Will eventually receive another value from the source stream...
1083 /// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout");
1088 fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self> in timeout_repeating()
1095 /// Slows down a stream by enforcing a delay between items.
1101 /// Create a throttled stream.
1107 /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
1118 fn throttle(self, duration: Duration) -> Throttle<Self> in throttle()
1125 /// Batches the items in the given stream using a maximum duration and size for each batch.
1127 /// This stream returns the next batch of items in the following situations:
1128 /// 1. The inner stream has returned at least `max_size` many items since the last batch.
1130 /// 3. The end of the stream is reached.
1144 /// use tokio_stream::{self as stream, StreamExt};
1152 /// let stream0 = stream::iter(iter);
1155 /// let stream1 = stream::iter(iter)
1167 /// // last element in the stream
1174 fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self> in chunks_timeout()
1178 assert!(max_size > 0, "`max_size` must be non-zero."); in chunks_timeout()
1183 impl<St: ?Sized> StreamExt for St where St: Stream {}
1185 /// Merge the size hints from two streams.
1189 ) -> (usize, Option<usize>) { in merge_size_hints()