1 use crate::stream_ext::Fuse; 2 use crate::{Elapsed, Stream}; 3 use tokio::time::Interval; 4 5 use core::pin::Pin; 6 use core::task::{ready, Context, Poll}; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Stream returned by the [`timeout_repeating`](super::StreamExt::timeout_repeating) method. 11 #[must_use = "streams do nothing unless polled"] 12 #[derive(Debug)] 13 pub struct TimeoutRepeating<S> { 14 #[pin] 15 stream: Fuse<S>, 16 #[pin] 17 interval: Interval, 18 } 19 } 20 21 impl<S: Stream> TimeoutRepeating<S> { new(stream: S, interval: Interval) -> Self22 pub(super) fn new(stream: S, interval: Interval) -> Self { 23 TimeoutRepeating { 24 stream: Fuse::new(stream), 25 interval, 26 } 27 } 28 } 29 30 impl<S: Stream> Stream for TimeoutRepeating<S> { 31 type Item = Result<S::Item, Elapsed>; 32 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>33 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 34 let mut me = self.project(); 35 36 match me.stream.poll_next(cx) { 37 Poll::Ready(v) => { 38 if v.is_some() { 39 me.interval.reset(); 40 } 41 return Poll::Ready(v.map(Ok)); 42 } 43 Poll::Pending => {} 44 }; 45 46 ready!(me.interval.poll_tick(cx)); 47 Poll::Ready(Some(Err(Elapsed::new()))) 48 } 49 size_hint(&self) -> (usize, Option<usize>)50 fn size_hint(&self) -> (usize, Option<usize>) { 51 let (lower, _) = self.stream.size_hint(); 52 53 // The timeout stream may insert an error an infinite number of times. 54 (lower, None) 55 } 56 } 57