1 use futures_core::ready; 2 use pin_project_lite::pin_project; 3 use std::io::{IoSlice, Result}; 4 use std::pin::Pin; 5 use std::task::{Context, Poll}; 6 7 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; 8 9 pin_project! { 10 /// An adapter that lets you inspect the data that's being read. 11 /// 12 /// This is useful for things like hashing data as it's read in. 13 pub struct InspectReader<R, F> { 14 #[pin] 15 reader: R, 16 f: F, 17 } 18 } 19 20 impl<R, F> InspectReader<R, F> { 21 /// Create a new InspectReader, wrapping `reader` and calling `f` for the 22 /// new data supplied by each read call. 23 /// 24 /// The closure will only be called with an empty slice if the inner reader 25 /// returns without reading data into the buffer. This happens at EOF, or if 26 /// `poll_read` is called with a zero-size buffer. new(reader: R, f: F) -> InspectReader<R, F> where R: AsyncRead, F: FnMut(&[u8]),27 pub fn new(reader: R, f: F) -> InspectReader<R, F> 28 where 29 R: AsyncRead, 30 F: FnMut(&[u8]), 31 { 32 InspectReader { reader, f } 33 } 34 35 /// Consumes the `InspectReader`, returning the wrapped reader into_inner(self) -> R36 pub fn into_inner(self) -> R { 37 self.reader 38 } 39 } 40 41 impl<R: AsyncRead, F: FnMut(&[u8])> AsyncRead for InspectReader<R, F> { poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<Result<()>>42 fn poll_read( 43 self: Pin<&mut Self>, 44 cx: &mut Context<'_>, 45 buf: &mut ReadBuf<'_>, 46 ) -> Poll<Result<()>> { 47 let me = self.project(); 48 let filled_length = buf.filled().len(); 49 ready!(me.reader.poll_read(cx, buf))?; 50 (me.f)(&buf.filled()[filled_length..]); 51 Poll::Ready(Ok(())) 52 } 53 } 54 55 pin_project! { 56 /// An adapter that lets you inspect the data that's being written. 57 /// 58 /// This is useful for things like hashing data as it's written out. 59 pub struct InspectWriter<W, F> { 60 #[pin] 61 writer: W, 62 f: F, 63 } 64 } 65 66 impl<W, F> InspectWriter<W, F> { 67 /// Create a new InspectWriter, wrapping `write` and calling `f` for the 68 /// data successfully written by each write call. 69 /// 70 /// The closure `f` will never be called with an empty slice. A vectored 71 /// write can result in multiple calls to `f` - at most one call to `f` per 72 /// buffer supplied to `poll_write_vectored`. new(writer: W, f: F) -> InspectWriter<W, F> where W: AsyncWrite, F: FnMut(&[u8]),73 pub fn new(writer: W, f: F) -> InspectWriter<W, F> 74 where 75 W: AsyncWrite, 76 F: FnMut(&[u8]), 77 { 78 InspectWriter { writer, f } 79 } 80 81 /// Consumes the `InspectWriter`, returning the wrapped writer into_inner(self) -> W82 pub fn into_inner(self) -> W { 83 self.writer 84 } 85 } 86 87 impl<W: AsyncWrite, F: FnMut(&[u8])> AsyncWrite for InspectWriter<W, F> { poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>88 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { 89 let me = self.project(); 90 let res = me.writer.poll_write(cx, buf); 91 if let Poll::Ready(Ok(count)) = res { 92 if count != 0 { 93 (me.f)(&buf[..count]); 94 } 95 } 96 res 97 } 98 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>99 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 100 let me = self.project(); 101 me.writer.poll_flush(cx) 102 } 103 poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>104 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 105 let me = self.project(); 106 me.writer.poll_shutdown(cx) 107 } 108 poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize>>109 fn poll_write_vectored( 110 self: Pin<&mut Self>, 111 cx: &mut Context<'_>, 112 bufs: &[IoSlice<'_>], 113 ) -> Poll<Result<usize>> { 114 let me = self.project(); 115 let res = me.writer.poll_write_vectored(cx, bufs); 116 if let Poll::Ready(Ok(mut count)) = res { 117 for buf in bufs { 118 if count == 0 { 119 break; 120 } 121 let size = count.min(buf.len()); 122 if size != 0 { 123 (me.f)(&buf[..size]); 124 count -= size; 125 } 126 } 127 } 128 res 129 } 130 is_write_vectored(&self) -> bool131 fn is_write_vectored(&self) -> bool { 132 self.writer.is_write_vectored() 133 } 134 } 135