1 use core::future::Future;
2 use futures_core::Stream;
3
4 mod all;
5 use all::AllFuture;
6
7 mod any;
8 use any::AnyFuture;
9
10 mod chain;
11 use chain::Chain;
12
13 pub(crate) mod collect;
14 use collect::{Collect, FromStream};
15
16 mod filter;
17 use filter::Filter;
18
19 mod filter_map;
20 use filter_map::FilterMap;
21
22 mod fold;
23 use fold::FoldFuture;
24
25 mod fuse;
26 use fuse::Fuse;
27
28 mod map;
29 use map::Map;
30
31 mod map_while;
32 use map_while::MapWhile;
33
34 mod merge;
35 use merge::Merge;
36
37 mod next;
38 use next::Next;
39
40 mod skip;
41 use skip::Skip;
42
43 mod skip_while;
44 use skip_while::SkipWhile;
45
46 mod take;
47 use take::Take;
48
49 mod take_while;
50 use take_while::TakeWhile;
51
52 mod then;
53 use then::Then;
54
55 mod try_next;
56 use try_next::TryNext;
57
58 cfg_time! {
59 pub(crate) mod timeout;
60 use timeout::Timeout;
61 use tokio::time::Duration;
62 mod throttle;
63 use throttle::{throttle, Throttle};
64 mod chunks_timeout;
65 use chunks_timeout::ChunksTimeout;
66 }
67
68 /// An extension trait for the [`Stream`] trait that provides a variety of
69 /// convenient combinator functions.
70 ///
71 /// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
72 /// in the [futures] crate, however both Tokio and futures provide separate
73 /// `StreamExt` utility traits, and some utilities are only available on one of
74 /// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
75 /// trait in the futures crate.
76 ///
77 /// If you need utilities from both `StreamExt` traits, you should prefer to
78 /// import one of them, and use the other through the fully qualified call
79 /// syntax. For example:
80 /// ```
81 /// // import one of the traits:
82 /// use futures::stream::StreamExt;
83 /// # #[tokio::main(flavor = "current_thread")]
84 /// # async fn main() {
85 ///
86 /// let a = tokio_stream::iter(vec![1, 3, 5]);
87 /// let b = tokio_stream::iter(vec![2, 4, 6]);
88 ///
89 /// // use the fully qualified call syntax for the other trait:
90 /// let merged = tokio_stream::StreamExt::merge(a, b);
91 ///
92 /// // use normal call notation for futures::stream::StreamExt::collect
93 /// let output: Vec<_> = merged.collect().await;
94 /// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
95 /// # }
96 /// ```
97 ///
98 /// [`Stream`]: crate::Stream
99 /// [futures]: https://docs.rs/futures
100 /// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
101 pub trait StreamExt: Stream {
102 /// Consumes and returns the next value in the stream or `None` if the
103 /// stream is finished.
104 ///
105 /// Equivalent to:
106 ///
107 /// ```ignore
108 /// async fn next(&mut self) -> Option<Self::Item>;
109 /// ```
110 ///
111 /// Note that because `next` doesn't take ownership over the stream,
112 /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
113 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
114 /// be done by boxing the stream using [`Box::pin`] or
115 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
116 /// crate.
117 ///
118 /// # Cancel safety
119 ///
120 /// This method is cancel safe. The returned future only
121 /// holds onto a reference to the underlying stream,
122 /// so dropping it will never lose a value.
123 ///
124 /// # Examples
125 ///
126 /// ```
127 /// # #[tokio::main]
128 /// # async fn main() {
129 /// use tokio_stream::{self as stream, StreamExt};
130 ///
131 /// let mut stream = stream::iter(1..=3);
132 ///
133 /// assert_eq!(stream.next().await, Some(1));
134 /// assert_eq!(stream.next().await, Some(2));
135 /// assert_eq!(stream.next().await, Some(3));
136 /// assert_eq!(stream.next().await, None);
137 /// # }
138 /// ```
next(&mut self) -> Next<'_, Self> where Self: Unpin,139 fn next(&mut self) -> Next<'_, Self>
140 where
141 Self: Unpin,
142 {
143 Next::new(self)
144 }
145
146 /// Consumes and returns the next item in the stream. If an error is
147 /// encountered before the next item, the error is returned instead.
148 ///
149 /// Equivalent to:
150 ///
151 /// ```ignore
152 /// async fn try_next(&mut self) -> Result<Option<T>, E>;
153 /// ```
154 ///
155 /// This is similar to the [`next`](StreamExt::next) combinator,
156 /// but returns a [`Result<Option<T>, E>`](Result) rather than
157 /// an [`Option<Result<T, E>>`](Option), making for easy use
158 /// with the [`?`](std::ops::Try) operator.
159 ///
160 /// # Cancel safety
161 ///
162 /// This method is cancel safe. The returned future only
163 /// holds onto a reference to the underlying stream,
164 /// so dropping it will never lose a value.
165 ///
166 /// # Examples
167 ///
168 /// ```
169 /// # #[tokio::main]
170 /// # async fn main() {
171 /// use tokio_stream::{self as stream, StreamExt};
172 ///
173 /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
174 ///
175 /// assert_eq!(stream.try_next().await, Ok(Some(1)));
176 /// assert_eq!(stream.try_next().await, Ok(Some(2)));
177 /// assert_eq!(stream.try_next().await, Err("nope"));
178 /// # }
179 /// ```
try_next<T, E>(&mut self) -> TryNext<'_, Self> where Self: Stream<Item = Result<T, E>> + Unpin,180 fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
181 where
182 Self: Stream<Item = Result<T, E>> + Unpin,
183 {
184 TryNext::new(self)
185 }
186
187 /// Maps this stream's items to a different type, returning a new stream of
188 /// the resulting type.
189 ///
190 /// The provided closure is executed over all elements of this stream as
191 /// they are made available. It is executed inline with calls to
192 /// [`poll_next`](Stream::poll_next).
193 ///
194 /// Note that this function consumes the stream passed into it and returns a
195 /// wrapped version of it, similar to the existing `map` methods in the
196 /// standard library.
197 ///
198 /// # Examples
199 ///
200 /// ```
201 /// # #[tokio::main]
202 /// # async fn main() {
203 /// use tokio_stream::{self as stream, StreamExt};
204 ///
205 /// let stream = stream::iter(1..=3);
206 /// let mut stream = stream.map(|x| x + 3);
207 ///
208 /// assert_eq!(stream.next().await, Some(4));
209 /// assert_eq!(stream.next().await, Some(5));
210 /// assert_eq!(stream.next().await, Some(6));
211 /// # }
212 /// ```
map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,213 fn map<T, F>(self, f: F) -> Map<Self, F>
214 where
215 F: FnMut(Self::Item) -> T,
216 Self: Sized,
217 {
218 Map::new(self, f)
219 }
220
221 /// Map this stream's items to a different type for as long as determined by
222 /// the provided closure. A stream of the target type will be returned,
223 /// which will yield elements until the closure returns `None`.
224 ///
225 /// The provided closure is executed over all elements of this stream as
226 /// they are made available, until it returns `None`. It is executed inline
227 /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
228 /// the underlying stream will not be polled again.
229 ///
230 /// Note that this function consumes the stream passed into it and returns a
231 /// wrapped version of it, similar to the [`Iterator::map_while`] method in the
232 /// standard library.
233 ///
234 /// # Examples
235 ///
236 /// ```
237 /// # #[tokio::main]
238 /// # async fn main() {
239 /// use tokio_stream::{self as stream, StreamExt};
240 ///
241 /// let stream = stream::iter(1..=10);
242 /// let mut stream = stream.map_while(|x| {
243 /// if x < 4 {
244 /// Some(x + 3)
245 /// } else {
246 /// None
247 /// }
248 /// });
249 /// assert_eq!(stream.next().await, Some(4));
250 /// assert_eq!(stream.next().await, Some(5));
251 /// assert_eq!(stream.next().await, Some(6));
252 /// assert_eq!(stream.next().await, None);
253 /// # }
254 /// ```
map_while<T, F>(self, f: F) -> MapWhile<Self, F> where F: FnMut(Self::Item) -> Option<T>, Self: Sized,255 fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
256 where
257 F: FnMut(Self::Item) -> Option<T>,
258 Self: Sized,
259 {
260 MapWhile::new(self, f)
261 }
262
263 /// Maps this stream's items asynchronously to a different type, returning a
264 /// new stream of the resulting type.
265 ///
266 /// The provided closure is executed over all elements of this stream as
267 /// they are made available, and the returned future is executed. Only one
268 /// future is executed at the time.
269 ///
270 /// Note that this function consumes the stream passed into it and returns a
271 /// wrapped version of it, similar to the existing `then` methods in the
272 /// standard library.
273 ///
274 /// Be aware that if the future is not `Unpin`, then neither is the `Stream`
275 /// returned by this method. To handle this, you can use `tokio::pin!` as in
276 /// the example below or put the stream in a `Box` with `Box::pin(stream)`.
277 ///
278 /// # Examples
279 ///
280 /// ```
281 /// # #[tokio::main]
282 /// # async fn main() {
283 /// use tokio_stream::{self as stream, StreamExt};
284 ///
285 /// async fn do_async_work(value: i32) -> i32 {
286 /// value + 3
287 /// }
288 ///
289 /// let stream = stream::iter(1..=3);
290 /// let stream = stream.then(do_async_work);
291 ///
292 /// tokio::pin!(stream);
293 ///
294 /// assert_eq!(stream.next().await, Some(4));
295 /// assert_eq!(stream.next().await, Some(5));
296 /// assert_eq!(stream.next().await, Some(6));
297 /// # }
298 /// ```
then<F, Fut>(self, f: F) -> Then<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized,299 fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
300 where
301 F: FnMut(Self::Item) -> Fut,
302 Fut: Future,
303 Self: Sized,
304 {
305 Then::new(self, f)
306 }
307
308 /// Combine two streams into one by interleaving the output of both as it
309 /// is produced.
310 ///
311 /// Values are produced from the merged stream in the order they arrive from
312 /// the two source streams. If both source streams provide values
313 /// simultaneously, the merge stream alternates between them. This provides
314 /// some level of fairness. You should not chain calls to `merge`, as this
315 /// will break the fairness of the merging.
316 ///
317 /// The merged stream completes once **both** source streams complete. When
318 /// one source stream completes before the other, the merge stream
319 /// exclusively polls the remaining stream.
320 ///
321 /// For merging multiple streams, consider using [`StreamMap`] instead.
322 ///
323 /// [`StreamMap`]: crate::StreamMap
324 ///
325 /// # Examples
326 ///
327 /// ```
328 /// use tokio_stream::{StreamExt, Stream};
329 /// use tokio::sync::mpsc;
330 /// use tokio::time;
331 ///
332 /// use std::time::Duration;
333 /// use std::pin::Pin;
334 ///
335 /// # /*
336 /// #[tokio::main]
337 /// # */
338 /// # #[tokio::main(flavor = "current_thread")]
339 /// async fn main() {
340 /// # time::pause();
341 /// let (tx1, mut rx1) = mpsc::channel::<usize>(10);
342 /// let (tx2, mut rx2) = mpsc::channel::<usize>(10);
343 ///
344 /// // Convert the channels to a `Stream`.
345 /// let rx1 = Box::pin(async_stream::stream! {
346 /// while let Some(item) = rx1.recv().await {
347 /// yield item;
348 /// }
349 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
350 ///
351 /// let rx2 = Box::pin(async_stream::stream! {
352 /// while let Some(item) = rx2.recv().await {
353 /// yield item;
354 /// }
355 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
356 ///
357 /// let mut rx = rx1.merge(rx2);
358 ///
359 /// tokio::spawn(async move {
360 /// // Send some values immediately
361 /// tx1.send(1).await.unwrap();
362 /// tx1.send(2).await.unwrap();
363 ///
364 /// // Let the other task send values
365 /// time::sleep(Duration::from_millis(20)).await;
366 ///
367 /// tx1.send(4).await.unwrap();
368 /// });
369 ///
370 /// tokio::spawn(async move {
371 /// // Wait for the first task to send values
372 /// time::sleep(Duration::from_millis(5)).await;
373 ///
374 /// tx2.send(3).await.unwrap();
375 ///
376 /// time::sleep(Duration::from_millis(25)).await;
377 ///
378 /// // Send the final value
379 /// tx2.send(5).await.unwrap();
380 /// });
381 ///
382 /// assert_eq!(1, rx.next().await.unwrap());
383 /// assert_eq!(2, rx.next().await.unwrap());
384 /// assert_eq!(3, rx.next().await.unwrap());
385 /// assert_eq!(4, rx.next().await.unwrap());
386 /// assert_eq!(5, rx.next().await.unwrap());
387 ///
388 /// // The merged stream is consumed
389 /// assert!(rx.next().await.is_none());
390 /// }
391 /// ```
merge<U>(self, other: U) -> Merge<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,392 fn merge<U>(self, other: U) -> Merge<Self, U>
393 where
394 U: Stream<Item = Self::Item>,
395 Self: Sized,
396 {
397 Merge::new(self, other)
398 }
399
400 /// Filters the values produced by this stream according to the provided
401 /// predicate.
402 ///
403 /// As values of this stream are made available, the provided predicate `f`
404 /// will be run against them. If the predicate
405 /// resolves to `true`, then the stream will yield the value, but if the
406 /// predicate resolves to `false`, then the value
407 /// will be discarded and the next value will be produced.
408 ///
409 /// Note that this function consumes the stream passed into it and returns a
410 /// wrapped version of it, similar to [`Iterator::filter`] method in the
411 /// standard library.
412 ///
413 /// # Examples
414 ///
415 /// ```
416 /// # #[tokio::main]
417 /// # async fn main() {
418 /// use tokio_stream::{self as stream, StreamExt};
419 ///
420 /// let stream = stream::iter(1..=8);
421 /// let mut evens = stream.filter(|x| x % 2 == 0);
422 ///
423 /// assert_eq!(Some(2), evens.next().await);
424 /// assert_eq!(Some(4), evens.next().await);
425 /// assert_eq!(Some(6), evens.next().await);
426 /// assert_eq!(Some(8), evens.next().await);
427 /// assert_eq!(None, evens.next().await);
428 /// # }
429 /// ```
filter<F>(self, f: F) -> Filter<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,430 fn filter<F>(self, f: F) -> Filter<Self, F>
431 where
432 F: FnMut(&Self::Item) -> bool,
433 Self: Sized,
434 {
435 Filter::new(self, f)
436 }
437
438 /// Filters the values produced by this stream while simultaneously mapping
439 /// them to a different type according to the provided closure.
440 ///
441 /// As values of this stream are made available, the provided function will
442 /// be run on them. If the predicate `f` resolves to
443 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
444 /// it resolves to [`None`], then the value will be skipped.
445 ///
446 /// Note that this function consumes the stream passed into it and returns a
447 /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
448 /// standard library.
449 ///
450 /// # Examples
451 /// ```
452 /// # #[tokio::main]
453 /// # async fn main() {
454 /// use tokio_stream::{self as stream, StreamExt};
455 ///
456 /// let stream = stream::iter(1..=8);
457 /// let mut evens = stream.filter_map(|x| {
458 /// if x % 2 == 0 { Some(x + 1) } else { None }
459 /// });
460 ///
461 /// assert_eq!(Some(3), evens.next().await);
462 /// assert_eq!(Some(5), evens.next().await);
463 /// assert_eq!(Some(7), evens.next().await);
464 /// assert_eq!(Some(9), evens.next().await);
465 /// assert_eq!(None, evens.next().await);
466 /// # }
467 /// ```
filter_map<T, F>(self, f: F) -> FilterMap<Self, F> where F: FnMut(Self::Item) -> Option<T>, Self: Sized,468 fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
469 where
470 F: FnMut(Self::Item) -> Option<T>,
471 Self: Sized,
472 {
473 FilterMap::new(self, f)
474 }
475
476 /// Creates a stream which ends after the first `None`.
477 ///
478 /// After a stream returns `None`, behavior is undefined. Future calls to
479 /// `poll_next` may or may not return `Some(T)` again or they may panic.
480 /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
481 /// return `None` forever.
482 ///
483 /// # Examples
484 ///
485 /// ```
486 /// use tokio_stream::{Stream, StreamExt};
487 ///
488 /// use std::pin::Pin;
489 /// use std::task::{Context, Poll};
490 ///
491 /// // a stream which alternates between Some and None
492 /// struct Alternate {
493 /// state: i32,
494 /// }
495 ///
496 /// impl Stream for Alternate {
497 /// type Item = i32;
498 ///
499 /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
500 /// let val = self.state;
501 /// self.state = self.state + 1;
502 ///
503 /// // if it's even, Some(i32), else None
504 /// if val % 2 == 0 {
505 /// Poll::Ready(Some(val))
506 /// } else {
507 /// Poll::Ready(None)
508 /// }
509 /// }
510 /// }
511 ///
512 /// #[tokio::main]
513 /// async fn main() {
514 /// let mut stream = Alternate { state: 0 };
515 ///
516 /// // the stream goes back and forth
517 /// assert_eq!(stream.next().await, Some(0));
518 /// assert_eq!(stream.next().await, None);
519 /// assert_eq!(stream.next().await, Some(2));
520 /// assert_eq!(stream.next().await, None);
521 ///
522 /// // however, once it is fused
523 /// let mut stream = stream.fuse();
524 ///
525 /// assert_eq!(stream.next().await, Some(4));
526 /// assert_eq!(stream.next().await, None);
527 ///
528 /// // it will always return `None` after the first time.
529 /// assert_eq!(stream.next().await, None);
530 /// assert_eq!(stream.next().await, None);
531 /// assert_eq!(stream.next().await, None);
532 /// }
533 /// ```
fuse(self) -> Fuse<Self> where Self: Sized,534 fn fuse(self) -> Fuse<Self>
535 where
536 Self: Sized,
537 {
538 Fuse::new(self)
539 }
540
541 /// Creates a new stream of at most `n` items of the underlying stream.
542 ///
543 /// Once `n` items have been yielded from this stream then it will always
544 /// return that the stream is done.
545 ///
546 /// # Examples
547 ///
548 /// ```
549 /// # #[tokio::main]
550 /// # async fn main() {
551 /// use tokio_stream::{self as stream, StreamExt};
552 ///
553 /// let mut stream = stream::iter(1..=10).take(3);
554 ///
555 /// assert_eq!(Some(1), stream.next().await);
556 /// assert_eq!(Some(2), stream.next().await);
557 /// assert_eq!(Some(3), stream.next().await);
558 /// assert_eq!(None, stream.next().await);
559 /// # }
560 /// ```
take(self, n: usize) -> Take<Self> where Self: Sized,561 fn take(self, n: usize) -> Take<Self>
562 where
563 Self: Sized,
564 {
565 Take::new(self, n)
566 }
567
568 /// Take elements from this stream while the provided predicate
569 /// resolves to `true`.
570 ///
571 /// This function, like `Iterator::take_while`, will take elements from the
572 /// stream until the predicate `f` resolves to `false`. Once one element
573 /// returns false it will always return that the stream is done.
574 ///
575 /// # Examples
576 ///
577 /// ```
578 /// # #[tokio::main]
579 /// # async fn main() {
580 /// use tokio_stream::{self as stream, StreamExt};
581 ///
582 /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
583 ///
584 /// assert_eq!(Some(1), stream.next().await);
585 /// assert_eq!(Some(2), stream.next().await);
586 /// assert_eq!(Some(3), stream.next().await);
587 /// assert_eq!(None, stream.next().await);
588 /// # }
589 /// ```
take_while<F>(self, f: F) -> TakeWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,590 fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
591 where
592 F: FnMut(&Self::Item) -> bool,
593 Self: Sized,
594 {
595 TakeWhile::new(self, f)
596 }
597
598 /// Creates a new stream that will skip the `n` first items of the
599 /// underlying stream.
600 ///
601 /// # Examples
602 ///
603 /// ```
604 /// # #[tokio::main]
605 /// # async fn main() {
606 /// use tokio_stream::{self as stream, StreamExt};
607 ///
608 /// let mut stream = stream::iter(1..=10).skip(7);
609 ///
610 /// assert_eq!(Some(8), stream.next().await);
611 /// assert_eq!(Some(9), stream.next().await);
612 /// assert_eq!(Some(10), stream.next().await);
613 /// assert_eq!(None, stream.next().await);
614 /// # }
615 /// ```
skip(self, n: usize) -> Skip<Self> where Self: Sized,616 fn skip(self, n: usize) -> Skip<Self>
617 where
618 Self: Sized,
619 {
620 Skip::new(self, n)
621 }
622
623 /// Skip elements from the underlying stream while the provided predicate
624 /// resolves to `true`.
625 ///
626 /// This function, like [`Iterator::skip_while`], will ignore elements from the
627 /// stream until the predicate `f` resolves to `false`. Once one element
628 /// returns false, the rest of the elements will be yielded.
629 ///
630 /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
631 ///
632 /// # Examples
633 ///
634 /// ```
635 /// # #[tokio::main]
636 /// # async fn main() {
637 /// use tokio_stream::{self as stream, StreamExt};
638 /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
639 ///
640 /// assert_eq!(Some(3), stream.next().await);
641 /// assert_eq!(Some(4), stream.next().await);
642 /// assert_eq!(Some(1), stream.next().await);
643 /// assert_eq!(None, stream.next().await);
644 /// # }
645 /// ```
skip_while<F>(self, f: F) -> SkipWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,646 fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
647 where
648 F: FnMut(&Self::Item) -> bool,
649 Self: Sized,
650 {
651 SkipWhile::new(self, f)
652 }
653
654 /// Tests if every element of the stream matches a predicate.
655 ///
656 /// Equivalent to:
657 ///
658 /// ```ignore
659 /// async fn all<F>(&mut self, f: F) -> bool;
660 /// ```
661 ///
662 /// `all()` takes a closure that returns `true` or `false`. It applies
663 /// this closure to each element of the stream, and if they all return
664 /// `true`, then so does `all`. If any of them return `false`, it
665 /// returns `false`. An empty stream returns `true`.
666 ///
667 /// `all()` is short-circuiting; in other words, it will stop processing
668 /// as soon as it finds a `false`, given that no matter what else happens,
669 /// the result will also be `false`.
670 ///
671 /// An empty stream returns `true`.
672 ///
673 /// # Examples
674 ///
675 /// Basic usage:
676 ///
677 /// ```
678 /// # #[tokio::main]
679 /// # async fn main() {
680 /// use tokio_stream::{self as stream, StreamExt};
681 ///
682 /// let a = [1, 2, 3];
683 ///
684 /// assert!(stream::iter(&a).all(|&x| x > 0).await);
685 ///
686 /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
687 /// # }
688 /// ```
689 ///
690 /// Stopping at the first `false`:
691 ///
692 /// ```
693 /// # #[tokio::main]
694 /// # async fn main() {
695 /// use tokio_stream::{self as stream, StreamExt};
696 ///
697 /// let a = [1, 2, 3];
698 ///
699 /// let mut iter = stream::iter(&a);
700 ///
701 /// assert!(!iter.all(|&x| x != 2).await);
702 ///
703 /// // we can still use `iter`, as there are more elements.
704 /// assert_eq!(iter.next().await, Some(&3));
705 /// # }
706 /// ```
all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,707 fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
708 where
709 Self: Unpin,
710 F: FnMut(Self::Item) -> bool,
711 {
712 AllFuture::new(self, f)
713 }
714
715 /// Tests if any element of the stream matches a predicate.
716 ///
717 /// Equivalent to:
718 ///
719 /// ```ignore
720 /// async fn any<F>(&mut self, f: F) -> bool;
721 /// ```
722 ///
723 /// `any()` takes a closure that returns `true` or `false`. It applies
724 /// this closure to each element of the stream, and if any of them return
725 /// `true`, then so does `any()`. If they all return `false`, it
726 /// returns `false`.
727 ///
728 /// `any()` is short-circuiting; in other words, it will stop processing
729 /// as soon as it finds a `true`, given that no matter what else happens,
730 /// the result will also be `true`.
731 ///
732 /// An empty stream returns `false`.
733 ///
734 /// Basic usage:
735 ///
736 /// ```
737 /// # #[tokio::main]
738 /// # async fn main() {
739 /// use tokio_stream::{self as stream, StreamExt};
740 ///
741 /// let a = [1, 2, 3];
742 ///
743 /// assert!(stream::iter(&a).any(|&x| x > 0).await);
744 ///
745 /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
746 /// # }
747 /// ```
748 ///
749 /// Stopping at the first `true`:
750 ///
751 /// ```
752 /// # #[tokio::main]
753 /// # async fn main() {
754 /// use tokio_stream::{self as stream, StreamExt};
755 ///
756 /// let a = [1, 2, 3];
757 ///
758 /// let mut iter = stream::iter(&a);
759 ///
760 /// assert!(iter.any(|&x| x != 2).await);
761 ///
762 /// // we can still use `iter`, as there are more elements.
763 /// assert_eq!(iter.next().await, Some(&2));
764 /// # }
765 /// ```
any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,766 fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
767 where
768 Self: Unpin,
769 F: FnMut(Self::Item) -> bool,
770 {
771 AnyFuture::new(self, f)
772 }
773
774 /// Combine two streams into one by first returning all values from the
775 /// first stream then all values from the second stream.
776 ///
777 /// As long as `self` still has values to emit, no values from `other` are
778 /// emitted, even if some are ready.
779 ///
780 /// # Examples
781 ///
782 /// ```
783 /// use tokio_stream::{self as stream, StreamExt};
784 ///
785 /// #[tokio::main]
786 /// async fn main() {
787 /// let one = stream::iter(vec![1, 2, 3]);
788 /// let two = stream::iter(vec![4, 5, 6]);
789 ///
790 /// let mut stream = one.chain(two);
791 ///
792 /// assert_eq!(stream.next().await, Some(1));
793 /// assert_eq!(stream.next().await, Some(2));
794 /// assert_eq!(stream.next().await, Some(3));
795 /// assert_eq!(stream.next().await, Some(4));
796 /// assert_eq!(stream.next().await, Some(5));
797 /// assert_eq!(stream.next().await, Some(6));
798 /// assert_eq!(stream.next().await, None);
799 /// }
800 /// ```
chain<U>(self, other: U) -> Chain<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,801 fn chain<U>(self, other: U) -> Chain<Self, U>
802 where
803 U: Stream<Item = Self::Item>,
804 Self: Sized,
805 {
806 Chain::new(self, other)
807 }
808
809 /// A combinator that applies a function to every element in a stream
810 /// producing a single, final value.
811 ///
812 /// Equivalent to:
813 ///
814 /// ```ignore
815 /// async fn fold<B, F>(self, init: B, f: F) -> B;
816 /// ```
817 ///
818 /// # Examples
819 /// Basic usage:
820 /// ```
821 /// # #[tokio::main]
822 /// # async fn main() {
823 /// use tokio_stream::{self as stream, *};
824 ///
825 /// let s = stream::iter(vec![1u8, 2, 3]);
826 /// let sum = s.fold(0, |acc, x| acc + x).await;
827 ///
828 /// assert_eq!(sum, 6);
829 /// # }
830 /// ```
fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> where Self: Sized, F: FnMut(B, Self::Item) -> B,831 fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
832 where
833 Self: Sized,
834 F: FnMut(B, Self::Item) -> B,
835 {
836 FoldFuture::new(self, init, f)
837 }
838
839 /// Drain stream pushing all emitted values into a collection.
840 ///
841 /// Equivalent to:
842 ///
843 /// ```ignore
844 /// async fn collect<T>(self) -> T;
845 /// ```
846 ///
847 /// `collect` streams all values, awaiting as needed. Values are pushed into
848 /// a collection. A number of different target collection types are
849 /// supported, including [`Vec`](std::vec::Vec),
850 /// [`String`](std::string::String), and [`Bytes`].
851 ///
852 /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html
853 ///
854 /// # `Result`
855 ///
856 /// `collect()` can also be used with streams of type `Result<T, E>` where
857 /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
858 /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
859 /// streaming is terminated and `collect()` returns the `Err`.
860 ///
861 /// # Notes
862 ///
863 /// `FromStream` is currently a sealed trait. Stabilization is pending
864 /// enhancements to the Rust language.
865 ///
866 /// # Examples
867 ///
868 /// Basic usage:
869 ///
870 /// ```
871 /// use tokio_stream::{self as stream, StreamExt};
872 ///
873 /// #[tokio::main]
874 /// async fn main() {
875 /// let doubled: Vec<i32> =
876 /// stream::iter(vec![1, 2, 3])
877 /// .map(|x| x * 2)
878 /// .collect()
879 /// .await;
880 ///
881 /// assert_eq!(vec![2, 4, 6], doubled);
882 /// }
883 /// ```
884 ///
885 /// Collecting a stream of `Result` values
886 ///
887 /// ```
888 /// use tokio_stream::{self as stream, StreamExt};
889 ///
890 /// #[tokio::main]
891 /// async fn main() {
892 /// // A stream containing only `Ok` values will be collected
893 /// let values: Result<Vec<i32>, &str> =
894 /// stream::iter(vec![Ok(1), Ok(2), Ok(3)])
895 /// .collect()
896 /// .await;
897 ///
898 /// assert_eq!(Ok(vec![1, 2, 3]), values);
899 ///
900 /// // A stream containing `Err` values will return the first error.
901 /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
902 ///
903 /// let values: Result<Vec<i32>, &str> =
904 /// stream::iter(results)
905 /// .collect()
906 /// .await;
907 ///
908 /// assert_eq!(Err("no"), values);
909 /// }
910 /// ```
collect<T>(self) -> Collect<Self, T> where T: FromStream<Self::Item>, Self: Sized,911 fn collect<T>(self) -> Collect<Self, T>
912 where
913 T: FromStream<Self::Item>,
914 Self: Sized,
915 {
916 Collect::new(self)
917 }
918
919 /// Applies a per-item timeout to the passed stream.
920 ///
921 /// `timeout()` takes a `Duration` that represents the maximum amount of
922 /// time each element of the stream has to complete before timing out.
923 ///
924 /// If the wrapped stream yields a value before the deadline is reached, the
925 /// value is returned. Otherwise, an error is returned. The caller may decide
926 /// to continue consuming the stream and will eventually get the next source
927 /// stream value once it becomes available.
928 ///
929 /// # Notes
930 ///
931 /// This function consumes the stream passed into it and returns a
932 /// wrapped version of it.
933 ///
934 /// Polling the returned stream will continue to poll the inner stream even
935 /// if one or more items time out.
936 ///
937 /// # Examples
938 ///
939 /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
940 ///
941 /// ```
942 /// # #[tokio::main]
943 /// # async fn main() {
944 /// use tokio_stream::{self as stream, StreamExt};
945 /// use std::time::Duration;
946 /// # let int_stream = stream::iter(1..=3);
947 ///
948 /// let int_stream = int_stream.timeout(Duration::from_secs(1));
949 /// tokio::pin!(int_stream);
950 ///
951 /// // When no items time out, we get the 3 elements in succession:
952 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
953 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
954 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
955 /// assert_eq!(int_stream.try_next().await, Ok(None));
956 ///
957 /// // If the second item times out, we get an error and continue polling the stream:
958 /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
959 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
960 /// assert!(int_stream.try_next().await.is_err());
961 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
962 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
963 /// assert_eq!(int_stream.try_next().await, Ok(None));
964 ///
965 /// // If we want to stop consuming the source stream the first time an
966 /// // element times out, we can use the `take_while` operator:
967 /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
968 /// let mut int_stream = int_stream.take_while(Result::is_ok);
969 ///
970 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
971 /// assert_eq!(int_stream.try_next().await, Ok(None));
972 /// # }
973 /// ```
974 #[cfg(all(feature = "time"))]
975 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
timeout(self, duration: Duration) -> Timeout<Self> where Self: Sized,976 fn timeout(self, duration: Duration) -> Timeout<Self>
977 where
978 Self: Sized,
979 {
980 Timeout::new(self, duration)
981 }
982
983 /// Slows down a stream by enforcing a delay between items.
984 ///
985 /// The underlying timer behind this utility has a granularity of one millisecond.
986 ///
987 /// # Example
988 ///
989 /// Create a throttled stream.
990 /// ```rust,no_run
991 /// use std::time::Duration;
992 /// use tokio_stream::StreamExt;
993 ///
994 /// # async fn dox() {
995 /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
996 /// tokio::pin!(item_stream);
997 ///
998 /// loop {
999 /// // The string will be produced at most every 2 seconds
1000 /// println!("{:?}", item_stream.next().await);
1001 /// }
1002 /// # }
1003 /// ```
1004 #[cfg(all(feature = "time"))]
1005 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
throttle(self, duration: Duration) -> Throttle<Self> where Self: Sized,1006 fn throttle(self, duration: Duration) -> Throttle<Self>
1007 where
1008 Self: Sized,
1009 {
1010 throttle(duration, self)
1011 }
1012
1013 /// Batches the items in the given stream using a maximum duration and size for each batch.
1014 ///
1015 /// This stream returns the next batch of items in the following situations:
1016 /// 1. The inner stream has returned at least `max_size` many items since the last batch.
1017 /// 2. The time since the first item of a batch is greater than the given duration.
1018 /// 3. The end of the stream is reached.
1019 ///
1020 /// The length of the returned vector is never empty or greater than the maximum size. Empty batches
1021 /// will not be emitted if no items are received upstream.
1022 ///
1023 /// # Panics
1024 ///
1025 /// This function panics if `max_size` is zero
1026 ///
1027 /// # Example
1028 ///
1029 /// ```rust
1030 /// use std::time::Duration;
1031 /// use tokio::time;
1032 /// use tokio_stream::{self as stream, StreamExt};
1033 /// use futures::FutureExt;
1034 ///
1035 /// #[tokio::main]
1036 /// # async fn _unused() {}
1037 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1038 /// async fn main() {
1039 /// let iter = vec![1, 2, 3, 4].into_iter();
1040 /// let stream0 = stream::iter(iter);
1041 ///
1042 /// let iter = vec![5].into_iter();
1043 /// let stream1 = stream::iter(iter)
1044 /// .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
1045 ///
1046 /// let chunk_stream = stream0
1047 /// .chain(stream1)
1048 /// .chunks_timeout(3, Duration::from_secs(2));
1049 /// tokio::pin!(chunk_stream);
1050 ///
1051 /// // a full batch was received
1052 /// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
1053 /// // deadline was reached before max_size was reached
1054 /// assert_eq!(chunk_stream.next().await, Some(vec![4]));
1055 /// // last element in the stream
1056 /// assert_eq!(chunk_stream.next().await, Some(vec![5]));
1057 /// }
1058 /// ```
1059 #[cfg(feature = "time")]
1060 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1061 #[track_caller]
chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self> where Self: Sized,1062 fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
1063 where
1064 Self: Sized,
1065 {
1066 assert!(max_size > 0, "`max_size` must be non-zero.");
1067 ChunksTimeout::new(self, max_size, duration)
1068 }
1069 }
1070
1071 impl<St: ?Sized> StreamExt for St where St: Stream {}
1072
1073 /// Merge the size hints from two streams.
merge_size_hints( (left_low, left_high): (usize, Option<usize>), (right_low, right_high): (usize, Option<usize>), ) -> (usize, Option<usize>)1074 fn merge_size_hints(
1075 (left_low, left_high): (usize, Option<usize>),
1076 (right_low, right_high): (usize, Option<usize>),
1077 ) -> (usize, Option<usize>) {
1078 let low = left_low.saturating_add(right_low);
1079 let high = match (left_high, right_high) {
1080 (Some(h1), Some(h2)) => h1.checked_add(h2),
1081 _ => None,
1082 };
1083 (low, high)
1084 }
1085