• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::yielder::Receiver;
2 
3 use futures_core::{FusedStream, Stream};
4 use std::future::Future;
5 use std::pin::Pin;
6 use std::task::{Context, Poll};
7 
8 #[doc(hidden)]
9 #[derive(Debug)]
10 pub struct AsyncStream<T, U> {
11     rx: Receiver<T>,
12     done: bool,
13     generator: U,
14 }
15 
16 impl<T, U> AsyncStream<T, U> {
17     #[doc(hidden)]
new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U>18     pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> {
19         AsyncStream {
20             rx,
21             done: false,
22             generator,
23         }
24     }
25 }
26 
27 impl<T, U> FusedStream for AsyncStream<T, U>
28 where
29     U: Future<Output = ()>,
30 {
is_terminated(&self) -> bool31     fn is_terminated(&self) -> bool {
32         self.done
33     }
34 }
35 
36 impl<T, U> Stream for AsyncStream<T, U>
37 where
38     U: Future<Output = ()>,
39 {
40     type Item = T;
41 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>42     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43         unsafe {
44             let me = Pin::get_unchecked_mut(self);
45 
46             if me.done {
47                 return Poll::Ready(None);
48             }
49 
50             let mut dst = None;
51             let res = {
52                 let _enter = me.rx.enter(&mut dst);
53                 Pin::new_unchecked(&mut me.generator).poll(cx)
54             };
55 
56             me.done = res.is_ready();
57 
58             if dst.is_some() {
59                 return Poll::Ready(dst.take());
60             }
61 
62             if me.done {
63                 Poll::Ready(None)
64             } else {
65                 Poll::Pending
66             }
67         }
68     }
69 
size_hint(&self) -> (usize, Option<usize>)70     fn size_hint(&self) -> (usize, Option<usize>) {
71         if self.done {
72             (0, Some(0))
73         } else {
74             (0, None)
75         }
76     }
77 }
78