1 use crate::stream::Fuse; 2 use alloc::vec::Vec; 3 use core::mem; 4 use core::pin::Pin; 5 use futures_core::ready; 6 use futures_core::stream::{FusedStream, Stream}; 7 use futures_core::task::{Context, Poll}; 8 #[cfg(feature = "sink")] 9 use futures_sink::Sink; 10 use pin_project_lite::pin_project; 11 12 pin_project! { 13 /// Stream for the [`chunks`](super::StreamExt::chunks) method. 14 #[derive(Debug)] 15 #[must_use = "streams do nothing unless polled"] 16 pub struct Chunks<St: Stream> { 17 #[pin] 18 stream: Fuse<St>, 19 items: Vec<St::Item>, 20 cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 21 } 22 } 23 24 impl<St: Stream> Chunks<St> { new(stream: St, capacity: usize) -> Self25 pub(super) fn new(stream: St, capacity: usize) -> Self { 26 assert!(capacity > 0); 27 28 Self { 29 stream: super::Fuse::new(stream), 30 items: Vec::with_capacity(capacity), 31 cap: capacity, 32 } 33 } 34 take(self: Pin<&mut Self>) -> Vec<St::Item>35 fn take(self: Pin<&mut Self>) -> Vec<St::Item> { 36 let cap = self.cap; 37 mem::replace(self.project().items, Vec::with_capacity(cap)) 38 } 39 40 delegate_access_inner!(stream, St, (.)); 41 } 42 43 impl<St: Stream> Stream for Chunks<St> { 44 type Item = Vec<St::Item>; 45 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>46 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 47 let mut this = self.as_mut().project(); 48 loop { 49 match ready!(this.stream.as_mut().poll_next(cx)) { 50 // Push the item into the buffer and check whether it is full. 51 // If so, replace our buffer with a new and empty one and return 52 // the full one. 53 Some(item) => { 54 this.items.push(item); 55 if this.items.len() >= *this.cap { 56 return Poll::Ready(Some(self.take())); 57 } 58 } 59 60 // Since the underlying stream ran out of values, return what we 61 // have buffered, if we have anything. 62 None => { 63 let last = if this.items.is_empty() { 64 None 65 } else { 66 let full_buf = mem::take(this.items); 67 Some(full_buf) 68 }; 69 70 return Poll::Ready(last); 71 } 72 } 73 } 74 } 75 size_hint(&self) -> (usize, Option<usize>)76 fn size_hint(&self) -> (usize, Option<usize>) { 77 let chunk_len = usize::from(!self.items.is_empty()); 78 let (lower, upper) = self.stream.size_hint(); 79 let lower = (lower / self.cap).saturating_add(chunk_len); 80 let upper = match upper { 81 Some(x) => x.checked_add(chunk_len), 82 None => None, 83 }; 84 (lower, upper) 85 } 86 } 87 88 impl<St: FusedStream> FusedStream for Chunks<St> { is_terminated(&self) -> bool89 fn is_terminated(&self) -> bool { 90 self.stream.is_terminated() && self.items.is_empty() 91 } 92 } 93 94 // Forwarding impl of Sink from the underlying stream 95 #[cfg(feature = "sink")] 96 impl<S, Item> Sink<Item> for Chunks<S> 97 where 98 S: Stream + Sink<Item>, 99 { 100 type Error = S::Error; 101 102 delegate_sink!(stream, Item); 103 } 104