• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use core::future::Future;
2 use futures_core::Stream;
3 
4 mod all;
5 use all::AllFuture;
6 
7 mod any;
8 use any::AnyFuture;
9 
10 mod chain;
11 use chain::Chain;
12 
13 pub(crate) mod collect;
14 use collect::{Collect, FromStream};
15 
16 mod filter;
17 use filter::Filter;
18 
19 mod filter_map;
20 use filter_map::FilterMap;
21 
22 mod fold;
23 use fold::FoldFuture;
24 
25 mod fuse;
26 use fuse::Fuse;
27 
28 mod map;
29 use map::Map;
30 
31 mod map_while;
32 use map_while::MapWhile;
33 
34 mod merge;
35 use merge::Merge;
36 
37 mod next;
38 use next::Next;
39 
40 mod skip;
41 use skip::Skip;
42 
43 mod skip_while;
44 use skip_while::SkipWhile;
45 
46 mod take;
47 use take::Take;
48 
49 mod take_while;
50 use take_while::TakeWhile;
51 
52 mod then;
53 use then::Then;
54 
55 mod try_next;
56 use try_next::TryNext;
57 
58 cfg_time! {
59     pub(crate) mod timeout;
60     use timeout::Timeout;
61     use tokio::time::Duration;
62     mod throttle;
63     use throttle::{throttle, Throttle};
64     mod chunks_timeout;
65     use chunks_timeout::ChunksTimeout;
66 }
67 
68 /// An extension trait for the [`Stream`] trait that provides a variety of
69 /// convenient combinator functions.
70 ///
71 /// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
72 /// in the [futures] crate, however both Tokio and futures provide separate
73 /// `StreamExt` utility traits, and some utilities are only available on one of
74 /// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
75 /// trait in the futures crate.
76 ///
77 /// If you need utilities from both `StreamExt` traits, you should prefer to
78 /// import one of them, and use the other through the fully qualified call
79 /// syntax. For example:
80 /// ```
81 /// // import one of the traits:
82 /// use futures::stream::StreamExt;
83 /// # #[tokio::main(flavor = "current_thread")]
84 /// # async fn main() {
85 ///
86 /// let a = tokio_stream::iter(vec![1, 3, 5]);
87 /// let b = tokio_stream::iter(vec![2, 4, 6]);
88 ///
89 /// // use the fully qualified call syntax for the other trait:
90 /// let merged = tokio_stream::StreamExt::merge(a, b);
91 ///
92 /// // use normal call notation for futures::stream::StreamExt::collect
93 /// let output: Vec<_> = merged.collect().await;
94 /// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
95 /// # }
96 /// ```
97 ///
98 /// [`Stream`]: crate::Stream
99 /// [futures]: https://docs.rs/futures
100 /// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
101 pub trait StreamExt: Stream {
102     /// Consumes and returns the next value in the stream or `None` if the
103     /// stream is finished.
104     ///
105     /// Equivalent to:
106     ///
107     /// ```ignore
108     /// async fn next(&mut self) -> Option<Self::Item>;
109     /// ```
110     ///
111     /// Note that because `next` doesn't take ownership over the stream,
112     /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
113     /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
114     /// be done by boxing the stream using [`Box::pin`] or
115     /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
116     /// crate.
117     ///
118     /// # Cancel safety
119     ///
120     /// This method is cancel safe. The returned future only
121     /// holds onto a reference to the underlying stream,
122     /// so dropping it will never lose a value.
123     ///
124     /// # Examples
125     ///
126     /// ```
127     /// # #[tokio::main]
128     /// # async fn main() {
129     /// use tokio_stream::{self as stream, StreamExt};
130     ///
131     /// let mut stream = stream::iter(1..=3);
132     ///
133     /// assert_eq!(stream.next().await, Some(1));
134     /// assert_eq!(stream.next().await, Some(2));
135     /// assert_eq!(stream.next().await, Some(3));
136     /// assert_eq!(stream.next().await, None);
137     /// # }
138     /// ```
next(&mut self) -> Next<'_, Self> where Self: Unpin,139     fn next(&mut self) -> Next<'_, Self>
140     where
141         Self: Unpin,
142     {
143         Next::new(self)
144     }
145 
146     /// Consumes and returns the next item in the stream. If an error is
147     /// encountered before the next item, the error is returned instead.
148     ///
149     /// Equivalent to:
150     ///
151     /// ```ignore
152     /// async fn try_next(&mut self) -> Result<Option<T>, E>;
153     /// ```
154     ///
155     /// This is similar to the [`next`](StreamExt::next) combinator,
156     /// but returns a [`Result<Option<T>, E>`](Result) rather than
157     /// an [`Option<Result<T, E>>`](Option), making for easy use
158     /// with the [`?`](std::ops::Try) operator.
159     ///
160     /// # Cancel safety
161     ///
162     /// This method is cancel safe. The returned future only
163     /// holds onto a reference to the underlying stream,
164     /// so dropping it will never lose a value.
165     ///
166     /// # Examples
167     ///
168     /// ```
169     /// # #[tokio::main]
170     /// # async fn main() {
171     /// use tokio_stream::{self as stream, StreamExt};
172     ///
173     /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
174     ///
175     /// assert_eq!(stream.try_next().await, Ok(Some(1)));
176     /// assert_eq!(stream.try_next().await, Ok(Some(2)));
177     /// assert_eq!(stream.try_next().await, Err("nope"));
178     /// # }
179     /// ```
try_next<T, E>(&mut self) -> TryNext<'_, Self> where Self: Stream<Item = Result<T, E>> + Unpin,180     fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
181     where
182         Self: Stream<Item = Result<T, E>> + Unpin,
183     {
184         TryNext::new(self)
185     }
186 
187     /// Maps this stream's items to a different type, returning a new stream of
188     /// the resulting type.
189     ///
190     /// The provided closure is executed over all elements of this stream as
191     /// they are made available. It is executed inline with calls to
192     /// [`poll_next`](Stream::poll_next).
193     ///
194     /// Note that this function consumes the stream passed into it and returns a
195     /// wrapped version of it, similar to the existing `map` methods in the
196     /// standard library.
197     ///
198     /// # Examples
199     ///
200     /// ```
201     /// # #[tokio::main]
202     /// # async fn main() {
203     /// use tokio_stream::{self as stream, StreamExt};
204     ///
205     /// let stream = stream::iter(1..=3);
206     /// let mut stream = stream.map(|x| x + 3);
207     ///
208     /// assert_eq!(stream.next().await, Some(4));
209     /// assert_eq!(stream.next().await, Some(5));
210     /// assert_eq!(stream.next().await, Some(6));
211     /// # }
212     /// ```
map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,213     fn map<T, F>(self, f: F) -> Map<Self, F>
214     where
215         F: FnMut(Self::Item) -> T,
216         Self: Sized,
217     {
218         Map::new(self, f)
219     }
220 
221     /// Map this stream's items to a different type for as long as determined by
222     /// the provided closure. A stream of the target type will be returned,
223     /// which will yield elements until the closure returns `None`.
224     ///
225     /// The provided closure is executed over all elements of this stream as
226     /// they are made available, until it returns `None`. It is executed inline
227     /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
228     /// the underlying stream will not be polled again.
229     ///
230     /// Note that this function consumes the stream passed into it and returns a
231     /// wrapped version of it, similar to the [`Iterator::map_while`] method in the
232     /// standard library.
233     ///
234     /// # Examples
235     ///
236     /// ```
237     /// # #[tokio::main]
238     /// # async fn main() {
239     /// use tokio_stream::{self as stream, StreamExt};
240     ///
241     /// let stream = stream::iter(1..=10);
242     /// let mut stream = stream.map_while(|x| {
243     ///     if x < 4 {
244     ///         Some(x + 3)
245     ///     } else {
246     ///         None
247     ///     }
248     /// });
249     /// assert_eq!(stream.next().await, Some(4));
250     /// assert_eq!(stream.next().await, Some(5));
251     /// assert_eq!(stream.next().await, Some(6));
252     /// assert_eq!(stream.next().await, None);
253     /// # }
254     /// ```
map_while<T, F>(self, f: F) -> MapWhile<Self, F> where F: FnMut(Self::Item) -> Option<T>, Self: Sized,255     fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
256     where
257         F: FnMut(Self::Item) -> Option<T>,
258         Self: Sized,
259     {
260         MapWhile::new(self, f)
261     }
262 
263     /// Maps this stream's items asynchronously to a different type, returning a
264     /// new stream of the resulting type.
265     ///
266     /// The provided closure is executed over all elements of this stream as
267     /// they are made available, and the returned future is executed. Only one
268     /// future is executed at the time.
269     ///
270     /// Note that this function consumes the stream passed into it and returns a
271     /// wrapped version of it, similar to the existing `then` methods in the
272     /// standard library.
273     ///
274     /// Be aware that if the future is not `Unpin`, then neither is the `Stream`
275     /// returned by this method. To handle this, you can use `tokio::pin!` as in
276     /// the example below or put the stream in a `Box` with `Box::pin(stream)`.
277     ///
278     /// # Examples
279     ///
280     /// ```
281     /// # #[tokio::main]
282     /// # async fn main() {
283     /// use tokio_stream::{self as stream, StreamExt};
284     ///
285     /// async fn do_async_work(value: i32) -> i32 {
286     ///     value + 3
287     /// }
288     ///
289     /// let stream = stream::iter(1..=3);
290     /// let stream = stream.then(do_async_work);
291     ///
292     /// tokio::pin!(stream);
293     ///
294     /// assert_eq!(stream.next().await, Some(4));
295     /// assert_eq!(stream.next().await, Some(5));
296     /// assert_eq!(stream.next().await, Some(6));
297     /// # }
298     /// ```
then<F, Fut>(self, f: F) -> Then<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized,299     fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
300     where
301         F: FnMut(Self::Item) -> Fut,
302         Fut: Future,
303         Self: Sized,
304     {
305         Then::new(self, f)
306     }
307 
308     /// Combine two streams into one by interleaving the output of both as it
309     /// is produced.
310     ///
311     /// Values are produced from the merged stream in the order they arrive from
312     /// the two source streams. If both source streams provide values
313     /// simultaneously, the merge stream alternates between them. This provides
314     /// some level of fairness. You should not chain calls to `merge`, as this
315     /// will break the fairness of the merging.
316     ///
317     /// The merged stream completes once **both** source streams complete. When
318     /// one source stream completes before the other, the merge stream
319     /// exclusively polls the remaining stream.
320     ///
321     /// For merging multiple streams, consider using [`StreamMap`] instead.
322     ///
323     /// [`StreamMap`]: crate::StreamMap
324     ///
325     /// # Examples
326     ///
327     /// ```
328     /// use tokio_stream::{StreamExt, Stream};
329     /// use tokio::sync::mpsc;
330     /// use tokio::time;
331     ///
332     /// use std::time::Duration;
333     /// use std::pin::Pin;
334     ///
335     /// # /*
336     /// #[tokio::main]
337     /// # */
338     /// # #[tokio::main(flavor = "current_thread")]
339     /// async fn main() {
340     /// # time::pause();
341     ///     let (tx1, mut rx1) = mpsc::channel::<usize>(10);
342     ///     let (tx2, mut rx2) = mpsc::channel::<usize>(10);
343     ///
344     ///     // Convert the channels to a `Stream`.
345     ///     let rx1 = Box::pin(async_stream::stream! {
346     ///           while let Some(item) = rx1.recv().await {
347     ///               yield item;
348     ///           }
349     ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
350     ///
351     ///     let rx2 = Box::pin(async_stream::stream! {
352     ///           while let Some(item) = rx2.recv().await {
353     ///               yield item;
354     ///           }
355     ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
356     ///
357     ///     let mut rx = rx1.merge(rx2);
358     ///
359     ///     tokio::spawn(async move {
360     ///         // Send some values immediately
361     ///         tx1.send(1).await.unwrap();
362     ///         tx1.send(2).await.unwrap();
363     ///
364     ///         // Let the other task send values
365     ///         time::sleep(Duration::from_millis(20)).await;
366     ///
367     ///         tx1.send(4).await.unwrap();
368     ///     });
369     ///
370     ///     tokio::spawn(async move {
371     ///         // Wait for the first task to send values
372     ///         time::sleep(Duration::from_millis(5)).await;
373     ///
374     ///         tx2.send(3).await.unwrap();
375     ///
376     ///         time::sleep(Duration::from_millis(25)).await;
377     ///
378     ///         // Send the final value
379     ///         tx2.send(5).await.unwrap();
380     ///     });
381     ///
382     ///    assert_eq!(1, rx.next().await.unwrap());
383     ///    assert_eq!(2, rx.next().await.unwrap());
384     ///    assert_eq!(3, rx.next().await.unwrap());
385     ///    assert_eq!(4, rx.next().await.unwrap());
386     ///    assert_eq!(5, rx.next().await.unwrap());
387     ///
388     ///    // The merged stream is consumed
389     ///    assert!(rx.next().await.is_none());
390     /// }
391     /// ```
merge<U>(self, other: U) -> Merge<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,392     fn merge<U>(self, other: U) -> Merge<Self, U>
393     where
394         U: Stream<Item = Self::Item>,
395         Self: Sized,
396     {
397         Merge::new(self, other)
398     }
399 
400     /// Filters the values produced by this stream according to the provided
401     /// predicate.
402     ///
403     /// As values of this stream are made available, the provided predicate `f`
404     /// will be run against them. If the predicate
405     /// resolves to `true`, then the stream will yield the value, but if the
406     /// predicate resolves to `false`, then the value
407     /// will be discarded and the next value will be produced.
408     ///
409     /// Note that this function consumes the stream passed into it and returns a
410     /// wrapped version of it, similar to [`Iterator::filter`] method in the
411     /// standard library.
412     ///
413     /// # Examples
414     ///
415     /// ```
416     /// # #[tokio::main]
417     /// # async fn main() {
418     /// use tokio_stream::{self as stream, StreamExt};
419     ///
420     /// let stream = stream::iter(1..=8);
421     /// let mut evens = stream.filter(|x| x % 2 == 0);
422     ///
423     /// assert_eq!(Some(2), evens.next().await);
424     /// assert_eq!(Some(4), evens.next().await);
425     /// assert_eq!(Some(6), evens.next().await);
426     /// assert_eq!(Some(8), evens.next().await);
427     /// assert_eq!(None, evens.next().await);
428     /// # }
429     /// ```
filter<F>(self, f: F) -> Filter<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,430     fn filter<F>(self, f: F) -> Filter<Self, F>
431     where
432         F: FnMut(&Self::Item) -> bool,
433         Self: Sized,
434     {
435         Filter::new(self, f)
436     }
437 
438     /// Filters the values produced by this stream while simultaneously mapping
439     /// them to a different type according to the provided closure.
440     ///
441     /// As values of this stream are made available, the provided function will
442     /// be run on them. If the predicate `f` resolves to
443     /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
444     /// it resolves to [`None`], then the value will be skipped.
445     ///
446     /// Note that this function consumes the stream passed into it and returns a
447     /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
448     /// standard library.
449     ///
450     /// # Examples
451     /// ```
452     /// # #[tokio::main]
453     /// # async fn main() {
454     /// use tokio_stream::{self as stream, StreamExt};
455     ///
456     /// let stream = stream::iter(1..=8);
457     /// let mut evens = stream.filter_map(|x| {
458     ///     if x % 2 == 0 { Some(x + 1) } else { None }
459     /// });
460     ///
461     /// assert_eq!(Some(3), evens.next().await);
462     /// assert_eq!(Some(5), evens.next().await);
463     /// assert_eq!(Some(7), evens.next().await);
464     /// assert_eq!(Some(9), evens.next().await);
465     /// assert_eq!(None, evens.next().await);
466     /// # }
467     /// ```
filter_map<T, F>(self, f: F) -> FilterMap<Self, F> where F: FnMut(Self::Item) -> Option<T>, Self: Sized,468     fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
469     where
470         F: FnMut(Self::Item) -> Option<T>,
471         Self: Sized,
472     {
473         FilterMap::new(self, f)
474     }
475 
476     /// Creates a stream which ends after the first `None`.
477     ///
478     /// After a stream returns `None`, behavior is undefined. Future calls to
479     /// `poll_next` may or may not return `Some(T)` again or they may panic.
480     /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
481     /// return `None` forever.
482     ///
483     /// # Examples
484     ///
485     /// ```
486     /// use tokio_stream::{Stream, StreamExt};
487     ///
488     /// use std::pin::Pin;
489     /// use std::task::{Context, Poll};
490     ///
491     /// // a stream which alternates between Some and None
492     /// struct Alternate {
493     ///     state: i32,
494     /// }
495     ///
496     /// impl Stream for Alternate {
497     ///     type Item = i32;
498     ///
499     ///     fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
500     ///         let val = self.state;
501     ///         self.state = self.state + 1;
502     ///
503     ///         // if it's even, Some(i32), else None
504     ///         if val % 2 == 0 {
505     ///             Poll::Ready(Some(val))
506     ///         } else {
507     ///             Poll::Ready(None)
508     ///         }
509     ///     }
510     /// }
511     ///
512     /// #[tokio::main]
513     /// async fn main() {
514     ///     let mut stream = Alternate { state: 0 };
515     ///
516     ///     // the stream goes back and forth
517     ///     assert_eq!(stream.next().await, Some(0));
518     ///     assert_eq!(stream.next().await, None);
519     ///     assert_eq!(stream.next().await, Some(2));
520     ///     assert_eq!(stream.next().await, None);
521     ///
522     ///     // however, once it is fused
523     ///     let mut stream = stream.fuse();
524     ///
525     ///     assert_eq!(stream.next().await, Some(4));
526     ///     assert_eq!(stream.next().await, None);
527     ///
528     ///     // it will always return `None` after the first time.
529     ///     assert_eq!(stream.next().await, None);
530     ///     assert_eq!(stream.next().await, None);
531     ///     assert_eq!(stream.next().await, None);
532     /// }
533     /// ```
fuse(self) -> Fuse<Self> where Self: Sized,534     fn fuse(self) -> Fuse<Self>
535     where
536         Self: Sized,
537     {
538         Fuse::new(self)
539     }
540 
541     /// Creates a new stream of at most `n` items of the underlying stream.
542     ///
543     /// Once `n` items have been yielded from this stream then it will always
544     /// return that the stream is done.
545     ///
546     /// # Examples
547     ///
548     /// ```
549     /// # #[tokio::main]
550     /// # async fn main() {
551     /// use tokio_stream::{self as stream, StreamExt};
552     ///
553     /// let mut stream = stream::iter(1..=10).take(3);
554     ///
555     /// assert_eq!(Some(1), stream.next().await);
556     /// assert_eq!(Some(2), stream.next().await);
557     /// assert_eq!(Some(3), stream.next().await);
558     /// assert_eq!(None, stream.next().await);
559     /// # }
560     /// ```
take(self, n: usize) -> Take<Self> where Self: Sized,561     fn take(self, n: usize) -> Take<Self>
562     where
563         Self: Sized,
564     {
565         Take::new(self, n)
566     }
567 
568     /// Take elements from this stream while the provided predicate
569     /// resolves to `true`.
570     ///
571     /// This function, like `Iterator::take_while`, will take elements from the
572     /// stream until the predicate `f` resolves to `false`. Once one element
573     /// returns false it will always return that the stream is done.
574     ///
575     /// # Examples
576     ///
577     /// ```
578     /// # #[tokio::main]
579     /// # async fn main() {
580     /// use tokio_stream::{self as stream, StreamExt};
581     ///
582     /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
583     ///
584     /// assert_eq!(Some(1), stream.next().await);
585     /// assert_eq!(Some(2), stream.next().await);
586     /// assert_eq!(Some(3), stream.next().await);
587     /// assert_eq!(None, stream.next().await);
588     /// # }
589     /// ```
take_while<F>(self, f: F) -> TakeWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,590     fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
591     where
592         F: FnMut(&Self::Item) -> bool,
593         Self: Sized,
594     {
595         TakeWhile::new(self, f)
596     }
597 
598     /// Creates a new stream that will skip the `n` first items of the
599     /// underlying stream.
600     ///
601     /// # Examples
602     ///
603     /// ```
604     /// # #[tokio::main]
605     /// # async fn main() {
606     /// use tokio_stream::{self as stream, StreamExt};
607     ///
608     /// let mut stream = stream::iter(1..=10).skip(7);
609     ///
610     /// assert_eq!(Some(8), stream.next().await);
611     /// assert_eq!(Some(9), stream.next().await);
612     /// assert_eq!(Some(10), stream.next().await);
613     /// assert_eq!(None, stream.next().await);
614     /// # }
615     /// ```
skip(self, n: usize) -> Skip<Self> where Self: Sized,616     fn skip(self, n: usize) -> Skip<Self>
617     where
618         Self: Sized,
619     {
620         Skip::new(self, n)
621     }
622 
623     /// Skip elements from the underlying stream while the provided predicate
624     /// resolves to `true`.
625     ///
626     /// This function, like [`Iterator::skip_while`], will ignore elements from the
627     /// stream until the predicate `f` resolves to `false`. Once one element
628     /// returns false, the rest of the elements will be yielded.
629     ///
630     /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
631     ///
632     /// # Examples
633     ///
634     /// ```
635     /// # #[tokio::main]
636     /// # async fn main() {
637     /// use tokio_stream::{self as stream, StreamExt};
638     /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
639     ///
640     /// assert_eq!(Some(3), stream.next().await);
641     /// assert_eq!(Some(4), stream.next().await);
642     /// assert_eq!(Some(1), stream.next().await);
643     /// assert_eq!(None, stream.next().await);
644     /// # }
645     /// ```
skip_while<F>(self, f: F) -> SkipWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,646     fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
647     where
648         F: FnMut(&Self::Item) -> bool,
649         Self: Sized,
650     {
651         SkipWhile::new(self, f)
652     }
653 
654     /// Tests if every element of the stream matches a predicate.
655     ///
656     /// Equivalent to:
657     ///
658     /// ```ignore
659     /// async fn all<F>(&mut self, f: F) -> bool;
660     /// ```
661     ///
662     /// `all()` takes a closure that returns `true` or `false`. It applies
663     /// this closure to each element of the stream, and if they all return
664     /// `true`, then so does `all`. If any of them return `false`, it
665     /// returns `false`. An empty stream returns `true`.
666     ///
667     /// `all()` is short-circuiting; in other words, it will stop processing
668     /// as soon as it finds a `false`, given that no matter what else happens,
669     /// the result will also be `false`.
670     ///
671     /// An empty stream returns `true`.
672     ///
673     /// # Examples
674     ///
675     /// Basic usage:
676     ///
677     /// ```
678     /// # #[tokio::main]
679     /// # async fn main() {
680     /// use tokio_stream::{self as stream, StreamExt};
681     ///
682     /// let a = [1, 2, 3];
683     ///
684     /// assert!(stream::iter(&a).all(|&x| x > 0).await);
685     ///
686     /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
687     /// # }
688     /// ```
689     ///
690     /// Stopping at the first `false`:
691     ///
692     /// ```
693     /// # #[tokio::main]
694     /// # async fn main() {
695     /// use tokio_stream::{self as stream, StreamExt};
696     ///
697     /// let a = [1, 2, 3];
698     ///
699     /// let mut iter = stream::iter(&a);
700     ///
701     /// assert!(!iter.all(|&x| x != 2).await);
702     ///
703     /// // we can still use `iter`, as there are more elements.
704     /// assert_eq!(iter.next().await, Some(&3));
705     /// # }
706     /// ```
all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,707     fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
708     where
709         Self: Unpin,
710         F: FnMut(Self::Item) -> bool,
711     {
712         AllFuture::new(self, f)
713     }
714 
715     /// Tests if any element of the stream matches a predicate.
716     ///
717     /// Equivalent to:
718     ///
719     /// ```ignore
720     /// async fn any<F>(&mut self, f: F) -> bool;
721     /// ```
722     ///
723     /// `any()` takes a closure that returns `true` or `false`. It applies
724     /// this closure to each element of the stream, and if any of them return
725     /// `true`, then so does `any()`. If they all return `false`, it
726     /// returns `false`.
727     ///
728     /// `any()` is short-circuiting; in other words, it will stop processing
729     /// as soon as it finds a `true`, given that no matter what else happens,
730     /// the result will also be `true`.
731     ///
732     /// An empty stream returns `false`.
733     ///
734     /// Basic usage:
735     ///
736     /// ```
737     /// # #[tokio::main]
738     /// # async fn main() {
739     /// use tokio_stream::{self as stream, StreamExt};
740     ///
741     /// let a = [1, 2, 3];
742     ///
743     /// assert!(stream::iter(&a).any(|&x| x > 0).await);
744     ///
745     /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
746     /// # }
747     /// ```
748     ///
749     /// Stopping at the first `true`:
750     ///
751     /// ```
752     /// # #[tokio::main]
753     /// # async fn main() {
754     /// use tokio_stream::{self as stream, StreamExt};
755     ///
756     /// let a = [1, 2, 3];
757     ///
758     /// let mut iter = stream::iter(&a);
759     ///
760     /// assert!(iter.any(|&x| x != 2).await);
761     ///
762     /// // we can still use `iter`, as there are more elements.
763     /// assert_eq!(iter.next().await, Some(&2));
764     /// # }
765     /// ```
any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,766     fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
767     where
768         Self: Unpin,
769         F: FnMut(Self::Item) -> bool,
770     {
771         AnyFuture::new(self, f)
772     }
773 
774     /// Combine two streams into one by first returning all values from the
775     /// first stream then all values from the second stream.
776     ///
777     /// As long as `self` still has values to emit, no values from `other` are
778     /// emitted, even if some are ready.
779     ///
780     /// # Examples
781     ///
782     /// ```
783     /// use tokio_stream::{self as stream, StreamExt};
784     ///
785     /// #[tokio::main]
786     /// async fn main() {
787     ///     let one = stream::iter(vec![1, 2, 3]);
788     ///     let two = stream::iter(vec![4, 5, 6]);
789     ///
790     ///     let mut stream = one.chain(two);
791     ///
792     ///     assert_eq!(stream.next().await, Some(1));
793     ///     assert_eq!(stream.next().await, Some(2));
794     ///     assert_eq!(stream.next().await, Some(3));
795     ///     assert_eq!(stream.next().await, Some(4));
796     ///     assert_eq!(stream.next().await, Some(5));
797     ///     assert_eq!(stream.next().await, Some(6));
798     ///     assert_eq!(stream.next().await, None);
799     /// }
800     /// ```
chain<U>(self, other: U) -> Chain<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,801     fn chain<U>(self, other: U) -> Chain<Self, U>
802     where
803         U: Stream<Item = Self::Item>,
804         Self: Sized,
805     {
806         Chain::new(self, other)
807     }
808 
809     /// A combinator that applies a function to every element in a stream
810     /// producing a single, final value.
811     ///
812     /// Equivalent to:
813     ///
814     /// ```ignore
815     /// async fn fold<B, F>(self, init: B, f: F) -> B;
816     /// ```
817     ///
818     /// # Examples
819     /// Basic usage:
820     /// ```
821     /// # #[tokio::main]
822     /// # async fn main() {
823     /// use tokio_stream::{self as stream, *};
824     ///
825     /// let s = stream::iter(vec![1u8, 2, 3]);
826     /// let sum = s.fold(0, |acc, x| acc + x).await;
827     ///
828     /// assert_eq!(sum, 6);
829     /// # }
830     /// ```
fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> where Self: Sized, F: FnMut(B, Self::Item) -> B,831     fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
832     where
833         Self: Sized,
834         F: FnMut(B, Self::Item) -> B,
835     {
836         FoldFuture::new(self, init, f)
837     }
838 
839     /// Drain stream pushing all emitted values into a collection.
840     ///
841     /// Equivalent to:
842     ///
843     /// ```ignore
844     /// async fn collect<T>(self) -> T;
845     /// ```
846     ///
847     /// `collect` streams all values, awaiting as needed. Values are pushed into
848     /// a collection. A number of different target collection types are
849     /// supported, including [`Vec`](std::vec::Vec),
850     /// [`String`](std::string::String), and [`Bytes`].
851     ///
852     /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html
853     ///
854     /// # `Result`
855     ///
856     /// `collect()` can also be used with streams of type `Result<T, E>` where
857     /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
858     /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
859     /// streaming is terminated and `collect()` returns the `Err`.
860     ///
861     /// # Notes
862     ///
863     /// `FromStream` is currently a sealed trait. Stabilization is pending
864     /// enhancements to the Rust language.
865     ///
866     /// # Examples
867     ///
868     /// Basic usage:
869     ///
870     /// ```
871     /// use tokio_stream::{self as stream, StreamExt};
872     ///
873     /// #[tokio::main]
874     /// async fn main() {
875     ///     let doubled: Vec<i32> =
876     ///         stream::iter(vec![1, 2, 3])
877     ///             .map(|x| x * 2)
878     ///             .collect()
879     ///             .await;
880     ///
881     ///     assert_eq!(vec![2, 4, 6], doubled);
882     /// }
883     /// ```
884     ///
885     /// Collecting a stream of `Result` values
886     ///
887     /// ```
888     /// use tokio_stream::{self as stream, StreamExt};
889     ///
890     /// #[tokio::main]
891     /// async fn main() {
892     ///     // A stream containing only `Ok` values will be collected
893     ///     let values: Result<Vec<i32>, &str> =
894     ///         stream::iter(vec![Ok(1), Ok(2), Ok(3)])
895     ///             .collect()
896     ///             .await;
897     ///
898     ///     assert_eq!(Ok(vec![1, 2, 3]), values);
899     ///
900     ///     // A stream containing `Err` values will return the first error.
901     ///     let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
902     ///
903     ///     let values: Result<Vec<i32>, &str> =
904     ///         stream::iter(results)
905     ///             .collect()
906     ///             .await;
907     ///
908     ///     assert_eq!(Err("no"), values);
909     /// }
910     /// ```
collect<T>(self) -> Collect<Self, T> where T: FromStream<Self::Item>, Self: Sized,911     fn collect<T>(self) -> Collect<Self, T>
912     where
913         T: FromStream<Self::Item>,
914         Self: Sized,
915     {
916         Collect::new(self)
917     }
918 
919     /// Applies a per-item timeout to the passed stream.
920     ///
921     /// `timeout()` takes a `Duration` that represents the maximum amount of
922     /// time each element of the stream has to complete before timing out.
923     ///
924     /// If the wrapped stream yields a value before the deadline is reached, the
925     /// value is returned. Otherwise, an error is returned. The caller may decide
926     /// to continue consuming the stream and will eventually get the next source
927     /// stream value once it becomes available.
928     ///
929     /// # Notes
930     ///
931     /// This function consumes the stream passed into it and returns a
932     /// wrapped version of it.
933     ///
934     /// Polling the returned stream will continue to poll the inner stream even
935     /// if one or more items time out.
936     ///
937     /// # Examples
938     ///
939     /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
940     ///
941     /// ```
942     /// # #[tokio::main]
943     /// # async fn main() {
944     /// use tokio_stream::{self as stream, StreamExt};
945     /// use std::time::Duration;
946     /// # let int_stream = stream::iter(1..=3);
947     ///
948     /// let int_stream = int_stream.timeout(Duration::from_secs(1));
949     /// tokio::pin!(int_stream);
950     ///
951     /// // When no items time out, we get the 3 elements in succession:
952     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
953     /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
954     /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
955     /// assert_eq!(int_stream.try_next().await, Ok(None));
956     ///
957     /// // If the second item times out, we get an error and continue polling the stream:
958     /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
959     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
960     /// assert!(int_stream.try_next().await.is_err());
961     /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
962     /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
963     /// assert_eq!(int_stream.try_next().await, Ok(None));
964     ///
965     /// // If we want to stop consuming the source stream the first time an
966     /// // element times out, we can use the `take_while` operator:
967     /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
968     /// let mut int_stream = int_stream.take_while(Result::is_ok);
969     ///
970     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
971     /// assert_eq!(int_stream.try_next().await, Ok(None));
972     /// # }
973     /// ```
974     #[cfg(all(feature = "time"))]
975     #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
timeout(self, duration: Duration) -> Timeout<Self> where Self: Sized,976     fn timeout(self, duration: Duration) -> Timeout<Self>
977     where
978         Self: Sized,
979     {
980         Timeout::new(self, duration)
981     }
982 
983     /// Slows down a stream by enforcing a delay between items.
984     ///
985     /// The underlying timer behind this utility has a granularity of one millisecond.
986     ///
987     /// # Example
988     ///
989     /// Create a throttled stream.
990     /// ```rust,no_run
991     /// use std::time::Duration;
992     /// use tokio_stream::StreamExt;
993     ///
994     /// # async fn dox() {
995     /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
996     /// tokio::pin!(item_stream);
997     ///
998     /// loop {
999     ///     // The string will be produced at most every 2 seconds
1000     ///     println!("{:?}", item_stream.next().await);
1001     /// }
1002     /// # }
1003     /// ```
1004     #[cfg(all(feature = "time"))]
1005     #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
throttle(self, duration: Duration) -> Throttle<Self> where Self: Sized,1006     fn throttle(self, duration: Duration) -> Throttle<Self>
1007     where
1008         Self: Sized,
1009     {
1010         throttle(duration, self)
1011     }
1012 
1013     /// Batches the items in the given stream using a maximum duration and size for each batch.
1014     ///
1015     /// This stream returns the next batch of items in the following situations:
1016     ///  1. The inner stream has returned at least `max_size` many items since the last batch.
1017     ///  2. The time since the first item of a batch is greater than the given duration.
1018     ///  3. The end of the stream is reached.
1019     ///
1020     /// The length of the returned vector is never empty or greater than the maximum size. Empty batches
1021     /// will not be emitted if no items are received upstream.
1022     ///
1023     /// # Panics
1024     ///
1025     /// This function panics if `max_size` is zero
1026     ///
1027     /// # Example
1028     ///
1029     /// ```rust
1030     /// use std::time::Duration;
1031     /// use tokio::time;
1032     /// use tokio_stream::{self as stream, StreamExt};
1033     /// use futures::FutureExt;
1034     ///
1035     /// #[tokio::main]
1036     /// # async fn _unused() {}
1037     /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1038     /// async fn main() {
1039     ///     let iter = vec![1, 2, 3, 4].into_iter();
1040     ///     let stream0 = stream::iter(iter);
1041     ///
1042     ///     let iter = vec![5].into_iter();
1043     ///     let stream1 = stream::iter(iter)
1044     ///          .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
1045     ///
1046     ///     let chunk_stream = stream0
1047     ///         .chain(stream1)
1048     ///         .chunks_timeout(3, Duration::from_secs(2));
1049     ///     tokio::pin!(chunk_stream);
1050     ///
1051     ///     // a full batch was received
1052     ///     assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
1053     ///     // deadline was reached before max_size was reached
1054     ///     assert_eq!(chunk_stream.next().await, Some(vec![4]));
1055     ///     // last element in the stream
1056     ///     assert_eq!(chunk_stream.next().await, Some(vec![5]));
1057     /// }
1058     /// ```
1059     #[cfg(feature = "time")]
1060     #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1061     #[track_caller]
chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self> where Self: Sized,1062     fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
1063     where
1064         Self: Sized,
1065     {
1066         assert!(max_size > 0, "`max_size` must be non-zero.");
1067         ChunksTimeout::new(self, max_size, duration)
1068     }
1069 }
1070 
1071 impl<St: ?Sized> StreamExt for St where St: Stream {}
1072 
1073 /// Merge the size hints from two streams.
merge_size_hints( (left_low, left_high): (usize, Option<usize>), (right_low, right_high): (usize, Option<usize>), ) -> (usize, Option<usize>)1074 fn merge_size_hints(
1075     (left_low, left_high): (usize, Option<usize>),
1076     (right_low, right_high): (usize, Option<usize>),
1077 ) -> (usize, Option<usize>) {
1078     let low = left_low.saturating_add(right_low);
1079     let high = match (left_high, right_high) {
1080         (Some(h1), Some(h2)) => h1.checked_add(h2),
1081         _ => None,
1082     };
1083     (low, high)
1084 }
1085