• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use core::future::Future;
2 use futures_core::Stream;
3 
4 mod all;
5 use all::AllFuture;
6 
7 mod any;
8 use any::AnyFuture;
9 
10 mod chain;
11 use chain::Chain;
12 
13 pub(crate) mod collect;
14 use collect::{Collect, FromStream};
15 
16 mod filter;
17 use filter::Filter;
18 
19 mod filter_map;
20 use filter_map::FilterMap;
21 
22 mod fold;
23 use fold::FoldFuture;
24 
25 mod fuse;
26 use fuse::Fuse;
27 
28 mod map;
29 use map::Map;
30 
31 mod map_while;
32 use map_while::MapWhile;
33 
34 mod merge;
35 use merge::Merge;
36 
37 mod next;
38 use next::Next;
39 
40 mod skip;
41 use skip::Skip;
42 
43 mod skip_while;
44 use skip_while::SkipWhile;
45 
46 mod take;
47 use take::Take;
48 
49 mod take_while;
50 use take_while::TakeWhile;
51 
52 mod then;
53 use then::Then;
54 
55 mod try_next;
56 use try_next::TryNext;
57 
58 cfg_time! {
59     pub(crate) mod timeout;
60     pub(crate) mod timeout_repeating;
61     use timeout::Timeout;
62     use timeout_repeating::TimeoutRepeating;
63     use tokio::time::{Duration, Interval};
64     mod throttle;
65     use throttle::{throttle, Throttle};
66     mod chunks_timeout;
67     use chunks_timeout::ChunksTimeout;
68 }
69 
70 /// An extension trait for the [`Stream`] trait that provides a variety of
71 /// convenient combinator functions.
72 ///
73 /// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
74 /// in the [futures] crate, however both Tokio and futures provide separate
75 /// `StreamExt` utility traits, and some utilities are only available on one of
76 /// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
77 /// trait in the futures crate.
78 ///
79 /// If you need utilities from both `StreamExt` traits, you should prefer to
80 /// import one of them, and use the other through the fully qualified call
81 /// syntax. For example:
82 /// ```
83 /// // import one of the traits:
84 /// use futures::stream::StreamExt;
85 /// # #[tokio::main(flavor = "current_thread")]
86 /// # async fn main() {
87 ///
88 /// let a = tokio_stream::iter(vec![1, 3, 5]);
89 /// let b = tokio_stream::iter(vec![2, 4, 6]);
90 ///
91 /// // use the fully qualified call syntax for the other trait:
92 /// let merged = tokio_stream::StreamExt::merge(a, b);
93 ///
94 /// // use normal call notation for futures::stream::StreamExt::collect
95 /// let output: Vec<_> = merged.collect().await;
96 /// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
97 /// # }
98 /// ```
99 ///
100 /// [`Stream`]: crate::Stream
101 /// [futures]: https://docs.rs/futures
102 /// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
103 pub trait StreamExt: Stream {
104     /// Consumes and returns the next value in the stream or `None` if the
105     /// stream is finished.
106     ///
107     /// Equivalent to:
108     ///
109     /// ```ignore
110     /// async fn next(&mut self) -> Option<Self::Item>;
111     /// ```
112     ///
113     /// Note that because `next` doesn't take ownership over the stream,
114     /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
115     /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
116     /// be done by boxing the stream using [`Box::pin`] or
117     /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
118     /// crate.
119     ///
120     /// # Cancel safety
121     ///
122     /// This method is cancel safe. The returned future only
123     /// holds onto a reference to the underlying stream,
124     /// so dropping it will never lose a value.
125     ///
126     /// # Examples
127     ///
128     /// ```
129     /// # #[tokio::main]
130     /// # async fn main() {
131     /// use tokio_stream::{self as stream, StreamExt};
132     ///
133     /// let mut stream = stream::iter(1..=3);
134     ///
135     /// assert_eq!(stream.next().await, Some(1));
136     /// assert_eq!(stream.next().await, Some(2));
137     /// assert_eq!(stream.next().await, Some(3));
138     /// assert_eq!(stream.next().await, None);
139     /// # }
140     /// ```
next(&mut self) -> Next<'_, Self> where Self: Unpin,141     fn next(&mut self) -> Next<'_, Self>
142     where
143         Self: Unpin,
144     {
145         Next::new(self)
146     }
147 
148     /// Consumes and returns the next item in the stream. If an error is
149     /// encountered before the next item, the error is returned instead.
150     ///
151     /// Equivalent to:
152     ///
153     /// ```ignore
154     /// async fn try_next(&mut self) -> Result<Option<T>, E>;
155     /// ```
156     ///
157     /// This is similar to the [`next`](StreamExt::next) combinator,
158     /// but returns a [`Result<Option<T>, E>`](Result) rather than
159     /// an [`Option<Result<T, E>>`](Option), making for easy use
160     /// with the [`?`](std::ops::Try) operator.
161     ///
162     /// # Cancel safety
163     ///
164     /// This method is cancel safe. The returned future only
165     /// holds onto a reference to the underlying stream,
166     /// so dropping it will never lose a value.
167     ///
168     /// # Examples
169     ///
170     /// ```
171     /// # #[tokio::main]
172     /// # async fn main() {
173     /// use tokio_stream::{self as stream, StreamExt};
174     ///
175     /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
176     ///
177     /// assert_eq!(stream.try_next().await, Ok(Some(1)));
178     /// assert_eq!(stream.try_next().await, Ok(Some(2)));
179     /// assert_eq!(stream.try_next().await, Err("nope"));
180     /// # }
181     /// ```
try_next<T, E>(&mut self) -> TryNext<'_, Self> where Self: Stream<Item = Result<T, E>> + Unpin,182     fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
183     where
184         Self: Stream<Item = Result<T, E>> + Unpin,
185     {
186         TryNext::new(self)
187     }
188 
189     /// Maps this stream's items to a different type, returning a new stream of
190     /// the resulting type.
191     ///
192     /// The provided closure is executed over all elements of this stream as
193     /// they are made available. It is executed inline with calls to
194     /// [`poll_next`](Stream::poll_next).
195     ///
196     /// Note that this function consumes the stream passed into it and returns a
197     /// wrapped version of it, similar to the existing `map` methods in the
198     /// standard library.
199     ///
200     /// # Examples
201     ///
202     /// ```
203     /// # #[tokio::main]
204     /// # async fn main() {
205     /// use tokio_stream::{self as stream, StreamExt};
206     ///
207     /// let stream = stream::iter(1..=3);
208     /// let mut stream = stream.map(|x| x + 3);
209     ///
210     /// assert_eq!(stream.next().await, Some(4));
211     /// assert_eq!(stream.next().await, Some(5));
212     /// assert_eq!(stream.next().await, Some(6));
213     /// # }
214     /// ```
map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,215     fn map<T, F>(self, f: F) -> Map<Self, F>
216     where
217         F: FnMut(Self::Item) -> T,
218         Self: Sized,
219     {
220         Map::new(self, f)
221     }
222 
223     /// Map this stream's items to a different type for as long as determined by
224     /// the provided closure. A stream of the target type will be returned,
225     /// which will yield elements until the closure returns `None`.
226     ///
227     /// The provided closure is executed over all elements of this stream as
228     /// they are made available, until it returns `None`. It is executed inline
229     /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
230     /// the underlying stream will not be polled again.
231     ///
232     /// Note that this function consumes the stream passed into it and returns a
233     /// wrapped version of it, similar to the [`Iterator::map_while`] method in the
234     /// standard library.
235     ///
236     /// # Examples
237     ///
238     /// ```
239     /// # #[tokio::main]
240     /// # async fn main() {
241     /// use tokio_stream::{self as stream, StreamExt};
242     ///
243     /// let stream = stream::iter(1..=10);
244     /// let mut stream = stream.map_while(|x| {
245     ///     if x < 4 {
246     ///         Some(x + 3)
247     ///     } else {
248     ///         None
249     ///     }
250     /// });
251     /// assert_eq!(stream.next().await, Some(4));
252     /// assert_eq!(stream.next().await, Some(5));
253     /// assert_eq!(stream.next().await, Some(6));
254     /// assert_eq!(stream.next().await, None);
255     /// # }
256     /// ```
map_while<T, F>(self, f: F) -> MapWhile<Self, F> where F: FnMut(Self::Item) -> Option<T>, Self: Sized,257     fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
258     where
259         F: FnMut(Self::Item) -> Option<T>,
260         Self: Sized,
261     {
262         MapWhile::new(self, f)
263     }
264 
265     /// Maps this stream's items asynchronously to a different type, returning a
266     /// new stream of the resulting type.
267     ///
268     /// The provided closure is executed over all elements of this stream as
269     /// they are made available, and the returned future is executed. Only one
270     /// future is executed at the time.
271     ///
272     /// Note that this function consumes the stream passed into it and returns a
273     /// wrapped version of it, similar to the existing `then` methods in the
274     /// standard library.
275     ///
276     /// Be aware that if the future is not `Unpin`, then neither is the `Stream`
277     /// returned by this method. To handle this, you can use `tokio::pin!` as in
278     /// the example below or put the stream in a `Box` with `Box::pin(stream)`.
279     ///
280     /// # Examples
281     ///
282     /// ```
283     /// # #[tokio::main]
284     /// # async fn main() {
285     /// use tokio_stream::{self as stream, StreamExt};
286     ///
287     /// async fn do_async_work(value: i32) -> i32 {
288     ///     value + 3
289     /// }
290     ///
291     /// let stream = stream::iter(1..=3);
292     /// let stream = stream.then(do_async_work);
293     ///
294     /// tokio::pin!(stream);
295     ///
296     /// assert_eq!(stream.next().await, Some(4));
297     /// assert_eq!(stream.next().await, Some(5));
298     /// assert_eq!(stream.next().await, Some(6));
299     /// # }
300     /// ```
then<F, Fut>(self, f: F) -> Then<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized,301     fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
302     where
303         F: FnMut(Self::Item) -> Fut,
304         Fut: Future,
305         Self: Sized,
306     {
307         Then::new(self, f)
308     }
309 
310     /// Combine two streams into one by interleaving the output of both as it
311     /// is produced.
312     ///
313     /// Values are produced from the merged stream in the order they arrive from
314     /// the two source streams. If both source streams provide values
315     /// simultaneously, the merge stream alternates between them. This provides
316     /// some level of fairness. You should not chain calls to `merge`, as this
317     /// will break the fairness of the merging.
318     ///
319     /// The merged stream completes once **both** source streams complete. When
320     /// one source stream completes before the other, the merge stream
321     /// exclusively polls the remaining stream.
322     ///
323     /// For merging multiple streams, consider using [`StreamMap`] instead.
324     ///
325     /// [`StreamMap`]: crate::StreamMap
326     ///
327     /// # Examples
328     ///
329     /// ```
330     /// use tokio_stream::{StreamExt, Stream};
331     /// use tokio::sync::mpsc;
332     /// use tokio::time;
333     ///
334     /// use std::time::Duration;
335     /// use std::pin::Pin;
336     ///
337     /// # /*
338     /// #[tokio::main]
339     /// # */
340     /// # #[tokio::main(flavor = "current_thread")]
341     /// async fn main() {
342     /// # time::pause();
343     ///     let (tx1, mut rx1) = mpsc::channel::<usize>(10);
344     ///     let (tx2, mut rx2) = mpsc::channel::<usize>(10);
345     ///
346     ///     // Convert the channels to a `Stream`.
347     ///     let rx1 = Box::pin(async_stream::stream! {
348     ///           while let Some(item) = rx1.recv().await {
349     ///               yield item;
350     ///           }
351     ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
352     ///
353     ///     let rx2 = Box::pin(async_stream::stream! {
354     ///           while let Some(item) = rx2.recv().await {
355     ///               yield item;
356     ///           }
357     ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
358     ///
359     ///     let mut rx = rx1.merge(rx2);
360     ///
361     ///     tokio::spawn(async move {
362     ///         // Send some values immediately
363     ///         tx1.send(1).await.unwrap();
364     ///         tx1.send(2).await.unwrap();
365     ///
366     ///         // Let the other task send values
367     ///         time::sleep(Duration::from_millis(20)).await;
368     ///
369     ///         tx1.send(4).await.unwrap();
370     ///     });
371     ///
372     ///     tokio::spawn(async move {
373     ///         // Wait for the first task to send values
374     ///         time::sleep(Duration::from_millis(5)).await;
375     ///
376     ///         tx2.send(3).await.unwrap();
377     ///
378     ///         time::sleep(Duration::from_millis(25)).await;
379     ///
380     ///         // Send the final value
381     ///         tx2.send(5).await.unwrap();
382     ///     });
383     ///
384     ///    assert_eq!(1, rx.next().await.unwrap());
385     ///    assert_eq!(2, rx.next().await.unwrap());
386     ///    assert_eq!(3, rx.next().await.unwrap());
387     ///    assert_eq!(4, rx.next().await.unwrap());
388     ///    assert_eq!(5, rx.next().await.unwrap());
389     ///
390     ///    // The merged stream is consumed
391     ///    assert!(rx.next().await.is_none());
392     /// }
393     /// ```
merge<U>(self, other: U) -> Merge<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,394     fn merge<U>(self, other: U) -> Merge<Self, U>
395     where
396         U: Stream<Item = Self::Item>,
397         Self: Sized,
398     {
399         Merge::new(self, other)
400     }
401 
402     /// Filters the values produced by this stream according to the provided
403     /// predicate.
404     ///
405     /// As values of this stream are made available, the provided predicate `f`
406     /// will be run against them. If the predicate
407     /// resolves to `true`, then the stream will yield the value, but if the
408     /// predicate resolves to `false`, then the value
409     /// will be discarded and the next value will be produced.
410     ///
411     /// Note that this function consumes the stream passed into it and returns a
412     /// wrapped version of it, similar to [`Iterator::filter`] method in the
413     /// standard library.
414     ///
415     /// # Examples
416     ///
417     /// ```
418     /// # #[tokio::main]
419     /// # async fn main() {
420     /// use tokio_stream::{self as stream, StreamExt};
421     ///
422     /// let stream = stream::iter(1..=8);
423     /// let mut evens = stream.filter(|x| x % 2 == 0);
424     ///
425     /// assert_eq!(Some(2), evens.next().await);
426     /// assert_eq!(Some(4), evens.next().await);
427     /// assert_eq!(Some(6), evens.next().await);
428     /// assert_eq!(Some(8), evens.next().await);
429     /// assert_eq!(None, evens.next().await);
430     /// # }
431     /// ```
filter<F>(self, f: F) -> Filter<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,432     fn filter<F>(self, f: F) -> Filter<Self, F>
433     where
434         F: FnMut(&Self::Item) -> bool,
435         Self: Sized,
436     {
437         Filter::new(self, f)
438     }
439 
440     /// Filters the values produced by this stream while simultaneously mapping
441     /// them to a different type according to the provided closure.
442     ///
443     /// As values of this stream are made available, the provided function will
444     /// be run on them. If the predicate `f` resolves to
445     /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
446     /// it resolves to [`None`], then the value will be skipped.
447     ///
448     /// Note that this function consumes the stream passed into it and returns a
449     /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
450     /// standard library.
451     ///
452     /// # Examples
453     /// ```
454     /// # #[tokio::main]
455     /// # async fn main() {
456     /// use tokio_stream::{self as stream, StreamExt};
457     ///
458     /// let stream = stream::iter(1..=8);
459     /// let mut evens = stream.filter_map(|x| {
460     ///     if x % 2 == 0 { Some(x + 1) } else { None }
461     /// });
462     ///
463     /// assert_eq!(Some(3), evens.next().await);
464     /// assert_eq!(Some(5), evens.next().await);
465     /// assert_eq!(Some(7), evens.next().await);
466     /// assert_eq!(Some(9), evens.next().await);
467     /// assert_eq!(None, evens.next().await);
468     /// # }
469     /// ```
filter_map<T, F>(self, f: F) -> FilterMap<Self, F> where F: FnMut(Self::Item) -> Option<T>, Self: Sized,470     fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
471     where
472         F: FnMut(Self::Item) -> Option<T>,
473         Self: Sized,
474     {
475         FilterMap::new(self, f)
476     }
477 
478     /// Creates a stream which ends after the first `None`.
479     ///
480     /// After a stream returns `None`, behavior is undefined. Future calls to
481     /// `poll_next` may or may not return `Some(T)` again or they may panic.
482     /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
483     /// return `None` forever.
484     ///
485     /// # Examples
486     ///
487     /// ```
488     /// use tokio_stream::{Stream, StreamExt};
489     ///
490     /// use std::pin::Pin;
491     /// use std::task::{Context, Poll};
492     ///
493     /// // a stream which alternates between Some and None
494     /// struct Alternate {
495     ///     state: i32,
496     /// }
497     ///
498     /// impl Stream for Alternate {
499     ///     type Item = i32;
500     ///
501     ///     fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
502     ///         let val = self.state;
503     ///         self.state = self.state + 1;
504     ///
505     ///         // if it's even, Some(i32), else None
506     ///         if val % 2 == 0 {
507     ///             Poll::Ready(Some(val))
508     ///         } else {
509     ///             Poll::Ready(None)
510     ///         }
511     ///     }
512     /// }
513     ///
514     /// #[tokio::main]
515     /// async fn main() {
516     ///     let mut stream = Alternate { state: 0 };
517     ///
518     ///     // the stream goes back and forth
519     ///     assert_eq!(stream.next().await, Some(0));
520     ///     assert_eq!(stream.next().await, None);
521     ///     assert_eq!(stream.next().await, Some(2));
522     ///     assert_eq!(stream.next().await, None);
523     ///
524     ///     // however, once it is fused
525     ///     let mut stream = stream.fuse();
526     ///
527     ///     assert_eq!(stream.next().await, Some(4));
528     ///     assert_eq!(stream.next().await, None);
529     ///
530     ///     // it will always return `None` after the first time.
531     ///     assert_eq!(stream.next().await, None);
532     ///     assert_eq!(stream.next().await, None);
533     ///     assert_eq!(stream.next().await, None);
534     /// }
535     /// ```
fuse(self) -> Fuse<Self> where Self: Sized,536     fn fuse(self) -> Fuse<Self>
537     where
538         Self: Sized,
539     {
540         Fuse::new(self)
541     }
542 
543     /// Creates a new stream of at most `n` items of the underlying stream.
544     ///
545     /// Once `n` items have been yielded from this stream then it will always
546     /// return that the stream is done.
547     ///
548     /// # Examples
549     ///
550     /// ```
551     /// # #[tokio::main]
552     /// # async fn main() {
553     /// use tokio_stream::{self as stream, StreamExt};
554     ///
555     /// let mut stream = stream::iter(1..=10).take(3);
556     ///
557     /// assert_eq!(Some(1), stream.next().await);
558     /// assert_eq!(Some(2), stream.next().await);
559     /// assert_eq!(Some(3), stream.next().await);
560     /// assert_eq!(None, stream.next().await);
561     /// # }
562     /// ```
take(self, n: usize) -> Take<Self> where Self: Sized,563     fn take(self, n: usize) -> Take<Self>
564     where
565         Self: Sized,
566     {
567         Take::new(self, n)
568     }
569 
570     /// Take elements from this stream while the provided predicate
571     /// resolves to `true`.
572     ///
573     /// This function, like `Iterator::take_while`, will take elements from the
574     /// stream until the predicate `f` resolves to `false`. Once one element
575     /// returns false it will always return that the stream is done.
576     ///
577     /// # Examples
578     ///
579     /// ```
580     /// # #[tokio::main]
581     /// # async fn main() {
582     /// use tokio_stream::{self as stream, StreamExt};
583     ///
584     /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
585     ///
586     /// assert_eq!(Some(1), stream.next().await);
587     /// assert_eq!(Some(2), stream.next().await);
588     /// assert_eq!(Some(3), stream.next().await);
589     /// assert_eq!(None, stream.next().await);
590     /// # }
591     /// ```
take_while<F>(self, f: F) -> TakeWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,592     fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
593     where
594         F: FnMut(&Self::Item) -> bool,
595         Self: Sized,
596     {
597         TakeWhile::new(self, f)
598     }
599 
600     /// Creates a new stream that will skip the `n` first items of the
601     /// underlying stream.
602     ///
603     /// # Examples
604     ///
605     /// ```
606     /// # #[tokio::main]
607     /// # async fn main() {
608     /// use tokio_stream::{self as stream, StreamExt};
609     ///
610     /// let mut stream = stream::iter(1..=10).skip(7);
611     ///
612     /// assert_eq!(Some(8), stream.next().await);
613     /// assert_eq!(Some(9), stream.next().await);
614     /// assert_eq!(Some(10), stream.next().await);
615     /// assert_eq!(None, stream.next().await);
616     /// # }
617     /// ```
skip(self, n: usize) -> Skip<Self> where Self: Sized,618     fn skip(self, n: usize) -> Skip<Self>
619     where
620         Self: Sized,
621     {
622         Skip::new(self, n)
623     }
624 
625     /// Skip elements from the underlying stream while the provided predicate
626     /// resolves to `true`.
627     ///
628     /// This function, like [`Iterator::skip_while`], will ignore elements from the
629     /// stream until the predicate `f` resolves to `false`. Once one element
630     /// returns false, the rest of the elements will be yielded.
631     ///
632     /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
633     ///
634     /// # Examples
635     ///
636     /// ```
637     /// # #[tokio::main]
638     /// # async fn main() {
639     /// use tokio_stream::{self as stream, StreamExt};
640     /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
641     ///
642     /// assert_eq!(Some(3), stream.next().await);
643     /// assert_eq!(Some(4), stream.next().await);
644     /// assert_eq!(Some(1), stream.next().await);
645     /// assert_eq!(None, stream.next().await);
646     /// # }
647     /// ```
skip_while<F>(self, f: F) -> SkipWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,648     fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
649     where
650         F: FnMut(&Self::Item) -> bool,
651         Self: Sized,
652     {
653         SkipWhile::new(self, f)
654     }
655 
656     /// Tests if every element of the stream matches a predicate.
657     ///
658     /// Equivalent to:
659     ///
660     /// ```ignore
661     /// async fn all<F>(&mut self, f: F) -> bool;
662     /// ```
663     ///
664     /// `all()` takes a closure that returns `true` or `false`. It applies
665     /// this closure to each element of the stream, and if they all return
666     /// `true`, then so does `all`. If any of them return `false`, it
667     /// returns `false`. An empty stream returns `true`.
668     ///
669     /// `all()` is short-circuiting; in other words, it will stop processing
670     /// as soon as it finds a `false`, given that no matter what else happens,
671     /// the result will also be `false`.
672     ///
673     /// An empty stream returns `true`.
674     ///
675     /// # Examples
676     ///
677     /// Basic usage:
678     ///
679     /// ```
680     /// # #[tokio::main]
681     /// # async fn main() {
682     /// use tokio_stream::{self as stream, StreamExt};
683     ///
684     /// let a = [1, 2, 3];
685     ///
686     /// assert!(stream::iter(&a).all(|&x| x > 0).await);
687     ///
688     /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
689     /// # }
690     /// ```
691     ///
692     /// Stopping at the first `false`:
693     ///
694     /// ```
695     /// # #[tokio::main]
696     /// # async fn main() {
697     /// use tokio_stream::{self as stream, StreamExt};
698     ///
699     /// let a = [1, 2, 3];
700     ///
701     /// let mut iter = stream::iter(&a);
702     ///
703     /// assert!(!iter.all(|&x| x != 2).await);
704     ///
705     /// // we can still use `iter`, as there are more elements.
706     /// assert_eq!(iter.next().await, Some(&3));
707     /// # }
708     /// ```
all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,709     fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
710     where
711         Self: Unpin,
712         F: FnMut(Self::Item) -> bool,
713     {
714         AllFuture::new(self, f)
715     }
716 
717     /// Tests if any element of the stream matches a predicate.
718     ///
719     /// Equivalent to:
720     ///
721     /// ```ignore
722     /// async fn any<F>(&mut self, f: F) -> bool;
723     /// ```
724     ///
725     /// `any()` takes a closure that returns `true` or `false`. It applies
726     /// this closure to each element of the stream, and if any of them return
727     /// `true`, then so does `any()`. If they all return `false`, it
728     /// returns `false`.
729     ///
730     /// `any()` is short-circuiting; in other words, it will stop processing
731     /// as soon as it finds a `true`, given that no matter what else happens,
732     /// the result will also be `true`.
733     ///
734     /// An empty stream returns `false`.
735     ///
736     /// Basic usage:
737     ///
738     /// ```
739     /// # #[tokio::main]
740     /// # async fn main() {
741     /// use tokio_stream::{self as stream, StreamExt};
742     ///
743     /// let a = [1, 2, 3];
744     ///
745     /// assert!(stream::iter(&a).any(|&x| x > 0).await);
746     ///
747     /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
748     /// # }
749     /// ```
750     ///
751     /// Stopping at the first `true`:
752     ///
753     /// ```
754     /// # #[tokio::main]
755     /// # async fn main() {
756     /// use tokio_stream::{self as stream, StreamExt};
757     ///
758     /// let a = [1, 2, 3];
759     ///
760     /// let mut iter = stream::iter(&a);
761     ///
762     /// assert!(iter.any(|&x| x != 2).await);
763     ///
764     /// // we can still use `iter`, as there are more elements.
765     /// assert_eq!(iter.next().await, Some(&2));
766     /// # }
767     /// ```
any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,768     fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
769     where
770         Self: Unpin,
771         F: FnMut(Self::Item) -> bool,
772     {
773         AnyFuture::new(self, f)
774     }
775 
776     /// Combine two streams into one by first returning all values from the
777     /// first stream then all values from the second stream.
778     ///
779     /// As long as `self` still has values to emit, no values from `other` are
780     /// emitted, even if some are ready.
781     ///
782     /// # Examples
783     ///
784     /// ```
785     /// use tokio_stream::{self as stream, StreamExt};
786     ///
787     /// #[tokio::main]
788     /// async fn main() {
789     ///     let one = stream::iter(vec![1, 2, 3]);
790     ///     let two = stream::iter(vec![4, 5, 6]);
791     ///
792     ///     let mut stream = one.chain(two);
793     ///
794     ///     assert_eq!(stream.next().await, Some(1));
795     ///     assert_eq!(stream.next().await, Some(2));
796     ///     assert_eq!(stream.next().await, Some(3));
797     ///     assert_eq!(stream.next().await, Some(4));
798     ///     assert_eq!(stream.next().await, Some(5));
799     ///     assert_eq!(stream.next().await, Some(6));
800     ///     assert_eq!(stream.next().await, None);
801     /// }
802     /// ```
chain<U>(self, other: U) -> Chain<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,803     fn chain<U>(self, other: U) -> Chain<Self, U>
804     where
805         U: Stream<Item = Self::Item>,
806         Self: Sized,
807     {
808         Chain::new(self, other)
809     }
810 
811     /// A combinator that applies a function to every element in a stream
812     /// producing a single, final value.
813     ///
814     /// Equivalent to:
815     ///
816     /// ```ignore
817     /// async fn fold<B, F>(self, init: B, f: F) -> B;
818     /// ```
819     ///
820     /// # Examples
821     /// Basic usage:
822     /// ```
823     /// # #[tokio::main]
824     /// # async fn main() {
825     /// use tokio_stream::{self as stream, *};
826     ///
827     /// let s = stream::iter(vec![1u8, 2, 3]);
828     /// let sum = s.fold(0, |acc, x| acc + x).await;
829     ///
830     /// assert_eq!(sum, 6);
831     /// # }
832     /// ```
fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> where Self: Sized, F: FnMut(B, Self::Item) -> B,833     fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
834     where
835         Self: Sized,
836         F: FnMut(B, Self::Item) -> B,
837     {
838         FoldFuture::new(self, init, f)
839     }
840 
841     /// Drain stream pushing all emitted values into a collection.
842     ///
843     /// Equivalent to:
844     ///
845     /// ```ignore
846     /// async fn collect<T>(self) -> T;
847     /// ```
848     ///
849     /// `collect` streams all values, awaiting as needed. Values are pushed into
850     /// a collection. A number of different target collection types are
851     /// supported, including [`Vec`](std::vec::Vec),
852     /// [`String`](std::string::String), and [`Bytes`].
853     ///
854     /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html
855     ///
856     /// # `Result`
857     ///
858     /// `collect()` can also be used with streams of type `Result<T, E>` where
859     /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
860     /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
861     /// streaming is terminated and `collect()` returns the `Err`.
862     ///
863     /// # Notes
864     ///
865     /// `FromStream` is currently a sealed trait. Stabilization is pending
866     /// enhancements to the Rust language.
867     ///
868     /// # Examples
869     ///
870     /// Basic usage:
871     ///
872     /// ```
873     /// use tokio_stream::{self as stream, StreamExt};
874     ///
875     /// #[tokio::main]
876     /// async fn main() {
877     ///     let doubled: Vec<i32> =
878     ///         stream::iter(vec![1, 2, 3])
879     ///             .map(|x| x * 2)
880     ///             .collect()
881     ///             .await;
882     ///
883     ///     assert_eq!(vec![2, 4, 6], doubled);
884     /// }
885     /// ```
886     ///
887     /// Collecting a stream of `Result` values
888     ///
889     /// ```
890     /// use tokio_stream::{self as stream, StreamExt};
891     ///
892     /// #[tokio::main]
893     /// async fn main() {
894     ///     // A stream containing only `Ok` values will be collected
895     ///     let values: Result<Vec<i32>, &str> =
896     ///         stream::iter(vec![Ok(1), Ok(2), Ok(3)])
897     ///             .collect()
898     ///             .await;
899     ///
900     ///     assert_eq!(Ok(vec![1, 2, 3]), values);
901     ///
902     ///     // A stream containing `Err` values will return the first error.
903     ///     let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
904     ///
905     ///     let values: Result<Vec<i32>, &str> =
906     ///         stream::iter(results)
907     ///             .collect()
908     ///             .await;
909     ///
910     ///     assert_eq!(Err("no"), values);
911     /// }
912     /// ```
collect<T>(self) -> Collect<Self, T> where T: FromStream<Self::Item>, Self: Sized,913     fn collect<T>(self) -> Collect<Self, T>
914     where
915         T: FromStream<Self::Item>,
916         Self: Sized,
917     {
918         Collect::new(self)
919     }
920 
921     /// Applies a per-item timeout to the passed stream.
922     ///
923     /// `timeout()` takes a `Duration` that represents the maximum amount of
924     /// time each element of the stream has to complete before timing out.
925     ///
926     /// If the wrapped stream yields a value before the deadline is reached, the
927     /// value is returned. Otherwise, an error is returned. The caller may decide
928     /// to continue consuming the stream and will eventually get the next source
929     /// stream value once it becomes available. See
930     /// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative
931     /// where the timeouts will repeat.
932     ///
933     /// # Notes
934     ///
935     /// This function consumes the stream passed into it and returns a
936     /// wrapped version of it.
937     ///
938     /// Polling the returned stream will continue to poll the inner stream even
939     /// if one or more items time out.
940     ///
941     /// # Examples
942     ///
943     /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
944     ///
945     /// ```
946     /// # #[tokio::main]
947     /// # async fn main() {
948     /// use tokio_stream::{self as stream, StreamExt};
949     /// use std::time::Duration;
950     /// # let int_stream = stream::iter(1..=3);
951     ///
952     /// let int_stream = int_stream.timeout(Duration::from_secs(1));
953     /// tokio::pin!(int_stream);
954     ///
955     /// // When no items time out, we get the 3 elements in succession:
956     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
957     /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
958     /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
959     /// assert_eq!(int_stream.try_next().await, Ok(None));
960     ///
961     /// // If the second item times out, we get an error and continue polling the stream:
962     /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
963     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
964     /// assert!(int_stream.try_next().await.is_err());
965     /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
966     /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
967     /// assert_eq!(int_stream.try_next().await, Ok(None));
968     ///
969     /// // If we want to stop consuming the source stream the first time an
970     /// // element times out, we can use the `take_while` operator:
971     /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
972     /// let mut int_stream = int_stream.take_while(Result::is_ok);
973     ///
974     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
975     /// assert_eq!(int_stream.try_next().await, Ok(None));
976     /// # }
977     /// ```
978     ///
979     /// Once a timeout error is received, no further events will be received
980     /// unless the wrapped stream yields a value (timeouts do not repeat).
981     ///
982     /// ```
983     /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
984     /// # async fn main() {
985     /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
986     /// use std::time::Duration;
987     /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100)));
988     /// let timeout_stream = interval_stream.timeout(Duration::from_millis(10));
989     /// tokio::pin!(timeout_stream);
990     ///
991     /// // Only one timeout will be received between values in the source stream.
992     /// assert!(timeout_stream.try_next().await.is_ok());
993     /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
994     /// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts");
995     /// # }
996     /// ```
997     #[cfg(all(feature = "time"))]
998     #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
timeout(self, duration: Duration) -> Timeout<Self> where Self: Sized,999     fn timeout(self, duration: Duration) -> Timeout<Self>
1000     where
1001         Self: Sized,
1002     {
1003         Timeout::new(self, duration)
1004     }
1005 
1006     /// Applies a per-item timeout to the passed stream.
1007     ///
1008     /// `timeout_repeating()` takes an [`Interval`](tokio::time::Interval) that
1009     /// controls the time each element of the stream has to complete before
1010     /// timing out.
1011     ///
1012     /// If the wrapped stream yields a value before the deadline is reached, the
1013     /// value is returned. Otherwise, an error is returned. The caller may decide
1014     /// to continue consuming the stream and will eventually get the next source
1015     /// stream value once it becomes available. Unlike `timeout()`, if no value
1016     /// becomes available before the deadline is reached, additional errors are
1017     /// returned at the specified interval. See [`timeout`](StreamExt::timeout)
1018     /// for an alternative where the timeouts do not repeat.
1019     ///
1020     /// # Notes
1021     ///
1022     /// This function consumes the stream passed into it and returns a
1023     /// wrapped version of it.
1024     ///
1025     /// Polling the returned stream will continue to poll the inner stream even
1026     /// if one or more items time out.
1027     ///
1028     /// # Examples
1029     ///
1030     /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
1031     ///
1032     /// ```
1033     /// # #[tokio::main]
1034     /// # async fn main() {
1035     /// use tokio_stream::{self as stream, StreamExt};
1036     /// use std::time::Duration;
1037     /// # let int_stream = stream::iter(1..=3);
1038     ///
1039     /// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1)));
1040     /// tokio::pin!(int_stream);
1041     ///
1042     /// // When no items time out, we get the 3 elements in succession:
1043     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1044     /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1045     /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1046     /// assert_eq!(int_stream.try_next().await, Ok(None));
1047     ///
1048     /// // If the second item times out, we get an error and continue polling the stream:
1049     /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1050     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1051     /// assert!(int_stream.try_next().await.is_err());
1052     /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1053     /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1054     /// assert_eq!(int_stream.try_next().await, Ok(None));
1055     ///
1056     /// // If we want to stop consuming the source stream the first time an
1057     /// // element times out, we can use the `take_while` operator:
1058     /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1059     /// let mut int_stream = int_stream.take_while(Result::is_ok);
1060     ///
1061     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1062     /// assert_eq!(int_stream.try_next().await, Ok(None));
1063     /// # }
1064     /// ```
1065     ///
1066     /// Timeout errors will be continuously produced at the specified interval
1067     /// until the wrapped stream yields a value.
1068     ///
1069     /// ```
1070     /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1071     /// # async fn main() {
1072     /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
1073     /// use std::time::Duration;
1074     /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
1075     /// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9)));
1076     /// tokio::pin!(timeout_stream);
1077     ///
1078     /// // Multiple timeouts will be received between values in the source stream.
1079     /// assert!(timeout_stream.try_next().await.is_ok());
1080     /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
1081     /// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout");
1082     /// // Will eventually receive another value from the source stream...
1083     /// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout");
1084     /// # }
1085     /// ```
1086     #[cfg(all(feature = "time"))]
1087     #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self> where Self: Sized,1088     fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self>
1089     where
1090         Self: Sized,
1091     {
1092         TimeoutRepeating::new(self, interval)
1093     }
1094 
1095     /// Slows down a stream by enforcing a delay between items.
1096     ///
1097     /// The underlying timer behind this utility has a granularity of one millisecond.
1098     ///
1099     /// # Example
1100     ///
1101     /// Create a throttled stream.
1102     /// ```rust,no_run
1103     /// use std::time::Duration;
1104     /// use tokio_stream::StreamExt;
1105     ///
1106     /// # async fn dox() {
1107     /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
1108     /// tokio::pin!(item_stream);
1109     ///
1110     /// loop {
1111     ///     // The string will be produced at most every 2 seconds
1112     ///     println!("{:?}", item_stream.next().await);
1113     /// }
1114     /// # }
1115     /// ```
1116     #[cfg(all(feature = "time"))]
1117     #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
throttle(self, duration: Duration) -> Throttle<Self> where Self: Sized,1118     fn throttle(self, duration: Duration) -> Throttle<Self>
1119     where
1120         Self: Sized,
1121     {
1122         throttle(duration, self)
1123     }
1124 
1125     /// Batches the items in the given stream using a maximum duration and size for each batch.
1126     ///
1127     /// This stream returns the next batch of items in the following situations:
1128     ///  1. The inner stream has returned at least `max_size` many items since the last batch.
1129     ///  2. The time since the first item of a batch is greater than the given duration.
1130     ///  3. The end of the stream is reached.
1131     ///
1132     /// The length of the returned vector is never empty or greater than the maximum size. Empty batches
1133     /// will not be emitted if no items are received upstream.
1134     ///
1135     /// # Panics
1136     ///
1137     /// This function panics if `max_size` is zero
1138     ///
1139     /// # Example
1140     ///
1141     /// ```rust
1142     /// use std::time::Duration;
1143     /// use tokio::time;
1144     /// use tokio_stream::{self as stream, StreamExt};
1145     /// use futures::FutureExt;
1146     ///
1147     /// #[tokio::main]
1148     /// # async fn _unused() {}
1149     /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1150     /// async fn main() {
1151     ///     let iter = vec![1, 2, 3, 4].into_iter();
1152     ///     let stream0 = stream::iter(iter);
1153     ///
1154     ///     let iter = vec![5].into_iter();
1155     ///     let stream1 = stream::iter(iter)
1156     ///          .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
1157     ///
1158     ///     let chunk_stream = stream0
1159     ///         .chain(stream1)
1160     ///         .chunks_timeout(3, Duration::from_secs(2));
1161     ///     tokio::pin!(chunk_stream);
1162     ///
1163     ///     // a full batch was received
1164     ///     assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
1165     ///     // deadline was reached before max_size was reached
1166     ///     assert_eq!(chunk_stream.next().await, Some(vec![4]));
1167     ///     // last element in the stream
1168     ///     assert_eq!(chunk_stream.next().await, Some(vec![5]));
1169     /// }
1170     /// ```
1171     #[cfg(feature = "time")]
1172     #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1173     #[track_caller]
chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self> where Self: Sized,1174     fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
1175     where
1176         Self: Sized,
1177     {
1178         assert!(max_size > 0, "`max_size` must be non-zero.");
1179         ChunksTimeout::new(self, max_size, duration)
1180     }
1181 }
1182 
1183 impl<St: ?Sized> StreamExt for St where St: Stream {}
1184 
1185 /// Merge the size hints from two streams.
merge_size_hints( (left_low, left_high): (usize, Option<usize>), (right_low, right_high): (usize, Option<usize>), ) -> (usize, Option<usize>)1186 fn merge_size_hints(
1187     (left_low, left_high): (usize, Option<usize>),
1188     (right_low, right_high): (usize, Option<usize>),
1189 ) -> (usize, Option<usize>) {
1190     let low = left_low.saturating_add(right_low);
1191     let high = match (left_high, right_high) {
1192         (Some(h1), Some(h2)) => h1.checked_add(h2),
1193         _ => None,
1194     };
1195     (low, high)
1196 }
1197