1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::{FusedFuture, Future}; 4 use futures_core::ready; 5 use futures_core::stream::{FusedStream, Stream}; 6 use futures_core::task::{Context, Poll}; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Future for the [`count`](super::StreamExt::count) method. 11 #[must_use = "futures do nothing unless you `.await` or poll them"] 12 pub struct Count<St> { 13 #[pin] 14 stream: St, 15 count: usize 16 } 17 } 18 19 impl<St> fmt::Debug for Count<St> 20 where 21 St: fmt::Debug, 22 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 24 f.debug_struct("Count").field("stream", &self.stream).field("count", &self.count).finish() 25 } 26 } 27 28 impl<St: Stream> Count<St> { new(stream: St) -> Self29 pub(super) fn new(stream: St) -> Self { 30 Self { stream, count: 0 } 31 } 32 } 33 34 impl<St: FusedStream> FusedFuture for Count<St> { is_terminated(&self) -> bool35 fn is_terminated(&self) -> bool { 36 self.stream.is_terminated() 37 } 38 } 39 40 impl<St: Stream> Future for Count<St> { 41 type Output = usize; 42 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>43 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 44 let mut this = self.project(); 45 46 Poll::Ready(loop { 47 match ready!(this.stream.as_mut().poll_next(cx)) { 48 Some(_) => *this.count += 1, 49 None => break *this.count, 50 } 51 }) 52 } 53 } 54