• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use futures_core::ready;
2 use pin_project_lite::pin_project;
3 use std::io::{IoSlice, Result};
4 use std::pin::Pin;
5 use std::task::{Context, Poll};
6 
7 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
8 
9 pin_project! {
10     /// An adapter that lets you inspect the data that's being read.
11     ///
12     /// This is useful for things like hashing data as it's read in.
13     pub struct InspectReader<R, F> {
14         #[pin]
15         reader: R,
16         f: F,
17     }
18 }
19 
20 impl<R, F> InspectReader<R, F> {
21     /// Create a new InspectReader, wrapping `reader` and calling `f` for the
22     /// new data supplied by each read call.
23     ///
24     /// The closure will only be called with an empty slice if the inner reader
25     /// returns without reading data into the buffer. This happens at EOF, or if
26     /// `poll_read` is called with a zero-size buffer.
new(reader: R, f: F) -> InspectReader<R, F> where R: AsyncRead, F: FnMut(&[u8]),27     pub fn new(reader: R, f: F) -> InspectReader<R, F>
28     where
29         R: AsyncRead,
30         F: FnMut(&[u8]),
31     {
32         InspectReader { reader, f }
33     }
34 
35     /// Consumes the `InspectReader`, returning the wrapped reader
into_inner(self) -> R36     pub fn into_inner(self) -> R {
37         self.reader
38     }
39 }
40 
41 impl<R: AsyncRead, F: FnMut(&[u8])> AsyncRead for InspectReader<R, F> {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<Result<()>>42     fn poll_read(
43         self: Pin<&mut Self>,
44         cx: &mut Context<'_>,
45         buf: &mut ReadBuf<'_>,
46     ) -> Poll<Result<()>> {
47         let me = self.project();
48         let filled_length = buf.filled().len();
49         ready!(me.reader.poll_read(cx, buf))?;
50         (me.f)(&buf.filled()[filled_length..]);
51         Poll::Ready(Ok(()))
52     }
53 }
54 
55 pin_project! {
56     /// An adapter that lets you inspect the data that's being written.
57     ///
58     /// This is useful for things like hashing data as it's written out.
59     pub struct InspectWriter<W, F> {
60         #[pin]
61         writer: W,
62         f: F,
63     }
64 }
65 
66 impl<W, F> InspectWriter<W, F> {
67     /// Create a new InspectWriter, wrapping `write` and calling `f` for the
68     /// data successfully written by each write call.
69     ///
70     /// The closure `f` will never be called with an empty slice. A vectored
71     /// write can result in multiple calls to `f` - at most one call to `f` per
72     /// buffer supplied to `poll_write_vectored`.
new(writer: W, f: F) -> InspectWriter<W, F> where W: AsyncWrite, F: FnMut(&[u8]),73     pub fn new(writer: W, f: F) -> InspectWriter<W, F>
74     where
75         W: AsyncWrite,
76         F: FnMut(&[u8]),
77     {
78         InspectWriter { writer, f }
79     }
80 
81     /// Consumes the `InspectWriter`, returning the wrapped writer
into_inner(self) -> W82     pub fn into_inner(self) -> W {
83         self.writer
84     }
85 }
86 
87 impl<W: AsyncWrite, F: FnMut(&[u8])> AsyncWrite for InspectWriter<W, F> {
poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>88     fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
89         let me = self.project();
90         let res = me.writer.poll_write(cx, buf);
91         if let Poll::Ready(Ok(count)) = res {
92             if count != 0 {
93                 (me.f)(&buf[..count]);
94             }
95         }
96         res
97     }
98 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>99     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
100         let me = self.project();
101         me.writer.poll_flush(cx)
102     }
103 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>104     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
105         let me = self.project();
106         me.writer.poll_shutdown(cx)
107     }
108 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize>>109     fn poll_write_vectored(
110         self: Pin<&mut Self>,
111         cx: &mut Context<'_>,
112         bufs: &[IoSlice<'_>],
113     ) -> Poll<Result<usize>> {
114         let me = self.project();
115         let res = me.writer.poll_write_vectored(cx, bufs);
116         if let Poll::Ready(Ok(mut count)) = res {
117             for buf in bufs {
118                 if count == 0 {
119                     break;
120                 }
121                 let size = count.min(buf.len());
122                 if size != 0 {
123                     (me.f)(&buf[..size]);
124                     count -= size;
125                 }
126             }
127         }
128         res
129     }
130 
is_write_vectored(&self) -> bool131     fn is_write_vectored(&self) -> bool {
132         self.writer.is_write_vectored()
133     }
134 }
135