1 use crate::yielder::Receiver; 2 3 use futures_core::{FusedStream, Stream}; 4 use pin_project_lite::pin_project; 5 use std::future::Future; 6 use std::pin::Pin; 7 use std::task::{Context, Poll}; 8 9 pin_project! { 10 #[doc(hidden)] 11 #[derive(Debug)] 12 pub struct AsyncStream<T, U> { 13 rx: Receiver<T>, 14 done: bool, 15 #[pin] 16 generator: U, 17 } 18 } 19 20 impl<T, U> AsyncStream<T, U> { 21 #[doc(hidden)] new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U>22 pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> { 23 AsyncStream { 24 rx, 25 done: false, 26 generator, 27 } 28 } 29 } 30 31 impl<T, U> FusedStream for AsyncStream<T, U> 32 where 33 U: Future<Output = ()>, 34 { is_terminated(&self) -> bool35 fn is_terminated(&self) -> bool { 36 self.done 37 } 38 } 39 40 impl<T, U> Stream for AsyncStream<T, U> 41 where 42 U: Future<Output = ()>, 43 { 44 type Item = T; 45 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>46 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 47 let me = self.project(); 48 49 if *me.done { 50 return Poll::Ready(None); 51 } 52 53 let mut dst = None; 54 let res = { 55 let _enter = me.rx.enter(&mut dst); 56 me.generator.poll(cx) 57 }; 58 59 *me.done = res.is_ready(); 60 61 if dst.is_some() { 62 return Poll::Ready(dst.take()); 63 } 64 65 if *me.done { 66 Poll::Ready(None) 67 } else { 68 Poll::Pending 69 } 70 } 71 size_hint(&self) -> (usize, Option<usize>)72 fn size_hint(&self) -> (usize, Option<usize>) { 73 if self.done { 74 (0, Some(0)) 75 } else { 76 (0, None) 77 } 78 } 79 } 80