• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::stream_ext::Fuse;
2 use crate::Stream;
3 use tokio::time::{sleep, Sleep};
4 
5 use core::future::Future;
6 use core::pin::Pin;
7 use core::task::{Context, Poll};
8 use pin_project_lite::pin_project;
9 use std::time::Duration;
10 
11 pin_project! {
12     /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method.
13     #[must_use = "streams do nothing unless polled"]
14     #[derive(Debug)]
15     pub struct ChunksTimeout<S: Stream> {
16         #[pin]
17         stream: Fuse<S>,
18         #[pin]
19         deadline: Option<Sleep>,
20         duration: Duration,
21         items: Vec<S::Item>,
22         cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
23     }
24 }
25 
26 impl<S: Stream> ChunksTimeout<S> {
new(stream: S, max_size: usize, duration: Duration) -> Self27     pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self {
28         ChunksTimeout {
29             stream: Fuse::new(stream),
30             deadline: None,
31             duration,
32             items: Vec::with_capacity(max_size),
33             cap: max_size,
34         }
35     }
36 }
37 
38 impl<S: Stream> Stream for ChunksTimeout<S> {
39     type Item = Vec<S::Item>;
40 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>41     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42         let mut me = self.as_mut().project();
43         loop {
44             match me.stream.as_mut().poll_next(cx) {
45                 Poll::Pending => break,
46                 Poll::Ready(Some(item)) => {
47                     if me.items.is_empty() {
48                         me.deadline.set(Some(sleep(*me.duration)));
49                         me.items.reserve_exact(*me.cap);
50                     }
51                     me.items.push(item);
52                     if me.items.len() >= *me.cap {
53                         return Poll::Ready(Some(std::mem::take(me.items)));
54                     }
55                 }
56                 Poll::Ready(None) => {
57                     // Returning Some here is only correct because we fuse the inner stream.
58                     let last = if me.items.is_empty() {
59                         None
60                     } else {
61                         Some(std::mem::take(me.items))
62                     };
63 
64                     return Poll::Ready(last);
65                 }
66             }
67         }
68 
69         if !me.items.is_empty() {
70             if let Some(deadline) = me.deadline.as_pin_mut() {
71                 ready!(deadline.poll(cx));
72             }
73             return Poll::Ready(Some(std::mem::take(me.items)));
74         }
75 
76         Poll::Pending
77     }
78 
size_hint(&self) -> (usize, Option<usize>)79     fn size_hint(&self) -> (usize, Option<usize>) {
80         let chunk_len = if self.items.is_empty() { 0 } else { 1 };
81         let (lower, upper) = self.stream.size_hint();
82         let lower = (lower / self.cap).saturating_add(chunk_len);
83         let upper = upper.and_then(|x| x.checked_add(chunk_len));
84         (lower, upper)
85     }
86 }
87