1 //! Frame a stream of bytes based on a length prefix 2 //! 3 //! Many protocols delimit their frames by prefacing frame data with a 4 //! frame head that specifies the length of the frame. The 5 //! `length_delimited` module provides utilities for handling the length 6 //! based framing. This allows the consumer to work with entire frames 7 //! without having to worry about buffering or other framing logic. 8 //! 9 //! # Getting started 10 //! 11 //! If implementing a protocol from scratch, using length delimited framing 12 //! is an easy way to get started. [`LengthDelimitedCodec::new()`] will 13 //! return a length delimited codec using default configuration values. 14 //! This can then be used to construct a framer to adapt a full-duplex 15 //! byte stream into a stream of frames. 16 //! 17 //! ``` 18 //! use tokio::io::{AsyncRead, AsyncWrite}; 19 //! use tokio_util::codec::{Framed, LengthDelimitedCodec}; 20 //! 21 //! fn bind_transport<T: AsyncRead + AsyncWrite>(io: T) 22 //! -> Framed<T, LengthDelimitedCodec> 23 //! { 24 //! Framed::new(io, LengthDelimitedCodec::new()) 25 //! } 26 //! # pub fn main() {} 27 //! ``` 28 //! 29 //! The returned transport implements `Sink + Stream` for `BytesMut`. It 30 //! encodes the frame with a big-endian `u32` header denoting the frame 31 //! payload length: 32 //! 33 //! ```text 34 //! +----------+--------------------------------+ 35 //! | len: u32 | frame payload | 36 //! +----------+--------------------------------+ 37 //! ``` 38 //! 39 //! Specifically, given the following: 40 //! 41 //! ``` 42 //! use tokio::io::{AsyncRead, AsyncWrite}; 43 //! use tokio_util::codec::{Framed, LengthDelimitedCodec}; 44 //! 45 //! use futures::SinkExt; 46 //! use bytes::Bytes; 47 //! 48 //! async fn write_frame<T>(io: T) -> Result<(), Box<dyn std::error::Error>> 49 //! where 50 //! T: AsyncRead + AsyncWrite + Unpin, 51 //! { 52 //! let mut transport = Framed::new(io, LengthDelimitedCodec::new()); 53 //! let frame = Bytes::from("hello world"); 54 //! 55 //! transport.send(frame).await?; 56 //! Ok(()) 57 //! } 58 //! ``` 59 //! 60 //! The encoded frame will look like this: 61 //! 62 //! ```text 63 //! +---- len: u32 ----+---- data ----+ 64 //! | \x00\x00\x00\x0b | hello world | 65 //! +------------------+--------------+ 66 //! ``` 67 //! 68 //! # Decoding 69 //! 70 //! [`FramedRead`] adapts an [`AsyncRead`] into a `Stream` of [`BytesMut`], 71 //! such that each yielded [`BytesMut`] value contains the contents of an 72 //! entire frame. There are many configuration parameters enabling 73 //! [`FramedRead`] to handle a wide range of protocols. Here are some 74 //! examples that will cover the various options at a high level. 75 //! 76 //! ## Example 1 77 //! 78 //! The following will parse a `u16` length field at offset 0, including the 79 //! frame head in the yielded `BytesMut`. 80 //! 81 //! ``` 82 //! # use tokio::io::AsyncRead; 83 //! # use tokio_util::codec::LengthDelimitedCodec; 84 //! # fn bind_read<T: AsyncRead>(io: T) { 85 //! LengthDelimitedCodec::builder() 86 //! .length_field_offset(0) // default value 87 //! .length_field_type::<u16>() 88 //! .length_adjustment(0) // default value 89 //! .num_skip(0) // Do not strip frame header 90 //! .new_read(io); 91 //! # } 92 //! # pub fn main() {} 93 //! ``` 94 //! 95 //! The following frame will be decoded as such: 96 //! 97 //! ```text 98 //! INPUT DECODED 99 //! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ 100 //! | \x00\x0B | Hello world | --> | \x00\x0B | Hello world | 101 //! +----------+---------------+ +----------+---------------+ 102 //! ``` 103 //! 104 //! The value of the length field is 11 (`\x0B`) which represents the length 105 //! of the payload, `hello world`. By default, [`FramedRead`] assumes that 106 //! the length field represents the number of bytes that **follows** the 107 //! length field. Thus, the entire frame has a length of 13: 2 bytes for the 108 //! frame head + 11 bytes for the payload. 109 //! 110 //! ## Example 2 111 //! 112 //! The following will parse a `u16` length field at offset 0, omitting the 113 //! frame head in the yielded `BytesMut`. 114 //! 115 //! ``` 116 //! # use tokio::io::AsyncRead; 117 //! # use tokio_util::codec::LengthDelimitedCodec; 118 //! # fn bind_read<T: AsyncRead>(io: T) { 119 //! LengthDelimitedCodec::builder() 120 //! .length_field_offset(0) // default value 121 //! .length_field_type::<u16>() 122 //! .length_adjustment(0) // default value 123 //! // `num_skip` is not needed, the default is to skip 124 //! .new_read(io); 125 //! # } 126 //! # pub fn main() {} 127 //! ``` 128 //! 129 //! The following frame will be decoded as such: 130 //! 131 //! ```text 132 //! INPUT DECODED 133 //! +-- len ---+--- Payload ---+ +--- Payload ---+ 134 //! | \x00\x0B | Hello world | --> | Hello world | 135 //! +----------+---------------+ +---------------+ 136 //! ``` 137 //! 138 //! This is similar to the first example, the only difference is that the 139 //! frame head is **not** included in the yielded `BytesMut` value. 140 //! 141 //! ## Example 3 142 //! 143 //! The following will parse a `u16` length field at offset 0, including the 144 //! frame head in the yielded `BytesMut`. In this case, the length field 145 //! **includes** the frame head length. 146 //! 147 //! ``` 148 //! # use tokio::io::AsyncRead; 149 //! # use tokio_util::codec::LengthDelimitedCodec; 150 //! # fn bind_read<T: AsyncRead>(io: T) { 151 //! LengthDelimitedCodec::builder() 152 //! .length_field_offset(0) // default value 153 //! .length_field_type::<u16>() 154 //! .length_adjustment(-2) // size of head 155 //! .num_skip(0) 156 //! .new_read(io); 157 //! # } 158 //! # pub fn main() {} 159 //! ``` 160 //! 161 //! The following frame will be decoded as such: 162 //! 163 //! ```text 164 //! INPUT DECODED 165 //! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ 166 //! | \x00\x0D | Hello world | --> | \x00\x0D | Hello world | 167 //! +----------+---------------+ +----------+---------------+ 168 //! ``` 169 //! 170 //! In most cases, the length field represents the length of the payload 171 //! only, as shown in the previous examples. However, in some protocols the 172 //! length field represents the length of the whole frame, including the 173 //! head. In such cases, we specify a negative `length_adjustment` to adjust 174 //! the value provided in the frame head to represent the payload length. 175 //! 176 //! ## Example 4 177 //! 178 //! The following will parse a 3 byte length field at offset 0 in a 5 byte 179 //! frame head, including the frame head in the yielded `BytesMut`. 180 //! 181 //! ``` 182 //! # use tokio::io::AsyncRead; 183 //! # use tokio_util::codec::LengthDelimitedCodec; 184 //! # fn bind_read<T: AsyncRead>(io: T) { 185 //! LengthDelimitedCodec::builder() 186 //! .length_field_offset(0) // default value 187 //! .length_field_length(3) 188 //! .length_adjustment(2) // remaining head 189 //! .num_skip(0) 190 //! .new_read(io); 191 //! # } 192 //! # pub fn main() {} 193 //! ``` 194 //! 195 //! The following frame will be decoded as such: 196 //! 197 //! ```text 198 //! INPUT 199 //! +---- len -----+- head -+--- Payload ---+ 200 //! | \x00\x00\x0B | \xCAFE | Hello world | 201 //! +--------------+--------+---------------+ 202 //! 203 //! DECODED 204 //! +---- len -----+- head -+--- Payload ---+ 205 //! | \x00\x00\x0B | \xCAFE | Hello world | 206 //! +--------------+--------+---------------+ 207 //! ``` 208 //! 209 //! A more advanced example that shows a case where there is extra frame 210 //! head data between the length field and the payload. In such cases, it is 211 //! usually desirable to include the frame head as part of the yielded 212 //! `BytesMut`. This lets consumers of the length delimited framer to 213 //! process the frame head as needed. 214 //! 215 //! The positive `length_adjustment` value lets `FramedRead` factor in the 216 //! additional head into the frame length calculation. 217 //! 218 //! ## Example 5 219 //! 220 //! The following will parse a `u16` length field at offset 1 of a 4 byte 221 //! frame head. The first byte and the length field will be omitted from the 222 //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be 223 //! included. 224 //! 225 //! ``` 226 //! # use tokio::io::AsyncRead; 227 //! # use tokio_util::codec::LengthDelimitedCodec; 228 //! # fn bind_read<T: AsyncRead>(io: T) { 229 //! LengthDelimitedCodec::builder() 230 //! .length_field_offset(1) // length of hdr1 231 //! .length_field_type::<u16>() 232 //! .length_adjustment(1) // length of hdr2 233 //! .num_skip(3) // length of hdr1 + LEN 234 //! .new_read(io); 235 //! # } 236 //! # pub fn main() {} 237 //! ``` 238 //! 239 //! The following frame will be decoded as such: 240 //! 241 //! ```text 242 //! INPUT 243 //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ 244 //! | \xCA | \x00\x0B | \xFE | Hello world | 245 //! +--------+----------+--------+---------------+ 246 //! 247 //! DECODED 248 //! +- hdr2 -+--- Payload ---+ 249 //! | \xFE | Hello world | 250 //! +--------+---------------+ 251 //! ``` 252 //! 253 //! The length field is situated in the middle of the frame head. In this 254 //! case, the first byte in the frame head could be a version or some other 255 //! identifier that is not needed for processing. On the other hand, the 256 //! second half of the head is needed. 257 //! 258 //! `length_field_offset` indicates how many bytes to skip before starting 259 //! to read the length field. `length_adjustment` is the number of bytes to 260 //! skip starting at the end of the length field. In this case, it is the 261 //! second half of the head. 262 //! 263 //! ## Example 6 264 //! 265 //! The following will parse a `u16` length field at offset 1 of a 4 byte 266 //! frame head. The first byte and the length field will be omitted from the 267 //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be 268 //! included. In this case, the length field **includes** the frame head 269 //! length. 270 //! 271 //! ``` 272 //! # use tokio::io::AsyncRead; 273 //! # use tokio_util::codec::LengthDelimitedCodec; 274 //! # fn bind_read<T: AsyncRead>(io: T) { 275 //! LengthDelimitedCodec::builder() 276 //! .length_field_offset(1) // length of hdr1 277 //! .length_field_type::<u16>() 278 //! .length_adjustment(-3) // length of hdr1 + LEN, negative 279 //! .num_skip(3) 280 //! .new_read(io); 281 //! # } 282 //! ``` 283 //! 284 //! The following frame will be decoded as such: 285 //! 286 //! ```text 287 //! INPUT 288 //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ 289 //! | \xCA | \x00\x0F | \xFE | Hello world | 290 //! +--------+----------+--------+---------------+ 291 //! 292 //! DECODED 293 //! +- hdr2 -+--- Payload ---+ 294 //! | \xFE | Hello world | 295 //! +--------+---------------+ 296 //! ``` 297 //! 298 //! Similar to the example above, the difference is that the length field 299 //! represents the length of the entire frame instead of just the payload. 300 //! The length of `hdr1` and `len` must be counted in `length_adjustment`. 301 //! Note that the length of `hdr2` does **not** need to be explicitly set 302 //! anywhere because it already is factored into the total frame length that 303 //! is read from the byte stream. 304 //! 305 //! ## Example 7 306 //! 307 //! The following will parse a 3 byte length field at offset 0 in a 4 byte 308 //! frame head, excluding the 4th byte from the yielded `BytesMut`. 309 //! 310 //! ``` 311 //! # use tokio::io::AsyncRead; 312 //! # use tokio_util::codec::LengthDelimitedCodec; 313 //! # fn bind_read<T: AsyncRead>(io: T) { 314 //! LengthDelimitedCodec::builder() 315 //! .length_field_offset(0) // default value 316 //! .length_field_length(3) 317 //! .length_adjustment(0) // default value 318 //! .num_skip(4) // skip the first 4 bytes 319 //! .new_read(io); 320 //! # } 321 //! # pub fn main() {} 322 //! ``` 323 //! 324 //! The following frame will be decoded as such: 325 //! 326 //! ```text 327 //! INPUT DECODED 328 //! +------- len ------+--- Payload ---+ +--- Payload ---+ 329 //! | \x00\x00\x0B\xFF | Hello world | => | Hello world | 330 //! +------------------+---------------+ +---------------+ 331 //! ``` 332 //! 333 //! A simple example where there are unused bytes between the length field 334 //! and the payload. 335 //! 336 //! # Encoding 337 //! 338 //! [`FramedWrite`] adapts an [`AsyncWrite`] into a `Sink` of [`BytesMut`], 339 //! such that each submitted [`BytesMut`] is prefaced by a length field. 340 //! There are fewer configuration options than [`FramedRead`]. Given 341 //! protocols that have more complex frame heads, an encoder should probably 342 //! be written by hand using [`Encoder`]. 343 //! 344 //! Here is a simple example, given a `FramedWrite` with the following 345 //! configuration: 346 //! 347 //! ``` 348 //! # use tokio::io::AsyncWrite; 349 //! # use tokio_util::codec::LengthDelimitedCodec; 350 //! # fn write_frame<T: AsyncWrite>(io: T) { 351 //! # let _ = 352 //! LengthDelimitedCodec::builder() 353 //! .length_field_type::<u16>() 354 //! .new_write(io); 355 //! # } 356 //! # pub fn main() {} 357 //! ``` 358 //! 359 //! A payload of `hello world` will be encoded as: 360 //! 361 //! ```text 362 //! +- len: u16 -+---- data ----+ 363 //! | \x00\x0b | hello world | 364 //! +------------+--------------+ 365 //! ``` 366 //! 367 //! [`LengthDelimitedCodec::new()`]: method@LengthDelimitedCodec::new 368 //! [`FramedRead`]: struct@FramedRead 369 //! [`FramedWrite`]: struct@FramedWrite 370 //! [`AsyncRead`]: trait@tokio::io::AsyncRead 371 //! [`AsyncWrite`]: trait@tokio::io::AsyncWrite 372 //! [`Encoder`]: trait@Encoder 373 //! [`BytesMut`]: bytes::BytesMut 374 375 use crate::codec::{Decoder, Encoder, Framed, FramedRead, FramedWrite}; 376 377 use tokio::io::{AsyncRead, AsyncWrite}; 378 379 use bytes::{Buf, BufMut, Bytes, BytesMut}; 380 use std::error::Error as StdError; 381 use std::io::{self, Cursor}; 382 use std::{cmp, fmt, mem}; 383 384 /// Configure length delimited `LengthDelimitedCodec`s. 385 /// 386 /// `Builder` enables constructing configured length delimited codecs. Note 387 /// that not all configuration settings apply to both encoding and decoding. See 388 /// the documentation for specific methods for more detail. 389 #[derive(Debug, Clone, Copy)] 390 pub struct Builder { 391 // Maximum frame length 392 max_frame_len: usize, 393 394 // Number of bytes representing the field length 395 length_field_len: usize, 396 397 // Number of bytes in the header before the length field 398 length_field_offset: usize, 399 400 // Adjust the length specified in the header field by this amount 401 length_adjustment: isize, 402 403 // Total number of bytes to skip before reading the payload, if not set, 404 // `length_field_len + length_field_offset` 405 num_skip: Option<usize>, 406 407 // Length field byte order (little or big endian) 408 length_field_is_big_endian: bool, 409 } 410 411 /// An error when the number of bytes read is more than max frame length. 412 pub struct LengthDelimitedCodecError { 413 _priv: (), 414 } 415 416 /// A codec for frames delimited by a frame head specifying their lengths. 417 /// 418 /// This allows the consumer to work with entire frames without having to worry 419 /// about buffering or other framing logic. 420 /// 421 /// See [module level] documentation for more detail. 422 /// 423 /// [module level]: index.html 424 #[derive(Debug, Clone)] 425 pub struct LengthDelimitedCodec { 426 // Configuration values 427 builder: Builder, 428 429 // Read state 430 state: DecodeState, 431 } 432 433 #[derive(Debug, Clone, Copy)] 434 enum DecodeState { 435 Head, 436 Data(usize), 437 } 438 439 // ===== impl LengthDelimitedCodec ====== 440 441 impl LengthDelimitedCodec { 442 /// Creates a new `LengthDelimitedCodec` with the default configuration values. new() -> Self443 pub fn new() -> Self { 444 Self { 445 builder: Builder::new(), 446 state: DecodeState::Head, 447 } 448 } 449 450 /// Creates a new length delimited codec builder with default configuration 451 /// values. builder() -> Builder452 pub fn builder() -> Builder { 453 Builder::new() 454 } 455 456 /// Returns the current max frame setting 457 /// 458 /// This is the largest size this codec will accept from the wire. Larger 459 /// frames will be rejected. max_frame_length(&self) -> usize460 pub fn max_frame_length(&self) -> usize { 461 self.builder.max_frame_len 462 } 463 464 /// Updates the max frame setting. 465 /// 466 /// The change takes effect the next time a frame is decoded. In other 467 /// words, if a frame is currently in process of being decoded with a frame 468 /// size greater than `val` but less than the max frame length in effect 469 /// before calling this function, then the frame will be allowed. set_max_frame_length(&mut self, val: usize)470 pub fn set_max_frame_length(&mut self, val: usize) { 471 self.builder.max_frame_length(val); 472 } 473 decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>>474 fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> { 475 let head_len = self.builder.num_head_bytes(); 476 let field_len = self.builder.length_field_len; 477 478 if src.len() < head_len { 479 // Not enough data 480 return Ok(None); 481 } 482 483 let n = { 484 let mut src = Cursor::new(&mut *src); 485 486 // Skip the required bytes 487 src.advance(self.builder.length_field_offset); 488 489 // match endianness 490 let n = if self.builder.length_field_is_big_endian { 491 src.get_uint(field_len) 492 } else { 493 src.get_uint_le(field_len) 494 }; 495 496 if n > self.builder.max_frame_len as u64 { 497 return Err(io::Error::new( 498 io::ErrorKind::InvalidData, 499 LengthDelimitedCodecError { _priv: () }, 500 )); 501 } 502 503 // The check above ensures there is no overflow 504 let n = n as usize; 505 506 // Adjust `n` with bounds checking 507 let n = if self.builder.length_adjustment < 0 { 508 n.checked_sub(-self.builder.length_adjustment as usize) 509 } else { 510 n.checked_add(self.builder.length_adjustment as usize) 511 }; 512 513 // Error handling 514 match n { 515 Some(n) => n, 516 None => { 517 return Err(io::Error::new( 518 io::ErrorKind::InvalidInput, 519 "provided length would overflow after adjustment", 520 )); 521 } 522 } 523 }; 524 525 src.advance(self.builder.get_num_skip()); 526 527 // Ensure that the buffer has enough space to read the incoming 528 // payload 529 src.reserve(n.saturating_sub(src.len())); 530 531 Ok(Some(n)) 532 } 533 decode_data(&self, n: usize, src: &mut BytesMut) -> Option<BytesMut>534 fn decode_data(&self, n: usize, src: &mut BytesMut) -> Option<BytesMut> { 535 // At this point, the buffer has already had the required capacity 536 // reserved. All there is to do is read. 537 if src.len() < n { 538 return None; 539 } 540 541 Some(src.split_to(n)) 542 } 543 } 544 545 impl Decoder for LengthDelimitedCodec { 546 type Item = BytesMut; 547 type Error = io::Error; 548 decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>>549 fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>> { 550 let n = match self.state { 551 DecodeState::Head => match self.decode_head(src)? { 552 Some(n) => { 553 self.state = DecodeState::Data(n); 554 n 555 } 556 None => return Ok(None), 557 }, 558 DecodeState::Data(n) => n, 559 }; 560 561 match self.decode_data(n, src) { 562 Some(data) => { 563 // Update the decode state 564 self.state = DecodeState::Head; 565 566 // Make sure the buffer has enough space to read the next head 567 src.reserve(self.builder.num_head_bytes().saturating_sub(src.len())); 568 569 Ok(Some(data)) 570 } 571 None => Ok(None), 572 } 573 } 574 } 575 576 impl Encoder<Bytes> for LengthDelimitedCodec { 577 type Error = io::Error; 578 encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error>579 fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> { 580 let n = data.len(); 581 582 if n > self.builder.max_frame_len { 583 return Err(io::Error::new( 584 io::ErrorKind::InvalidInput, 585 LengthDelimitedCodecError { _priv: () }, 586 )); 587 } 588 589 // Adjust `n` with bounds checking 590 let n = if self.builder.length_adjustment < 0 { 591 n.checked_add(-self.builder.length_adjustment as usize) 592 } else { 593 n.checked_sub(self.builder.length_adjustment as usize) 594 }; 595 596 let n = n.ok_or_else(|| { 597 io::Error::new( 598 io::ErrorKind::InvalidInput, 599 "provided length would overflow after adjustment", 600 ) 601 })?; 602 603 // Reserve capacity in the destination buffer to fit the frame and 604 // length field (plus adjustment). 605 dst.reserve(self.builder.length_field_len + n); 606 607 if self.builder.length_field_is_big_endian { 608 dst.put_uint(n as u64, self.builder.length_field_len); 609 } else { 610 dst.put_uint_le(n as u64, self.builder.length_field_len); 611 } 612 613 // Write the frame to the buffer 614 dst.extend_from_slice(&data[..]); 615 616 Ok(()) 617 } 618 } 619 620 impl Default for LengthDelimitedCodec { default() -> Self621 fn default() -> Self { 622 Self::new() 623 } 624 } 625 626 // ===== impl Builder ===== 627 628 mod builder { 629 /// Types that can be used with `Builder::length_field_type`. 630 pub trait LengthFieldType {} 631 632 impl LengthFieldType for u8 {} 633 impl LengthFieldType for u16 {} 634 impl LengthFieldType for u32 {} 635 impl LengthFieldType for u64 {} 636 637 #[cfg(any( 638 target_pointer_width = "8", 639 target_pointer_width = "16", 640 target_pointer_width = "32", 641 target_pointer_width = "64", 642 ))] 643 impl LengthFieldType for usize {} 644 } 645 646 impl Builder { 647 /// Creates a new length delimited codec builder with default configuration 648 /// values. 649 /// 650 /// # Examples 651 /// 652 /// ``` 653 /// # use tokio::io::AsyncRead; 654 /// use tokio_util::codec::LengthDelimitedCodec; 655 /// 656 /// # fn bind_read<T: AsyncRead>(io: T) { 657 /// LengthDelimitedCodec::builder() 658 /// .length_field_offset(0) 659 /// .length_field_type::<u16>() 660 /// .length_adjustment(0) 661 /// .num_skip(0) 662 /// .new_read(io); 663 /// # } 664 /// # pub fn main() {} 665 /// ``` new() -> Builder666 pub fn new() -> Builder { 667 Builder { 668 // Default max frame length of 8MB 669 max_frame_len: 8 * 1_024 * 1_024, 670 671 // Default byte length of 4 672 length_field_len: 4, 673 674 // Default to the header field being at the start of the header. 675 length_field_offset: 0, 676 677 length_adjustment: 0, 678 679 // Total number of bytes to skip before reading the payload, if not set, 680 // `length_field_len + length_field_offset` 681 num_skip: None, 682 683 // Default to reading the length field in network (big) endian. 684 length_field_is_big_endian: true, 685 } 686 } 687 688 /// Read the length field as a big endian integer 689 /// 690 /// This is the default setting. 691 /// 692 /// This configuration option applies to both encoding and decoding. 693 /// 694 /// # Examples 695 /// 696 /// ``` 697 /// # use tokio::io::AsyncRead; 698 /// use tokio_util::codec::LengthDelimitedCodec; 699 /// 700 /// # fn bind_read<T: AsyncRead>(io: T) { 701 /// LengthDelimitedCodec::builder() 702 /// .big_endian() 703 /// .new_read(io); 704 /// # } 705 /// # pub fn main() {} 706 /// ``` big_endian(&mut self) -> &mut Self707 pub fn big_endian(&mut self) -> &mut Self { 708 self.length_field_is_big_endian = true; 709 self 710 } 711 712 /// Read the length field as a little endian integer 713 /// 714 /// The default setting is big endian. 715 /// 716 /// This configuration option applies to both encoding and decoding. 717 /// 718 /// # Examples 719 /// 720 /// ``` 721 /// # use tokio::io::AsyncRead; 722 /// use tokio_util::codec::LengthDelimitedCodec; 723 /// 724 /// # fn bind_read<T: AsyncRead>(io: T) { 725 /// LengthDelimitedCodec::builder() 726 /// .little_endian() 727 /// .new_read(io); 728 /// # } 729 /// # pub fn main() {} 730 /// ``` little_endian(&mut self) -> &mut Self731 pub fn little_endian(&mut self) -> &mut Self { 732 self.length_field_is_big_endian = false; 733 self 734 } 735 736 /// Read the length field as a native endian integer 737 /// 738 /// The default setting is big endian. 739 /// 740 /// This configuration option applies to both encoding and decoding. 741 /// 742 /// # Examples 743 /// 744 /// ``` 745 /// # use tokio::io::AsyncRead; 746 /// use tokio_util::codec::LengthDelimitedCodec; 747 /// 748 /// # fn bind_read<T: AsyncRead>(io: T) { 749 /// LengthDelimitedCodec::builder() 750 /// .native_endian() 751 /// .new_read(io); 752 /// # } 753 /// # pub fn main() {} 754 /// ``` native_endian(&mut self) -> &mut Self755 pub fn native_endian(&mut self) -> &mut Self { 756 if cfg!(target_endian = "big") { 757 self.big_endian() 758 } else { 759 self.little_endian() 760 } 761 } 762 763 /// Sets the max frame length in bytes 764 /// 765 /// This configuration option applies to both encoding and decoding. The 766 /// default value is 8MB. 767 /// 768 /// When decoding, the length field read from the byte stream is checked 769 /// against this setting **before** any adjustments are applied. When 770 /// encoding, the length of the submitted payload is checked against this 771 /// setting. 772 /// 773 /// When frames exceed the max length, an `io::Error` with the custom value 774 /// of the `LengthDelimitedCodecError` type will be returned. 775 /// 776 /// # Examples 777 /// 778 /// ``` 779 /// # use tokio::io::AsyncRead; 780 /// use tokio_util::codec::LengthDelimitedCodec; 781 /// 782 /// # fn bind_read<T: AsyncRead>(io: T) { 783 /// LengthDelimitedCodec::builder() 784 /// .max_frame_length(8 * 1024 * 1024) 785 /// .new_read(io); 786 /// # } 787 /// # pub fn main() {} 788 /// ``` max_frame_length(&mut self, val: usize) -> &mut Self789 pub fn max_frame_length(&mut self, val: usize) -> &mut Self { 790 self.max_frame_len = val; 791 self 792 } 793 794 /// Sets the unsigned integer type used to represent the length field. 795 /// 796 /// The default type is [`u32`]. The max type is [`u64`] (or [`usize`] on 797 /// 64-bit targets). 798 /// 799 /// # Examples 800 /// 801 /// ``` 802 /// # use tokio::io::AsyncRead; 803 /// use tokio_util::codec::LengthDelimitedCodec; 804 /// 805 /// # fn bind_read<T: AsyncRead>(io: T) { 806 /// LengthDelimitedCodec::builder() 807 /// .length_field_type::<u32>() 808 /// .new_read(io); 809 /// # } 810 /// # pub fn main() {} 811 /// ``` 812 /// 813 /// Unlike [`Builder::length_field_length`], this does not fail at runtime 814 /// and instead produces a compile error: 815 /// 816 /// ```compile_fail 817 /// # use tokio::io::AsyncRead; 818 /// # use tokio_util::codec::LengthDelimitedCodec; 819 /// # fn bind_read<T: AsyncRead>(io: T) { 820 /// LengthDelimitedCodec::builder() 821 /// .length_field_type::<u128>() 822 /// .new_read(io); 823 /// # } 824 /// # pub fn main() {} 825 /// ``` length_field_type<T: builder::LengthFieldType>(&mut self) -> &mut Self826 pub fn length_field_type<T: builder::LengthFieldType>(&mut self) -> &mut Self { 827 self.length_field_length(mem::size_of::<T>()) 828 } 829 830 /// Sets the number of bytes used to represent the length field 831 /// 832 /// The default value is `4`. The max value is `8`. 833 /// 834 /// This configuration option applies to both encoding and decoding. 835 /// 836 /// # Examples 837 /// 838 /// ``` 839 /// # use tokio::io::AsyncRead; 840 /// use tokio_util::codec::LengthDelimitedCodec; 841 /// 842 /// # fn bind_read<T: AsyncRead>(io: T) { 843 /// LengthDelimitedCodec::builder() 844 /// .length_field_length(4) 845 /// .new_read(io); 846 /// # } 847 /// # pub fn main() {} 848 /// ``` length_field_length(&mut self, val: usize) -> &mut Self849 pub fn length_field_length(&mut self, val: usize) -> &mut Self { 850 assert!(val > 0 && val <= 8, "invalid length field length"); 851 self.length_field_len = val; 852 self 853 } 854 855 /// Sets the number of bytes in the header before the length field 856 /// 857 /// This configuration option only applies to decoding. 858 /// 859 /// # Examples 860 /// 861 /// ``` 862 /// # use tokio::io::AsyncRead; 863 /// use tokio_util::codec::LengthDelimitedCodec; 864 /// 865 /// # fn bind_read<T: AsyncRead>(io: T) { 866 /// LengthDelimitedCodec::builder() 867 /// .length_field_offset(1) 868 /// .new_read(io); 869 /// # } 870 /// # pub fn main() {} 871 /// ``` length_field_offset(&mut self, val: usize) -> &mut Self872 pub fn length_field_offset(&mut self, val: usize) -> &mut Self { 873 self.length_field_offset = val; 874 self 875 } 876 877 /// Delta between the payload length specified in the header and the real 878 /// payload length 879 /// 880 /// # Examples 881 /// 882 /// ``` 883 /// # use tokio::io::AsyncRead; 884 /// use tokio_util::codec::LengthDelimitedCodec; 885 /// 886 /// # fn bind_read<T: AsyncRead>(io: T) { 887 /// LengthDelimitedCodec::builder() 888 /// .length_adjustment(-2) 889 /// .new_read(io); 890 /// # } 891 /// # pub fn main() {} 892 /// ``` length_adjustment(&mut self, val: isize) -> &mut Self893 pub fn length_adjustment(&mut self, val: isize) -> &mut Self { 894 self.length_adjustment = val; 895 self 896 } 897 898 /// Sets the number of bytes to skip before reading the payload 899 /// 900 /// Default value is `length_field_len + length_field_offset` 901 /// 902 /// This configuration option only applies to decoding 903 /// 904 /// # Examples 905 /// 906 /// ``` 907 /// # use tokio::io::AsyncRead; 908 /// use tokio_util::codec::LengthDelimitedCodec; 909 /// 910 /// # fn bind_read<T: AsyncRead>(io: T) { 911 /// LengthDelimitedCodec::builder() 912 /// .num_skip(4) 913 /// .new_read(io); 914 /// # } 915 /// # pub fn main() {} 916 /// ``` num_skip(&mut self, val: usize) -> &mut Self917 pub fn num_skip(&mut self, val: usize) -> &mut Self { 918 self.num_skip = Some(val); 919 self 920 } 921 922 /// Create a configured length delimited `LengthDelimitedCodec` 923 /// 924 /// # Examples 925 /// 926 /// ``` 927 /// use tokio_util::codec::LengthDelimitedCodec; 928 /// # pub fn main() { 929 /// LengthDelimitedCodec::builder() 930 /// .length_field_offset(0) 931 /// .length_field_type::<u16>() 932 /// .length_adjustment(0) 933 /// .num_skip(0) 934 /// .new_codec(); 935 /// # } 936 /// ``` new_codec(&self) -> LengthDelimitedCodec937 pub fn new_codec(&self) -> LengthDelimitedCodec { 938 LengthDelimitedCodec { 939 builder: *self, 940 state: DecodeState::Head, 941 } 942 } 943 944 /// Create a configured length delimited `FramedRead` 945 /// 946 /// # Examples 947 /// 948 /// ``` 949 /// # use tokio::io::AsyncRead; 950 /// use tokio_util::codec::LengthDelimitedCodec; 951 /// 952 /// # fn bind_read<T: AsyncRead>(io: T) { 953 /// LengthDelimitedCodec::builder() 954 /// .length_field_offset(0) 955 /// .length_field_type::<u16>() 956 /// .length_adjustment(0) 957 /// .num_skip(0) 958 /// .new_read(io); 959 /// # } 960 /// # pub fn main() {} 961 /// ``` new_read<T>(&self, upstream: T) -> FramedRead<T, LengthDelimitedCodec> where T: AsyncRead,962 pub fn new_read<T>(&self, upstream: T) -> FramedRead<T, LengthDelimitedCodec> 963 where 964 T: AsyncRead, 965 { 966 FramedRead::new(upstream, self.new_codec()) 967 } 968 969 /// Create a configured length delimited `FramedWrite` 970 /// 971 /// # Examples 972 /// 973 /// ``` 974 /// # use tokio::io::AsyncWrite; 975 /// # use tokio_util::codec::LengthDelimitedCodec; 976 /// # fn write_frame<T: AsyncWrite>(io: T) { 977 /// LengthDelimitedCodec::builder() 978 /// .length_field_type::<u16>() 979 /// .new_write(io); 980 /// # } 981 /// # pub fn main() {} 982 /// ``` new_write<T>(&self, inner: T) -> FramedWrite<T, LengthDelimitedCodec> where T: AsyncWrite,983 pub fn new_write<T>(&self, inner: T) -> FramedWrite<T, LengthDelimitedCodec> 984 where 985 T: AsyncWrite, 986 { 987 FramedWrite::new(inner, self.new_codec()) 988 } 989 990 /// Create a configured length delimited `Framed` 991 /// 992 /// # Examples 993 /// 994 /// ``` 995 /// # use tokio::io::{AsyncRead, AsyncWrite}; 996 /// # use tokio_util::codec::LengthDelimitedCodec; 997 /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) { 998 /// # let _ = 999 /// LengthDelimitedCodec::builder() 1000 /// .length_field_type::<u16>() 1001 /// .new_framed(io); 1002 /// # } 1003 /// # pub fn main() {} 1004 /// ``` new_framed<T>(&self, inner: T) -> Framed<T, LengthDelimitedCodec> where T: AsyncRead + AsyncWrite,1005 pub fn new_framed<T>(&self, inner: T) -> Framed<T, LengthDelimitedCodec> 1006 where 1007 T: AsyncRead + AsyncWrite, 1008 { 1009 Framed::new(inner, self.new_codec()) 1010 } 1011 num_head_bytes(&self) -> usize1012 fn num_head_bytes(&self) -> usize { 1013 let num = self.length_field_offset + self.length_field_len; 1014 cmp::max(num, self.num_skip.unwrap_or(0)) 1015 } 1016 get_num_skip(&self) -> usize1017 fn get_num_skip(&self) -> usize { 1018 self.num_skip 1019 .unwrap_or(self.length_field_offset + self.length_field_len) 1020 } 1021 } 1022 1023 impl Default for Builder { default() -> Self1024 fn default() -> Self { 1025 Self::new() 1026 } 1027 } 1028 1029 // ===== impl LengthDelimitedCodecError ===== 1030 1031 impl fmt::Debug for LengthDelimitedCodecError { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1032 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1033 f.debug_struct("LengthDelimitedCodecError").finish() 1034 } 1035 } 1036 1037 impl fmt::Display for LengthDelimitedCodecError { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1038 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1039 f.write_str("frame size too big") 1040 } 1041 } 1042 1043 impl StdError for LengthDelimitedCodecError {} 1044