1 use crate::stream_ext::Fuse; 2 use crate::Stream; 3 use tokio::time::{sleep, Sleep}; 4 5 use core::future::Future; 6 use core::pin::Pin; 7 use core::task::{Context, Poll}; 8 use pin_project_lite::pin_project; 9 use std::time::Duration; 10 11 pin_project! { 12 /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method. 13 #[must_use = "streams do nothing unless polled"] 14 #[derive(Debug)] 15 pub struct ChunksTimeout<S: Stream> { 16 #[pin] 17 stream: Fuse<S>, 18 #[pin] 19 deadline: Option<Sleep>, 20 duration: Duration, 21 items: Vec<S::Item>, 22 cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 23 } 24 } 25 26 impl<S: Stream> ChunksTimeout<S> { new(stream: S, max_size: usize, duration: Duration) -> Self27 pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self { 28 ChunksTimeout { 29 stream: Fuse::new(stream), 30 deadline: None, 31 duration, 32 items: Vec::with_capacity(max_size), 33 cap: max_size, 34 } 35 } 36 } 37 38 impl<S: Stream> Stream for ChunksTimeout<S> { 39 type Item = Vec<S::Item>; 40 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>41 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 42 let mut me = self.as_mut().project(); 43 loop { 44 match me.stream.as_mut().poll_next(cx) { 45 Poll::Pending => break, 46 Poll::Ready(Some(item)) => { 47 if me.items.is_empty() { 48 me.deadline.set(Some(sleep(*me.duration))); 49 me.items.reserve_exact(*me.cap); 50 } 51 me.items.push(item); 52 if me.items.len() >= *me.cap { 53 return Poll::Ready(Some(std::mem::take(me.items))); 54 } 55 } 56 Poll::Ready(None) => { 57 // Returning Some here is only correct because we fuse the inner stream. 58 let last = if me.items.is_empty() { 59 None 60 } else { 61 Some(std::mem::take(me.items)) 62 }; 63 64 return Poll::Ready(last); 65 } 66 } 67 } 68 69 if !me.items.is_empty() { 70 if let Some(deadline) = me.deadline.as_pin_mut() { 71 ready!(deadline.poll(cx)); 72 } 73 return Poll::Ready(Some(std::mem::take(me.items))); 74 } 75 76 Poll::Pending 77 } 78 size_hint(&self) -> (usize, Option<usize>)79 fn size_hint(&self) -> (usize, Option<usize>) { 80 let chunk_len = if self.items.is_empty() { 0 } else { 1 }; 81 let (lower, upper) = self.stream.size_hint(); 82 let lower = (lower / self.cap).saturating_add(chunk_len); 83 let upper = upper.and_then(|x| x.checked_add(chunk_len)); 84 (lower, upper) 85 } 86 } 87