• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::yielder::Receiver;
2 
3 use futures_core::{FusedStream, Stream};
4 use pin_project_lite::pin_project;
5 use std::future::Future;
6 use std::pin::Pin;
7 use std::task::{Context, Poll};
8 
9 pin_project! {
10     #[doc(hidden)]
11     #[derive(Debug)]
12     pub struct AsyncStream<T, U> {
13         rx: Receiver<T>,
14         done: bool,
15         #[pin]
16         generator: U,
17     }
18 }
19 
20 impl<T, U> AsyncStream<T, U> {
21     #[doc(hidden)]
new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U>22     pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> {
23         AsyncStream {
24             rx,
25             done: false,
26             generator,
27         }
28     }
29 }
30 
31 impl<T, U> FusedStream for AsyncStream<T, U>
32 where
33     U: Future<Output = ()>,
34 {
is_terminated(&self) -> bool35     fn is_terminated(&self) -> bool {
36         self.done
37     }
38 }
39 
40 impl<T, U> Stream for AsyncStream<T, U>
41 where
42     U: Future<Output = ()>,
43 {
44     type Item = T;
45 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>46     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
47         let me = self.project();
48 
49         if *me.done {
50             return Poll::Ready(None);
51         }
52 
53         let mut dst = None;
54         let res = {
55             let _enter = me.rx.enter(&mut dst);
56             me.generator.poll(cx)
57         };
58 
59         *me.done = res.is_ready();
60 
61         if dst.is_some() {
62             return Poll::Ready(dst.take());
63         }
64 
65         if *me.done {
66             Poll::Ready(None)
67         } else {
68             Poll::Pending
69         }
70     }
71 
size_hint(&self) -> (usize, Option<usize>)72     fn size_hint(&self) -> (usize, Option<usize>) {
73         if self.done {
74             (0, Some(0))
75         } else {
76             (0, None)
77         }
78     }
79 }
80