• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Adaptors from AsyncRead/AsyncWrite to Stream/Sink
2 //!
3 //! Raw I/O objects work with byte sequences, but higher-level code usually
4 //! wants to batch these into meaningful chunks, called "frames".
5 //!
6 //! This module contains adapters to go from streams of bytes, [`AsyncRead`] and
7 //! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`].
8 //! Framed streams are also known as transports.
9 //!
10 //! # The Decoder trait
11 //!
12 //! A [`Decoder`] is used together with [`FramedRead`] or [`Framed`] to turn an
13 //! [`AsyncRead`] into a [`Stream`]. The job of the decoder trait is to specify
14 //! how sequences of bytes are turned into a sequence of frames, and to
15 //! determine where the boundaries between frames are.  The job of the
16 //! `FramedRead` is to repeatedly switch between reading more data from the IO
17 //! resource, and asking the decoder whether we have received enough data to
18 //! decode another frame of data.
19 //!
20 //! The main method on the `Decoder` trait is the [`decode`] method. This method
21 //! takes as argument the data that has been read so far, and when it is called,
22 //! it will be in one of the following situations:
23 //!
24 //!  1. The buffer contains less than a full frame.
25 //!  2. The buffer contains exactly a full frame.
26 //!  3. The buffer contains more than a full frame.
27 //!
28 //! In the first situation, the decoder should return `Ok(None)`.
29 //!
30 //! In the second situation, the decoder should clear the provided buffer and
31 //! return `Ok(Some(the_decoded_frame))`.
32 //!
33 //! In the third situation, the decoder should use a method such as [`split_to`]
34 //! or [`advance`] to modify the buffer such that the frame is removed from the
35 //! buffer, but any data in the buffer after that frame should still remain in
36 //! the buffer. The decoder should also return `Ok(Some(the_decoded_frame))` in
37 //! this case.
38 //!
39 //! Finally the decoder may return an error if the data is invalid in some way.
40 //! The decoder should _not_ return an error just because it has yet to receive
41 //! a full frame.
42 //!
43 //! It is guaranteed that, from one call to `decode` to another, the provided
44 //! buffer will contain the exact same data as before, except that if more data
45 //! has arrived through the IO resource, that data will have been appended to
46 //! the buffer.  This means that reading frames from a `FramedRead` is
47 //! essentially equivalent to the following loop:
48 //!
49 //! ```no_run
50 //! use tokio::io::AsyncReadExt;
51 //! # // This uses async_stream to create an example that compiles.
52 //! # fn foo() -> impl futures_core::Stream<Item = std::io::Result<bytes::BytesMut>> { async_stream::try_stream! {
53 //! # use tokio_util::codec::Decoder;
54 //! # let mut decoder = tokio_util::codec::BytesCodec::new();
55 //! # let io_resource = &mut &[0u8, 1, 2, 3][..];
56 //!
57 //! let mut buf = bytes::BytesMut::new();
58 //! loop {
59 //!     // The read_buf call will append to buf rather than overwrite existing data.
60 //!     let len = io_resource.read_buf(&mut buf).await?;
61 //!
62 //!     if len == 0 {
63 //!         while let Some(frame) = decoder.decode_eof(&mut buf)? {
64 //!             yield frame;
65 //!         }
66 //!         break;
67 //!     }
68 //!
69 //!     while let Some(frame) = decoder.decode(&mut buf)? {
70 //!         yield frame;
71 //!     }
72 //! }
73 //! # }}
74 //! ```
75 //! The example above uses `yield` whenever the `Stream` produces an item.
76 //!
77 //! ## Example decoder
78 //!
79 //! As an example, consider a protocol that can be used to send strings where
80 //! each frame is a four byte integer that contains the length of the frame,
81 //! followed by that many bytes of string data. The decoder fails with an error
82 //! if the string data is not valid utf-8 or too long.
83 //!
84 //! Such a decoder can be written like this:
85 //! ```
86 //! use tokio_util::codec::Decoder;
87 //! use bytes::{BytesMut, Buf};
88 //!
89 //! struct MyStringDecoder {}
90 //!
91 //! const MAX: usize = 8 * 1024 * 1024;
92 //!
93 //! impl Decoder for MyStringDecoder {
94 //!     type Item = String;
95 //!     type Error = std::io::Error;
96 //!
97 //!     fn decode(
98 //!         &mut self,
99 //!         src: &mut BytesMut
100 //!     ) -> Result<Option<Self::Item>, Self::Error> {
101 //!         if src.len() < 4 {
102 //!             // Not enough data to read length marker.
103 //!             return Ok(None);
104 //!         }
105 //!
106 //!         // Read length marker.
107 //!         let mut length_bytes = [0u8; 4];
108 //!         length_bytes.copy_from_slice(&src[..4]);
109 //!         let length = u32::from_le_bytes(length_bytes) as usize;
110 //!
111 //!         // Check that the length is not too large to avoid a denial of
112 //!         // service attack where the server runs out of memory.
113 //!         if length > MAX {
114 //!             return Err(std::io::Error::new(
115 //!                 std::io::ErrorKind::InvalidData,
116 //!                 format!("Frame of length {} is too large.", length)
117 //!             ));
118 //!         }
119 //!
120 //!         if src.len() < 4 + length {
121 //!             // The full string has not yet arrived.
122 //!             //
123 //!             // We reserve more space in the buffer. This is not strictly
124 //!             // necessary, but is a good idea performance-wise.
125 //!             src.reserve(4 + length - src.len());
126 //!
127 //!             // We inform the Framed that we need more bytes to form the next
128 //!             // frame.
129 //!             return Ok(None);
130 //!         }
131 //!
132 //!         // Use advance to modify src such that it no longer contains
133 //!         // this frame.
134 //!         let data = src[4..4 + length].to_vec();
135 //!         src.advance(4 + length);
136 //!
137 //!         // Convert the data to a string, or fail if it is not valid utf-8.
138 //!         match String::from_utf8(data) {
139 //!             Ok(string) => Ok(Some(string)),
140 //!             Err(utf8_error) => {
141 //!                 Err(std::io::Error::new(
142 //!                     std::io::ErrorKind::InvalidData,
143 //!                     utf8_error.utf8_error(),
144 //!                 ))
145 //!             },
146 //!         }
147 //!     }
148 //! }
149 //! ```
150 //!
151 //! # The Encoder trait
152 //!
153 //! An [`Encoder`] is used together with [`FramedWrite`] or [`Framed`] to turn
154 //! an [`AsyncWrite`] into a [`Sink`]. The job of the encoder trait is to
155 //! specify how frames are turned into a sequences of bytes.  The job of the
156 //! `FramedWrite` is to take the resulting sequence of bytes and write it to the
157 //! IO resource.
158 //!
159 //! The main method on the `Encoder` trait is the [`encode`] method. This method
160 //! takes an item that is being written, and a buffer to write the item to. The
161 //! buffer may already contain data, and in this case, the encoder should append
162 //! the new frame the to buffer rather than overwrite the existing data.
163 //!
164 //! It is guaranteed that, from one call to `encode` to another, the provided
165 //! buffer will contain the exact same data as before, except that some of the
166 //! data may have been removed from the front of the buffer. Writing to a
167 //! `FramedWrite` is essentially equivalent to the following loop:
168 //!
169 //! ```no_run
170 //! use tokio::io::AsyncWriteExt;
171 //! use bytes::Buf; // for advance
172 //! # use tokio_util::codec::Encoder;
173 //! # async fn next_frame() -> bytes::Bytes { bytes::Bytes::new() }
174 //! # async fn no_more_frames() { }
175 //! # #[tokio::main] async fn main() -> std::io::Result<()> {
176 //! # let mut io_resource = tokio::io::sink();
177 //! # let mut encoder = tokio_util::codec::BytesCodec::new();
178 //!
179 //! const MAX: usize = 8192;
180 //!
181 //! let mut buf = bytes::BytesMut::new();
182 //! loop {
183 //!     tokio::select! {
184 //!         num_written = io_resource.write(&buf), if !buf.is_empty() => {
185 //!             buf.advance(num_written?);
186 //!         },
187 //!         frame = next_frame(), if buf.len() < MAX => {
188 //!             encoder.encode(frame, &mut buf)?;
189 //!         },
190 //!         _ = no_more_frames() => {
191 //!             io_resource.write_all(&buf).await?;
192 //!             io_resource.shutdown().await?;
193 //!             return Ok(());
194 //!         },
195 //!     }
196 //! }
197 //! # }
198 //! ```
199 //! Here the `next_frame` method corresponds to any frames you write to the
200 //! `FramedWrite`. The `no_more_frames` method corresponds to closing the
201 //! `FramedWrite` with [`SinkExt::close`].
202 //!
203 //! ## Example encoder
204 //!
205 //! As an example, consider a protocol that can be used to send strings where
206 //! each frame is a four byte integer that contains the length of the frame,
207 //! followed by that many bytes of string data. The encoder will fail if the
208 //! string is too long.
209 //!
210 //! Such an encoder can be written like this:
211 //! ```
212 //! use tokio_util::codec::Encoder;
213 //! use bytes::BytesMut;
214 //!
215 //! struct MyStringEncoder {}
216 //!
217 //! const MAX: usize = 8 * 1024 * 1024;
218 //!
219 //! impl Encoder<String> for MyStringEncoder {
220 //!     type Error = std::io::Error;
221 //!
222 //!     fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
223 //!         // Don't send a string if it is longer than the other end will
224 //!         // accept.
225 //!         if item.len() > MAX {
226 //!             return Err(std::io::Error::new(
227 //!                 std::io::ErrorKind::InvalidData,
228 //!                 format!("Frame of length {} is too large.", item.len())
229 //!             ));
230 //!         }
231 //!
232 //!         // Convert the length into a byte array.
233 //!         // The cast to u32 cannot overflow due to the length check above.
234 //!         let len_slice = u32::to_le_bytes(item.len() as u32);
235 //!
236 //!         // Reserve space in the buffer.
237 //!         dst.reserve(4 + item.len());
238 //!
239 //!         // Write the length and string to the buffer.
240 //!         dst.extend_from_slice(&len_slice);
241 //!         dst.extend_from_slice(item.as_bytes());
242 //!         Ok(())
243 //!     }
244 //! }
245 //! ```
246 //!
247 //! [`AsyncRead`]: tokio::io::AsyncRead
248 //! [`AsyncWrite`]: tokio::io::AsyncWrite
249 //! [`Stream`]: futures_core::Stream
250 //! [`Sink`]: futures_sink::Sink
251 //! [`SinkExt::close`]: https://docs.rs/futures/0.3/futures/sink/trait.SinkExt.html#method.close
252 //! [`FramedRead`]: struct@crate::codec::FramedRead
253 //! [`FramedWrite`]: struct@crate::codec::FramedWrite
254 //! [`Framed`]: struct@crate::codec::Framed
255 //! [`Decoder`]: trait@crate::codec::Decoder
256 //! [`decode`]: fn@crate::codec::Decoder::decode
257 //! [`encode`]: fn@crate::codec::Encoder::encode
258 //! [`split_to`]: fn@bytes::BytesMut::split_to
259 //! [`advance`]: fn@bytes::Buf::advance
260 
261 mod bytes_codec;
262 pub use self::bytes_codec::BytesCodec;
263 
264 mod decoder;
265 pub use self::decoder::Decoder;
266 
267 mod encoder;
268 pub use self::encoder::Encoder;
269 
270 mod framed_impl;
271 #[allow(unused_imports)]
272 pub(crate) use self::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
273 
274 mod framed;
275 pub use self::framed::{Framed, FramedParts};
276 
277 mod framed_read;
278 pub use self::framed_read::FramedRead;
279 
280 mod framed_write;
281 pub use self::framed_write::FramedWrite;
282 
283 pub mod length_delimited;
284 pub use self::length_delimited::{LengthDelimitedCodec, LengthDelimitedCodecError};
285 
286 mod lines_codec;
287 pub use self::lines_codec::{LinesCodec, LinesCodecError};
288 
289 mod any_delimiter_codec;
290 pub use self::any_delimiter_codec::{AnyDelimiterCodec, AnyDelimiterCodecError};
291