• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::Stream;
2 
3 use core::pin::Pin;
4 use core::task::{Context, Poll};
5 
6 /// Stream for the [`iter`](fn@iter) function.
7 #[derive(Debug)]
8 #[must_use = "streams do nothing unless polled"]
9 pub struct Iter<I> {
10     iter: I,
11     yield_amt: usize,
12 }
13 
14 impl<I> Unpin for Iter<I> {}
15 
16 /// Converts an `Iterator` into a `Stream` which is always ready
17 /// to yield the next value.
18 ///
19 /// Iterators in Rust don't express the ability to block, so this adapter
20 /// simply always calls `iter.next()` and returns that.
21 ///
22 /// ```
23 /// # async fn dox() {
24 /// use tokio_stream::{self as stream, StreamExt};
25 ///
26 /// let mut stream = stream::iter(vec![17, 19]);
27 ///
28 /// assert_eq!(stream.next().await, Some(17));
29 /// assert_eq!(stream.next().await, Some(19));
30 /// assert_eq!(stream.next().await, None);
31 /// # }
32 /// ```
iter<I>(i: I) -> Iter<I::IntoIter> where I: IntoIterator,33 pub fn iter<I>(i: I) -> Iter<I::IntoIter>
34 where
35     I: IntoIterator,
36 {
37     Iter {
38         iter: i.into_iter(),
39         yield_amt: 0,
40     }
41 }
42 
43 impl<I> Stream for Iter<I>
44 where
45     I: Iterator,
46 {
47     type Item = I::Item;
48 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<I::Item>>49     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<I::Item>> {
50         // TODO: add coop back
51         if self.yield_amt >= 32 {
52             self.yield_amt = 0;
53 
54             cx.waker().wake_by_ref();
55 
56             Poll::Pending
57         } else {
58             self.yield_amt += 1;
59 
60             Poll::Ready(self.iter.next())
61         }
62     }
63 
size_hint(&self) -> (usize, Option<usize>)64     fn size_hint(&self) -> (usize, Option<usize>) {
65         self.iter.size_hint()
66     }
67 }
68