• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::codec::decoder::Decoder;
2 use crate::codec::encoder::Encoder;
3 use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
4 
5 use futures_core::Stream;
6 use tokio::io::{AsyncRead, AsyncWrite};
7 
8 use bytes::BytesMut;
9 use futures_sink::Sink;
10 use pin_project_lite::pin_project;
11 use std::fmt;
12 use std::io;
13 use std::pin::Pin;
14 use std::task::{Context, Poll};
15 
16 pin_project! {
17     /// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using
18     /// the `Encoder` and `Decoder` traits to encode and decode frames.
19     ///
20     /// You can create a `Framed` instance by using the [`Decoder::framed`] adapter, or
21     /// by using the `new` function seen below.
22     ///
23     /// [`Stream`]: futures_core::Stream
24     /// [`Sink`]: futures_sink::Sink
25     /// [`AsyncRead`]: tokio::io::AsyncRead
26     /// [`Decoder::framed`]: crate::codec::Decoder::framed()
27     pub struct Framed<T, U> {
28         #[pin]
29         inner: FramedImpl<T, U, RWFrames>
30     }
31 }
32 
33 impl<T, U> Framed<T, U>
34 where
35     T: AsyncRead + AsyncWrite,
36 {
37     /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
38     /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
39     ///
40     /// Raw I/O objects work with byte sequences, but higher-level code usually
41     /// wants to batch these into meaningful chunks, called "frames". This
42     /// method layers framing on top of an I/O object, by using the codec
43     /// traits to handle encoding and decoding of messages frames. Note that
44     /// the incoming and outgoing frame types may be distinct.
45     ///
46     /// This function returns a *single* object that is both [`Stream`] and
47     /// [`Sink`]; grouping this into a single object is often useful for layering
48     /// things like gzip or TLS, which require both read and write access to the
49     /// underlying object.
50     ///
51     /// If you want to work more directly with the streams and sink, consider
52     /// calling [`split`] on the `Framed` returned by this method, which will
53     /// break them into separate objects, allowing them to interact more easily.
54     ///
55     /// Note that, for some byte sources, the stream can be resumed after an EOF
56     /// by reading from it, even after it has returned `None`. Repeated attempts
57     /// to do so, without new data available, continue to return `None` without
58     /// creating more (closing) frames.
59     ///
60     /// [`Stream`]: futures_core::Stream
61     /// [`Sink`]: futures_sink::Sink
62     /// [`Decode`]: crate::codec::Decoder
63     /// [`Encoder`]: crate::codec::Encoder
64     /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
new(inner: T, codec: U) -> Framed<T, U>65     pub fn new(inner: T, codec: U) -> Framed<T, U> {
66         Framed {
67             inner: FramedImpl {
68                 inner,
69                 codec,
70                 state: Default::default(),
71             },
72         }
73     }
74 
75     /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
76     /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data,
77     /// with a specific read buffer initial capacity.
78     ///
79     /// Raw I/O objects work with byte sequences, but higher-level code usually
80     /// wants to batch these into meaningful chunks, called "frames". This
81     /// method layers framing on top of an I/O object, by using the codec
82     /// traits to handle encoding and decoding of messages frames. Note that
83     /// the incoming and outgoing frame types may be distinct.
84     ///
85     /// This function returns a *single* object that is both [`Stream`] and
86     /// [`Sink`]; grouping this into a single object is often useful for layering
87     /// things like gzip or TLS, which require both read and write access to the
88     /// underlying object.
89     ///
90     /// If you want to work more directly with the streams and sink, consider
91     /// calling [`split`] on the `Framed` returned by this method, which will
92     /// break them into separate objects, allowing them to interact more easily.
93     ///
94     /// [`Stream`]: futures_core::Stream
95     /// [`Sink`]: futures_sink::Sink
96     /// [`Decode`]: crate::codec::Decoder
97     /// [`Encoder`]: crate::codec::Encoder
98     /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U>99     pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> {
100         Framed {
101             inner: FramedImpl {
102                 inner,
103                 codec,
104                 state: RWFrames {
105                     read: ReadFrame {
106                         eof: false,
107                         is_readable: false,
108                         buffer: BytesMut::with_capacity(capacity),
109                         has_errored: false,
110                     },
111                     write: WriteFrame::default(),
112                 },
113             },
114         }
115     }
116 }
117 
118 impl<T, U> Framed<T, U> {
119     /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
120     /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
121     ///
122     /// Raw I/O objects work with byte sequences, but higher-level code usually
123     /// wants to batch these into meaningful chunks, called "frames". This
124     /// method layers framing on top of an I/O object, by using the `Codec`
125     /// traits to handle encoding and decoding of messages frames. Note that
126     /// the incoming and outgoing frame types may be distinct.
127     ///
128     /// This function returns a *single* object that is both [`Stream`] and
129     /// [`Sink`]; grouping this into a single object is often useful for layering
130     /// things like gzip or TLS, which require both read and write access to the
131     /// underlying object.
132     ///
133     /// This objects takes a stream and a readbuffer and a writebuffer. These field
134     /// can be obtained from an existing `Framed` with the [`into_parts`] method.
135     ///
136     /// If you want to work more directly with the streams and sink, consider
137     /// calling [`split`] on the `Framed` returned by this method, which will
138     /// break them into separate objects, allowing them to interact more easily.
139     ///
140     /// [`Stream`]: futures_core::Stream
141     /// [`Sink`]: futures_sink::Sink
142     /// [`Decoder`]: crate::codec::Decoder
143     /// [`Encoder`]: crate::codec::Encoder
144     /// [`into_parts`]: crate::codec::Framed::into_parts()
145     /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
from_parts(parts: FramedParts<T, U>) -> Framed<T, U>146     pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
147         Framed {
148             inner: FramedImpl {
149                 inner: parts.io,
150                 codec: parts.codec,
151                 state: RWFrames {
152                     read: parts.read_buf.into(),
153                     write: parts.write_buf.into(),
154                 },
155             },
156         }
157     }
158 
159     /// Returns a reference to the underlying I/O stream wrapped by
160     /// `Framed`.
161     ///
162     /// Note that care should be taken to not tamper with the underlying stream
163     /// of data coming in as it may corrupt the stream of frames otherwise
164     /// being worked with.
get_ref(&self) -> &T165     pub fn get_ref(&self) -> &T {
166         &self.inner.inner
167     }
168 
169     /// Returns a mutable reference to the underlying I/O stream wrapped by
170     /// `Framed`.
171     ///
172     /// Note that care should be taken to not tamper with the underlying stream
173     /// of data coming in as it may corrupt the stream of frames otherwise
174     /// being worked with.
get_mut(&mut self) -> &mut T175     pub fn get_mut(&mut self) -> &mut T {
176         &mut self.inner.inner
177     }
178 
179     /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
180     /// `Framed`.
181     ///
182     /// Note that care should be taken to not tamper with the underlying stream
183     /// of data coming in as it may corrupt the stream of frames otherwise
184     /// being worked with.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T>185     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
186         self.project().inner.project().inner
187     }
188 
189     /// Returns a reference to the underlying codec wrapped by
190     /// `Framed`.
191     ///
192     /// Note that care should be taken to not tamper with the underlying codec
193     /// as it may corrupt the stream of frames otherwise being worked with.
codec(&self) -> &U194     pub fn codec(&self) -> &U {
195         &self.inner.codec
196     }
197 
198     /// Returns a mutable reference to the underlying codec wrapped by
199     /// `Framed`.
200     ///
201     /// Note that care should be taken to not tamper with the underlying codec
202     /// as it may corrupt the stream of frames otherwise being worked with.
codec_mut(&mut self) -> &mut U203     pub fn codec_mut(&mut self) -> &mut U {
204         &mut self.inner.codec
205     }
206 
207     /// Maps the codec `U` to `C`, preserving the read and write buffers
208     /// wrapped by `Framed`.
209     ///
210     /// Note that care should be taken to not tamper with the underlying codec
211     /// as it may corrupt the stream of frames otherwise being worked with.
map_codec<C, F>(self, map: F) -> Framed<T, C> where F: FnOnce(U) -> C,212     pub fn map_codec<C, F>(self, map: F) -> Framed<T, C>
213     where
214         F: FnOnce(U) -> C,
215     {
216         // This could be potentially simplified once rust-lang/rust#86555 hits stable
217         let parts = self.into_parts();
218         Framed::from_parts(FramedParts {
219             io: parts.io,
220             codec: map(parts.codec),
221             read_buf: parts.read_buf,
222             write_buf: parts.write_buf,
223             _priv: (),
224         })
225     }
226 
227     /// Returns a mutable reference to the underlying codec wrapped by
228     /// `Framed`.
229     ///
230     /// Note that care should be taken to not tamper with the underlying codec
231     /// as it may corrupt the stream of frames otherwise being worked with.
codec_pin_mut(self: Pin<&mut Self>) -> &mut U232     pub fn codec_pin_mut(self: Pin<&mut Self>) -> &mut U {
233         self.project().inner.project().codec
234     }
235 
236     /// Returns a reference to the read buffer.
read_buffer(&self) -> &BytesMut237     pub fn read_buffer(&self) -> &BytesMut {
238         &self.inner.state.read.buffer
239     }
240 
241     /// Returns a mutable reference to the read buffer.
read_buffer_mut(&mut self) -> &mut BytesMut242     pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
243         &mut self.inner.state.read.buffer
244     }
245 
246     /// Returns a reference to the write buffer.
write_buffer(&self) -> &BytesMut247     pub fn write_buffer(&self) -> &BytesMut {
248         &self.inner.state.write.buffer
249     }
250 
251     /// Returns a mutable reference to the write buffer.
write_buffer_mut(&mut self) -> &mut BytesMut252     pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
253         &mut self.inner.state.write.buffer
254     }
255 
256     /// Returns backpressure boundary
backpressure_boundary(&self) -> usize257     pub fn backpressure_boundary(&self) -> usize {
258         self.inner.state.write.backpressure_boundary
259     }
260 
261     /// Updates backpressure boundary
set_backpressure_boundary(&mut self, boundary: usize)262     pub fn set_backpressure_boundary(&mut self, boundary: usize) {
263         self.inner.state.write.backpressure_boundary = boundary;
264     }
265 
266     /// Consumes the `Framed`, returning its underlying I/O stream.
267     ///
268     /// Note that care should be taken to not tamper with the underlying stream
269     /// of data coming in as it may corrupt the stream of frames otherwise
270     /// being worked with.
into_inner(self) -> T271     pub fn into_inner(self) -> T {
272         self.inner.inner
273     }
274 
275     /// Consumes the `Framed`, returning its underlying I/O stream, the buffer
276     /// with unprocessed data, and the codec.
277     ///
278     /// Note that care should be taken to not tamper with the underlying stream
279     /// of data coming in as it may corrupt the stream of frames otherwise
280     /// being worked with.
into_parts(self) -> FramedParts<T, U>281     pub fn into_parts(self) -> FramedParts<T, U> {
282         FramedParts {
283             io: self.inner.inner,
284             codec: self.inner.codec,
285             read_buf: self.inner.state.read.buffer,
286             write_buf: self.inner.state.write.buffer,
287             _priv: (),
288         }
289     }
290 }
291 
292 // This impl just defers to the underlying FramedImpl
293 impl<T, U> Stream for Framed<T, U>
294 where
295     T: AsyncRead,
296     U: Decoder,
297 {
298     type Item = Result<U::Item, U::Error>;
299 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>300     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
301         self.project().inner.poll_next(cx)
302     }
303 }
304 
305 // This impl just defers to the underlying FramedImpl
306 impl<T, I, U> Sink<I> for Framed<T, U>
307 where
308     T: AsyncWrite,
309     U: Encoder<I>,
310     U::Error: From<io::Error>,
311 {
312     type Error = U::Error;
313 
poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>314     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
315         self.project().inner.poll_ready(cx)
316     }
317 
start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error>318     fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
319         self.project().inner.start_send(item)
320     }
321 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>322     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
323         self.project().inner.poll_flush(cx)
324     }
325 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>326     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
327         self.project().inner.poll_close(cx)
328     }
329 }
330 
331 impl<T, U> fmt::Debug for Framed<T, U>
332 where
333     T: fmt::Debug,
334     U: fmt::Debug,
335 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result336     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
337         f.debug_struct("Framed")
338             .field("io", self.get_ref())
339             .field("codec", self.codec())
340             .finish()
341     }
342 }
343 
344 /// `FramedParts` contains an export of the data of a Framed transport.
345 /// It can be used to construct a new [`Framed`] with a different codec.
346 /// It contains all current buffers and the inner transport.
347 ///
348 /// [`Framed`]: crate::codec::Framed
349 #[derive(Debug)]
350 #[allow(clippy::manual_non_exhaustive)]
351 pub struct FramedParts<T, U> {
352     /// The inner transport used to read bytes to and write bytes to
353     pub io: T,
354 
355     /// The codec
356     pub codec: U,
357 
358     /// The buffer with read but unprocessed data.
359     pub read_buf: BytesMut,
360 
361     /// A buffer with unprocessed data which are not written yet.
362     pub write_buf: BytesMut,
363 
364     /// This private field allows us to add additional fields in the future in a
365     /// backwards compatible way.
366     _priv: (),
367 }
368 
369 impl<T, U> FramedParts<T, U> {
370     /// Create a new, default, `FramedParts`
new<I>(io: T, codec: U) -> FramedParts<T, U> where U: Encoder<I>,371     pub fn new<I>(io: T, codec: U) -> FramedParts<T, U>
372     where
373         U: Encoder<I>,
374     {
375         FramedParts {
376             io,
377             codec,
378             read_buf: BytesMut::new(),
379             write_buf: BytesMut::new(),
380             _priv: (),
381         }
382     }
383 }
384