• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::stream_ext::Fuse;
2 use crate::{Elapsed, Stream};
3 use tokio::time::Interval;
4 
5 use core::pin::Pin;
6 use core::task::{ready, Context, Poll};
7 use pin_project_lite::pin_project;
8 
9 pin_project! {
10     /// Stream returned by the [`timeout_repeating`](super::StreamExt::timeout_repeating) method.
11     #[must_use = "streams do nothing unless polled"]
12     #[derive(Debug)]
13     pub struct TimeoutRepeating<S> {
14         #[pin]
15         stream: Fuse<S>,
16         #[pin]
17         interval: Interval,
18     }
19 }
20 
21 impl<S: Stream> TimeoutRepeating<S> {
new(stream: S, interval: Interval) -> Self22     pub(super) fn new(stream: S, interval: Interval) -> Self {
23         TimeoutRepeating {
24             stream: Fuse::new(stream),
25             interval,
26         }
27     }
28 }
29 
30 impl<S: Stream> Stream for TimeoutRepeating<S> {
31     type Item = Result<S::Item, Elapsed>;
32 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>33     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34         let mut me = self.project();
35 
36         match me.stream.poll_next(cx) {
37             Poll::Ready(v) => {
38                 if v.is_some() {
39                     me.interval.reset();
40                 }
41                 return Poll::Ready(v.map(Ok));
42             }
43             Poll::Pending => {}
44         };
45 
46         ready!(me.interval.poll_tick(cx));
47         Poll::Ready(Some(Err(Elapsed::new())))
48     }
49 
size_hint(&self) -> (usize, Option<usize>)50     fn size_hint(&self) -> (usize, Option<usize>) {
51         let (lower, _) = self.stream.size_hint();
52 
53         // The timeout stream may insert an error an infinite number of times.
54         (lower, None)
55     }
56 }
57