1 use core::future::Future;
2 use futures_core::Stream;
3
4 mod all;
5 use all::AllFuture;
6
7 mod any;
8 use any::AnyFuture;
9
10 mod chain;
11 use chain::Chain;
12
13 pub(crate) mod collect;
14 use collect::{Collect, FromStream};
15
16 mod filter;
17 use filter::Filter;
18
19 mod filter_map;
20 use filter_map::FilterMap;
21
22 mod fold;
23 use fold::FoldFuture;
24
25 mod fuse;
26 use fuse::Fuse;
27
28 mod map;
29 use map::Map;
30
31 mod map_while;
32 use map_while::MapWhile;
33
34 mod merge;
35 use merge::Merge;
36
37 mod next;
38 use next::Next;
39
40 mod skip;
41 use skip::Skip;
42
43 mod skip_while;
44 use skip_while::SkipWhile;
45
46 mod take;
47 use take::Take;
48
49 mod take_while;
50 use take_while::TakeWhile;
51
52 mod then;
53 use then::Then;
54
55 mod try_next;
56 use try_next::TryNext;
57
58 cfg_time! {
59 pub(crate) mod timeout;
60 pub(crate) mod timeout_repeating;
61 use timeout::Timeout;
62 use timeout_repeating::TimeoutRepeating;
63 use tokio::time::{Duration, Interval};
64 mod throttle;
65 use throttle::{throttle, Throttle};
66 mod chunks_timeout;
67 use chunks_timeout::ChunksTimeout;
68 }
69
70 /// An extension trait for the [`Stream`] trait that provides a variety of
71 /// convenient combinator functions.
72 ///
73 /// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
74 /// in the [futures] crate, however both Tokio and futures provide separate
75 /// `StreamExt` utility traits, and some utilities are only available on one of
76 /// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
77 /// trait in the futures crate.
78 ///
79 /// If you need utilities from both `StreamExt` traits, you should prefer to
80 /// import one of them, and use the other through the fully qualified call
81 /// syntax. For example:
82 /// ```
83 /// // import one of the traits:
84 /// use futures::stream::StreamExt;
85 /// # #[tokio::main(flavor = "current_thread")]
86 /// # async fn main() {
87 ///
88 /// let a = tokio_stream::iter(vec![1, 3, 5]);
89 /// let b = tokio_stream::iter(vec![2, 4, 6]);
90 ///
91 /// // use the fully qualified call syntax for the other trait:
92 /// let merged = tokio_stream::StreamExt::merge(a, b);
93 ///
94 /// // use normal call notation for futures::stream::StreamExt::collect
95 /// let output: Vec<_> = merged.collect().await;
96 /// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
97 /// # }
98 /// ```
99 ///
100 /// [`Stream`]: crate::Stream
101 /// [futures]: https://docs.rs/futures
102 /// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
103 pub trait StreamExt: Stream {
104 /// Consumes and returns the next value in the stream or `None` if the
105 /// stream is finished.
106 ///
107 /// Equivalent to:
108 ///
109 /// ```ignore
110 /// async fn next(&mut self) -> Option<Self::Item>;
111 /// ```
112 ///
113 /// Note that because `next` doesn't take ownership over the stream,
114 /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
115 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
116 /// be done by boxing the stream using [`Box::pin`] or
117 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
118 /// crate.
119 ///
120 /// # Cancel safety
121 ///
122 /// This method is cancel safe. The returned future only
123 /// holds onto a reference to the underlying stream,
124 /// so dropping it will never lose a value.
125 ///
126 /// # Examples
127 ///
128 /// ```
129 /// # #[tokio::main]
130 /// # async fn main() {
131 /// use tokio_stream::{self as stream, StreamExt};
132 ///
133 /// let mut stream = stream::iter(1..=3);
134 ///
135 /// assert_eq!(stream.next().await, Some(1));
136 /// assert_eq!(stream.next().await, Some(2));
137 /// assert_eq!(stream.next().await, Some(3));
138 /// assert_eq!(stream.next().await, None);
139 /// # }
140 /// ```
next(&mut self) -> Next<'_, Self> where Self: Unpin,141 fn next(&mut self) -> Next<'_, Self>
142 where
143 Self: Unpin,
144 {
145 Next::new(self)
146 }
147
148 /// Consumes and returns the next item in the stream. If an error is
149 /// encountered before the next item, the error is returned instead.
150 ///
151 /// Equivalent to:
152 ///
153 /// ```ignore
154 /// async fn try_next(&mut self) -> Result<Option<T>, E>;
155 /// ```
156 ///
157 /// This is similar to the [`next`](StreamExt::next) combinator,
158 /// but returns a [`Result<Option<T>, E>`](Result) rather than
159 /// an [`Option<Result<T, E>>`](Option), making for easy use
160 /// with the [`?`](std::ops::Try) operator.
161 ///
162 /// # Cancel safety
163 ///
164 /// This method is cancel safe. The returned future only
165 /// holds onto a reference to the underlying stream,
166 /// so dropping it will never lose a value.
167 ///
168 /// # Examples
169 ///
170 /// ```
171 /// # #[tokio::main]
172 /// # async fn main() {
173 /// use tokio_stream::{self as stream, StreamExt};
174 ///
175 /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
176 ///
177 /// assert_eq!(stream.try_next().await, Ok(Some(1)));
178 /// assert_eq!(stream.try_next().await, Ok(Some(2)));
179 /// assert_eq!(stream.try_next().await, Err("nope"));
180 /// # }
181 /// ```
try_next<T, E>(&mut self) -> TryNext<'_, Self> where Self: Stream<Item = Result<T, E>> + Unpin,182 fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
183 where
184 Self: Stream<Item = Result<T, E>> + Unpin,
185 {
186 TryNext::new(self)
187 }
188
189 /// Maps this stream's items to a different type, returning a new stream of
190 /// the resulting type.
191 ///
192 /// The provided closure is executed over all elements of this stream as
193 /// they are made available. It is executed inline with calls to
194 /// [`poll_next`](Stream::poll_next).
195 ///
196 /// Note that this function consumes the stream passed into it and returns a
197 /// wrapped version of it, similar to the existing `map` methods in the
198 /// standard library.
199 ///
200 /// # Examples
201 ///
202 /// ```
203 /// # #[tokio::main]
204 /// # async fn main() {
205 /// use tokio_stream::{self as stream, StreamExt};
206 ///
207 /// let stream = stream::iter(1..=3);
208 /// let mut stream = stream.map(|x| x + 3);
209 ///
210 /// assert_eq!(stream.next().await, Some(4));
211 /// assert_eq!(stream.next().await, Some(5));
212 /// assert_eq!(stream.next().await, Some(6));
213 /// # }
214 /// ```
map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,215 fn map<T, F>(self, f: F) -> Map<Self, F>
216 where
217 F: FnMut(Self::Item) -> T,
218 Self: Sized,
219 {
220 Map::new(self, f)
221 }
222
223 /// Map this stream's items to a different type for as long as determined by
224 /// the provided closure. A stream of the target type will be returned,
225 /// which will yield elements until the closure returns `None`.
226 ///
227 /// The provided closure is executed over all elements of this stream as
228 /// they are made available, until it returns `None`. It is executed inline
229 /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
230 /// the underlying stream will not be polled again.
231 ///
232 /// Note that this function consumes the stream passed into it and returns a
233 /// wrapped version of it, similar to the [`Iterator::map_while`] method in the
234 /// standard library.
235 ///
236 /// # Examples
237 ///
238 /// ```
239 /// # #[tokio::main]
240 /// # async fn main() {
241 /// use tokio_stream::{self as stream, StreamExt};
242 ///
243 /// let stream = stream::iter(1..=10);
244 /// let mut stream = stream.map_while(|x| {
245 /// if x < 4 {
246 /// Some(x + 3)
247 /// } else {
248 /// None
249 /// }
250 /// });
251 /// assert_eq!(stream.next().await, Some(4));
252 /// assert_eq!(stream.next().await, Some(5));
253 /// assert_eq!(stream.next().await, Some(6));
254 /// assert_eq!(stream.next().await, None);
255 /// # }
256 /// ```
map_while<T, F>(self, f: F) -> MapWhile<Self, F> where F: FnMut(Self::Item) -> Option<T>, Self: Sized,257 fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
258 where
259 F: FnMut(Self::Item) -> Option<T>,
260 Self: Sized,
261 {
262 MapWhile::new(self, f)
263 }
264
265 /// Maps this stream's items asynchronously to a different type, returning a
266 /// new stream of the resulting type.
267 ///
268 /// The provided closure is executed over all elements of this stream as
269 /// they are made available, and the returned future is executed. Only one
270 /// future is executed at the time.
271 ///
272 /// Note that this function consumes the stream passed into it and returns a
273 /// wrapped version of it, similar to the existing `then` methods in the
274 /// standard library.
275 ///
276 /// Be aware that if the future is not `Unpin`, then neither is the `Stream`
277 /// returned by this method. To handle this, you can use `tokio::pin!` as in
278 /// the example below or put the stream in a `Box` with `Box::pin(stream)`.
279 ///
280 /// # Examples
281 ///
282 /// ```
283 /// # #[tokio::main]
284 /// # async fn main() {
285 /// use tokio_stream::{self as stream, StreamExt};
286 ///
287 /// async fn do_async_work(value: i32) -> i32 {
288 /// value + 3
289 /// }
290 ///
291 /// let stream = stream::iter(1..=3);
292 /// let stream = stream.then(do_async_work);
293 ///
294 /// tokio::pin!(stream);
295 ///
296 /// assert_eq!(stream.next().await, Some(4));
297 /// assert_eq!(stream.next().await, Some(5));
298 /// assert_eq!(stream.next().await, Some(6));
299 /// # }
300 /// ```
then<F, Fut>(self, f: F) -> Then<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized,301 fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
302 where
303 F: FnMut(Self::Item) -> Fut,
304 Fut: Future,
305 Self: Sized,
306 {
307 Then::new(self, f)
308 }
309
310 /// Combine two streams into one by interleaving the output of both as it
311 /// is produced.
312 ///
313 /// Values are produced from the merged stream in the order they arrive from
314 /// the two source streams. If both source streams provide values
315 /// simultaneously, the merge stream alternates between them. This provides
316 /// some level of fairness. You should not chain calls to `merge`, as this
317 /// will break the fairness of the merging.
318 ///
319 /// The merged stream completes once **both** source streams complete. When
320 /// one source stream completes before the other, the merge stream
321 /// exclusively polls the remaining stream.
322 ///
323 /// For merging multiple streams, consider using [`StreamMap`] instead.
324 ///
325 /// [`StreamMap`]: crate::StreamMap
326 ///
327 /// # Examples
328 ///
329 /// ```
330 /// use tokio_stream::{StreamExt, Stream};
331 /// use tokio::sync::mpsc;
332 /// use tokio::time;
333 ///
334 /// use std::time::Duration;
335 /// use std::pin::Pin;
336 ///
337 /// # /*
338 /// #[tokio::main]
339 /// # */
340 /// # #[tokio::main(flavor = "current_thread")]
341 /// async fn main() {
342 /// # time::pause();
343 /// let (tx1, mut rx1) = mpsc::channel::<usize>(10);
344 /// let (tx2, mut rx2) = mpsc::channel::<usize>(10);
345 ///
346 /// // Convert the channels to a `Stream`.
347 /// let rx1 = Box::pin(async_stream::stream! {
348 /// while let Some(item) = rx1.recv().await {
349 /// yield item;
350 /// }
351 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
352 ///
353 /// let rx2 = Box::pin(async_stream::stream! {
354 /// while let Some(item) = rx2.recv().await {
355 /// yield item;
356 /// }
357 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
358 ///
359 /// let mut rx = rx1.merge(rx2);
360 ///
361 /// tokio::spawn(async move {
362 /// // Send some values immediately
363 /// tx1.send(1).await.unwrap();
364 /// tx1.send(2).await.unwrap();
365 ///
366 /// // Let the other task send values
367 /// time::sleep(Duration::from_millis(20)).await;
368 ///
369 /// tx1.send(4).await.unwrap();
370 /// });
371 ///
372 /// tokio::spawn(async move {
373 /// // Wait for the first task to send values
374 /// time::sleep(Duration::from_millis(5)).await;
375 ///
376 /// tx2.send(3).await.unwrap();
377 ///
378 /// time::sleep(Duration::from_millis(25)).await;
379 ///
380 /// // Send the final value
381 /// tx2.send(5).await.unwrap();
382 /// });
383 ///
384 /// assert_eq!(1, rx.next().await.unwrap());
385 /// assert_eq!(2, rx.next().await.unwrap());
386 /// assert_eq!(3, rx.next().await.unwrap());
387 /// assert_eq!(4, rx.next().await.unwrap());
388 /// assert_eq!(5, rx.next().await.unwrap());
389 ///
390 /// // The merged stream is consumed
391 /// assert!(rx.next().await.is_none());
392 /// }
393 /// ```
merge<U>(self, other: U) -> Merge<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,394 fn merge<U>(self, other: U) -> Merge<Self, U>
395 where
396 U: Stream<Item = Self::Item>,
397 Self: Sized,
398 {
399 Merge::new(self, other)
400 }
401
402 /// Filters the values produced by this stream according to the provided
403 /// predicate.
404 ///
405 /// As values of this stream are made available, the provided predicate `f`
406 /// will be run against them. If the predicate
407 /// resolves to `true`, then the stream will yield the value, but if the
408 /// predicate resolves to `false`, then the value
409 /// will be discarded and the next value will be produced.
410 ///
411 /// Note that this function consumes the stream passed into it and returns a
412 /// wrapped version of it, similar to [`Iterator::filter`] method in the
413 /// standard library.
414 ///
415 /// # Examples
416 ///
417 /// ```
418 /// # #[tokio::main]
419 /// # async fn main() {
420 /// use tokio_stream::{self as stream, StreamExt};
421 ///
422 /// let stream = stream::iter(1..=8);
423 /// let mut evens = stream.filter(|x| x % 2 == 0);
424 ///
425 /// assert_eq!(Some(2), evens.next().await);
426 /// assert_eq!(Some(4), evens.next().await);
427 /// assert_eq!(Some(6), evens.next().await);
428 /// assert_eq!(Some(8), evens.next().await);
429 /// assert_eq!(None, evens.next().await);
430 /// # }
431 /// ```
filter<F>(self, f: F) -> Filter<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,432 fn filter<F>(self, f: F) -> Filter<Self, F>
433 where
434 F: FnMut(&Self::Item) -> bool,
435 Self: Sized,
436 {
437 Filter::new(self, f)
438 }
439
440 /// Filters the values produced by this stream while simultaneously mapping
441 /// them to a different type according to the provided closure.
442 ///
443 /// As values of this stream are made available, the provided function will
444 /// be run on them. If the predicate `f` resolves to
445 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
446 /// it resolves to [`None`], then the value will be skipped.
447 ///
448 /// Note that this function consumes the stream passed into it and returns a
449 /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
450 /// standard library.
451 ///
452 /// # Examples
453 /// ```
454 /// # #[tokio::main]
455 /// # async fn main() {
456 /// use tokio_stream::{self as stream, StreamExt};
457 ///
458 /// let stream = stream::iter(1..=8);
459 /// let mut evens = stream.filter_map(|x| {
460 /// if x % 2 == 0 { Some(x + 1) } else { None }
461 /// });
462 ///
463 /// assert_eq!(Some(3), evens.next().await);
464 /// assert_eq!(Some(5), evens.next().await);
465 /// assert_eq!(Some(7), evens.next().await);
466 /// assert_eq!(Some(9), evens.next().await);
467 /// assert_eq!(None, evens.next().await);
468 /// # }
469 /// ```
filter_map<T, F>(self, f: F) -> FilterMap<Self, F> where F: FnMut(Self::Item) -> Option<T>, Self: Sized,470 fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
471 where
472 F: FnMut(Self::Item) -> Option<T>,
473 Self: Sized,
474 {
475 FilterMap::new(self, f)
476 }
477
478 /// Creates a stream which ends after the first `None`.
479 ///
480 /// After a stream returns `None`, behavior is undefined. Future calls to
481 /// `poll_next` may or may not return `Some(T)` again or they may panic.
482 /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
483 /// return `None` forever.
484 ///
485 /// # Examples
486 ///
487 /// ```
488 /// use tokio_stream::{Stream, StreamExt};
489 ///
490 /// use std::pin::Pin;
491 /// use std::task::{Context, Poll};
492 ///
493 /// // a stream which alternates between Some and None
494 /// struct Alternate {
495 /// state: i32,
496 /// }
497 ///
498 /// impl Stream for Alternate {
499 /// type Item = i32;
500 ///
501 /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
502 /// let val = self.state;
503 /// self.state = self.state + 1;
504 ///
505 /// // if it's even, Some(i32), else None
506 /// if val % 2 == 0 {
507 /// Poll::Ready(Some(val))
508 /// } else {
509 /// Poll::Ready(None)
510 /// }
511 /// }
512 /// }
513 ///
514 /// #[tokio::main]
515 /// async fn main() {
516 /// let mut stream = Alternate { state: 0 };
517 ///
518 /// // the stream goes back and forth
519 /// assert_eq!(stream.next().await, Some(0));
520 /// assert_eq!(stream.next().await, None);
521 /// assert_eq!(stream.next().await, Some(2));
522 /// assert_eq!(stream.next().await, None);
523 ///
524 /// // however, once it is fused
525 /// let mut stream = stream.fuse();
526 ///
527 /// assert_eq!(stream.next().await, Some(4));
528 /// assert_eq!(stream.next().await, None);
529 ///
530 /// // it will always return `None` after the first time.
531 /// assert_eq!(stream.next().await, None);
532 /// assert_eq!(stream.next().await, None);
533 /// assert_eq!(stream.next().await, None);
534 /// }
535 /// ```
fuse(self) -> Fuse<Self> where Self: Sized,536 fn fuse(self) -> Fuse<Self>
537 where
538 Self: Sized,
539 {
540 Fuse::new(self)
541 }
542
543 /// Creates a new stream of at most `n` items of the underlying stream.
544 ///
545 /// Once `n` items have been yielded from this stream then it will always
546 /// return that the stream is done.
547 ///
548 /// # Examples
549 ///
550 /// ```
551 /// # #[tokio::main]
552 /// # async fn main() {
553 /// use tokio_stream::{self as stream, StreamExt};
554 ///
555 /// let mut stream = stream::iter(1..=10).take(3);
556 ///
557 /// assert_eq!(Some(1), stream.next().await);
558 /// assert_eq!(Some(2), stream.next().await);
559 /// assert_eq!(Some(3), stream.next().await);
560 /// assert_eq!(None, stream.next().await);
561 /// # }
562 /// ```
take(self, n: usize) -> Take<Self> where Self: Sized,563 fn take(self, n: usize) -> Take<Self>
564 where
565 Self: Sized,
566 {
567 Take::new(self, n)
568 }
569
570 /// Take elements from this stream while the provided predicate
571 /// resolves to `true`.
572 ///
573 /// This function, like `Iterator::take_while`, will take elements from the
574 /// stream until the predicate `f` resolves to `false`. Once one element
575 /// returns false it will always return that the stream is done.
576 ///
577 /// # Examples
578 ///
579 /// ```
580 /// # #[tokio::main]
581 /// # async fn main() {
582 /// use tokio_stream::{self as stream, StreamExt};
583 ///
584 /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
585 ///
586 /// assert_eq!(Some(1), stream.next().await);
587 /// assert_eq!(Some(2), stream.next().await);
588 /// assert_eq!(Some(3), stream.next().await);
589 /// assert_eq!(None, stream.next().await);
590 /// # }
591 /// ```
take_while<F>(self, f: F) -> TakeWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,592 fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
593 where
594 F: FnMut(&Self::Item) -> bool,
595 Self: Sized,
596 {
597 TakeWhile::new(self, f)
598 }
599
600 /// Creates a new stream that will skip the `n` first items of the
601 /// underlying stream.
602 ///
603 /// # Examples
604 ///
605 /// ```
606 /// # #[tokio::main]
607 /// # async fn main() {
608 /// use tokio_stream::{self as stream, StreamExt};
609 ///
610 /// let mut stream = stream::iter(1..=10).skip(7);
611 ///
612 /// assert_eq!(Some(8), stream.next().await);
613 /// assert_eq!(Some(9), stream.next().await);
614 /// assert_eq!(Some(10), stream.next().await);
615 /// assert_eq!(None, stream.next().await);
616 /// # }
617 /// ```
skip(self, n: usize) -> Skip<Self> where Self: Sized,618 fn skip(self, n: usize) -> Skip<Self>
619 where
620 Self: Sized,
621 {
622 Skip::new(self, n)
623 }
624
625 /// Skip elements from the underlying stream while the provided predicate
626 /// resolves to `true`.
627 ///
628 /// This function, like [`Iterator::skip_while`], will ignore elements from the
629 /// stream until the predicate `f` resolves to `false`. Once one element
630 /// returns false, the rest of the elements will be yielded.
631 ///
632 /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
633 ///
634 /// # Examples
635 ///
636 /// ```
637 /// # #[tokio::main]
638 /// # async fn main() {
639 /// use tokio_stream::{self as stream, StreamExt};
640 /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
641 ///
642 /// assert_eq!(Some(3), stream.next().await);
643 /// assert_eq!(Some(4), stream.next().await);
644 /// assert_eq!(Some(1), stream.next().await);
645 /// assert_eq!(None, stream.next().await);
646 /// # }
647 /// ```
skip_while<F>(self, f: F) -> SkipWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,648 fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
649 where
650 F: FnMut(&Self::Item) -> bool,
651 Self: Sized,
652 {
653 SkipWhile::new(self, f)
654 }
655
656 /// Tests if every element of the stream matches a predicate.
657 ///
658 /// Equivalent to:
659 ///
660 /// ```ignore
661 /// async fn all<F>(&mut self, f: F) -> bool;
662 /// ```
663 ///
664 /// `all()` takes a closure that returns `true` or `false`. It applies
665 /// this closure to each element of the stream, and if they all return
666 /// `true`, then so does `all`. If any of them return `false`, it
667 /// returns `false`. An empty stream returns `true`.
668 ///
669 /// `all()` is short-circuiting; in other words, it will stop processing
670 /// as soon as it finds a `false`, given that no matter what else happens,
671 /// the result will also be `false`.
672 ///
673 /// An empty stream returns `true`.
674 ///
675 /// # Examples
676 ///
677 /// Basic usage:
678 ///
679 /// ```
680 /// # #[tokio::main]
681 /// # async fn main() {
682 /// use tokio_stream::{self as stream, StreamExt};
683 ///
684 /// let a = [1, 2, 3];
685 ///
686 /// assert!(stream::iter(&a).all(|&x| x > 0).await);
687 ///
688 /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
689 /// # }
690 /// ```
691 ///
692 /// Stopping at the first `false`:
693 ///
694 /// ```
695 /// # #[tokio::main]
696 /// # async fn main() {
697 /// use tokio_stream::{self as stream, StreamExt};
698 ///
699 /// let a = [1, 2, 3];
700 ///
701 /// let mut iter = stream::iter(&a);
702 ///
703 /// assert!(!iter.all(|&x| x != 2).await);
704 ///
705 /// // we can still use `iter`, as there are more elements.
706 /// assert_eq!(iter.next().await, Some(&3));
707 /// # }
708 /// ```
all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,709 fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
710 where
711 Self: Unpin,
712 F: FnMut(Self::Item) -> bool,
713 {
714 AllFuture::new(self, f)
715 }
716
717 /// Tests if any element of the stream matches a predicate.
718 ///
719 /// Equivalent to:
720 ///
721 /// ```ignore
722 /// async fn any<F>(&mut self, f: F) -> bool;
723 /// ```
724 ///
725 /// `any()` takes a closure that returns `true` or `false`. It applies
726 /// this closure to each element of the stream, and if any of them return
727 /// `true`, then so does `any()`. If they all return `false`, it
728 /// returns `false`.
729 ///
730 /// `any()` is short-circuiting; in other words, it will stop processing
731 /// as soon as it finds a `true`, given that no matter what else happens,
732 /// the result will also be `true`.
733 ///
734 /// An empty stream returns `false`.
735 ///
736 /// Basic usage:
737 ///
738 /// ```
739 /// # #[tokio::main]
740 /// # async fn main() {
741 /// use tokio_stream::{self as stream, StreamExt};
742 ///
743 /// let a = [1, 2, 3];
744 ///
745 /// assert!(stream::iter(&a).any(|&x| x > 0).await);
746 ///
747 /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
748 /// # }
749 /// ```
750 ///
751 /// Stopping at the first `true`:
752 ///
753 /// ```
754 /// # #[tokio::main]
755 /// # async fn main() {
756 /// use tokio_stream::{self as stream, StreamExt};
757 ///
758 /// let a = [1, 2, 3];
759 ///
760 /// let mut iter = stream::iter(&a);
761 ///
762 /// assert!(iter.any(|&x| x != 2).await);
763 ///
764 /// // we can still use `iter`, as there are more elements.
765 /// assert_eq!(iter.next().await, Some(&2));
766 /// # }
767 /// ```
any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,768 fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
769 where
770 Self: Unpin,
771 F: FnMut(Self::Item) -> bool,
772 {
773 AnyFuture::new(self, f)
774 }
775
776 /// Combine two streams into one by first returning all values from the
777 /// first stream then all values from the second stream.
778 ///
779 /// As long as `self` still has values to emit, no values from `other` are
780 /// emitted, even if some are ready.
781 ///
782 /// # Examples
783 ///
784 /// ```
785 /// use tokio_stream::{self as stream, StreamExt};
786 ///
787 /// #[tokio::main]
788 /// async fn main() {
789 /// let one = stream::iter(vec![1, 2, 3]);
790 /// let two = stream::iter(vec![4, 5, 6]);
791 ///
792 /// let mut stream = one.chain(two);
793 ///
794 /// assert_eq!(stream.next().await, Some(1));
795 /// assert_eq!(stream.next().await, Some(2));
796 /// assert_eq!(stream.next().await, Some(3));
797 /// assert_eq!(stream.next().await, Some(4));
798 /// assert_eq!(stream.next().await, Some(5));
799 /// assert_eq!(stream.next().await, Some(6));
800 /// assert_eq!(stream.next().await, None);
801 /// }
802 /// ```
chain<U>(self, other: U) -> Chain<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,803 fn chain<U>(self, other: U) -> Chain<Self, U>
804 where
805 U: Stream<Item = Self::Item>,
806 Self: Sized,
807 {
808 Chain::new(self, other)
809 }
810
811 /// A combinator that applies a function to every element in a stream
812 /// producing a single, final value.
813 ///
814 /// Equivalent to:
815 ///
816 /// ```ignore
817 /// async fn fold<B, F>(self, init: B, f: F) -> B;
818 /// ```
819 ///
820 /// # Examples
821 /// Basic usage:
822 /// ```
823 /// # #[tokio::main]
824 /// # async fn main() {
825 /// use tokio_stream::{self as stream, *};
826 ///
827 /// let s = stream::iter(vec![1u8, 2, 3]);
828 /// let sum = s.fold(0, |acc, x| acc + x).await;
829 ///
830 /// assert_eq!(sum, 6);
831 /// # }
832 /// ```
fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> where Self: Sized, F: FnMut(B, Self::Item) -> B,833 fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
834 where
835 Self: Sized,
836 F: FnMut(B, Self::Item) -> B,
837 {
838 FoldFuture::new(self, init, f)
839 }
840
841 /// Drain stream pushing all emitted values into a collection.
842 ///
843 /// Equivalent to:
844 ///
845 /// ```ignore
846 /// async fn collect<T>(self) -> T;
847 /// ```
848 ///
849 /// `collect` streams all values, awaiting as needed. Values are pushed into
850 /// a collection. A number of different target collection types are
851 /// supported, including [`Vec`](std::vec::Vec),
852 /// [`String`](std::string::String), and [`Bytes`].
853 ///
854 /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html
855 ///
856 /// # `Result`
857 ///
858 /// `collect()` can also be used with streams of type `Result<T, E>` where
859 /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
860 /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
861 /// streaming is terminated and `collect()` returns the `Err`.
862 ///
863 /// # Notes
864 ///
865 /// `FromStream` is currently a sealed trait. Stabilization is pending
866 /// enhancements to the Rust language.
867 ///
868 /// # Examples
869 ///
870 /// Basic usage:
871 ///
872 /// ```
873 /// use tokio_stream::{self as stream, StreamExt};
874 ///
875 /// #[tokio::main]
876 /// async fn main() {
877 /// let doubled: Vec<i32> =
878 /// stream::iter(vec![1, 2, 3])
879 /// .map(|x| x * 2)
880 /// .collect()
881 /// .await;
882 ///
883 /// assert_eq!(vec![2, 4, 6], doubled);
884 /// }
885 /// ```
886 ///
887 /// Collecting a stream of `Result` values
888 ///
889 /// ```
890 /// use tokio_stream::{self as stream, StreamExt};
891 ///
892 /// #[tokio::main]
893 /// async fn main() {
894 /// // A stream containing only `Ok` values will be collected
895 /// let values: Result<Vec<i32>, &str> =
896 /// stream::iter(vec![Ok(1), Ok(2), Ok(3)])
897 /// .collect()
898 /// .await;
899 ///
900 /// assert_eq!(Ok(vec![1, 2, 3]), values);
901 ///
902 /// // A stream containing `Err` values will return the first error.
903 /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
904 ///
905 /// let values: Result<Vec<i32>, &str> =
906 /// stream::iter(results)
907 /// .collect()
908 /// .await;
909 ///
910 /// assert_eq!(Err("no"), values);
911 /// }
912 /// ```
collect<T>(self) -> Collect<Self, T> where T: FromStream<Self::Item>, Self: Sized,913 fn collect<T>(self) -> Collect<Self, T>
914 where
915 T: FromStream<Self::Item>,
916 Self: Sized,
917 {
918 Collect::new(self)
919 }
920
921 /// Applies a per-item timeout to the passed stream.
922 ///
923 /// `timeout()` takes a `Duration` that represents the maximum amount of
924 /// time each element of the stream has to complete before timing out.
925 ///
926 /// If the wrapped stream yields a value before the deadline is reached, the
927 /// value is returned. Otherwise, an error is returned. The caller may decide
928 /// to continue consuming the stream and will eventually get the next source
929 /// stream value once it becomes available. See
930 /// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative
931 /// where the timeouts will repeat.
932 ///
933 /// # Notes
934 ///
935 /// This function consumes the stream passed into it and returns a
936 /// wrapped version of it.
937 ///
938 /// Polling the returned stream will continue to poll the inner stream even
939 /// if one or more items time out.
940 ///
941 /// # Examples
942 ///
943 /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
944 ///
945 /// ```
946 /// # #[tokio::main]
947 /// # async fn main() {
948 /// use tokio_stream::{self as stream, StreamExt};
949 /// use std::time::Duration;
950 /// # let int_stream = stream::iter(1..=3);
951 ///
952 /// let int_stream = int_stream.timeout(Duration::from_secs(1));
953 /// tokio::pin!(int_stream);
954 ///
955 /// // When no items time out, we get the 3 elements in succession:
956 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
957 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
958 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
959 /// assert_eq!(int_stream.try_next().await, Ok(None));
960 ///
961 /// // If the second item times out, we get an error and continue polling the stream:
962 /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
963 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
964 /// assert!(int_stream.try_next().await.is_err());
965 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
966 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
967 /// assert_eq!(int_stream.try_next().await, Ok(None));
968 ///
969 /// // If we want to stop consuming the source stream the first time an
970 /// // element times out, we can use the `take_while` operator:
971 /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
972 /// let mut int_stream = int_stream.take_while(Result::is_ok);
973 ///
974 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
975 /// assert_eq!(int_stream.try_next().await, Ok(None));
976 /// # }
977 /// ```
978 ///
979 /// Once a timeout error is received, no further events will be received
980 /// unless the wrapped stream yields a value (timeouts do not repeat).
981 ///
982 /// ```
983 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
984 /// # async fn main() {
985 /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
986 /// use std::time::Duration;
987 /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100)));
988 /// let timeout_stream = interval_stream.timeout(Duration::from_millis(10));
989 /// tokio::pin!(timeout_stream);
990 ///
991 /// // Only one timeout will be received between values in the source stream.
992 /// assert!(timeout_stream.try_next().await.is_ok());
993 /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
994 /// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts");
995 /// # }
996 /// ```
997 #[cfg(all(feature = "time"))]
998 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
timeout(self, duration: Duration) -> Timeout<Self> where Self: Sized,999 fn timeout(self, duration: Duration) -> Timeout<Self>
1000 where
1001 Self: Sized,
1002 {
1003 Timeout::new(self, duration)
1004 }
1005
1006 /// Applies a per-item timeout to the passed stream.
1007 ///
1008 /// `timeout_repeating()` takes an [`Interval`](tokio::time::Interval) that
1009 /// controls the time each element of the stream has to complete before
1010 /// timing out.
1011 ///
1012 /// If the wrapped stream yields a value before the deadline is reached, the
1013 /// value is returned. Otherwise, an error is returned. The caller may decide
1014 /// to continue consuming the stream and will eventually get the next source
1015 /// stream value once it becomes available. Unlike `timeout()`, if no value
1016 /// becomes available before the deadline is reached, additional errors are
1017 /// returned at the specified interval. See [`timeout`](StreamExt::timeout)
1018 /// for an alternative where the timeouts do not repeat.
1019 ///
1020 /// # Notes
1021 ///
1022 /// This function consumes the stream passed into it and returns a
1023 /// wrapped version of it.
1024 ///
1025 /// Polling the returned stream will continue to poll the inner stream even
1026 /// if one or more items time out.
1027 ///
1028 /// # Examples
1029 ///
1030 /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
1031 ///
1032 /// ```
1033 /// # #[tokio::main]
1034 /// # async fn main() {
1035 /// use tokio_stream::{self as stream, StreamExt};
1036 /// use std::time::Duration;
1037 /// # let int_stream = stream::iter(1..=3);
1038 ///
1039 /// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1)));
1040 /// tokio::pin!(int_stream);
1041 ///
1042 /// // When no items time out, we get the 3 elements in succession:
1043 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1044 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1045 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1046 /// assert_eq!(int_stream.try_next().await, Ok(None));
1047 ///
1048 /// // If the second item times out, we get an error and continue polling the stream:
1049 /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1050 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1051 /// assert!(int_stream.try_next().await.is_err());
1052 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1053 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1054 /// assert_eq!(int_stream.try_next().await, Ok(None));
1055 ///
1056 /// // If we want to stop consuming the source stream the first time an
1057 /// // element times out, we can use the `take_while` operator:
1058 /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1059 /// let mut int_stream = int_stream.take_while(Result::is_ok);
1060 ///
1061 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1062 /// assert_eq!(int_stream.try_next().await, Ok(None));
1063 /// # }
1064 /// ```
1065 ///
1066 /// Timeout errors will be continuously produced at the specified interval
1067 /// until the wrapped stream yields a value.
1068 ///
1069 /// ```
1070 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1071 /// # async fn main() {
1072 /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
1073 /// use std::time::Duration;
1074 /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
1075 /// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9)));
1076 /// tokio::pin!(timeout_stream);
1077 ///
1078 /// // Multiple timeouts will be received between values in the source stream.
1079 /// assert!(timeout_stream.try_next().await.is_ok());
1080 /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
1081 /// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout");
1082 /// // Will eventually receive another value from the source stream...
1083 /// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout");
1084 /// # }
1085 /// ```
1086 #[cfg(all(feature = "time"))]
1087 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self> where Self: Sized,1088 fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self>
1089 where
1090 Self: Sized,
1091 {
1092 TimeoutRepeating::new(self, interval)
1093 }
1094
1095 /// Slows down a stream by enforcing a delay between items.
1096 ///
1097 /// The underlying timer behind this utility has a granularity of one millisecond.
1098 ///
1099 /// # Example
1100 ///
1101 /// Create a throttled stream.
1102 /// ```rust,no_run
1103 /// use std::time::Duration;
1104 /// use tokio_stream::StreamExt;
1105 ///
1106 /// # async fn dox() {
1107 /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
1108 /// tokio::pin!(item_stream);
1109 ///
1110 /// loop {
1111 /// // The string will be produced at most every 2 seconds
1112 /// println!("{:?}", item_stream.next().await);
1113 /// }
1114 /// # }
1115 /// ```
1116 #[cfg(all(feature = "time"))]
1117 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
throttle(self, duration: Duration) -> Throttle<Self> where Self: Sized,1118 fn throttle(self, duration: Duration) -> Throttle<Self>
1119 where
1120 Self: Sized,
1121 {
1122 throttle(duration, self)
1123 }
1124
1125 /// Batches the items in the given stream using a maximum duration and size for each batch.
1126 ///
1127 /// This stream returns the next batch of items in the following situations:
1128 /// 1. The inner stream has returned at least `max_size` many items since the last batch.
1129 /// 2. The time since the first item of a batch is greater than the given duration.
1130 /// 3. The end of the stream is reached.
1131 ///
1132 /// The length of the returned vector is never empty or greater than the maximum size. Empty batches
1133 /// will not be emitted if no items are received upstream.
1134 ///
1135 /// # Panics
1136 ///
1137 /// This function panics if `max_size` is zero
1138 ///
1139 /// # Example
1140 ///
1141 /// ```rust
1142 /// use std::time::Duration;
1143 /// use tokio::time;
1144 /// use tokio_stream::{self as stream, StreamExt};
1145 /// use futures::FutureExt;
1146 ///
1147 /// #[tokio::main]
1148 /// # async fn _unused() {}
1149 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1150 /// async fn main() {
1151 /// let iter = vec![1, 2, 3, 4].into_iter();
1152 /// let stream0 = stream::iter(iter);
1153 ///
1154 /// let iter = vec![5].into_iter();
1155 /// let stream1 = stream::iter(iter)
1156 /// .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
1157 ///
1158 /// let chunk_stream = stream0
1159 /// .chain(stream1)
1160 /// .chunks_timeout(3, Duration::from_secs(2));
1161 /// tokio::pin!(chunk_stream);
1162 ///
1163 /// // a full batch was received
1164 /// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
1165 /// // deadline was reached before max_size was reached
1166 /// assert_eq!(chunk_stream.next().await, Some(vec![4]));
1167 /// // last element in the stream
1168 /// assert_eq!(chunk_stream.next().await, Some(vec![5]));
1169 /// }
1170 /// ```
1171 #[cfg(feature = "time")]
1172 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1173 #[track_caller]
chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self> where Self: Sized,1174 fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
1175 where
1176 Self: Sized,
1177 {
1178 assert!(max_size > 0, "`max_size` must be non-zero.");
1179 ChunksTimeout::new(self, max_size, duration)
1180 }
1181 }
1182
1183 impl<St: ?Sized> StreamExt for St where St: Stream {}
1184
1185 /// Merge the size hints from two streams.
merge_size_hints( (left_low, left_high): (usize, Option<usize>), (right_low, right_high): (usize, Option<usize>), ) -> (usize, Option<usize>)1186 fn merge_size_hints(
1187 (left_low, left_high): (usize, Option<usize>),
1188 (right_low, right_high): (usize, Option<usize>),
1189 ) -> (usize, Option<usize>) {
1190 let low = left_low.saturating_add(right_low);
1191 let high = match (left_high, right_high) {
1192 (Some(h1), Some(h2)) => h1.checked_add(h2),
1193 _ => None,
1194 };
1195 (low, high)
1196 }
1197