1 use crate::Stream; 2 3 use core::cmp; 4 use core::fmt; 5 use core::pin::Pin; 6 use core::task::{Context, Poll}; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Stream for the [`take`](super::StreamExt::take) method. 11 #[must_use = "streams do nothing unless polled"] 12 pub struct Take<St> { 13 #[pin] 14 stream: St, 15 remaining: usize, 16 } 17 } 18 19 impl<St> fmt::Debug for Take<St> 20 where 21 St: fmt::Debug, 22 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 24 f.debug_struct("Take") 25 .field("stream", &self.stream) 26 .finish() 27 } 28 } 29 30 impl<St> Take<St> { new(stream: St, remaining: usize) -> Self31 pub(super) fn new(stream: St, remaining: usize) -> Self { 32 Self { stream, remaining } 33 } 34 } 35 36 impl<St> Stream for Take<St> 37 where 38 St: Stream, 39 { 40 type Item = St::Item; 41 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>42 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 43 if *self.as_mut().project().remaining > 0 { 44 self.as_mut().project().stream.poll_next(cx).map(|ready| { 45 match &ready { 46 Some(_) => { 47 *self.as_mut().project().remaining -= 1; 48 } 49 None => { 50 *self.as_mut().project().remaining = 0; 51 } 52 } 53 ready 54 }) 55 } else { 56 Poll::Ready(None) 57 } 58 } 59 size_hint(&self) -> (usize, Option<usize>)60 fn size_hint(&self) -> (usize, Option<usize>) { 61 if self.remaining == 0 { 62 return (0, Some(0)); 63 } 64 65 let (lower, upper) = self.stream.size_hint(); 66 67 let lower = cmp::min(lower, self.remaining as usize); 68 69 let upper = match upper { 70 Some(x) if x < self.remaining as usize => Some(x), 71 _ => Some(self.remaining as usize), 72 }; 73 74 (lower, upper) 75 } 76 } 77