1 use super::read_line::read_line_internal; 2 use futures_core::ready; 3 use futures_core::stream::Stream; 4 use futures_core::task::{Context, Poll}; 5 use futures_io::AsyncBufRead; 6 use pin_project_lite::pin_project; 7 use std::io; 8 use std::mem; 9 use std::pin::Pin; 10 11 pin_project! { 12 /// Stream for the [`lines`](super::AsyncBufReadExt::lines) method. 13 #[derive(Debug)] 14 #[must_use = "streams do nothing unless polled"] 15 pub struct Lines<R> { 16 #[pin] 17 reader: R, 18 buf: String, 19 bytes: Vec<u8>, 20 read: usize, 21 } 22 } 23 24 impl<R: AsyncBufRead> Lines<R> { new(reader: R) -> Self25 pub(super) fn new(reader: R) -> Self { 26 Self { reader, buf: String::new(), bytes: Vec::new(), read: 0 } 27 } 28 } 29 30 impl<R: AsyncBufRead> Stream for Lines<R> { 31 type Item = io::Result<String>; 32 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>33 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 34 let this = self.project(); 35 let n = ready!(read_line_internal(this.reader, cx, this.buf, this.bytes, this.read))?; 36 if n == 0 && this.buf.is_empty() { 37 return Poll::Ready(None); 38 } 39 if this.buf.ends_with('\n') { 40 this.buf.pop(); 41 if this.buf.ends_with('\r') { 42 this.buf.pop(); 43 } 44 } 45 Poll::Ready(Some(Ok(mem::replace(this.buf, String::new())))) 46 } 47 } 48