1 use crate::codec::decoder::Decoder; 2 use crate::codec::encoder::Encoder; 3 use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame}; 4 5 use futures_core::Stream; 6 use tokio::io::{AsyncRead, AsyncWrite}; 7 8 use bytes::BytesMut; 9 use futures_sink::Sink; 10 use pin_project_lite::pin_project; 11 use std::fmt; 12 use std::io; 13 use std::pin::Pin; 14 use std::task::{Context, Poll}; 15 16 pin_project! { 17 /// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using 18 /// the `Encoder` and `Decoder` traits to encode and decode frames. 19 /// 20 /// You can create a `Framed` instance by using the [`Decoder::framed`] adapter, or 21 /// by using the `new` function seen below. 22 /// 23 /// [`Stream`]: futures_core::Stream 24 /// [`Sink`]: futures_sink::Sink 25 /// [`AsyncRead`]: tokio::io::AsyncRead 26 /// [`Decoder::framed`]: crate::codec::Decoder::framed() 27 pub struct Framed<T, U> { 28 #[pin] 29 inner: FramedImpl<T, U, RWFrames> 30 } 31 } 32 33 impl<T, U> Framed<T, U> 34 where 35 T: AsyncRead + AsyncWrite, 36 { 37 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this 38 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data. 39 /// 40 /// Raw I/O objects work with byte sequences, but higher-level code usually 41 /// wants to batch these into meaningful chunks, called "frames". This 42 /// method layers framing on top of an I/O object, by using the codec 43 /// traits to handle encoding and decoding of messages frames. Note that 44 /// the incoming and outgoing frame types may be distinct. 45 /// 46 /// This function returns a *single* object that is both [`Stream`] and 47 /// [`Sink`]; grouping this into a single object is often useful for layering 48 /// things like gzip or TLS, which require both read and write access to the 49 /// underlying object. 50 /// 51 /// If you want to work more directly with the streams and sink, consider 52 /// calling [`split`] on the `Framed` returned by this method, which will 53 /// break them into separate objects, allowing them to interact more easily. 54 /// 55 /// Note that, for some byte sources, the stream can be resumed after an EOF 56 /// by reading from it, even after it has returned `None`. Repeated attempts 57 /// to do so, without new data available, continue to return `None` without 58 /// creating more (closing) frames. 59 /// 60 /// [`Stream`]: futures_core::Stream 61 /// [`Sink`]: futures_sink::Sink 62 /// [`Decode`]: crate::codec::Decoder 63 /// [`Encoder`]: crate::codec::Encoder 64 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split new(inner: T, codec: U) -> Framed<T, U>65 pub fn new(inner: T, codec: U) -> Framed<T, U> { 66 Framed { 67 inner: FramedImpl { 68 inner, 69 codec, 70 state: Default::default(), 71 }, 72 } 73 } 74 75 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this 76 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data, 77 /// with a specific read buffer initial capacity. 78 /// 79 /// Raw I/O objects work with byte sequences, but higher-level code usually 80 /// wants to batch these into meaningful chunks, called "frames". This 81 /// method layers framing on top of an I/O object, by using the codec 82 /// traits to handle encoding and decoding of messages frames. Note that 83 /// the incoming and outgoing frame types may be distinct. 84 /// 85 /// This function returns a *single* object that is both [`Stream`] and 86 /// [`Sink`]; grouping this into a single object is often useful for layering 87 /// things like gzip or TLS, which require both read and write access to the 88 /// underlying object. 89 /// 90 /// If you want to work more directly with the streams and sink, consider 91 /// calling [`split`] on the `Framed` returned by this method, which will 92 /// break them into separate objects, allowing them to interact more easily. 93 /// 94 /// [`Stream`]: futures_core::Stream 95 /// [`Sink`]: futures_sink::Sink 96 /// [`Decode`]: crate::codec::Decoder 97 /// [`Encoder`]: crate::codec::Encoder 98 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U>99 pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> { 100 Framed { 101 inner: FramedImpl { 102 inner, 103 codec, 104 state: RWFrames { 105 read: ReadFrame { 106 eof: false, 107 is_readable: false, 108 buffer: BytesMut::with_capacity(capacity), 109 has_errored: false, 110 }, 111 write: WriteFrame::default(), 112 }, 113 }, 114 } 115 } 116 } 117 118 impl<T, U> Framed<T, U> { 119 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this 120 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data. 121 /// 122 /// Raw I/O objects work with byte sequences, but higher-level code usually 123 /// wants to batch these into meaningful chunks, called "frames". This 124 /// method layers framing on top of an I/O object, by using the `Codec` 125 /// traits to handle encoding and decoding of messages frames. Note that 126 /// the incoming and outgoing frame types may be distinct. 127 /// 128 /// This function returns a *single* object that is both [`Stream`] and 129 /// [`Sink`]; grouping this into a single object is often useful for layering 130 /// things like gzip or TLS, which require both read and write access to the 131 /// underlying object. 132 /// 133 /// This objects takes a stream and a readbuffer and a writebuffer. These field 134 /// can be obtained from an existing `Framed` with the [`into_parts`] method. 135 /// 136 /// If you want to work more directly with the streams and sink, consider 137 /// calling [`split`] on the `Framed` returned by this method, which will 138 /// break them into separate objects, allowing them to interact more easily. 139 /// 140 /// [`Stream`]: futures_core::Stream 141 /// [`Sink`]: futures_sink::Sink 142 /// [`Decoder`]: crate::codec::Decoder 143 /// [`Encoder`]: crate::codec::Encoder 144 /// [`into_parts`]: crate::codec::Framed::into_parts() 145 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split from_parts(parts: FramedParts<T, U>) -> Framed<T, U>146 pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> { 147 Framed { 148 inner: FramedImpl { 149 inner: parts.io, 150 codec: parts.codec, 151 state: RWFrames { 152 read: parts.read_buf.into(), 153 write: parts.write_buf.into(), 154 }, 155 }, 156 } 157 } 158 159 /// Returns a reference to the underlying I/O stream wrapped by 160 /// `Framed`. 161 /// 162 /// Note that care should be taken to not tamper with the underlying stream 163 /// of data coming in as it may corrupt the stream of frames otherwise 164 /// being worked with. get_ref(&self) -> &T165 pub fn get_ref(&self) -> &T { 166 &self.inner.inner 167 } 168 169 /// Returns a mutable reference to the underlying I/O stream wrapped by 170 /// `Framed`. 171 /// 172 /// Note that care should be taken to not tamper with the underlying stream 173 /// of data coming in as it may corrupt the stream of frames otherwise 174 /// being worked with. get_mut(&mut self) -> &mut T175 pub fn get_mut(&mut self) -> &mut T { 176 &mut self.inner.inner 177 } 178 179 /// Returns a pinned mutable reference to the underlying I/O stream wrapped by 180 /// `Framed`. 181 /// 182 /// Note that care should be taken to not tamper with the underlying stream 183 /// of data coming in as it may corrupt the stream of frames otherwise 184 /// being worked with. get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T>185 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> { 186 self.project().inner.project().inner 187 } 188 189 /// Returns a reference to the underlying codec wrapped by 190 /// `Framed`. 191 /// 192 /// Note that care should be taken to not tamper with the underlying codec 193 /// as it may corrupt the stream of frames otherwise being worked with. codec(&self) -> &U194 pub fn codec(&self) -> &U { 195 &self.inner.codec 196 } 197 198 /// Returns a mutable reference to the underlying codec wrapped by 199 /// `Framed`. 200 /// 201 /// Note that care should be taken to not tamper with the underlying codec 202 /// as it may corrupt the stream of frames otherwise being worked with. codec_mut(&mut self) -> &mut U203 pub fn codec_mut(&mut self) -> &mut U { 204 &mut self.inner.codec 205 } 206 207 /// Maps the codec `U` to `C`, preserving the read and write buffers 208 /// wrapped by `Framed`. 209 /// 210 /// Note that care should be taken to not tamper with the underlying codec 211 /// as it may corrupt the stream of frames otherwise being worked with. map_codec<C, F>(self, map: F) -> Framed<T, C> where F: FnOnce(U) -> C,212 pub fn map_codec<C, F>(self, map: F) -> Framed<T, C> 213 where 214 F: FnOnce(U) -> C, 215 { 216 // This could be potentially simplified once rust-lang/rust#86555 hits stable 217 let parts = self.into_parts(); 218 Framed::from_parts(FramedParts { 219 io: parts.io, 220 codec: map(parts.codec), 221 read_buf: parts.read_buf, 222 write_buf: parts.write_buf, 223 _priv: (), 224 }) 225 } 226 227 /// Returns a mutable reference to the underlying codec wrapped by 228 /// `Framed`. 229 /// 230 /// Note that care should be taken to not tamper with the underlying codec 231 /// as it may corrupt the stream of frames otherwise being worked with. codec_pin_mut(self: Pin<&mut Self>) -> &mut U232 pub fn codec_pin_mut(self: Pin<&mut Self>) -> &mut U { 233 self.project().inner.project().codec 234 } 235 236 /// Returns a reference to the read buffer. read_buffer(&self) -> &BytesMut237 pub fn read_buffer(&self) -> &BytesMut { 238 &self.inner.state.read.buffer 239 } 240 241 /// Returns a mutable reference to the read buffer. read_buffer_mut(&mut self) -> &mut BytesMut242 pub fn read_buffer_mut(&mut self) -> &mut BytesMut { 243 &mut self.inner.state.read.buffer 244 } 245 246 /// Returns a reference to the write buffer. write_buffer(&self) -> &BytesMut247 pub fn write_buffer(&self) -> &BytesMut { 248 &self.inner.state.write.buffer 249 } 250 251 /// Returns a mutable reference to the write buffer. write_buffer_mut(&mut self) -> &mut BytesMut252 pub fn write_buffer_mut(&mut self) -> &mut BytesMut { 253 &mut self.inner.state.write.buffer 254 } 255 256 /// Returns backpressure boundary backpressure_boundary(&self) -> usize257 pub fn backpressure_boundary(&self) -> usize { 258 self.inner.state.write.backpressure_boundary 259 } 260 261 /// Updates backpressure boundary set_backpressure_boundary(&mut self, boundary: usize)262 pub fn set_backpressure_boundary(&mut self, boundary: usize) { 263 self.inner.state.write.backpressure_boundary = boundary; 264 } 265 266 /// Consumes the `Framed`, returning its underlying I/O stream. 267 /// 268 /// Note that care should be taken to not tamper with the underlying stream 269 /// of data coming in as it may corrupt the stream of frames otherwise 270 /// being worked with. into_inner(self) -> T271 pub fn into_inner(self) -> T { 272 self.inner.inner 273 } 274 275 /// Consumes the `Framed`, returning its underlying I/O stream, the buffer 276 /// with unprocessed data, and the codec. 277 /// 278 /// Note that care should be taken to not tamper with the underlying stream 279 /// of data coming in as it may corrupt the stream of frames otherwise 280 /// being worked with. into_parts(self) -> FramedParts<T, U>281 pub fn into_parts(self) -> FramedParts<T, U> { 282 FramedParts { 283 io: self.inner.inner, 284 codec: self.inner.codec, 285 read_buf: self.inner.state.read.buffer, 286 write_buf: self.inner.state.write.buffer, 287 _priv: (), 288 } 289 } 290 } 291 292 // This impl just defers to the underlying FramedImpl 293 impl<T, U> Stream for Framed<T, U> 294 where 295 T: AsyncRead, 296 U: Decoder, 297 { 298 type Item = Result<U::Item, U::Error>; 299 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>300 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 301 self.project().inner.poll_next(cx) 302 } 303 } 304 305 // This impl just defers to the underlying FramedImpl 306 impl<T, I, U> Sink<I> for Framed<T, U> 307 where 308 T: AsyncWrite, 309 U: Encoder<I>, 310 U::Error: From<io::Error>, 311 { 312 type Error = U::Error; 313 poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>314 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 315 self.project().inner.poll_ready(cx) 316 } 317 start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error>318 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { 319 self.project().inner.start_send(item) 320 } 321 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>322 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 323 self.project().inner.poll_flush(cx) 324 } 325 poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>326 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 327 self.project().inner.poll_close(cx) 328 } 329 } 330 331 impl<T, U> fmt::Debug for Framed<T, U> 332 where 333 T: fmt::Debug, 334 U: fmt::Debug, 335 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result336 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 337 f.debug_struct("Framed") 338 .field("io", self.get_ref()) 339 .field("codec", self.codec()) 340 .finish() 341 } 342 } 343 344 /// `FramedParts` contains an export of the data of a Framed transport. 345 /// It can be used to construct a new [`Framed`] with a different codec. 346 /// It contains all current buffers and the inner transport. 347 /// 348 /// [`Framed`]: crate::codec::Framed 349 #[derive(Debug)] 350 #[allow(clippy::manual_non_exhaustive)] 351 pub struct FramedParts<T, U> { 352 /// The inner transport used to read bytes to and write bytes to 353 pub io: T, 354 355 /// The codec 356 pub codec: U, 357 358 /// The buffer with read but unprocessed data. 359 pub read_buf: BytesMut, 360 361 /// A buffer with unprocessed data which are not written yet. 362 pub write_buf: BytesMut, 363 364 /// This private field allows us to add additional fields in the future in a 365 /// backwards compatible way. 366 _priv: (), 367 } 368 369 impl<T, U> FramedParts<T, U> { 370 /// Create a new, default, `FramedParts` new<I>(io: T, codec: U) -> FramedParts<T, U> where U: Encoder<I>,371 pub fn new<I>(io: T, codec: U) -> FramedParts<T, U> 372 where 373 U: Encoder<I>, 374 { 375 FramedParts { 376 io, 377 codec, 378 read_buf: BytesMut::new(), 379 write_buf: BytesMut::new(), 380 _priv: (), 381 } 382 } 383 } 384