• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::stream::{Fuse, IntoStream, StreamExt};
2 
3 use alloc::vec::Vec;
4 use core::pin::Pin;
5 use core::{fmt, mem};
6 use futures_core::ready;
7 use futures_core::stream::{FusedStream, Stream, TryStream};
8 use futures_core::task::{Context, Poll};
9 #[cfg(feature = "sink")]
10 use futures_sink::Sink;
11 use pin_project_lite::pin_project;
12 
13 pin_project! {
14     /// Stream for the [`try_chunks`](super::TryStreamExt::try_chunks) method.
15     #[derive(Debug)]
16     #[must_use = "streams do nothing unless polled"]
17     pub struct TryChunks<St: TryStream> {
18         #[pin]
19         stream: Fuse<IntoStream<St>>,
20         items: Vec<St::Ok>,
21         cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
22     }
23 }
24 
25 impl<St: TryStream> TryChunks<St> {
new(stream: St, capacity: usize) -> Self26     pub(super) fn new(stream: St, capacity: usize) -> Self {
27         assert!(capacity > 0);
28 
29         Self {
30             stream: IntoStream::new(stream).fuse(),
31             items: Vec::with_capacity(capacity),
32             cap: capacity,
33         }
34     }
35 
take(self: Pin<&mut Self>) -> Vec<St::Ok>36     fn take(self: Pin<&mut Self>) -> Vec<St::Ok> {
37         let cap = self.cap;
38         mem::replace(self.project().items, Vec::with_capacity(cap))
39     }
40 
41     delegate_access_inner!(stream, St, (. .));
42 }
43 
44 impl<St: TryStream> Stream for TryChunks<St> {
45     #[allow(clippy::type_complexity)]
46     type Item = Result<Vec<St::Ok>, TryChunksError<St::Ok, St::Error>>;
47 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>48     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
49         let mut this = self.as_mut().project();
50         loop {
51             match ready!(this.stream.as_mut().try_poll_next(cx)) {
52                 // Push the item into the buffer and check whether it is full.
53                 // If so, replace our buffer with a new and empty one and return
54                 // the full one.
55                 Some(item) => match item {
56                     Ok(item) => {
57                         this.items.push(item);
58                         if this.items.len() >= *this.cap {
59                             return Poll::Ready(Some(Ok(self.take())));
60                         }
61                     }
62                     Err(e) => {
63                         return Poll::Ready(Some(Err(TryChunksError(self.take(), e))));
64                     }
65                 },
66 
67                 // Since the underlying stream ran out of values, return what we
68                 // have buffered, if we have anything.
69                 None => {
70                     let last = if this.items.is_empty() {
71                         None
72                     } else {
73                         let full_buf = mem::take(this.items);
74                         Some(full_buf)
75                     };
76 
77                     return Poll::Ready(last.map(Ok));
78                 }
79             }
80         }
81     }
82 
size_hint(&self) -> (usize, Option<usize>)83     fn size_hint(&self) -> (usize, Option<usize>) {
84         let chunk_len = usize::from(!self.items.is_empty());
85         let (lower, upper) = self.stream.size_hint();
86         let lower = (lower / self.cap).saturating_add(chunk_len);
87         let upper = match upper {
88             Some(x) => x.checked_add(chunk_len),
89             None => None,
90         };
91         (lower, upper)
92     }
93 }
94 
95 impl<St: TryStream + FusedStream> FusedStream for TryChunks<St> {
is_terminated(&self) -> bool96     fn is_terminated(&self) -> bool {
97         self.stream.is_terminated() && self.items.is_empty()
98     }
99 }
100 
101 // Forwarding impl of Sink from the underlying stream
102 #[cfg(feature = "sink")]
103 impl<S, Item> Sink<Item> for TryChunks<S>
104 where
105     S: TryStream + Sink<Item>,
106 {
107     type Error = <S as Sink<Item>>::Error;
108 
109     delegate_sink!(stream, Item);
110 }
111 
112 /// Error indicating, that while chunk was collected inner stream produced an error.
113 ///
114 /// Contains all items that were collected before an error occurred, and the stream error itself.
115 #[derive(PartialEq, Eq)]
116 pub struct TryChunksError<T, E>(pub Vec<T>, pub E);
117 
118 impl<T, E: fmt::Debug> fmt::Debug for TryChunksError<T, E> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result119     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120         self.1.fmt(f)
121     }
122 }
123 
124 impl<T, E: fmt::Display> fmt::Display for TryChunksError<T, E> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result125     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126         self.1.fmt(f)
127     }
128 }
129 
130 #[cfg(feature = "std")]
131 impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryChunksError<T, E> {}
132