1 use crate::codec::decoder::Decoder; 2 use crate::codec::encoder::Encoder; 3 4 use bytes::{Buf, BufMut, Bytes, BytesMut}; 5 use std::{cmp, fmt, io, str}; 6 7 const DEFAULT_SEEK_DELIMITERS: &[u8] = b",;\n\r"; 8 const DEFAULT_SEQUENCE_WRITER: &[u8] = b","; 9 /// A simple [`Decoder`] and [`Encoder`] implementation that splits up data into chunks based on any character in the given delimiter string. 10 /// 11 /// [`Decoder`]: crate::codec::Decoder 12 /// [`Encoder`]: crate::codec::Encoder 13 /// 14 /// # Example 15 /// Decode string of bytes containing various different delimiters. 16 /// 17 /// [`BytesMut`]: bytes::BytesMut 18 /// [`Error`]: std::io::Error 19 /// 20 /// ``` 21 /// use tokio_util::codec::{AnyDelimiterCodec, Decoder}; 22 /// use bytes::{BufMut, BytesMut}; 23 /// 24 /// # 25 /// # #[tokio::main(flavor = "current_thread")] 26 /// # async fn main() -> Result<(), std::io::Error> { 27 /// let mut codec = AnyDelimiterCodec::new(b",;\r\n".to_vec(),b";".to_vec()); 28 /// let buf = &mut BytesMut::new(); 29 /// buf.reserve(200); 30 /// buf.put_slice(b"chunk 1,chunk 2;chunk 3\n\r"); 31 /// assert_eq!("chunk 1", codec.decode(buf).unwrap().unwrap()); 32 /// assert_eq!("chunk 2", codec.decode(buf).unwrap().unwrap()); 33 /// assert_eq!("chunk 3", codec.decode(buf).unwrap().unwrap()); 34 /// assert_eq!("", codec.decode(buf).unwrap().unwrap()); 35 /// assert_eq!(None, codec.decode(buf).unwrap()); 36 /// # Ok(()) 37 /// # } 38 /// ``` 39 /// 40 #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] 41 pub struct AnyDelimiterCodec { 42 // Stored index of the next index to examine for the delimiter character. 43 // This is used to optimize searching. 44 // For example, if `decode` was called with `abc` and the delimiter is '{}', it would hold `3`, 45 // because that is the next index to examine. 46 // The next time `decode` is called with `abcde}`, the method will 47 // only look at `de}` before returning. 48 next_index: usize, 49 50 /// The maximum length for a given chunk. If `usize::MAX`, chunks will be 51 /// read until a delimiter character is reached. 52 max_length: usize, 53 54 /// Are we currently discarding the remainder of a chunk which was over 55 /// the length limit? 56 is_discarding: bool, 57 58 /// The bytes that are using for search during decode 59 seek_delimiters: Vec<u8>, 60 61 /// The bytes that are using for encoding 62 sequence_writer: Vec<u8>, 63 } 64 65 impl AnyDelimiterCodec { 66 /// Returns a `AnyDelimiterCodec` for splitting up data into chunks. 67 /// 68 /// # Note 69 /// 70 /// The returned `AnyDelimiterCodec` will not have an upper bound on the length 71 /// of a buffered chunk. See the documentation for [`new_with_max_length`] 72 /// for information on why this could be a potential security risk. 73 /// 74 /// [`new_with_max_length`]: crate::codec::AnyDelimiterCodec::new_with_max_length() new(seek_delimiters: Vec<u8>, sequence_writer: Vec<u8>) -> AnyDelimiterCodec75 pub fn new(seek_delimiters: Vec<u8>, sequence_writer: Vec<u8>) -> AnyDelimiterCodec { 76 AnyDelimiterCodec { 77 next_index: 0, 78 max_length: usize::MAX, 79 is_discarding: false, 80 seek_delimiters, 81 sequence_writer, 82 } 83 } 84 85 /// Returns a `AnyDelimiterCodec` with a maximum chunk length limit. 86 /// 87 /// If this is set, calls to `AnyDelimiterCodec::decode` will return a 88 /// [`AnyDelimiterCodecError`] when a chunk exceeds the length limit. Subsequent calls 89 /// will discard up to `limit` bytes from that chunk until a delimiter 90 /// character is reached, returning `None` until the delimiter over the limit 91 /// has been fully discarded. After that point, calls to `decode` will 92 /// function as normal. 93 /// 94 /// # Note 95 /// 96 /// Setting a length limit is highly recommended for any `AnyDelimiterCodec` which 97 /// will be exposed to untrusted input. Otherwise, the size of the buffer 98 /// that holds the chunk currently being read is unbounded. An attacker could 99 /// exploit this unbounded buffer by sending an unbounded amount of input 100 /// without any delimiter characters, causing unbounded memory consumption. 101 /// 102 /// [`AnyDelimiterCodecError`]: crate::codec::AnyDelimiterCodecError new_with_max_length( seek_delimiters: Vec<u8>, sequence_writer: Vec<u8>, max_length: usize, ) -> Self103 pub fn new_with_max_length( 104 seek_delimiters: Vec<u8>, 105 sequence_writer: Vec<u8>, 106 max_length: usize, 107 ) -> Self { 108 AnyDelimiterCodec { 109 max_length, 110 ..AnyDelimiterCodec::new(seek_delimiters, sequence_writer) 111 } 112 } 113 114 /// Returns the maximum chunk length when decoding. 115 /// 116 /// ``` 117 /// use std::usize; 118 /// use tokio_util::codec::AnyDelimiterCodec; 119 /// 120 /// let codec = AnyDelimiterCodec::new(b",;\n".to_vec(), b";".to_vec()); 121 /// assert_eq!(codec.max_length(), usize::MAX); 122 /// ``` 123 /// ``` 124 /// use tokio_util::codec::AnyDelimiterCodec; 125 /// 126 /// let codec = AnyDelimiterCodec::new_with_max_length(b",;\n".to_vec(), b";".to_vec(), 256); 127 /// assert_eq!(codec.max_length(), 256); 128 /// ``` max_length(&self) -> usize129 pub fn max_length(&self) -> usize { 130 self.max_length 131 } 132 } 133 134 impl Decoder for AnyDelimiterCodec { 135 type Item = Bytes; 136 type Error = AnyDelimiterCodecError; 137 decode(&mut self, buf: &mut BytesMut) -> Result<Option<Bytes>, AnyDelimiterCodecError>138 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Bytes>, AnyDelimiterCodecError> { 139 loop { 140 // Determine how far into the buffer we'll search for a delimiter. If 141 // there's no max_length set, we'll read to the end of the buffer. 142 let read_to = cmp::min(self.max_length.saturating_add(1), buf.len()); 143 144 let new_chunk_offset = buf[self.next_index..read_to].iter().position(|b| { 145 self.seek_delimiters 146 .iter() 147 .any(|delimiter| *b == *delimiter) 148 }); 149 150 match (self.is_discarding, new_chunk_offset) { 151 (true, Some(offset)) => { 152 // If we found a new chunk, discard up to that offset and 153 // then stop discarding. On the next iteration, we'll try 154 // to read a chunk normally. 155 buf.advance(offset + self.next_index + 1); 156 self.is_discarding = false; 157 self.next_index = 0; 158 } 159 (true, None) => { 160 // Otherwise, we didn't find a new chunk, so we'll discard 161 // everything we read. On the next iteration, we'll continue 162 // discarding up to max_len bytes unless we find a new chunk. 163 buf.advance(read_to); 164 self.next_index = 0; 165 if buf.is_empty() { 166 return Ok(None); 167 } 168 } 169 (false, Some(offset)) => { 170 // Found a chunk! 171 let new_chunk_index = offset + self.next_index; 172 self.next_index = 0; 173 let mut chunk = buf.split_to(new_chunk_index + 1); 174 chunk.truncate(chunk.len() - 1); 175 let chunk = chunk.freeze(); 176 return Ok(Some(chunk)); 177 } 178 (false, None) if buf.len() > self.max_length => { 179 // Reached the maximum length without finding a 180 // new chunk, return an error and start discarding on the 181 // next call. 182 self.is_discarding = true; 183 return Err(AnyDelimiterCodecError::MaxChunkLengthExceeded); 184 } 185 (false, None) => { 186 // We didn't find a chunk or reach the length limit, so the next 187 // call will resume searching at the current offset. 188 self.next_index = read_to; 189 return Ok(None); 190 } 191 } 192 } 193 } 194 decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Bytes>, AnyDelimiterCodecError>195 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Bytes>, AnyDelimiterCodecError> { 196 Ok(match self.decode(buf)? { 197 Some(frame) => Some(frame), 198 None => { 199 // return remaining data, if any 200 if buf.is_empty() { 201 None 202 } else { 203 let chunk = buf.split_to(buf.len()); 204 self.next_index = 0; 205 Some(chunk.freeze()) 206 } 207 } 208 }) 209 } 210 } 211 212 impl<T> Encoder<T> for AnyDelimiterCodec 213 where 214 T: AsRef<str>, 215 { 216 type Error = AnyDelimiterCodecError; 217 encode(&mut self, chunk: T, buf: &mut BytesMut) -> Result<(), AnyDelimiterCodecError>218 fn encode(&mut self, chunk: T, buf: &mut BytesMut) -> Result<(), AnyDelimiterCodecError> { 219 let chunk = chunk.as_ref(); 220 buf.reserve(chunk.len() + 1); 221 buf.put(chunk.as_bytes()); 222 buf.put(self.sequence_writer.as_ref()); 223 224 Ok(()) 225 } 226 } 227 228 impl Default for AnyDelimiterCodec { default() -> Self229 fn default() -> Self { 230 Self::new( 231 DEFAULT_SEEK_DELIMITERS.to_vec(), 232 DEFAULT_SEQUENCE_WRITER.to_vec(), 233 ) 234 } 235 } 236 237 /// An error occurred while encoding or decoding a chunk. 238 #[derive(Debug)] 239 pub enum AnyDelimiterCodecError { 240 /// The maximum chunk length was exceeded. 241 MaxChunkLengthExceeded, 242 /// An IO error occurred. 243 Io(io::Error), 244 } 245 246 impl fmt::Display for AnyDelimiterCodecError { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 248 match self { 249 AnyDelimiterCodecError::MaxChunkLengthExceeded => { 250 write!(f, "max chunk length exceeded") 251 } 252 AnyDelimiterCodecError::Io(e) => write!(f, "{e}"), 253 } 254 } 255 } 256 257 impl From<io::Error> for AnyDelimiterCodecError { from(e: io::Error) -> AnyDelimiterCodecError258 fn from(e: io::Error) -> AnyDelimiterCodecError { 259 AnyDelimiterCodecError::Io(e) 260 } 261 } 262 263 impl std::error::Error for AnyDelimiterCodecError {} 264