1 // Copyright 2022 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::collections::VecDeque; 6 use std::fmt; 7 use std::fmt::Display; 8 use std::time::Duration; 9 10 use pcap_file::pcap::PacketHeader; 11 12 const PACKET_HEADER_SIZE_IN_BYTES: usize = std::mem::size_of::<PacketHeader>(); 13 14 /// A wrapper around a ringer buffer that stores packet information. 15 /// This was made so on crosvm, we can write the packet information to a file 16 /// for debugging purposes. 17 18 pub struct PacketRingBuffer { 19 ring_buffer: VecDeque<PacketInfo>, 20 max_size_in_bytes: usize, 21 current_size_in_bytes: usize, 22 last_popped_packet_timestamp: Option<Duration>, 23 } 24 25 pub struct PacketInfo { 26 pub buf: Vec<u8>, 27 pub timestamp: Duration, 28 } 29 30 #[derive(Eq, PartialEq, Debug)] 31 pub enum Error { 32 PacketTooBigError { 33 rb_max_size_in_bytes: usize, 34 packet_size_in_bytes: usize, 35 }, 36 } 37 pub type Result<T> = std::result::Result<T, Error>; 38 39 impl Display for Error { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result40 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 41 use self::Error::*; 42 43 match self { 44 PacketTooBigError { 45 rb_max_size_in_bytes, 46 packet_size_in_bytes, 47 } => write!( 48 f, 49 "Packet of size {} bytes can't fit into Ring buffer of size {}", 50 rb_max_size_in_bytes, packet_size_in_bytes 51 ), 52 } 53 } 54 } 55 56 impl PacketRingBuffer { new(max_size_in_bytes: usize) -> PacketRingBuffer57 pub fn new(max_size_in_bytes: usize) -> PacketRingBuffer { 58 PacketRingBuffer { 59 ring_buffer: VecDeque::new(), 60 max_size_in_bytes, 61 current_size_in_bytes: 0, 62 last_popped_packet_timestamp: None, 63 } 64 } 65 add_packet(&mut self, buf: &[u8], packet_timestamp: Duration) -> Result<()>66 pub fn add_packet(&mut self, buf: &[u8], packet_timestamp: Duration) -> Result<()> { 67 self.prune_from_ring_buffer_if_oversized(buf)?; 68 69 self.ring_buffer.push_front(PacketInfo { 70 buf: buf.to_vec(), 71 timestamp: packet_timestamp, 72 }); 73 self.current_size_in_bytes += buf.len() + PACKET_HEADER_SIZE_IN_BYTES; 74 75 Ok(()) 76 } 77 78 // While the size of the rb with the new packet is less than the max size of the rb. prune_from_ring_buffer_if_oversized(&mut self, buf: &[u8]) -> Result<()>79 fn prune_from_ring_buffer_if_oversized(&mut self, buf: &[u8]) -> Result<()> { 80 let new_packet_size_in_bytes = buf.len() + PACKET_HEADER_SIZE_IN_BYTES; 81 82 while self.current_size_in_bytes + new_packet_size_in_bytes > self.max_size_in_bytes { 83 match self.ring_buffer.pop_back() { 84 Some(val) => { 85 self.current_size_in_bytes -= val.buf.len() + PACKET_HEADER_SIZE_IN_BYTES; 86 self.last_popped_packet_timestamp = self 87 .last_popped_packet_timestamp 88 .map(|t| std::cmp::max(t, val.timestamp)) 89 .or(Some(val.timestamp)) 90 } 91 None => { 92 return Err(Error::PacketTooBigError { 93 rb_max_size_in_bytes: self.max_size_in_bytes, 94 packet_size_in_bytes: new_packet_size_in_bytes, 95 }) 96 } 97 } 98 } 99 Ok(()) 100 } 101 102 /// Aggregates two ring buffers of packets by removing packets prior to the max oldest 103 /// removed packet and sorting them by time. pop_ring_buffers_and_aggregate<'a>( packet_rb1: &'a mut PacketRingBuffer, packet_rb2: &'a mut PacketRingBuffer, ) -> Vec<&'a PacketInfo>104 pub fn pop_ring_buffers_and_aggregate<'a>( 105 packet_rb1: &'a mut PacketRingBuffer, 106 packet_rb2: &'a mut PacketRingBuffer, 107 ) -> Vec<&'a PacketInfo> { 108 let mut result: Vec<&PacketInfo> = Vec::new(); 109 result.extend(packet_rb1.ring_buffer.iter().collect::<Vec<&PacketInfo>>()); 110 result.extend(packet_rb2.ring_buffer.iter().collect::<Vec<&PacketInfo>>()); 111 112 // The oldest time we want to keep in the aggregated result. 113 let start_time = std::cmp::max( 114 packet_rb1.last_popped_packet_timestamp, 115 packet_rb2.last_popped_packet_timestamp, 116 ); 117 118 let mut result = if let Some(start_time) = start_time { 119 result 120 .into_iter() 121 .filter(|packet| packet.timestamp > start_time) 122 .collect() 123 } else { 124 result 125 }; 126 127 result.sort_by(|a, b| a.timestamp.cmp(&b.timestamp)); 128 result 129 } 130 } 131 132 #[cfg(test)] 133 mod tests { 134 use super::*; 135 136 #[test] test_add()137 fn test_add() { 138 let mut packet_rb = PacketRingBuffer::new( 139 /* max_size_in_bytes= */ 4 + PACKET_HEADER_SIZE_IN_BYTES, 140 ); 141 let buf: &[u8] = &[1, 2, 3, 4]; 142 let start_time = Duration::from_nanos(45); 143 144 assert_eq!(packet_rb.ring_buffer.len(), 0); 145 146 packet_rb 147 .add_packet(buf, start_time) 148 .expect("Failed to add packet."); 149 150 let packet = packet_rb.ring_buffer.pop_back().unwrap(); 151 assert_eq!(packet.buf, &[1, 2, 3, 4]); 152 assert_eq!(packet.timestamp.as_nanos(), 45); 153 assert_eq!(packet_rb.last_popped_packet_timestamp, None); 154 // Each packet has 16 bytes in it's PacketHeader 155 assert_eq!( 156 packet_rb.current_size_in_bytes, 157 4 + PACKET_HEADER_SIZE_IN_BYTES 158 ); 159 } 160 161 #[test] test_add_no_space()162 fn test_add_no_space() { 163 // Max size is 3 bytes of buffer data + PACKET_HEADER_SIZE_IN_BYTES 164 let mut packet_rb = PacketRingBuffer::new( 165 /* max_size_in_bytes= */ 3 + PACKET_HEADER_SIZE_IN_BYTES, 166 ); 167 let buf: &[u8] = &[1, 2, 3, 4]; 168 let start_time = Duration::from_nanos(45); 169 170 let res = packet_rb.add_packet(buf, start_time); 171 172 // Should error because rb size is 19 bytes, but packet will take 20 bytes (4 bytes in buffer + 16 bytes from Packet header) 173 assert!(res.is_err()); 174 175 assert_eq!( 176 res.unwrap_err(), 177 Error::PacketTooBigError { 178 rb_max_size_in_bytes: packet_rb.max_size_in_bytes, 179 packet_size_in_bytes: 4 + PACKET_HEADER_SIZE_IN_BYTES 180 } 181 ); 182 } 183 184 #[test] test_add_exceeds_size_pop_one()185 fn test_add_exceeds_size_pop_one() { 186 let mut packet_rb = PacketRingBuffer::new( 187 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 188 ); 189 190 packet_rb 191 .add_packet(&[1], Duration::from_nanos(1)) 192 .expect("Failed to add packet."); 193 packet_rb 194 .add_packet(&[2], Duration::from_nanos(2)) 195 .expect("Failed to add packet."); 196 packet_rb 197 .add_packet(&[3], Duration::from_nanos(3)) 198 .expect("Failed to add packet."); 199 packet_rb 200 .add_packet(&[4], Duration::from_nanos(4)) 201 .expect("Failed to add packet."); 202 203 assert_eq!(packet_rb.ring_buffer.len(), 3); 204 205 let packet1 = packet_rb.ring_buffer.pop_back().unwrap(); 206 let packet2 = packet_rb.ring_buffer.pop_back().unwrap(); 207 let packet3 = packet_rb.ring_buffer.pop_back().unwrap(); 208 209 assert_eq!(packet1.buf, &[2]); 210 assert_eq!(packet1.timestamp.as_nanos(), 2); 211 assert_eq!(packet2.buf, &[3]); 212 assert_eq!(packet2.timestamp.as_nanos(), 3); 213 assert_eq!(packet3.buf, &[4]); 214 assert_eq!(packet3.timestamp.as_nanos(), 4); 215 assert_eq!( 216 packet_rb.last_popped_packet_timestamp.unwrap().as_nanos(), 217 1 218 ); 219 assert_eq!(packet_rb.current_size_in_bytes, 3 + 3 * 16); 220 } 221 222 #[test] test_add_exceeds_size_pop_multiple()223 fn test_add_exceeds_size_pop_multiple() { 224 let mut packet_rb = PacketRingBuffer::new( 225 /* max_size_in_bytes= */ 2 + PACKET_HEADER_SIZE_IN_BYTES, 226 ); 227 228 packet_rb 229 .add_packet(&[1], Duration::from_nanos(1)) 230 .expect("Failed to add packet."); 231 packet_rb 232 .add_packet(&[2], Duration::from_nanos(2)) 233 .expect("Failed to add packet."); 234 packet_rb 235 .add_packet(&[3, 4], Duration::from_nanos(3)) 236 .expect("Failed to add packet."); 237 packet_rb 238 .add_packet(&[5, 6], Duration::from_nanos(4)) 239 .expect("Failed to add packet."); 240 241 // The first 3 packets should've been popped 242 assert_eq!(packet_rb.ring_buffer.len(), 1); 243 244 let packet1 = packet_rb.ring_buffer.pop_back().unwrap(); 245 246 assert_eq!(packet1.buf, &[5, 6]); 247 assert_eq!(packet1.timestamp.as_nanos(), 4); 248 assert_eq!( 249 packet_rb.last_popped_packet_timestamp.unwrap().as_nanos(), 250 3 251 ); 252 assert_eq!( 253 packet_rb.current_size_in_bytes, 254 2 + PACKET_HEADER_SIZE_IN_BYTES 255 ); 256 } 257 258 #[test] test_aggregate_one_empty()259 fn test_aggregate_one_empty() { 260 let mut tx_packet_rb = PacketRingBuffer::new( 261 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 262 ); 263 264 let mut rx_packet_rb = PacketRingBuffer::new( 265 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 266 ); 267 rx_packet_rb 268 .add_packet(&[4], Duration::from_nanos(6)) 269 .expect("Failed to add packet."); 270 rx_packet_rb 271 .add_packet(&[5], Duration::from_nanos(10)) 272 .expect("Failed to add packet."); 273 274 let res = 275 PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb); 276 let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect(); 277 278 assert_eq!(packet_data_list, [&[4], &[5]]); 279 } 280 281 #[test] test_aggregate_both_empty()282 fn test_aggregate_both_empty() { 283 let mut tx_packet_rb = PacketRingBuffer::new( 284 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 285 ); 286 287 let mut rx_packet_rb = PacketRingBuffer::new( 288 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 289 ); 290 291 let res = 292 PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb); 293 let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect(); 294 295 assert!(packet_data_list.is_empty()); 296 } 297 298 #[test] test_aggregate_none_popped()299 fn test_aggregate_none_popped() { 300 let mut tx_packet_rb = PacketRingBuffer::new( 301 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 302 ); 303 tx_packet_rb 304 .add_packet(&[1], Duration::from_nanos(2)) 305 .expect("Failed to add packet."); 306 tx_packet_rb 307 .add_packet(&[2], Duration::from_nanos(8)) 308 .expect("Failed to add packet."); 309 tx_packet_rb 310 .add_packet(&[3], Duration::from_nanos(9)) 311 .expect("Failed to add packet."); 312 313 let mut rx_packet_rb = PacketRingBuffer::new( 314 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 315 ); 316 rx_packet_rb 317 .add_packet(&[4], Duration::from_nanos(6)) 318 .expect("Failed to add packet."); 319 rx_packet_rb 320 .add_packet(&[5], Duration::from_nanos(10)) 321 .expect("Failed to add packet."); 322 323 let res = 324 PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb); 325 326 let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect(); 327 assert_eq!(packet_data_list, [&[1], &[4], &[2], &[3], &[5]]); 328 } 329 330 #[test] test_aggregate_with_one_ring_buffer_popped()331 fn test_aggregate_with_one_ring_buffer_popped() { 332 let mut tx_packet_rb = PacketRingBuffer::new( 333 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 334 ); 335 tx_packet_rb 336 .add_packet(&[1], Duration::from_nanos(2)) 337 .expect("Failed to add packet."); 338 tx_packet_rb 339 .add_packet(&[2], Duration::from_nanos(8)) 340 .expect("Failed to add packet."); 341 tx_packet_rb 342 .add_packet(&[3, 4], Duration::from_nanos(9)) 343 .expect("Failed to add packet."); 344 tx_packet_rb 345 .add_packet(&[5], Duration::from_nanos(14)) 346 .expect("Failed to add packet."); 347 348 let mut rx_packet_rb = PacketRingBuffer::new( 349 /* max_size_in_bytes= */ 2 + 2 * PACKET_HEADER_SIZE_IN_BYTES, 350 ); 351 rx_packet_rb 352 .add_packet(&[6], Duration::from_nanos(6)) 353 .expect("Failed to add packet."); 354 rx_packet_rb 355 .add_packet(&[7], Duration::from_nanos(10)) 356 .expect("Failed to add packet."); 357 358 let res = 359 PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb); 360 361 let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect(); 362 assert_eq!(packet_data_list.len(), 3); 363 assert_eq!(packet_data_list[0], &[3, 4]); 364 assert_eq!(packet_data_list[1], &[7]); 365 assert_eq!(packet_data_list[2], &[5]); 366 } 367 368 #[test] test_aggregate_with_both_ring_buffers_popped()369 fn test_aggregate_with_both_ring_buffers_popped() { 370 let mut tx_packet_rb = PacketRingBuffer::new( 371 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 372 ); 373 tx_packet_rb 374 .add_packet(&[1], Duration::from_nanos(2)) 375 .expect("Failed to add packet."); 376 tx_packet_rb 377 .add_packet(&[2], Duration::from_nanos(8)) 378 .expect("Failed to add packet."); 379 tx_packet_rb 380 .add_packet(&[3, 4], Duration::from_nanos(9)) 381 .expect("Failed to add packet."); 382 tx_packet_rb 383 .add_packet(&[5], Duration::from_nanos(15)) 384 .expect("Failed to add packet."); 385 386 let mut rx_packet_rb = PacketRingBuffer::new( 387 /* max_size_in_bytes= */ 5 + 2 * PACKET_HEADER_SIZE_IN_BYTES, 388 ); 389 rx_packet_rb 390 .add_packet(&[6], Duration::from_nanos(6)) 391 .expect("Failed to add packet."); 392 rx_packet_rb 393 .add_packet(&[7, 8], Duration::from_nanos(10)) 394 .expect("Failed to add packet."); 395 rx_packet_rb 396 .add_packet(&[9], Duration::from_nanos(12)) 397 .expect("Failed to add packet."); 398 rx_packet_rb 399 .add_packet(&[10, 11, 12], Duration::from_nanos(13)) 400 .expect("Failed to add packet."); 401 rx_packet_rb 402 .add_packet(&[13, 14], Duration::from_nanos(16)) 403 .expect("Failed to add packet."); 404 405 let res = 406 PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb); 407 408 let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect(); 409 assert_eq!(packet_data_list.len(), 3); 410 assert_eq!(packet_data_list[0], &[10, 11, 12]); 411 assert_eq!(packet_data_list[1], &[5]); 412 assert_eq!(packet_data_list[2], &[13, 14]); 413 } 414 } 415