• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use core::mem;
2 use core::pin::Pin;
3 use futures_core::future::{FusedFuture, Future};
4 use futures_core::ready;
5 use futures_core::stream::{FusedStream, Stream};
6 use futures_core::task::{Context, Poll};
7 use pin_project_lite::pin_project;
8 
9 pin_project! {
10     /// Future for the [`collect`](super::StreamExt::collect) method.
11     #[derive(Debug)]
12     #[must_use = "futures do nothing unless you `.await` or poll them"]
13     pub struct Collect<St, C> {
14         #[pin]
15         stream: St,
16         collection: C,
17     }
18 }
19 
20 impl<St: Stream, C: Default> Collect<St, C> {
finish(self: Pin<&mut Self>) -> C21     fn finish(self: Pin<&mut Self>) -> C {
22         mem::take(self.project().collection)
23     }
24 
new(stream: St) -> Self25     pub(super) fn new(stream: St) -> Self {
26         Self { stream, collection: Default::default() }
27     }
28 }
29 
30 impl<St, C> FusedFuture for Collect<St, C>
31 where
32     St: FusedStream,
33     C: Default + Extend<St::Item>,
34 {
is_terminated(&self) -> bool35     fn is_terminated(&self) -> bool {
36         self.stream.is_terminated()
37     }
38 }
39 
40 impl<St, C> Future for Collect<St, C>
41 where
42     St: Stream,
43     C: Default + Extend<St::Item>,
44 {
45     type Output = C;
46 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C>47     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
48         let mut this = self.as_mut().project();
49         loop {
50             match ready!(this.stream.as_mut().poll_next(cx)) {
51                 Some(e) => this.collection.extend(Some(e)),
52                 None => return Poll::Ready(self.finish()),
53             }
54         }
55     }
56 }
57