1 //! Adaptors from AsyncRead/AsyncWrite to Stream/Sink 2 //! 3 //! Raw I/O objects work with byte sequences, but higher-level code usually 4 //! wants to batch these into meaningful chunks, called "frames". 5 //! 6 //! This module contains adapters to go from streams of bytes, [`AsyncRead`] and 7 //! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. 8 //! Framed streams are also known as transports. 9 //! 10 //! # The Decoder trait 11 //! 12 //! A [`Decoder`] is used together with [`FramedRead`] or [`Framed`] to turn an 13 //! [`AsyncRead`] into a [`Stream`]. The job of the decoder trait is to specify 14 //! how sequences of bytes are turned into a sequence of frames, and to 15 //! determine where the boundaries between frames are. The job of the 16 //! `FramedRead` is to repeatedly switch between reading more data from the IO 17 //! resource, and asking the decoder whether we have received enough data to 18 //! decode another frame of data. 19 //! 20 //! The main method on the `Decoder` trait is the [`decode`] method. This method 21 //! takes as argument the data that has been read so far, and when it is called, 22 //! it will be in one of the following situations: 23 //! 24 //! 1. The buffer contains less than a full frame. 25 //! 2. The buffer contains exactly a full frame. 26 //! 3. The buffer contains more than a full frame. 27 //! 28 //! In the first situation, the decoder should return `Ok(None)`. 29 //! 30 //! In the second situation, the decoder should clear the provided buffer and 31 //! return `Ok(Some(the_decoded_frame))`. 32 //! 33 //! In the third situation, the decoder should use a method such as [`split_to`] 34 //! or [`advance`] to modify the buffer such that the frame is removed from the 35 //! buffer, but any data in the buffer after that frame should still remain in 36 //! the buffer. The decoder should also return `Ok(Some(the_decoded_frame))` in 37 //! this case. 38 //! 39 //! Finally the decoder may return an error if the data is invalid in some way. 40 //! The decoder should _not_ return an error just because it has yet to receive 41 //! a full frame. 42 //! 43 //! It is guaranteed that, from one call to `decode` to another, the provided 44 //! buffer will contain the exact same data as before, except that if more data 45 //! has arrived through the IO resource, that data will have been appended to 46 //! the buffer. This means that reading frames from a `FramedRead` is 47 //! essentially equivalent to the following loop: 48 //! 49 //! ```no_run 50 //! use tokio::io::AsyncReadExt; 51 //! # // This uses async_stream to create an example that compiles. 52 //! # fn foo() -> impl futures_core::Stream<Item = std::io::Result<bytes::BytesMut>> { async_stream::try_stream! { 53 //! # use tokio_util::codec::Decoder; 54 //! # let mut decoder = tokio_util::codec::BytesCodec::new(); 55 //! # let io_resource = &mut &[0u8, 1, 2, 3][..]; 56 //! 57 //! let mut buf = bytes::BytesMut::new(); 58 //! loop { 59 //! // The read_buf call will append to buf rather than overwrite existing data. 60 //! let len = io_resource.read_buf(&mut buf).await?; 61 //! 62 //! if len == 0 { 63 //! while let Some(frame) = decoder.decode_eof(&mut buf)? { 64 //! yield frame; 65 //! } 66 //! break; 67 //! } 68 //! 69 //! while let Some(frame) = decoder.decode(&mut buf)? { 70 //! yield frame; 71 //! } 72 //! } 73 //! # }} 74 //! ``` 75 //! The example above uses `yield` whenever the `Stream` produces an item. 76 //! 77 //! ## Example decoder 78 //! 79 //! As an example, consider a protocol that can be used to send strings where 80 //! each frame is a four byte integer that contains the length of the frame, 81 //! followed by that many bytes of string data. The decoder fails with an error 82 //! if the string data is not valid utf-8 or too long. 83 //! 84 //! Such a decoder can be written like this: 85 //! ``` 86 //! use tokio_util::codec::Decoder; 87 //! use bytes::{BytesMut, Buf}; 88 //! 89 //! struct MyStringDecoder {} 90 //! 91 //! const MAX: usize = 8 * 1024 * 1024; 92 //! 93 //! impl Decoder for MyStringDecoder { 94 //! type Item = String; 95 //! type Error = std::io::Error; 96 //! 97 //! fn decode( 98 //! &mut self, 99 //! src: &mut BytesMut 100 //! ) -> Result<Option<Self::Item>, Self::Error> { 101 //! if src.len() < 4 { 102 //! // Not enough data to read length marker. 103 //! return Ok(None); 104 //! } 105 //! 106 //! // Read length marker. 107 //! let mut length_bytes = [0u8; 4]; 108 //! length_bytes.copy_from_slice(&src[..4]); 109 //! let length = u32::from_le_bytes(length_bytes) as usize; 110 //! 111 //! // Check that the length is not too large to avoid a denial of 112 //! // service attack where the server runs out of memory. 113 //! if length > MAX { 114 //! return Err(std::io::Error::new( 115 //! std::io::ErrorKind::InvalidData, 116 //! format!("Frame of length {} is too large.", length) 117 //! )); 118 //! } 119 //! 120 //! if src.len() < 4 + length { 121 //! // The full string has not yet arrived. 122 //! // 123 //! // We reserve more space in the buffer. This is not strictly 124 //! // necessary, but is a good idea performance-wise. 125 //! src.reserve(4 + length - src.len()); 126 //! 127 //! // We inform the Framed that we need more bytes to form the next 128 //! // frame. 129 //! return Ok(None); 130 //! } 131 //! 132 //! // Use advance to modify src such that it no longer contains 133 //! // this frame. 134 //! let data = src[4..4 + length].to_vec(); 135 //! src.advance(4 + length); 136 //! 137 //! // Convert the data to a string, or fail if it is not valid utf-8. 138 //! match String::from_utf8(data) { 139 //! Ok(string) => Ok(Some(string)), 140 //! Err(utf8_error) => { 141 //! Err(std::io::Error::new( 142 //! std::io::ErrorKind::InvalidData, 143 //! utf8_error.utf8_error(), 144 //! )) 145 //! }, 146 //! } 147 //! } 148 //! } 149 //! ``` 150 //! 151 //! # The Encoder trait 152 //! 153 //! An [`Encoder`] is used together with [`FramedWrite`] or [`Framed`] to turn 154 //! an [`AsyncWrite`] into a [`Sink`]. The job of the encoder trait is to 155 //! specify how frames are turned into a sequences of bytes. The job of the 156 //! `FramedWrite` is to take the resulting sequence of bytes and write it to the 157 //! IO resource. 158 //! 159 //! The main method on the `Encoder` trait is the [`encode`] method. This method 160 //! takes an item that is being written, and a buffer to write the item to. The 161 //! buffer may already contain data, and in this case, the encoder should append 162 //! the new frame the to buffer rather than overwrite the existing data. 163 //! 164 //! It is guaranteed that, from one call to `encode` to another, the provided 165 //! buffer will contain the exact same data as before, except that some of the 166 //! data may have been removed from the front of the buffer. Writing to a 167 //! `FramedWrite` is essentially equivalent to the following loop: 168 //! 169 //! ```no_run 170 //! use tokio::io::AsyncWriteExt; 171 //! use bytes::Buf; // for advance 172 //! # use tokio_util::codec::Encoder; 173 //! # async fn next_frame() -> bytes::Bytes { bytes::Bytes::new() } 174 //! # async fn no_more_frames() { } 175 //! # #[tokio::main] async fn main() -> std::io::Result<()> { 176 //! # let mut io_resource = tokio::io::sink(); 177 //! # let mut encoder = tokio_util::codec::BytesCodec::new(); 178 //! 179 //! const MAX: usize = 8192; 180 //! 181 //! let mut buf = bytes::BytesMut::new(); 182 //! loop { 183 //! tokio::select! { 184 //! num_written = io_resource.write(&buf), if !buf.is_empty() => { 185 //! buf.advance(num_written?); 186 //! }, 187 //! frame = next_frame(), if buf.len() < MAX => { 188 //! encoder.encode(frame, &mut buf)?; 189 //! }, 190 //! _ = no_more_frames() => { 191 //! io_resource.write_all(&buf).await?; 192 //! io_resource.shutdown().await?; 193 //! return Ok(()); 194 //! }, 195 //! } 196 //! } 197 //! # } 198 //! ``` 199 //! Here the `next_frame` method corresponds to any frames you write to the 200 //! `FramedWrite`. The `no_more_frames` method corresponds to closing the 201 //! `FramedWrite` with [`SinkExt::close`]. 202 //! 203 //! ## Example encoder 204 //! 205 //! As an example, consider a protocol that can be used to send strings where 206 //! each frame is a four byte integer that contains the length of the frame, 207 //! followed by that many bytes of string data. The encoder will fail if the 208 //! string is too long. 209 //! 210 //! Such an encoder can be written like this: 211 //! ``` 212 //! use tokio_util::codec::Encoder; 213 //! use bytes::BytesMut; 214 //! 215 //! struct MyStringEncoder {} 216 //! 217 //! const MAX: usize = 8 * 1024 * 1024; 218 //! 219 //! impl Encoder<String> for MyStringEncoder { 220 //! type Error = std::io::Error; 221 //! 222 //! fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> { 223 //! // Don't send a string if it is longer than the other end will 224 //! // accept. 225 //! if item.len() > MAX { 226 //! return Err(std::io::Error::new( 227 //! std::io::ErrorKind::InvalidData, 228 //! format!("Frame of length {} is too large.", item.len()) 229 //! )); 230 //! } 231 //! 232 //! // Convert the length into a byte array. 233 //! // The cast to u32 cannot overflow due to the length check above. 234 //! let len_slice = u32::to_le_bytes(item.len() as u32); 235 //! 236 //! // Reserve space in the buffer. 237 //! dst.reserve(4 + item.len()); 238 //! 239 //! // Write the length and string to the buffer. 240 //! dst.extend_from_slice(&len_slice); 241 //! dst.extend_from_slice(item.as_bytes()); 242 //! Ok(()) 243 //! } 244 //! } 245 //! ``` 246 //! 247 //! [`AsyncRead`]: tokio::io::AsyncRead 248 //! [`AsyncWrite`]: tokio::io::AsyncWrite 249 //! [`Stream`]: futures_core::Stream 250 //! [`Sink`]: futures_sink::Sink 251 //! [`SinkExt::close`]: https://docs.rs/futures/0.3/futures/sink/trait.SinkExt.html#method.close 252 //! [`FramedRead`]: struct@crate::codec::FramedRead 253 //! [`FramedWrite`]: struct@crate::codec::FramedWrite 254 //! [`Framed`]: struct@crate::codec::Framed 255 //! [`Decoder`]: trait@crate::codec::Decoder 256 //! [`decode`]: fn@crate::codec::Decoder::decode 257 //! [`encode`]: fn@crate::codec::Encoder::encode 258 //! [`split_to`]: fn@bytes::BytesMut::split_to 259 //! [`advance`]: fn@bytes::Buf::advance 260 261 mod bytes_codec; 262 pub use self::bytes_codec::BytesCodec; 263 264 mod decoder; 265 pub use self::decoder::Decoder; 266 267 mod encoder; 268 pub use self::encoder::Encoder; 269 270 mod framed_impl; 271 #[allow(unused_imports)] 272 pub(crate) use self::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame}; 273 274 mod framed; 275 pub use self::framed::{Framed, FramedParts}; 276 277 mod framed_read; 278 pub use self::framed_read::FramedRead; 279 280 mod framed_write; 281 pub use self::framed_write::FramedWrite; 282 283 pub mod length_delimited; 284 pub use self::length_delimited::{LengthDelimitedCodec, LengthDelimitedCodecError}; 285 286 mod lines_codec; 287 pub use self::lines_codec::{LinesCodec, LinesCodecError}; 288 289 mod any_delimiter_codec; 290 pub use self::any_delimiter_codec::{AnyDelimiterCodec, AnyDelimiterCodecError}; 291