1 use crate::Stream;
2
3 use core::pin::Pin;
4 use core::task::{Context, Poll};
5
6 /// Stream for the [`iter`](fn@iter) function.
7 #[derive(Debug)]
8 #[must_use = "streams do nothing unless polled"]
9 pub struct Iter<I> {
10 iter: I,
11 yield_amt: usize,
12 }
13
14 impl<I> Unpin for Iter<I> {}
15
16 /// Converts an `Iterator` into a `Stream` which is always ready
17 /// to yield the next value.
18 ///
19 /// Iterators in Rust don't express the ability to block, so this adapter
20 /// simply always calls `iter.next()` and returns that.
21 ///
22 /// ```
23 /// # async fn dox() {
24 /// use tokio_stream::{self as stream, StreamExt};
25 ///
26 /// let mut stream = stream::iter(vec![17, 19]);
27 ///
28 /// assert_eq!(stream.next().await, Some(17));
29 /// assert_eq!(stream.next().await, Some(19));
30 /// assert_eq!(stream.next().await, None);
31 /// # }
32 /// ```
iter<I>(i: I) -> Iter<I::IntoIter> where I: IntoIterator,33 pub fn iter<I>(i: I) -> Iter<I::IntoIter>
34 where
35 I: IntoIterator,
36 {
37 Iter {
38 iter: i.into_iter(),
39 yield_amt: 0,
40 }
41 }
42
43 impl<I> Stream for Iter<I>
44 where
45 I: Iterator,
46 {
47 type Item = I::Item;
48
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<I::Item>>49 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<I::Item>> {
50 // TODO: add coop back
51 if self.yield_amt >= 32 {
52 self.yield_amt = 0;
53
54 cx.waker().wake_by_ref();
55
56 Poll::Pending
57 } else {
58 self.yield_amt += 1;
59
60 Poll::Ready(self.iter.next())
61 }
62 }
63
size_hint(&self) -> (usize, Option<usize>)64 fn size_hint(&self) -> (usize, Option<usize>) {
65 self.iter.size_hint()
66 }
67 }
68