1 use crate::stream::{Fuse, IntoStream, StreamExt}; 2 3 use alloc::vec::Vec; 4 use core::fmt; 5 use core::pin::Pin; 6 use futures_core::stream::{FusedStream, Stream, TryStream}; 7 use futures_core::task::{Context, Poll}; 8 #[cfg(feature = "sink")] 9 use futures_sink::Sink; 10 use pin_project_lite::pin_project; 11 12 pin_project! { 13 /// Stream for the [`try_ready_chunks`](super::TryStreamExt::try_ready_chunks) method. 14 #[derive(Debug)] 15 #[must_use = "streams do nothing unless polled"] 16 pub struct TryReadyChunks<St: TryStream> { 17 #[pin] 18 stream: Fuse<IntoStream<St>>, 19 cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 20 } 21 } 22 23 impl<St: TryStream> TryReadyChunks<St> { new(stream: St, capacity: usize) -> Self24 pub(super) fn new(stream: St, capacity: usize) -> Self { 25 assert!(capacity > 0); 26 27 Self { stream: IntoStream::new(stream).fuse(), cap: capacity } 28 } 29 30 delegate_access_inner!(stream, St, (. .)); 31 } 32 33 type TryReadyChunksStreamError<St> = 34 TryReadyChunksError<<St as TryStream>::Ok, <St as TryStream>::Error>; 35 36 impl<St: TryStream> Stream for TryReadyChunks<St> { 37 type Item = Result<Vec<St::Ok>, TryReadyChunksStreamError<St>>; 38 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>39 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 40 let mut this = self.as_mut().project(); 41 42 let mut items: Vec<St::Ok> = Vec::new(); 43 44 loop { 45 match this.stream.as_mut().poll_next(cx) { 46 // Flush all the collected data if the underlying stream doesn't 47 // contain more ready values 48 Poll::Pending => { 49 return if items.is_empty() { 50 Poll::Pending 51 } else { 52 Poll::Ready(Some(Ok(items))) 53 } 54 } 55 56 // Push the ready item into the buffer and check whether it is full. 57 // If so, return the buffer. 58 Poll::Ready(Some(Ok(item))) => { 59 if items.is_empty() { 60 items.reserve_exact(*this.cap); 61 } 62 items.push(item); 63 if items.len() >= *this.cap { 64 return Poll::Ready(Some(Ok(items))); 65 } 66 } 67 68 // Return the already collected items and the error. 69 Poll::Ready(Some(Err(e))) => { 70 return Poll::Ready(Some(Err(TryReadyChunksError(items, e)))); 71 } 72 73 // Since the underlying stream ran out of values, return what we 74 // have buffered, if we have anything. 75 Poll::Ready(None) => { 76 let last = if items.is_empty() { None } else { Some(Ok(items)) }; 77 return Poll::Ready(last); 78 } 79 } 80 } 81 } 82 size_hint(&self) -> (usize, Option<usize>)83 fn size_hint(&self) -> (usize, Option<usize>) { 84 let (lower, upper) = self.stream.size_hint(); 85 let lower = lower / self.cap; 86 (lower, upper) 87 } 88 } 89 90 impl<St: TryStream + FusedStream> FusedStream for TryReadyChunks<St> { is_terminated(&self) -> bool91 fn is_terminated(&self) -> bool { 92 self.stream.is_terminated() 93 } 94 } 95 96 // Forwarding impl of Sink from the underlying stream 97 #[cfg(feature = "sink")] 98 impl<S, Item> Sink<Item> for TryReadyChunks<S> 99 where 100 S: TryStream + Sink<Item>, 101 { 102 type Error = <S as Sink<Item>>::Error; 103 104 delegate_sink!(stream, Item); 105 } 106 107 /// Error indicating, that while chunk was collected inner stream produced an error. 108 /// 109 /// Contains all items that were collected before an error occurred, and the stream error itself. 110 #[derive(PartialEq, Eq)] 111 pub struct TryReadyChunksError<T, E>(pub Vec<T>, pub E); 112 113 impl<T, E: fmt::Debug> fmt::Debug for TryReadyChunksError<T, E> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 115 self.1.fmt(f) 116 } 117 } 118 119 impl<T, E: fmt::Display> fmt::Display for TryReadyChunksError<T, E> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 121 self.1.fmt(f) 122 } 123 } 124 125 #[cfg(feature = "std")] 126 impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryReadyChunksError<T, E> {} 127