1 use crate::yielder::Receiver; 2 3 use futures_core::{FusedStream, Stream}; 4 use std::future::Future; 5 use std::pin::Pin; 6 use std::task::{Context, Poll}; 7 8 #[doc(hidden)] 9 #[derive(Debug)] 10 pub struct AsyncStream<T, U> { 11 rx: Receiver<T>, 12 done: bool, 13 generator: U, 14 } 15 16 impl<T, U> AsyncStream<T, U> { 17 #[doc(hidden)] new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U>18 pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> { 19 AsyncStream { 20 rx, 21 done: false, 22 generator, 23 } 24 } 25 } 26 27 impl<T, U> FusedStream for AsyncStream<T, U> 28 where 29 U: Future<Output = ()>, 30 { is_terminated(&self) -> bool31 fn is_terminated(&self) -> bool { 32 self.done 33 } 34 } 35 36 impl<T, U> Stream for AsyncStream<T, U> 37 where 38 U: Future<Output = ()>, 39 { 40 type Item = T; 41 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>42 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 43 unsafe { 44 let me = Pin::get_unchecked_mut(self); 45 46 if me.done { 47 return Poll::Ready(None); 48 } 49 50 let mut dst = None; 51 let res = { 52 let _enter = me.rx.enter(&mut dst); 53 Pin::new_unchecked(&mut me.generator).poll(cx) 54 }; 55 56 me.done = res.is_ready(); 57 58 if dst.is_some() { 59 return Poll::Ready(dst.take()); 60 } 61 62 if me.done { 63 Poll::Ready(None) 64 } else { 65 Poll::Pending 66 } 67 } 68 } 69 size_hint(&self) -> (usize, Option<usize>)70 fn size_hint(&self) -> (usize, Option<usize>) { 71 if self.done { 72 (0, Some(0)) 73 } else { 74 (0, None) 75 } 76 } 77 } 78