• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::codec::framed_impl::{FramedImpl, ReadFrame};
2 use crate::codec::Decoder;
3 
4 use futures_core::Stream;
5 use tokio::io::AsyncRead;
6 
7 use bytes::BytesMut;
8 use futures_sink::Sink;
9 use pin_project_lite::pin_project;
10 use std::fmt;
11 use std::pin::Pin;
12 use std::task::{Context, Poll};
13 
14 pin_project! {
15     /// A [`Stream`] of messages decoded from an [`AsyncRead`].
16     ///
17     /// [`Stream`]: futures_core::Stream
18     /// [`AsyncRead`]: tokio::io::AsyncRead
19     pub struct FramedRead<T, D> {
20         #[pin]
21         inner: FramedImpl<T, D, ReadFrame>,
22     }
23 }
24 
25 // ===== impl FramedRead =====
26 
27 impl<T, D> FramedRead<T, D>
28 where
29     T: AsyncRead,
30     D: Decoder,
31 {
32     /// Creates a new `FramedRead` with the given `decoder`.
new(inner: T, decoder: D) -> FramedRead<T, D>33     pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
34         FramedRead {
35             inner: FramedImpl {
36                 inner,
37                 codec: decoder,
38                 state: Default::default(),
39             },
40         }
41     }
42 
43     /// Creates a new `FramedRead` with the given `decoder` and a buffer of `capacity`
44     /// initial size.
with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D>45     pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D> {
46         FramedRead {
47             inner: FramedImpl {
48                 inner,
49                 codec: decoder,
50                 state: ReadFrame {
51                     eof: false,
52                     is_readable: false,
53                     buffer: BytesMut::with_capacity(capacity),
54                     has_errored: false,
55                 },
56             },
57         }
58     }
59 }
60 
61 impl<T, D> FramedRead<T, D> {
62     /// Returns a reference to the underlying I/O stream wrapped by
63     /// `FramedRead`.
64     ///
65     /// Note that care should be taken to not tamper with the underlying stream
66     /// of data coming in as it may corrupt the stream of frames otherwise
67     /// being worked with.
get_ref(&self) -> &T68     pub fn get_ref(&self) -> &T {
69         &self.inner.inner
70     }
71 
72     /// Returns a mutable reference to the underlying I/O stream wrapped by
73     /// `FramedRead`.
74     ///
75     /// Note that care should be taken to not tamper with the underlying stream
76     /// of data coming in as it may corrupt the stream of frames otherwise
77     /// being worked with.
get_mut(&mut self) -> &mut T78     pub fn get_mut(&mut self) -> &mut T {
79         &mut self.inner.inner
80     }
81 
82     /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
83     /// `FramedRead`.
84     ///
85     /// Note that care should be taken to not tamper with the underlying stream
86     /// of data coming in as it may corrupt the stream of frames otherwise
87     /// being worked with.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T>88     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
89         self.project().inner.project().inner
90     }
91 
92     /// Consumes the `FramedRead`, returning its underlying I/O stream.
93     ///
94     /// Note that care should be taken to not tamper with the underlying stream
95     /// of data coming in as it may corrupt the stream of frames otherwise
96     /// being worked with.
into_inner(self) -> T97     pub fn into_inner(self) -> T {
98         self.inner.inner
99     }
100 
101     /// Returns a reference to the underlying decoder.
decoder(&self) -> &D102     pub fn decoder(&self) -> &D {
103         &self.inner.codec
104     }
105 
106     /// Returns a mutable reference to the underlying decoder.
decoder_mut(&mut self) -> &mut D107     pub fn decoder_mut(&mut self) -> &mut D {
108         &mut self.inner.codec
109     }
110 
111     /// Maps the decoder `D` to `C`, preserving the read buffer
112     /// wrapped by `Framed`.
map_decoder<C, F>(self, map: F) -> FramedRead<T, C> where F: FnOnce(D) -> C,113     pub fn map_decoder<C, F>(self, map: F) -> FramedRead<T, C>
114     where
115         F: FnOnce(D) -> C,
116     {
117         // This could be potentially simplified once rust-lang/rust#86555 hits stable
118         let FramedImpl {
119             inner,
120             state,
121             codec,
122         } = self.inner;
123         FramedRead {
124             inner: FramedImpl {
125                 inner,
126                 state,
127                 codec: map(codec),
128             },
129         }
130     }
131 
132     /// Returns a mutable reference to the underlying decoder.
decoder_pin_mut(self: Pin<&mut Self>) -> &mut D133     pub fn decoder_pin_mut(self: Pin<&mut Self>) -> &mut D {
134         self.project().inner.project().codec
135     }
136 
137     /// Returns a reference to the read buffer.
read_buffer(&self) -> &BytesMut138     pub fn read_buffer(&self) -> &BytesMut {
139         &self.inner.state.buffer
140     }
141 
142     /// Returns a mutable reference to the read buffer.
read_buffer_mut(&mut self) -> &mut BytesMut143     pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
144         &mut self.inner.state.buffer
145     }
146 }
147 
148 // This impl just defers to the underlying FramedImpl
149 impl<T, D> Stream for FramedRead<T, D>
150 where
151     T: AsyncRead,
152     D: Decoder,
153 {
154     type Item = Result<D::Item, D::Error>;
155 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>156     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
157         self.project().inner.poll_next(cx)
158     }
159 }
160 
161 // This impl just defers to the underlying T: Sink
162 impl<T, I, D> Sink<I> for FramedRead<T, D>
163 where
164     T: Sink<I>,
165 {
166     type Error = T::Error;
167 
poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>168     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
169         self.project().inner.project().inner.poll_ready(cx)
170     }
171 
start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error>172     fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
173         self.project().inner.project().inner.start_send(item)
174     }
175 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>176     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
177         self.project().inner.project().inner.poll_flush(cx)
178     }
179 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>180     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
181         self.project().inner.project().inner.poll_close(cx)
182     }
183 }
184 
185 impl<T, D> fmt::Debug for FramedRead<T, D>
186 where
187     T: fmt::Debug,
188     D: fmt::Debug,
189 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result190     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191         f.debug_struct("FramedRead")
192             .field("inner", &self.get_ref())
193             .field("decoder", &self.decoder())
194             .field("eof", &self.inner.state.eof)
195             .field("is_readable", &self.inner.state.is_readable)
196             .field("buffer", &self.read_buffer())
197             .finish()
198     }
199 }
200