• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use futures_core::Stream;
2 
3 mod all;
4 use all::AllFuture;
5 
6 mod any;
7 use any::AnyFuture;
8 
9 mod chain;
10 use chain::Chain;
11 
12 pub(crate) mod collect;
13 use collect::{Collect, FromStream};
14 
15 mod filter;
16 use filter::Filter;
17 
18 mod filter_map;
19 use filter_map::FilterMap;
20 
21 mod fold;
22 use fold::FoldFuture;
23 
24 mod fuse;
25 use fuse::Fuse;
26 
27 mod map;
28 use map::Map;
29 
30 mod merge;
31 use merge::Merge;
32 
33 mod next;
34 use next::Next;
35 
36 mod skip;
37 use skip::Skip;
38 
39 mod skip_while;
40 use skip_while::SkipWhile;
41 
42 mod try_next;
43 use try_next::TryNext;
44 
45 mod take;
46 use take::Take;
47 
48 mod take_while;
49 use take_while::TakeWhile;
50 
51 cfg_time! {
52     mod timeout;
53     use timeout::Timeout;
54     use tokio::time::Duration;
55     mod throttle;
56     use throttle::{throttle, Throttle};
57 }
58 
59 /// An extension trait for the [`Stream`] trait that provides a variety of
60 /// convenient combinator functions.
61 ///
62 /// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
63 /// in the [futures] crate, however both Tokio and futures provide separate
64 /// `StreamExt` utility traits, and some utilities are only available on one of
65 /// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
66 /// trait in the futures crate.
67 ///
68 /// If you need utilities from both `StreamExt` traits, you should prefer to
69 /// import one of them, and use the other through the fully qualified call
70 /// syntax. For example:
71 /// ```
72 /// // import one of the traits:
73 /// use futures::stream::StreamExt;
74 /// # #[tokio::main(flavor = "current_thread")]
75 /// # async fn main() {
76 ///
77 /// let a = tokio_stream::iter(vec![1, 3, 5]);
78 /// let b = tokio_stream::iter(vec![2, 4, 6]);
79 ///
80 /// // use the fully qualified call syntax for the other trait:
81 /// let merged = tokio_stream::StreamExt::merge(a, b);
82 ///
83 /// // use normal call notation for futures::stream::StreamExt::collect
84 /// let output: Vec<_> = merged.collect().await;
85 /// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
86 /// # }
87 /// ```
88 ///
89 /// [`Stream`]: crate::Stream
90 /// [futures]: https://docs.rs/futures
91 /// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
92 pub trait StreamExt: Stream {
93     /// Consumes and returns the next value in the stream or `None` if the
94     /// stream is finished.
95     ///
96     /// Equivalent to:
97     ///
98     /// ```ignore
99     /// async fn next(&mut self) -> Option<Self::Item>;
100     /// ```
101     ///
102     /// Note that because `next` doesn't take ownership over the stream,
103     /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
104     /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
105     /// be done by boxing the stream using [`Box::pin`] or
106     /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
107     /// crate.
108     ///
109     /// # Examples
110     ///
111     /// ```
112     /// # #[tokio::main]
113     /// # async fn main() {
114     /// use tokio_stream::{self as stream, StreamExt};
115     ///
116     /// let mut stream = stream::iter(1..=3);
117     ///
118     /// assert_eq!(stream.next().await, Some(1));
119     /// assert_eq!(stream.next().await, Some(2));
120     /// assert_eq!(stream.next().await, Some(3));
121     /// assert_eq!(stream.next().await, None);
122     /// # }
123     /// ```
next(&mut self) -> Next<'_, Self> where Self: Unpin,124     fn next(&mut self) -> Next<'_, Self>
125     where
126         Self: Unpin,
127     {
128         Next::new(self)
129     }
130 
131     /// Consumes and returns the next item in the stream. If an error is
132     /// encountered before the next item, the error is returned instead.
133     ///
134     /// Equivalent to:
135     ///
136     /// ```ignore
137     /// async fn try_next(&mut self) -> Result<Option<T>, E>;
138     /// ```
139     ///
140     /// This is similar to the [`next`](StreamExt::next) combinator,
141     /// but returns a [`Result<Option<T>, E>`](Result) rather than
142     /// an [`Option<Result<T, E>>`](Option), making for easy use
143     /// with the [`?`](std::ops::Try) operator.
144     ///
145     /// # Examples
146     ///
147     /// ```
148     /// # #[tokio::main]
149     /// # async fn main() {
150     /// use tokio_stream::{self as stream, StreamExt};
151     ///
152     /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
153     ///
154     /// assert_eq!(stream.try_next().await, Ok(Some(1)));
155     /// assert_eq!(stream.try_next().await, Ok(Some(2)));
156     /// assert_eq!(stream.try_next().await, Err("nope"));
157     /// # }
158     /// ```
try_next<T, E>(&mut self) -> TryNext<'_, Self> where Self: Stream<Item = Result<T, E>> + Unpin,159     fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
160     where
161         Self: Stream<Item = Result<T, E>> + Unpin,
162     {
163         TryNext::new(self)
164     }
165 
166     /// Maps this stream's items to a different type, returning a new stream of
167     /// the resulting type.
168     ///
169     /// The provided closure is executed over all elements of this stream as
170     /// they are made available. It is executed inline with calls to
171     /// [`poll_next`](Stream::poll_next).
172     ///
173     /// Note that this function consumes the stream passed into it and returns a
174     /// wrapped version of it, similar to the existing `map` methods in the
175     /// standard library.
176     ///
177     /// # Examples
178     ///
179     /// ```
180     /// # #[tokio::main]
181     /// # async fn main() {
182     /// use tokio_stream::{self as stream, StreamExt};
183     ///
184     /// let stream = stream::iter(1..=3);
185     /// let mut stream = stream.map(|x| x + 3);
186     ///
187     /// assert_eq!(stream.next().await, Some(4));
188     /// assert_eq!(stream.next().await, Some(5));
189     /// assert_eq!(stream.next().await, Some(6));
190     /// # }
191     /// ```
map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,192     fn map<T, F>(self, f: F) -> Map<Self, F>
193     where
194         F: FnMut(Self::Item) -> T,
195         Self: Sized,
196     {
197         Map::new(self, f)
198     }
199 
200     /// Combine two streams into one by interleaving the output of both as it
201     /// is produced.
202     ///
203     /// Values are produced from the merged stream in the order they arrive from
204     /// the two source streams. If both source streams provide values
205     /// simultaneously, the merge stream alternates between them. This provides
206     /// some level of fairness. You should not chain calls to `merge`, as this
207     /// will break the fairness of the merging.
208     ///
209     /// The merged stream completes once **both** source streams complete. When
210     /// one source stream completes before the other, the merge stream
211     /// exclusively polls the remaining stream.
212     ///
213     /// For merging multiple streams, consider using [`StreamMap`] instead.
214     ///
215     /// [`StreamMap`]: crate::StreamMap
216     ///
217     /// # Examples
218     ///
219     /// ```
220     /// use tokio_stream::{StreamExt, Stream};
221     /// use tokio::sync::mpsc;
222     /// use tokio::time;
223     ///
224     /// use std::time::Duration;
225     /// use std::pin::Pin;
226     ///
227     /// # /*
228     /// #[tokio::main]
229     /// # */
230     /// # #[tokio::main(flavor = "current_thread")]
231     /// async fn main() {
232     /// # time::pause();
233     ///     let (tx1, mut rx1) = mpsc::channel::<usize>(10);
234     ///     let (tx2, mut rx2) = mpsc::channel::<usize>(10);
235     ///
236     ///     // Convert the channels to a `Stream`.
237     ///     let rx1 = Box::pin(async_stream::stream! {
238     ///           while let Some(item) = rx1.recv().await {
239     ///               yield item;
240     ///           }
241     ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
242     ///
243     ///     let rx2 = Box::pin(async_stream::stream! {
244     ///           while let Some(item) = rx2.recv().await {
245     ///               yield item;
246     ///           }
247     ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
248     ///
249     ///     let mut rx = rx1.merge(rx2);
250     ///
251     ///     tokio::spawn(async move {
252     ///         // Send some values immediately
253     ///         tx1.send(1).await.unwrap();
254     ///         tx1.send(2).await.unwrap();
255     ///
256     ///         // Let the other task send values
257     ///         time::sleep(Duration::from_millis(20)).await;
258     ///
259     ///         tx1.send(4).await.unwrap();
260     ///     });
261     ///
262     ///     tokio::spawn(async move {
263     ///         // Wait for the first task to send values
264     ///         time::sleep(Duration::from_millis(5)).await;
265     ///
266     ///         tx2.send(3).await.unwrap();
267     ///
268     ///         time::sleep(Duration::from_millis(25)).await;
269     ///
270     ///         // Send the final value
271     ///         tx2.send(5).await.unwrap();
272     ///     });
273     ///
274     ///    assert_eq!(1, rx.next().await.unwrap());
275     ///    assert_eq!(2, rx.next().await.unwrap());
276     ///    assert_eq!(3, rx.next().await.unwrap());
277     ///    assert_eq!(4, rx.next().await.unwrap());
278     ///    assert_eq!(5, rx.next().await.unwrap());
279     ///
280     ///    // The merged stream is consumed
281     ///    assert!(rx.next().await.is_none());
282     /// }
283     /// ```
merge<U>(self, other: U) -> Merge<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,284     fn merge<U>(self, other: U) -> Merge<Self, U>
285     where
286         U: Stream<Item = Self::Item>,
287         Self: Sized,
288     {
289         Merge::new(self, other)
290     }
291 
292     /// Filters the values produced by this stream according to the provided
293     /// predicate.
294     ///
295     /// As values of this stream are made available, the provided predicate `f`
296     /// will be run against them. If the predicate
297     /// resolves to `true`, then the stream will yield the value, but if the
298     /// predicate resolves to `false`, then the value
299     /// will be discarded and the next value will be produced.
300     ///
301     /// Note that this function consumes the stream passed into it and returns a
302     /// wrapped version of it, similar to [`Iterator::filter`] method in the
303     /// standard library.
304     ///
305     /// # Examples
306     ///
307     /// ```
308     /// # #[tokio::main]
309     /// # async fn main() {
310     /// use tokio_stream::{self as stream, StreamExt};
311     ///
312     /// let stream = stream::iter(1..=8);
313     /// let mut evens = stream.filter(|x| x % 2 == 0);
314     ///
315     /// assert_eq!(Some(2), evens.next().await);
316     /// assert_eq!(Some(4), evens.next().await);
317     /// assert_eq!(Some(6), evens.next().await);
318     /// assert_eq!(Some(8), evens.next().await);
319     /// assert_eq!(None, evens.next().await);
320     /// # }
321     /// ```
filter<F>(self, f: F) -> Filter<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,322     fn filter<F>(self, f: F) -> Filter<Self, F>
323     where
324         F: FnMut(&Self::Item) -> bool,
325         Self: Sized,
326     {
327         Filter::new(self, f)
328     }
329 
330     /// Filters the values produced by this stream while simultaneously mapping
331     /// them to a different type according to the provided closure.
332     ///
333     /// As values of this stream are made available, the provided function will
334     /// be run on them. If the predicate `f` resolves to
335     /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
336     /// it resolves to [`None`], then the value will be skipped.
337     ///
338     /// Note that this function consumes the stream passed into it and returns a
339     /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
340     /// standard library.
341     ///
342     /// # Examples
343     /// ```
344     /// # #[tokio::main]
345     /// # async fn main() {
346     /// use tokio_stream::{self as stream, StreamExt};
347     ///
348     /// let stream = stream::iter(1..=8);
349     /// let mut evens = stream.filter_map(|x| {
350     ///     if x % 2 == 0 { Some(x + 1) } else { None }
351     /// });
352     ///
353     /// assert_eq!(Some(3), evens.next().await);
354     /// assert_eq!(Some(5), evens.next().await);
355     /// assert_eq!(Some(7), evens.next().await);
356     /// assert_eq!(Some(9), evens.next().await);
357     /// assert_eq!(None, evens.next().await);
358     /// # }
359     /// ```
filter_map<T, F>(self, f: F) -> FilterMap<Self, F> where F: FnMut(Self::Item) -> Option<T>, Self: Sized,360     fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
361     where
362         F: FnMut(Self::Item) -> Option<T>,
363         Self: Sized,
364     {
365         FilterMap::new(self, f)
366     }
367 
368     /// Creates a stream which ends after the first `None`.
369     ///
370     /// After a stream returns `None`, behavior is undefined. Future calls to
371     /// `poll_next` may or may not return `Some(T)` again or they may panic.
372     /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
373     /// return `None` forever.
374     ///
375     /// # Examples
376     ///
377     /// ```
378     /// use tokio_stream::{Stream, StreamExt};
379     ///
380     /// use std::pin::Pin;
381     /// use std::task::{Context, Poll};
382     ///
383     /// // a stream which alternates between Some and None
384     /// struct Alternate {
385     ///     state: i32,
386     /// }
387     ///
388     /// impl Stream for Alternate {
389     ///     type Item = i32;
390     ///
391     ///     fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
392     ///         let val = self.state;
393     ///         self.state = self.state + 1;
394     ///
395     ///         // if it's even, Some(i32), else None
396     ///         if val % 2 == 0 {
397     ///             Poll::Ready(Some(val))
398     ///         } else {
399     ///             Poll::Ready(None)
400     ///         }
401     ///     }
402     /// }
403     ///
404     /// #[tokio::main]
405     /// async fn main() {
406     ///     let mut stream = Alternate { state: 0 };
407     ///
408     ///     // the stream goes back and forth
409     ///     assert_eq!(stream.next().await, Some(0));
410     ///     assert_eq!(stream.next().await, None);
411     ///     assert_eq!(stream.next().await, Some(2));
412     ///     assert_eq!(stream.next().await, None);
413     ///
414     ///     // however, once it is fused
415     ///     let mut stream = stream.fuse();
416     ///
417     ///     assert_eq!(stream.next().await, Some(4));
418     ///     assert_eq!(stream.next().await, None);
419     ///
420     ///     // it will always return `None` after the first time.
421     ///     assert_eq!(stream.next().await, None);
422     ///     assert_eq!(stream.next().await, None);
423     ///     assert_eq!(stream.next().await, None);
424     /// }
425     /// ```
fuse(self) -> Fuse<Self> where Self: Sized,426     fn fuse(self) -> Fuse<Self>
427     where
428         Self: Sized,
429     {
430         Fuse::new(self)
431     }
432 
433     /// Creates a new stream of at most `n` items of the underlying stream.
434     ///
435     /// Once `n` items have been yielded from this stream then it will always
436     /// return that the stream is done.
437     ///
438     /// # Examples
439     ///
440     /// ```
441     /// # #[tokio::main]
442     /// # async fn main() {
443     /// use tokio_stream::{self as stream, StreamExt};
444     ///
445     /// let mut stream = stream::iter(1..=10).take(3);
446     ///
447     /// assert_eq!(Some(1), stream.next().await);
448     /// assert_eq!(Some(2), stream.next().await);
449     /// assert_eq!(Some(3), stream.next().await);
450     /// assert_eq!(None, stream.next().await);
451     /// # }
452     /// ```
take(self, n: usize) -> Take<Self> where Self: Sized,453     fn take(self, n: usize) -> Take<Self>
454     where
455         Self: Sized,
456     {
457         Take::new(self, n)
458     }
459 
460     /// Take elements from this stream while the provided predicate
461     /// resolves to `true`.
462     ///
463     /// This function, like `Iterator::take_while`, will take elements from the
464     /// stream until the predicate `f` resolves to `false`. Once one element
465     /// returns false it will always return that the stream is done.
466     ///
467     /// # Examples
468     ///
469     /// ```
470     /// # #[tokio::main]
471     /// # async fn main() {
472     /// use tokio_stream::{self as stream, StreamExt};
473     ///
474     /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
475     ///
476     /// assert_eq!(Some(1), stream.next().await);
477     /// assert_eq!(Some(2), stream.next().await);
478     /// assert_eq!(Some(3), stream.next().await);
479     /// assert_eq!(None, stream.next().await);
480     /// # }
481     /// ```
take_while<F>(self, f: F) -> TakeWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,482     fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
483     where
484         F: FnMut(&Self::Item) -> bool,
485         Self: Sized,
486     {
487         TakeWhile::new(self, f)
488     }
489 
490     /// Creates a new stream that will skip the `n` first items of the
491     /// underlying stream.
492     ///
493     /// # Examples
494     ///
495     /// ```
496     /// # #[tokio::main]
497     /// # async fn main() {
498     /// use tokio_stream::{self as stream, StreamExt};
499     ///
500     /// let mut stream = stream::iter(1..=10).skip(7);
501     ///
502     /// assert_eq!(Some(8), stream.next().await);
503     /// assert_eq!(Some(9), stream.next().await);
504     /// assert_eq!(Some(10), stream.next().await);
505     /// assert_eq!(None, stream.next().await);
506     /// # }
507     /// ```
skip(self, n: usize) -> Skip<Self> where Self: Sized,508     fn skip(self, n: usize) -> Skip<Self>
509     where
510         Self: Sized,
511     {
512         Skip::new(self, n)
513     }
514 
515     /// Skip elements from the underlying stream while the provided predicate
516     /// resolves to `true`.
517     ///
518     /// This function, like [`Iterator::skip_while`], will ignore elemets from the
519     /// stream until the predicate `f` resolves to `false`. Once one element
520     /// returns false, the rest of the elements will be yielded.
521     ///
522     /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
523     ///
524     /// # Examples
525     ///
526     /// ```
527     /// # #[tokio::main]
528     /// # async fn main() {
529     /// use tokio_stream::{self as stream, StreamExt};
530     /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
531     ///
532     /// assert_eq!(Some(3), stream.next().await);
533     /// assert_eq!(Some(4), stream.next().await);
534     /// assert_eq!(Some(1), stream.next().await);
535     /// assert_eq!(None, stream.next().await);
536     /// # }
537     /// ```
skip_while<F>(self, f: F) -> SkipWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,538     fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
539     where
540         F: FnMut(&Self::Item) -> bool,
541         Self: Sized,
542     {
543         SkipWhile::new(self, f)
544     }
545 
546     /// Tests if every element of the stream matches a predicate.
547     ///
548     /// Equivalent to:
549     ///
550     /// ```ignore
551     /// async fn all<F>(&mut self, f: F) -> bool;
552     /// ```
553     ///
554     /// `all()` takes a closure that returns `true` or `false`. It applies
555     /// this closure to each element of the stream, and if they all return
556     /// `true`, then so does `all`. If any of them return `false`, it
557     /// returns `false`. An empty stream returns `true`.
558     ///
559     /// `all()` is short-circuiting; in other words, it will stop processing
560     /// as soon as it finds a `false`, given that no matter what else happens,
561     /// the result will also be `false`.
562     ///
563     /// An empty stream returns `true`.
564     ///
565     /// # Examples
566     ///
567     /// Basic usage:
568     ///
569     /// ```
570     /// # #[tokio::main]
571     /// # async fn main() {
572     /// use tokio_stream::{self as stream, StreamExt};
573     ///
574     /// let a = [1, 2, 3];
575     ///
576     /// assert!(stream::iter(&a).all(|&x| x > 0).await);
577     ///
578     /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
579     /// # }
580     /// ```
581     ///
582     /// Stopping at the first `false`:
583     ///
584     /// ```
585     /// # #[tokio::main]
586     /// # async fn main() {
587     /// use tokio_stream::{self as stream, StreamExt};
588     ///
589     /// let a = [1, 2, 3];
590     ///
591     /// let mut iter = stream::iter(&a);
592     ///
593     /// assert!(!iter.all(|&x| x != 2).await);
594     ///
595     /// // we can still use `iter`, as there are more elements.
596     /// assert_eq!(iter.next().await, Some(&3));
597     /// # }
598     /// ```
all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,599     fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
600     where
601         Self: Unpin,
602         F: FnMut(Self::Item) -> bool,
603     {
604         AllFuture::new(self, f)
605     }
606 
607     /// Tests if any element of the stream matches a predicate.
608     ///
609     /// Equivalent to:
610     ///
611     /// ```ignore
612     /// async fn any<F>(&mut self, f: F) -> bool;
613     /// ```
614     ///
615     /// `any()` takes a closure that returns `true` or `false`. It applies
616     /// this closure to each element of the stream, and if any of them return
617     /// `true`, then so does `any()`. If they all return `false`, it
618     /// returns `false`.
619     ///
620     /// `any()` is short-circuiting; in other words, it will stop processing
621     /// as soon as it finds a `true`, given that no matter what else happens,
622     /// the result will also be `true`.
623     ///
624     /// An empty stream returns `false`.
625     ///
626     /// Basic usage:
627     ///
628     /// ```
629     /// # #[tokio::main]
630     /// # async fn main() {
631     /// use tokio_stream::{self as stream, StreamExt};
632     ///
633     /// let a = [1, 2, 3];
634     ///
635     /// assert!(stream::iter(&a).any(|&x| x > 0).await);
636     ///
637     /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
638     /// # }
639     /// ```
640     ///
641     /// Stopping at the first `true`:
642     ///
643     /// ```
644     /// # #[tokio::main]
645     /// # async fn main() {
646     /// use tokio_stream::{self as stream, StreamExt};
647     ///
648     /// let a = [1, 2, 3];
649     ///
650     /// let mut iter = stream::iter(&a);
651     ///
652     /// assert!(iter.any(|&x| x != 2).await);
653     ///
654     /// // we can still use `iter`, as there are more elements.
655     /// assert_eq!(iter.next().await, Some(&2));
656     /// # }
657     /// ```
any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,658     fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
659     where
660         Self: Unpin,
661         F: FnMut(Self::Item) -> bool,
662     {
663         AnyFuture::new(self, f)
664     }
665 
666     /// Combine two streams into one by first returning all values from the
667     /// first stream then all values from the second stream.
668     ///
669     /// As long as `self` still has values to emit, no values from `other` are
670     /// emitted, even if some are ready.
671     ///
672     /// # Examples
673     ///
674     /// ```
675     /// use tokio_stream::{self as stream, StreamExt};
676     ///
677     /// #[tokio::main]
678     /// async fn main() {
679     ///     let one = stream::iter(vec![1, 2, 3]);
680     ///     let two = stream::iter(vec![4, 5, 6]);
681     ///
682     ///     let mut stream = one.chain(two);
683     ///
684     ///     assert_eq!(stream.next().await, Some(1));
685     ///     assert_eq!(stream.next().await, Some(2));
686     ///     assert_eq!(stream.next().await, Some(3));
687     ///     assert_eq!(stream.next().await, Some(4));
688     ///     assert_eq!(stream.next().await, Some(5));
689     ///     assert_eq!(stream.next().await, Some(6));
690     ///     assert_eq!(stream.next().await, None);
691     /// }
692     /// ```
chain<U>(self, other: U) -> Chain<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,693     fn chain<U>(self, other: U) -> Chain<Self, U>
694     where
695         U: Stream<Item = Self::Item>,
696         Self: Sized,
697     {
698         Chain::new(self, other)
699     }
700 
701     /// A combinator that applies a function to every element in a stream
702     /// producing a single, final value.
703     ///
704     /// Equivalent to:
705     ///
706     /// ```ignore
707     /// async fn fold<B, F>(self, init: B, f: F) -> B;
708     /// ```
709     ///
710     /// # Examples
711     /// Basic usage:
712     /// ```
713     /// # #[tokio::main]
714     /// # async fn main() {
715     /// use tokio_stream::{self as stream, *};
716     ///
717     /// let s = stream::iter(vec![1u8, 2, 3]);
718     /// let sum = s.fold(0, |acc, x| acc + x).await;
719     ///
720     /// assert_eq!(sum, 6);
721     /// # }
722     /// ```
fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> where Self: Sized, F: FnMut(B, Self::Item) -> B,723     fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
724     where
725         Self: Sized,
726         F: FnMut(B, Self::Item) -> B,
727     {
728         FoldFuture::new(self, init, f)
729     }
730 
731     /// Drain stream pushing all emitted values into a collection.
732     ///
733     /// Equivalent to:
734     ///
735     /// ```ignore
736     /// async fn collect<T>(self) -> T;
737     /// ```
738     ///
739     /// `collect` streams all values, awaiting as needed. Values are pushed into
740     /// a collection. A number of different target collection types are
741     /// supported, including [`Vec`](std::vec::Vec),
742     /// [`String`](std::string::String), and [`Bytes`].
743     ///
744     /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html
745     ///
746     /// # `Result`
747     ///
748     /// `collect()` can also be used with streams of type `Result<T, E>` where
749     /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
750     /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
751     /// streaming is terminated and `collect()` returns the `Err`.
752     ///
753     /// # Notes
754     ///
755     /// `FromStream` is currently a sealed trait. Stabilization is pending
756     /// enhancements to the Rust language.
757     ///
758     /// # Examples
759     ///
760     /// Basic usage:
761     ///
762     /// ```
763     /// use tokio_stream::{self as stream, StreamExt};
764     ///
765     /// #[tokio::main]
766     /// async fn main() {
767     ///     let doubled: Vec<i32> =
768     ///         stream::iter(vec![1, 2, 3])
769     ///             .map(|x| x * 2)
770     ///             .collect()
771     ///             .await;
772     ///
773     ///     assert_eq!(vec![2, 4, 6], doubled);
774     /// }
775     /// ```
776     ///
777     /// Collecting a stream of `Result` values
778     ///
779     /// ```
780     /// use tokio_stream::{self as stream, StreamExt};
781     ///
782     /// #[tokio::main]
783     /// async fn main() {
784     ///     // A stream containing only `Ok` values will be collected
785     ///     let values: Result<Vec<i32>, &str> =
786     ///         stream::iter(vec![Ok(1), Ok(2), Ok(3)])
787     ///             .collect()
788     ///             .await;
789     ///
790     ///     assert_eq!(Ok(vec![1, 2, 3]), values);
791     ///
792     ///     // A stream containing `Err` values will return the first error.
793     ///     let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
794     ///
795     ///     let values: Result<Vec<i32>, &str> =
796     ///         stream::iter(results)
797     ///             .collect()
798     ///             .await;
799     ///
800     ///     assert_eq!(Err("no"), values);
801     /// }
802     /// ```
collect<T>(self) -> Collect<Self, T> where T: FromStream<Self::Item>, Self: Sized,803     fn collect<T>(self) -> Collect<Self, T>
804     where
805         T: FromStream<Self::Item>,
806         Self: Sized,
807     {
808         Collect::new(self)
809     }
810 
811     /// Applies a per-item timeout to the passed stream.
812     ///
813     /// `timeout()` takes a `Duration` that represents the maximum amount of
814     /// time each element of the stream has to complete before timing out.
815     ///
816     /// If the wrapped stream yields a value before the deadline is reached, the
817     /// value is returned. Otherwise, an error is returned. The caller may decide
818     /// to continue consuming the stream and will eventually get the next source
819     /// stream value once it becomes available.
820     ///
821     /// # Notes
822     ///
823     /// This function consumes the stream passed into it and returns a
824     /// wrapped version of it.
825     ///
826     /// Polling the returned stream will continue to poll the inner stream even
827     /// if one or more items time out.
828     ///
829     /// # Examples
830     ///
831     /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
832     ///
833     /// ```
834     /// # #[tokio::main]
835     /// # async fn main() {
836     /// use tokio_stream::{self as stream, StreamExt};
837     /// use std::time::Duration;
838     /// # let int_stream = stream::iter(1..=3);
839     ///
840     /// let int_stream = int_stream.timeout(Duration::from_secs(1));
841     /// tokio::pin!(int_stream);
842     ///
843     /// // When no items time out, we get the 3 elements in succession:
844     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
845     /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
846     /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
847     /// assert_eq!(int_stream.try_next().await, Ok(None));
848     ///
849     /// // If the second item times out, we get an error and continue polling the stream:
850     /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
851     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
852     /// assert!(int_stream.try_next().await.is_err());
853     /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
854     /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
855     /// assert_eq!(int_stream.try_next().await, Ok(None));
856     ///
857     /// // If we want to stop consuming the source stream the first time an
858     /// // element times out, we can use the `take_while` operator:
859     /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
860     /// let mut int_stream = int_stream.take_while(Result::is_ok);
861     ///
862     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
863     /// assert_eq!(int_stream.try_next().await, Ok(None));
864     /// # }
865     /// ```
866     #[cfg(all(feature = "time"))]
867     #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
timeout(self, duration: Duration) -> Timeout<Self> where Self: Sized,868     fn timeout(self, duration: Duration) -> Timeout<Self>
869     where
870         Self: Sized,
871     {
872         Timeout::new(self, duration)
873     }
874 
875     /// Slows down a stream by enforcing a delay between items.
876     ///
877     /// # Example
878     ///
879     /// Create a throttled stream.
880     /// ```rust,no_run
881     /// use std::time::Duration;
882     /// use tokio_stream::StreamExt;
883     ///
884     /// # async fn dox() {
885     /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
886     /// tokio::pin!(item_stream);
887     ///
888     /// loop {
889     ///     // The string will be produced at most every 2 seconds
890     ///     println!("{:?}", item_stream.next().await);
891     /// }
892     /// # }
893     /// ```
894     #[cfg(all(feature = "time"))]
895     #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
throttle(self, duration: Duration) -> Throttle<Self> where Self: Sized,896     fn throttle(self, duration: Duration) -> Throttle<Self>
897     where
898         Self: Sized,
899     {
900         throttle(duration, self)
901     }
902 }
903 
904 impl<St: ?Sized> StreamExt for St where St: Stream {}
905 
906 /// Merge the size hints from two streams.
merge_size_hints( (left_low, left_high): (usize, Option<usize>), (right_low, right_hign): (usize, Option<usize>), ) -> (usize, Option<usize>)907 fn merge_size_hints(
908     (left_low, left_high): (usize, Option<usize>),
909     (right_low, right_hign): (usize, Option<usize>),
910 ) -> (usize, Option<usize>) {
911     let low = left_low.saturating_add(right_low);
912     let high = match (left_high, right_hign) {
913         (Some(h1), Some(h2)) => h1.checked_add(h2),
914         _ => None,
915     };
916     (low, high)
917 }
918