1 use crate::stream::{Fuse, IntoStream, StreamExt}; 2 3 use alloc::vec::Vec; 4 use core::pin::Pin; 5 use core::{fmt, mem}; 6 use futures_core::ready; 7 use futures_core::stream::{FusedStream, Stream, TryStream}; 8 use futures_core::task::{Context, Poll}; 9 #[cfg(feature = "sink")] 10 use futures_sink::Sink; 11 use pin_project_lite::pin_project; 12 13 pin_project! { 14 /// Stream for the [`try_chunks`](super::TryStreamExt::try_chunks) method. 15 #[derive(Debug)] 16 #[must_use = "streams do nothing unless polled"] 17 pub struct TryChunks<St: TryStream> { 18 #[pin] 19 stream: Fuse<IntoStream<St>>, 20 items: Vec<St::Ok>, 21 cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 22 } 23 } 24 25 impl<St: TryStream> TryChunks<St> { new(stream: St, capacity: usize) -> Self26 pub(super) fn new(stream: St, capacity: usize) -> Self { 27 assert!(capacity > 0); 28 29 Self { 30 stream: IntoStream::new(stream).fuse(), 31 items: Vec::with_capacity(capacity), 32 cap: capacity, 33 } 34 } 35 take(self: Pin<&mut Self>) -> Vec<St::Ok>36 fn take(self: Pin<&mut Self>) -> Vec<St::Ok> { 37 let cap = self.cap; 38 mem::replace(self.project().items, Vec::with_capacity(cap)) 39 } 40 41 delegate_access_inner!(stream, St, (. .)); 42 } 43 44 impl<St: TryStream> Stream for TryChunks<St> { 45 #[allow(clippy::type_complexity)] 46 type Item = Result<Vec<St::Ok>, TryChunksError<St::Ok, St::Error>>; 47 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>48 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 49 let mut this = self.as_mut().project(); 50 loop { 51 match ready!(this.stream.as_mut().try_poll_next(cx)) { 52 // Push the item into the buffer and check whether it is full. 53 // If so, replace our buffer with a new and empty one and return 54 // the full one. 55 Some(item) => match item { 56 Ok(item) => { 57 this.items.push(item); 58 if this.items.len() >= *this.cap { 59 return Poll::Ready(Some(Ok(self.take()))); 60 } 61 } 62 Err(e) => { 63 return Poll::Ready(Some(Err(TryChunksError(self.take(), e)))); 64 } 65 }, 66 67 // Since the underlying stream ran out of values, return what we 68 // have buffered, if we have anything. 69 None => { 70 let last = if this.items.is_empty() { 71 None 72 } else { 73 let full_buf = mem::take(this.items); 74 Some(full_buf) 75 }; 76 77 return Poll::Ready(last.map(Ok)); 78 } 79 } 80 } 81 } 82 size_hint(&self) -> (usize, Option<usize>)83 fn size_hint(&self) -> (usize, Option<usize>) { 84 let chunk_len = usize::from(!self.items.is_empty()); 85 let (lower, upper) = self.stream.size_hint(); 86 let lower = (lower / self.cap).saturating_add(chunk_len); 87 let upper = match upper { 88 Some(x) => x.checked_add(chunk_len), 89 None => None, 90 }; 91 (lower, upper) 92 } 93 } 94 95 impl<St: TryStream + FusedStream> FusedStream for TryChunks<St> { is_terminated(&self) -> bool96 fn is_terminated(&self) -> bool { 97 self.stream.is_terminated() && self.items.is_empty() 98 } 99 } 100 101 // Forwarding impl of Sink from the underlying stream 102 #[cfg(feature = "sink")] 103 impl<S, Item> Sink<Item> for TryChunks<S> 104 where 105 S: TryStream + Sink<Item>, 106 { 107 type Error = <S as Sink<Item>>::Error; 108 109 delegate_sink!(stream, Item); 110 } 111 112 /// Error indicating, that while chunk was collected inner stream produced an error. 113 /// 114 /// Contains all items that were collected before an error occurred, and the stream error itself. 115 #[derive(PartialEq, Eq)] 116 pub struct TryChunksError<T, E>(pub Vec<T>, pub E); 117 118 impl<T, E: fmt::Debug> fmt::Debug for TryChunksError<T, E> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result119 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 120 self.1.fmt(f) 121 } 122 } 123 124 impl<T, E: fmt::Display> fmt::Display for TryChunksError<T, E> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 126 self.1.fmt(f) 127 } 128 } 129 130 #[cfg(feature = "std")] 131 impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryChunksError<T, E> {} 132