1 use futures_core::Stream;
2
3 mod all;
4 use all::AllFuture;
5
6 mod any;
7 use any::AnyFuture;
8
9 mod chain;
10 use chain::Chain;
11
12 pub(crate) mod collect;
13 use collect::{Collect, FromStream};
14
15 mod filter;
16 use filter::Filter;
17
18 mod filter_map;
19 use filter_map::FilterMap;
20
21 mod fold;
22 use fold::FoldFuture;
23
24 mod fuse;
25 use fuse::Fuse;
26
27 mod map;
28 use map::Map;
29
30 mod merge;
31 use merge::Merge;
32
33 mod next;
34 use next::Next;
35
36 mod skip;
37 use skip::Skip;
38
39 mod skip_while;
40 use skip_while::SkipWhile;
41
42 mod try_next;
43 use try_next::TryNext;
44
45 mod take;
46 use take::Take;
47
48 mod take_while;
49 use take_while::TakeWhile;
50
51 cfg_time! {
52 mod timeout;
53 use timeout::Timeout;
54 use tokio::time::Duration;
55 mod throttle;
56 use throttle::{throttle, Throttle};
57 }
58
59 /// An extension trait for the [`Stream`] trait that provides a variety of
60 /// convenient combinator functions.
61 ///
62 /// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
63 /// in the [futures] crate, however both Tokio and futures provide separate
64 /// `StreamExt` utility traits, and some utilities are only available on one of
65 /// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
66 /// trait in the futures crate.
67 ///
68 /// If you need utilities from both `StreamExt` traits, you should prefer to
69 /// import one of them, and use the other through the fully qualified call
70 /// syntax. For example:
71 /// ```
72 /// // import one of the traits:
73 /// use futures::stream::StreamExt;
74 /// # #[tokio::main(flavor = "current_thread")]
75 /// # async fn main() {
76 ///
77 /// let a = tokio_stream::iter(vec![1, 3, 5]);
78 /// let b = tokio_stream::iter(vec![2, 4, 6]);
79 ///
80 /// // use the fully qualified call syntax for the other trait:
81 /// let merged = tokio_stream::StreamExt::merge(a, b);
82 ///
83 /// // use normal call notation for futures::stream::StreamExt::collect
84 /// let output: Vec<_> = merged.collect().await;
85 /// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
86 /// # }
87 /// ```
88 ///
89 /// [`Stream`]: crate::Stream
90 /// [futures]: https://docs.rs/futures
91 /// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
92 pub trait StreamExt: Stream {
93 /// Consumes and returns the next value in the stream or `None` if the
94 /// stream is finished.
95 ///
96 /// Equivalent to:
97 ///
98 /// ```ignore
99 /// async fn next(&mut self) -> Option<Self::Item>;
100 /// ```
101 ///
102 /// Note that because `next` doesn't take ownership over the stream,
103 /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
104 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
105 /// be done by boxing the stream using [`Box::pin`] or
106 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
107 /// crate.
108 ///
109 /// # Examples
110 ///
111 /// ```
112 /// # #[tokio::main]
113 /// # async fn main() {
114 /// use tokio_stream::{self as stream, StreamExt};
115 ///
116 /// let mut stream = stream::iter(1..=3);
117 ///
118 /// assert_eq!(stream.next().await, Some(1));
119 /// assert_eq!(stream.next().await, Some(2));
120 /// assert_eq!(stream.next().await, Some(3));
121 /// assert_eq!(stream.next().await, None);
122 /// # }
123 /// ```
next(&mut self) -> Next<'_, Self> where Self: Unpin,124 fn next(&mut self) -> Next<'_, Self>
125 where
126 Self: Unpin,
127 {
128 Next::new(self)
129 }
130
131 /// Consumes and returns the next item in the stream. If an error is
132 /// encountered before the next item, the error is returned instead.
133 ///
134 /// Equivalent to:
135 ///
136 /// ```ignore
137 /// async fn try_next(&mut self) -> Result<Option<T>, E>;
138 /// ```
139 ///
140 /// This is similar to the [`next`](StreamExt::next) combinator,
141 /// but returns a [`Result<Option<T>, E>`](Result) rather than
142 /// an [`Option<Result<T, E>>`](Option), making for easy use
143 /// with the [`?`](std::ops::Try) operator.
144 ///
145 /// # Examples
146 ///
147 /// ```
148 /// # #[tokio::main]
149 /// # async fn main() {
150 /// use tokio_stream::{self as stream, StreamExt};
151 ///
152 /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
153 ///
154 /// assert_eq!(stream.try_next().await, Ok(Some(1)));
155 /// assert_eq!(stream.try_next().await, Ok(Some(2)));
156 /// assert_eq!(stream.try_next().await, Err("nope"));
157 /// # }
158 /// ```
try_next<T, E>(&mut self) -> TryNext<'_, Self> where Self: Stream<Item = Result<T, E>> + Unpin,159 fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
160 where
161 Self: Stream<Item = Result<T, E>> + Unpin,
162 {
163 TryNext::new(self)
164 }
165
166 /// Maps this stream's items to a different type, returning a new stream of
167 /// the resulting type.
168 ///
169 /// The provided closure is executed over all elements of this stream as
170 /// they are made available. It is executed inline with calls to
171 /// [`poll_next`](Stream::poll_next).
172 ///
173 /// Note that this function consumes the stream passed into it and returns a
174 /// wrapped version of it, similar to the existing `map` methods in the
175 /// standard library.
176 ///
177 /// # Examples
178 ///
179 /// ```
180 /// # #[tokio::main]
181 /// # async fn main() {
182 /// use tokio_stream::{self as stream, StreamExt};
183 ///
184 /// let stream = stream::iter(1..=3);
185 /// let mut stream = stream.map(|x| x + 3);
186 ///
187 /// assert_eq!(stream.next().await, Some(4));
188 /// assert_eq!(stream.next().await, Some(5));
189 /// assert_eq!(stream.next().await, Some(6));
190 /// # }
191 /// ```
map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,192 fn map<T, F>(self, f: F) -> Map<Self, F>
193 where
194 F: FnMut(Self::Item) -> T,
195 Self: Sized,
196 {
197 Map::new(self, f)
198 }
199
200 /// Combine two streams into one by interleaving the output of both as it
201 /// is produced.
202 ///
203 /// Values are produced from the merged stream in the order they arrive from
204 /// the two source streams. If both source streams provide values
205 /// simultaneously, the merge stream alternates between them. This provides
206 /// some level of fairness. You should not chain calls to `merge`, as this
207 /// will break the fairness of the merging.
208 ///
209 /// The merged stream completes once **both** source streams complete. When
210 /// one source stream completes before the other, the merge stream
211 /// exclusively polls the remaining stream.
212 ///
213 /// For merging multiple streams, consider using [`StreamMap`] instead.
214 ///
215 /// [`StreamMap`]: crate::StreamMap
216 ///
217 /// # Examples
218 ///
219 /// ```
220 /// use tokio_stream::{StreamExt, Stream};
221 /// use tokio::sync::mpsc;
222 /// use tokio::time;
223 ///
224 /// use std::time::Duration;
225 /// use std::pin::Pin;
226 ///
227 /// # /*
228 /// #[tokio::main]
229 /// # */
230 /// # #[tokio::main(flavor = "current_thread")]
231 /// async fn main() {
232 /// # time::pause();
233 /// let (tx1, mut rx1) = mpsc::channel::<usize>(10);
234 /// let (tx2, mut rx2) = mpsc::channel::<usize>(10);
235 ///
236 /// // Convert the channels to a `Stream`.
237 /// let rx1 = Box::pin(async_stream::stream! {
238 /// while let Some(item) = rx1.recv().await {
239 /// yield item;
240 /// }
241 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
242 ///
243 /// let rx2 = Box::pin(async_stream::stream! {
244 /// while let Some(item) = rx2.recv().await {
245 /// yield item;
246 /// }
247 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
248 ///
249 /// let mut rx = rx1.merge(rx2);
250 ///
251 /// tokio::spawn(async move {
252 /// // Send some values immediately
253 /// tx1.send(1).await.unwrap();
254 /// tx1.send(2).await.unwrap();
255 ///
256 /// // Let the other task send values
257 /// time::sleep(Duration::from_millis(20)).await;
258 ///
259 /// tx1.send(4).await.unwrap();
260 /// });
261 ///
262 /// tokio::spawn(async move {
263 /// // Wait for the first task to send values
264 /// time::sleep(Duration::from_millis(5)).await;
265 ///
266 /// tx2.send(3).await.unwrap();
267 ///
268 /// time::sleep(Duration::from_millis(25)).await;
269 ///
270 /// // Send the final value
271 /// tx2.send(5).await.unwrap();
272 /// });
273 ///
274 /// assert_eq!(1, rx.next().await.unwrap());
275 /// assert_eq!(2, rx.next().await.unwrap());
276 /// assert_eq!(3, rx.next().await.unwrap());
277 /// assert_eq!(4, rx.next().await.unwrap());
278 /// assert_eq!(5, rx.next().await.unwrap());
279 ///
280 /// // The merged stream is consumed
281 /// assert!(rx.next().await.is_none());
282 /// }
283 /// ```
merge<U>(self, other: U) -> Merge<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,284 fn merge<U>(self, other: U) -> Merge<Self, U>
285 where
286 U: Stream<Item = Self::Item>,
287 Self: Sized,
288 {
289 Merge::new(self, other)
290 }
291
292 /// Filters the values produced by this stream according to the provided
293 /// predicate.
294 ///
295 /// As values of this stream are made available, the provided predicate `f`
296 /// will be run against them. If the predicate
297 /// resolves to `true`, then the stream will yield the value, but if the
298 /// predicate resolves to `false`, then the value
299 /// will be discarded and the next value will be produced.
300 ///
301 /// Note that this function consumes the stream passed into it and returns a
302 /// wrapped version of it, similar to [`Iterator::filter`] method in the
303 /// standard library.
304 ///
305 /// # Examples
306 ///
307 /// ```
308 /// # #[tokio::main]
309 /// # async fn main() {
310 /// use tokio_stream::{self as stream, StreamExt};
311 ///
312 /// let stream = stream::iter(1..=8);
313 /// let mut evens = stream.filter(|x| x % 2 == 0);
314 ///
315 /// assert_eq!(Some(2), evens.next().await);
316 /// assert_eq!(Some(4), evens.next().await);
317 /// assert_eq!(Some(6), evens.next().await);
318 /// assert_eq!(Some(8), evens.next().await);
319 /// assert_eq!(None, evens.next().await);
320 /// # }
321 /// ```
filter<F>(self, f: F) -> Filter<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,322 fn filter<F>(self, f: F) -> Filter<Self, F>
323 where
324 F: FnMut(&Self::Item) -> bool,
325 Self: Sized,
326 {
327 Filter::new(self, f)
328 }
329
330 /// Filters the values produced by this stream while simultaneously mapping
331 /// them to a different type according to the provided closure.
332 ///
333 /// As values of this stream are made available, the provided function will
334 /// be run on them. If the predicate `f` resolves to
335 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
336 /// it resolves to [`None`], then the value will be skipped.
337 ///
338 /// Note that this function consumes the stream passed into it and returns a
339 /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
340 /// standard library.
341 ///
342 /// # Examples
343 /// ```
344 /// # #[tokio::main]
345 /// # async fn main() {
346 /// use tokio_stream::{self as stream, StreamExt};
347 ///
348 /// let stream = stream::iter(1..=8);
349 /// let mut evens = stream.filter_map(|x| {
350 /// if x % 2 == 0 { Some(x + 1) } else { None }
351 /// });
352 ///
353 /// assert_eq!(Some(3), evens.next().await);
354 /// assert_eq!(Some(5), evens.next().await);
355 /// assert_eq!(Some(7), evens.next().await);
356 /// assert_eq!(Some(9), evens.next().await);
357 /// assert_eq!(None, evens.next().await);
358 /// # }
359 /// ```
filter_map<T, F>(self, f: F) -> FilterMap<Self, F> where F: FnMut(Self::Item) -> Option<T>, Self: Sized,360 fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
361 where
362 F: FnMut(Self::Item) -> Option<T>,
363 Self: Sized,
364 {
365 FilterMap::new(self, f)
366 }
367
368 /// Creates a stream which ends after the first `None`.
369 ///
370 /// After a stream returns `None`, behavior is undefined. Future calls to
371 /// `poll_next` may or may not return `Some(T)` again or they may panic.
372 /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
373 /// return `None` forever.
374 ///
375 /// # Examples
376 ///
377 /// ```
378 /// use tokio_stream::{Stream, StreamExt};
379 ///
380 /// use std::pin::Pin;
381 /// use std::task::{Context, Poll};
382 ///
383 /// // a stream which alternates between Some and None
384 /// struct Alternate {
385 /// state: i32,
386 /// }
387 ///
388 /// impl Stream for Alternate {
389 /// type Item = i32;
390 ///
391 /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
392 /// let val = self.state;
393 /// self.state = self.state + 1;
394 ///
395 /// // if it's even, Some(i32), else None
396 /// if val % 2 == 0 {
397 /// Poll::Ready(Some(val))
398 /// } else {
399 /// Poll::Ready(None)
400 /// }
401 /// }
402 /// }
403 ///
404 /// #[tokio::main]
405 /// async fn main() {
406 /// let mut stream = Alternate { state: 0 };
407 ///
408 /// // the stream goes back and forth
409 /// assert_eq!(stream.next().await, Some(0));
410 /// assert_eq!(stream.next().await, None);
411 /// assert_eq!(stream.next().await, Some(2));
412 /// assert_eq!(stream.next().await, None);
413 ///
414 /// // however, once it is fused
415 /// let mut stream = stream.fuse();
416 ///
417 /// assert_eq!(stream.next().await, Some(4));
418 /// assert_eq!(stream.next().await, None);
419 ///
420 /// // it will always return `None` after the first time.
421 /// assert_eq!(stream.next().await, None);
422 /// assert_eq!(stream.next().await, None);
423 /// assert_eq!(stream.next().await, None);
424 /// }
425 /// ```
fuse(self) -> Fuse<Self> where Self: Sized,426 fn fuse(self) -> Fuse<Self>
427 where
428 Self: Sized,
429 {
430 Fuse::new(self)
431 }
432
433 /// Creates a new stream of at most `n` items of the underlying stream.
434 ///
435 /// Once `n` items have been yielded from this stream then it will always
436 /// return that the stream is done.
437 ///
438 /// # Examples
439 ///
440 /// ```
441 /// # #[tokio::main]
442 /// # async fn main() {
443 /// use tokio_stream::{self as stream, StreamExt};
444 ///
445 /// let mut stream = stream::iter(1..=10).take(3);
446 ///
447 /// assert_eq!(Some(1), stream.next().await);
448 /// assert_eq!(Some(2), stream.next().await);
449 /// assert_eq!(Some(3), stream.next().await);
450 /// assert_eq!(None, stream.next().await);
451 /// # }
452 /// ```
take(self, n: usize) -> Take<Self> where Self: Sized,453 fn take(self, n: usize) -> Take<Self>
454 where
455 Self: Sized,
456 {
457 Take::new(self, n)
458 }
459
460 /// Take elements from this stream while the provided predicate
461 /// resolves to `true`.
462 ///
463 /// This function, like `Iterator::take_while`, will take elements from the
464 /// stream until the predicate `f` resolves to `false`. Once one element
465 /// returns false it will always return that the stream is done.
466 ///
467 /// # Examples
468 ///
469 /// ```
470 /// # #[tokio::main]
471 /// # async fn main() {
472 /// use tokio_stream::{self as stream, StreamExt};
473 ///
474 /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
475 ///
476 /// assert_eq!(Some(1), stream.next().await);
477 /// assert_eq!(Some(2), stream.next().await);
478 /// assert_eq!(Some(3), stream.next().await);
479 /// assert_eq!(None, stream.next().await);
480 /// # }
481 /// ```
take_while<F>(self, f: F) -> TakeWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,482 fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
483 where
484 F: FnMut(&Self::Item) -> bool,
485 Self: Sized,
486 {
487 TakeWhile::new(self, f)
488 }
489
490 /// Creates a new stream that will skip the `n` first items of the
491 /// underlying stream.
492 ///
493 /// # Examples
494 ///
495 /// ```
496 /// # #[tokio::main]
497 /// # async fn main() {
498 /// use tokio_stream::{self as stream, StreamExt};
499 ///
500 /// let mut stream = stream::iter(1..=10).skip(7);
501 ///
502 /// assert_eq!(Some(8), stream.next().await);
503 /// assert_eq!(Some(9), stream.next().await);
504 /// assert_eq!(Some(10), stream.next().await);
505 /// assert_eq!(None, stream.next().await);
506 /// # }
507 /// ```
skip(self, n: usize) -> Skip<Self> where Self: Sized,508 fn skip(self, n: usize) -> Skip<Self>
509 where
510 Self: Sized,
511 {
512 Skip::new(self, n)
513 }
514
515 /// Skip elements from the underlying stream while the provided predicate
516 /// resolves to `true`.
517 ///
518 /// This function, like [`Iterator::skip_while`], will ignore elemets from the
519 /// stream until the predicate `f` resolves to `false`. Once one element
520 /// returns false, the rest of the elements will be yielded.
521 ///
522 /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
523 ///
524 /// # Examples
525 ///
526 /// ```
527 /// # #[tokio::main]
528 /// # async fn main() {
529 /// use tokio_stream::{self as stream, StreamExt};
530 /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
531 ///
532 /// assert_eq!(Some(3), stream.next().await);
533 /// assert_eq!(Some(4), stream.next().await);
534 /// assert_eq!(Some(1), stream.next().await);
535 /// assert_eq!(None, stream.next().await);
536 /// # }
537 /// ```
skip_while<F>(self, f: F) -> SkipWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,538 fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
539 where
540 F: FnMut(&Self::Item) -> bool,
541 Self: Sized,
542 {
543 SkipWhile::new(self, f)
544 }
545
546 /// Tests if every element of the stream matches a predicate.
547 ///
548 /// Equivalent to:
549 ///
550 /// ```ignore
551 /// async fn all<F>(&mut self, f: F) -> bool;
552 /// ```
553 ///
554 /// `all()` takes a closure that returns `true` or `false`. It applies
555 /// this closure to each element of the stream, and if they all return
556 /// `true`, then so does `all`. If any of them return `false`, it
557 /// returns `false`. An empty stream returns `true`.
558 ///
559 /// `all()` is short-circuiting; in other words, it will stop processing
560 /// as soon as it finds a `false`, given that no matter what else happens,
561 /// the result will also be `false`.
562 ///
563 /// An empty stream returns `true`.
564 ///
565 /// # Examples
566 ///
567 /// Basic usage:
568 ///
569 /// ```
570 /// # #[tokio::main]
571 /// # async fn main() {
572 /// use tokio_stream::{self as stream, StreamExt};
573 ///
574 /// let a = [1, 2, 3];
575 ///
576 /// assert!(stream::iter(&a).all(|&x| x > 0).await);
577 ///
578 /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
579 /// # }
580 /// ```
581 ///
582 /// Stopping at the first `false`:
583 ///
584 /// ```
585 /// # #[tokio::main]
586 /// # async fn main() {
587 /// use tokio_stream::{self as stream, StreamExt};
588 ///
589 /// let a = [1, 2, 3];
590 ///
591 /// let mut iter = stream::iter(&a);
592 ///
593 /// assert!(!iter.all(|&x| x != 2).await);
594 ///
595 /// // we can still use `iter`, as there are more elements.
596 /// assert_eq!(iter.next().await, Some(&3));
597 /// # }
598 /// ```
all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,599 fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
600 where
601 Self: Unpin,
602 F: FnMut(Self::Item) -> bool,
603 {
604 AllFuture::new(self, f)
605 }
606
607 /// Tests if any element of the stream matches a predicate.
608 ///
609 /// Equivalent to:
610 ///
611 /// ```ignore
612 /// async fn any<F>(&mut self, f: F) -> bool;
613 /// ```
614 ///
615 /// `any()` takes a closure that returns `true` or `false`. It applies
616 /// this closure to each element of the stream, and if any of them return
617 /// `true`, then so does `any()`. If they all return `false`, it
618 /// returns `false`.
619 ///
620 /// `any()` is short-circuiting; in other words, it will stop processing
621 /// as soon as it finds a `true`, given that no matter what else happens,
622 /// the result will also be `true`.
623 ///
624 /// An empty stream returns `false`.
625 ///
626 /// Basic usage:
627 ///
628 /// ```
629 /// # #[tokio::main]
630 /// # async fn main() {
631 /// use tokio_stream::{self as stream, StreamExt};
632 ///
633 /// let a = [1, 2, 3];
634 ///
635 /// assert!(stream::iter(&a).any(|&x| x > 0).await);
636 ///
637 /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
638 /// # }
639 /// ```
640 ///
641 /// Stopping at the first `true`:
642 ///
643 /// ```
644 /// # #[tokio::main]
645 /// # async fn main() {
646 /// use tokio_stream::{self as stream, StreamExt};
647 ///
648 /// let a = [1, 2, 3];
649 ///
650 /// let mut iter = stream::iter(&a);
651 ///
652 /// assert!(iter.any(|&x| x != 2).await);
653 ///
654 /// // we can still use `iter`, as there are more elements.
655 /// assert_eq!(iter.next().await, Some(&2));
656 /// # }
657 /// ```
any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,658 fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
659 where
660 Self: Unpin,
661 F: FnMut(Self::Item) -> bool,
662 {
663 AnyFuture::new(self, f)
664 }
665
666 /// Combine two streams into one by first returning all values from the
667 /// first stream then all values from the second stream.
668 ///
669 /// As long as `self` still has values to emit, no values from `other` are
670 /// emitted, even if some are ready.
671 ///
672 /// # Examples
673 ///
674 /// ```
675 /// use tokio_stream::{self as stream, StreamExt};
676 ///
677 /// #[tokio::main]
678 /// async fn main() {
679 /// let one = stream::iter(vec![1, 2, 3]);
680 /// let two = stream::iter(vec![4, 5, 6]);
681 ///
682 /// let mut stream = one.chain(two);
683 ///
684 /// assert_eq!(stream.next().await, Some(1));
685 /// assert_eq!(stream.next().await, Some(2));
686 /// assert_eq!(stream.next().await, Some(3));
687 /// assert_eq!(stream.next().await, Some(4));
688 /// assert_eq!(stream.next().await, Some(5));
689 /// assert_eq!(stream.next().await, Some(6));
690 /// assert_eq!(stream.next().await, None);
691 /// }
692 /// ```
chain<U>(self, other: U) -> Chain<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,693 fn chain<U>(self, other: U) -> Chain<Self, U>
694 where
695 U: Stream<Item = Self::Item>,
696 Self: Sized,
697 {
698 Chain::new(self, other)
699 }
700
701 /// A combinator that applies a function to every element in a stream
702 /// producing a single, final value.
703 ///
704 /// Equivalent to:
705 ///
706 /// ```ignore
707 /// async fn fold<B, F>(self, init: B, f: F) -> B;
708 /// ```
709 ///
710 /// # Examples
711 /// Basic usage:
712 /// ```
713 /// # #[tokio::main]
714 /// # async fn main() {
715 /// use tokio_stream::{self as stream, *};
716 ///
717 /// let s = stream::iter(vec![1u8, 2, 3]);
718 /// let sum = s.fold(0, |acc, x| acc + x).await;
719 ///
720 /// assert_eq!(sum, 6);
721 /// # }
722 /// ```
fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> where Self: Sized, F: FnMut(B, Self::Item) -> B,723 fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
724 where
725 Self: Sized,
726 F: FnMut(B, Self::Item) -> B,
727 {
728 FoldFuture::new(self, init, f)
729 }
730
731 /// Drain stream pushing all emitted values into a collection.
732 ///
733 /// Equivalent to:
734 ///
735 /// ```ignore
736 /// async fn collect<T>(self) -> T;
737 /// ```
738 ///
739 /// `collect` streams all values, awaiting as needed. Values are pushed into
740 /// a collection. A number of different target collection types are
741 /// supported, including [`Vec`](std::vec::Vec),
742 /// [`String`](std::string::String), and [`Bytes`].
743 ///
744 /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html
745 ///
746 /// # `Result`
747 ///
748 /// `collect()` can also be used with streams of type `Result<T, E>` where
749 /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
750 /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
751 /// streaming is terminated and `collect()` returns the `Err`.
752 ///
753 /// # Notes
754 ///
755 /// `FromStream` is currently a sealed trait. Stabilization is pending
756 /// enhancements to the Rust language.
757 ///
758 /// # Examples
759 ///
760 /// Basic usage:
761 ///
762 /// ```
763 /// use tokio_stream::{self as stream, StreamExt};
764 ///
765 /// #[tokio::main]
766 /// async fn main() {
767 /// let doubled: Vec<i32> =
768 /// stream::iter(vec![1, 2, 3])
769 /// .map(|x| x * 2)
770 /// .collect()
771 /// .await;
772 ///
773 /// assert_eq!(vec![2, 4, 6], doubled);
774 /// }
775 /// ```
776 ///
777 /// Collecting a stream of `Result` values
778 ///
779 /// ```
780 /// use tokio_stream::{self as stream, StreamExt};
781 ///
782 /// #[tokio::main]
783 /// async fn main() {
784 /// // A stream containing only `Ok` values will be collected
785 /// let values: Result<Vec<i32>, &str> =
786 /// stream::iter(vec![Ok(1), Ok(2), Ok(3)])
787 /// .collect()
788 /// .await;
789 ///
790 /// assert_eq!(Ok(vec![1, 2, 3]), values);
791 ///
792 /// // A stream containing `Err` values will return the first error.
793 /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
794 ///
795 /// let values: Result<Vec<i32>, &str> =
796 /// stream::iter(results)
797 /// .collect()
798 /// .await;
799 ///
800 /// assert_eq!(Err("no"), values);
801 /// }
802 /// ```
collect<T>(self) -> Collect<Self, T> where T: FromStream<Self::Item>, Self: Sized,803 fn collect<T>(self) -> Collect<Self, T>
804 where
805 T: FromStream<Self::Item>,
806 Self: Sized,
807 {
808 Collect::new(self)
809 }
810
811 /// Applies a per-item timeout to the passed stream.
812 ///
813 /// `timeout()` takes a `Duration` that represents the maximum amount of
814 /// time each element of the stream has to complete before timing out.
815 ///
816 /// If the wrapped stream yields a value before the deadline is reached, the
817 /// value is returned. Otherwise, an error is returned. The caller may decide
818 /// to continue consuming the stream and will eventually get the next source
819 /// stream value once it becomes available.
820 ///
821 /// # Notes
822 ///
823 /// This function consumes the stream passed into it and returns a
824 /// wrapped version of it.
825 ///
826 /// Polling the returned stream will continue to poll the inner stream even
827 /// if one or more items time out.
828 ///
829 /// # Examples
830 ///
831 /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
832 ///
833 /// ```
834 /// # #[tokio::main]
835 /// # async fn main() {
836 /// use tokio_stream::{self as stream, StreamExt};
837 /// use std::time::Duration;
838 /// # let int_stream = stream::iter(1..=3);
839 ///
840 /// let int_stream = int_stream.timeout(Duration::from_secs(1));
841 /// tokio::pin!(int_stream);
842 ///
843 /// // When no items time out, we get the 3 elements in succession:
844 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
845 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
846 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
847 /// assert_eq!(int_stream.try_next().await, Ok(None));
848 ///
849 /// // If the second item times out, we get an error and continue polling the stream:
850 /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
851 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
852 /// assert!(int_stream.try_next().await.is_err());
853 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
854 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
855 /// assert_eq!(int_stream.try_next().await, Ok(None));
856 ///
857 /// // If we want to stop consuming the source stream the first time an
858 /// // element times out, we can use the `take_while` operator:
859 /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
860 /// let mut int_stream = int_stream.take_while(Result::is_ok);
861 ///
862 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
863 /// assert_eq!(int_stream.try_next().await, Ok(None));
864 /// # }
865 /// ```
866 #[cfg(all(feature = "time"))]
867 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
timeout(self, duration: Duration) -> Timeout<Self> where Self: Sized,868 fn timeout(self, duration: Duration) -> Timeout<Self>
869 where
870 Self: Sized,
871 {
872 Timeout::new(self, duration)
873 }
874
875 /// Slows down a stream by enforcing a delay between items.
876 ///
877 /// # Example
878 ///
879 /// Create a throttled stream.
880 /// ```rust,no_run
881 /// use std::time::Duration;
882 /// use tokio_stream::StreamExt;
883 ///
884 /// # async fn dox() {
885 /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
886 /// tokio::pin!(item_stream);
887 ///
888 /// loop {
889 /// // The string will be produced at most every 2 seconds
890 /// println!("{:?}", item_stream.next().await);
891 /// }
892 /// # }
893 /// ```
894 #[cfg(all(feature = "time"))]
895 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
throttle(self, duration: Duration) -> Throttle<Self> where Self: Sized,896 fn throttle(self, duration: Duration) -> Throttle<Self>
897 where
898 Self: Sized,
899 {
900 throttle(duration, self)
901 }
902 }
903
904 impl<St: ?Sized> StreamExt for St where St: Stream {}
905
906 /// Merge the size hints from two streams.
merge_size_hints( (left_low, left_high): (usize, Option<usize>), (right_low, right_hign): (usize, Option<usize>), ) -> (usize, Option<usize>)907 fn merge_size_hints(
908 (left_low, left_high): (usize, Option<usize>),
909 (right_low, right_hign): (usize, Option<usize>),
910 ) -> (usize, Option<usize>) {
911 let low = left_low.saturating_add(right_low);
912 let high = match (left_high, right_hign) {
913 (Some(h1), Some(h2)) => h1.checked_add(h2),
914 _ => None,
915 };
916 (low, high)
917 }
918