• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::collections::VecDeque;
6 use std::fmt;
7 use std::fmt::Display;
8 use std::time::Duration;
9 
10 use pcap_file::pcap::PacketHeader;
11 
12 const PACKET_HEADER_SIZE_IN_BYTES: usize = std::mem::size_of::<PacketHeader>();
13 
14 /// A wrapper around a ringer buffer that stores packet information.
15 /// This was made so on crosvm, we can write the packet information to a file
16 /// for debugging purposes.
17 
18 pub struct PacketRingBuffer {
19     ring_buffer: VecDeque<PacketInfo>,
20     max_size_in_bytes: usize,
21     current_size_in_bytes: usize,
22     last_popped_packet_timestamp: Option<Duration>,
23 }
24 
25 pub struct PacketInfo {
26     pub buf: Vec<u8>,
27     pub timestamp: Duration,
28 }
29 
30 #[derive(Eq, PartialEq, Debug)]
31 pub enum Error {
32     PacketTooBigError {
33         rb_max_size_in_bytes: usize,
34         packet_size_in_bytes: usize,
35     },
36 }
37 pub type Result<T> = std::result::Result<T, Error>;
38 
39 impl Display for Error {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result40     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
41         use self::Error::*;
42 
43         match self {
44             PacketTooBigError {
45                 rb_max_size_in_bytes,
46                 packet_size_in_bytes,
47             } => write!(
48                 f,
49                 "Packet of size {} bytes can't fit into Ring buffer of size {}",
50                 rb_max_size_in_bytes, packet_size_in_bytes
51             ),
52         }
53     }
54 }
55 
56 impl PacketRingBuffer {
new(max_size_in_bytes: usize) -> PacketRingBuffer57     pub fn new(max_size_in_bytes: usize) -> PacketRingBuffer {
58         PacketRingBuffer {
59             ring_buffer: VecDeque::new(),
60             max_size_in_bytes,
61             current_size_in_bytes: 0,
62             last_popped_packet_timestamp: None,
63         }
64     }
65 
add_packet(&mut self, buf: &[u8], packet_timestamp: Duration) -> Result<()>66     pub fn add_packet(&mut self, buf: &[u8], packet_timestamp: Duration) -> Result<()> {
67         self.prune_from_ring_buffer_if_oversized(buf)?;
68 
69         self.ring_buffer.push_front(PacketInfo {
70             buf: buf.to_vec(),
71             timestamp: packet_timestamp,
72         });
73         self.current_size_in_bytes += buf.len() + PACKET_HEADER_SIZE_IN_BYTES;
74 
75         Ok(())
76     }
77 
78     // While the size of the rb with the new packet is less than the max size of the rb.
prune_from_ring_buffer_if_oversized(&mut self, buf: &[u8]) -> Result<()>79     fn prune_from_ring_buffer_if_oversized(&mut self, buf: &[u8]) -> Result<()> {
80         let new_packet_size_in_bytes = buf.len() + PACKET_HEADER_SIZE_IN_BYTES;
81 
82         while self.current_size_in_bytes + new_packet_size_in_bytes > self.max_size_in_bytes {
83             match self.ring_buffer.pop_back() {
84                 Some(val) => {
85                     self.current_size_in_bytes -= val.buf.len() + PACKET_HEADER_SIZE_IN_BYTES;
86                     self.last_popped_packet_timestamp = self
87                         .last_popped_packet_timestamp
88                         .map(|t| std::cmp::max(t, val.timestamp))
89                         .or(Some(val.timestamp))
90                 }
91                 None => {
92                     return Err(Error::PacketTooBigError {
93                         rb_max_size_in_bytes: self.max_size_in_bytes,
94                         packet_size_in_bytes: new_packet_size_in_bytes,
95                     })
96                 }
97             }
98         }
99         Ok(())
100     }
101 
102     /// Aggregates two ring buffers of packets by removing packets prior to the max oldest
103     /// removed packet and sorting them by time.
pop_ring_buffers_and_aggregate<'a>( packet_rb1: &'a mut PacketRingBuffer, packet_rb2: &'a mut PacketRingBuffer, ) -> Vec<&'a PacketInfo>104     pub fn pop_ring_buffers_and_aggregate<'a>(
105         packet_rb1: &'a mut PacketRingBuffer,
106         packet_rb2: &'a mut PacketRingBuffer,
107     ) -> Vec<&'a PacketInfo> {
108         let mut result: Vec<&PacketInfo> = Vec::new();
109         result.extend(packet_rb1.ring_buffer.iter().collect::<Vec<&PacketInfo>>());
110         result.extend(packet_rb2.ring_buffer.iter().collect::<Vec<&PacketInfo>>());
111 
112         // The oldest time we want to keep in the aggregated result.
113         let start_time = std::cmp::max(
114             packet_rb1.last_popped_packet_timestamp,
115             packet_rb2.last_popped_packet_timestamp,
116         );
117 
118         let mut result = if let Some(start_time) = start_time {
119             result
120                 .into_iter()
121                 .filter(|packet| packet.timestamp > start_time)
122                 .collect()
123         } else {
124             result
125         };
126 
127         result.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
128         result
129     }
130 }
131 
132 #[cfg(test)]
133 mod tests {
134     use super::*;
135 
136     #[test]
test_add()137     fn test_add() {
138         let mut packet_rb = PacketRingBuffer::new(
139             /* max_size_in_bytes= */ 4 + PACKET_HEADER_SIZE_IN_BYTES,
140         );
141         let buf: &[u8] = &[1, 2, 3, 4];
142         let start_time = Duration::from_nanos(45);
143 
144         assert_eq!(packet_rb.ring_buffer.len(), 0);
145 
146         packet_rb
147             .add_packet(buf, start_time)
148             .expect("Failed to add packet.");
149 
150         let packet = packet_rb.ring_buffer.pop_back().unwrap();
151         assert_eq!(packet.buf, &[1, 2, 3, 4]);
152         assert_eq!(packet.timestamp.as_nanos(), 45);
153         assert_eq!(packet_rb.last_popped_packet_timestamp, None);
154         // Each packet has 16 bytes in it's PacketHeader
155         assert_eq!(
156             packet_rb.current_size_in_bytes,
157             4 + PACKET_HEADER_SIZE_IN_BYTES
158         );
159     }
160 
161     #[test]
test_add_no_space()162     fn test_add_no_space() {
163         // Max size is 3 bytes of buffer data + PACKET_HEADER_SIZE_IN_BYTES
164         let mut packet_rb = PacketRingBuffer::new(
165             /* max_size_in_bytes= */ 3 + PACKET_HEADER_SIZE_IN_BYTES,
166         );
167         let buf: &[u8] = &[1, 2, 3, 4];
168         let start_time = Duration::from_nanos(45);
169 
170         let res = packet_rb.add_packet(buf, start_time);
171 
172         // Should error because rb size is 19 bytes, but packet will take 20 bytes (4 bytes in buffer + 16 bytes from Packet header)
173         assert!(res.is_err());
174 
175         assert_eq!(
176             res.unwrap_err(),
177             Error::PacketTooBigError {
178                 rb_max_size_in_bytes: packet_rb.max_size_in_bytes,
179                 packet_size_in_bytes: 4 + PACKET_HEADER_SIZE_IN_BYTES
180             }
181         );
182     }
183 
184     #[test]
test_add_exceeds_size_pop_one()185     fn test_add_exceeds_size_pop_one() {
186         let mut packet_rb = PacketRingBuffer::new(
187             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
188         );
189 
190         packet_rb
191             .add_packet(&[1], Duration::from_nanos(1))
192             .expect("Failed to add packet.");
193         packet_rb
194             .add_packet(&[2], Duration::from_nanos(2))
195             .expect("Failed to add packet.");
196         packet_rb
197             .add_packet(&[3], Duration::from_nanos(3))
198             .expect("Failed to add packet.");
199         packet_rb
200             .add_packet(&[4], Duration::from_nanos(4))
201             .expect("Failed to add packet.");
202 
203         assert_eq!(packet_rb.ring_buffer.len(), 3);
204 
205         let packet1 = packet_rb.ring_buffer.pop_back().unwrap();
206         let packet2 = packet_rb.ring_buffer.pop_back().unwrap();
207         let packet3 = packet_rb.ring_buffer.pop_back().unwrap();
208 
209         assert_eq!(packet1.buf, &[2]);
210         assert_eq!(packet1.timestamp.as_nanos(), 2);
211         assert_eq!(packet2.buf, &[3]);
212         assert_eq!(packet2.timestamp.as_nanos(), 3);
213         assert_eq!(packet3.buf, &[4]);
214         assert_eq!(packet3.timestamp.as_nanos(), 4);
215         assert_eq!(
216             packet_rb.last_popped_packet_timestamp.unwrap().as_nanos(),
217             1
218         );
219         assert_eq!(packet_rb.current_size_in_bytes, 3 + 3 * 16);
220     }
221 
222     #[test]
test_add_exceeds_size_pop_multiple()223     fn test_add_exceeds_size_pop_multiple() {
224         let mut packet_rb = PacketRingBuffer::new(
225             /* max_size_in_bytes= */ 2 + PACKET_HEADER_SIZE_IN_BYTES,
226         );
227 
228         packet_rb
229             .add_packet(&[1], Duration::from_nanos(1))
230             .expect("Failed to add packet.");
231         packet_rb
232             .add_packet(&[2], Duration::from_nanos(2))
233             .expect("Failed to add packet.");
234         packet_rb
235             .add_packet(&[3, 4], Duration::from_nanos(3))
236             .expect("Failed to add packet.");
237         packet_rb
238             .add_packet(&[5, 6], Duration::from_nanos(4))
239             .expect("Failed to add packet.");
240 
241         // The first 3 packets should've been popped
242         assert_eq!(packet_rb.ring_buffer.len(), 1);
243 
244         let packet1 = packet_rb.ring_buffer.pop_back().unwrap();
245 
246         assert_eq!(packet1.buf, &[5, 6]);
247         assert_eq!(packet1.timestamp.as_nanos(), 4);
248         assert_eq!(
249             packet_rb.last_popped_packet_timestamp.unwrap().as_nanos(),
250             3
251         );
252         assert_eq!(
253             packet_rb.current_size_in_bytes,
254             2 + PACKET_HEADER_SIZE_IN_BYTES
255         );
256     }
257 
258     #[test]
test_aggregate_one_empty()259     fn test_aggregate_one_empty() {
260         let mut tx_packet_rb = PacketRingBuffer::new(
261             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
262         );
263 
264         let mut rx_packet_rb = PacketRingBuffer::new(
265             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
266         );
267         rx_packet_rb
268             .add_packet(&[4], Duration::from_nanos(6))
269             .expect("Failed to add packet.");
270         rx_packet_rb
271             .add_packet(&[5], Duration::from_nanos(10))
272             .expect("Failed to add packet.");
273 
274         let res =
275             PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb);
276         let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect();
277 
278         assert_eq!(packet_data_list, [&[4], &[5]]);
279     }
280 
281     #[test]
test_aggregate_both_empty()282     fn test_aggregate_both_empty() {
283         let mut tx_packet_rb = PacketRingBuffer::new(
284             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
285         );
286 
287         let mut rx_packet_rb = PacketRingBuffer::new(
288             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
289         );
290 
291         let res =
292             PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb);
293         let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect();
294 
295         assert!(packet_data_list.is_empty());
296     }
297 
298     #[test]
test_aggregate_none_popped()299     fn test_aggregate_none_popped() {
300         let mut tx_packet_rb = PacketRingBuffer::new(
301             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
302         );
303         tx_packet_rb
304             .add_packet(&[1], Duration::from_nanos(2))
305             .expect("Failed to add packet.");
306         tx_packet_rb
307             .add_packet(&[2], Duration::from_nanos(8))
308             .expect("Failed to add packet.");
309         tx_packet_rb
310             .add_packet(&[3], Duration::from_nanos(9))
311             .expect("Failed to add packet.");
312 
313         let mut rx_packet_rb = PacketRingBuffer::new(
314             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
315         );
316         rx_packet_rb
317             .add_packet(&[4], Duration::from_nanos(6))
318             .expect("Failed to add packet.");
319         rx_packet_rb
320             .add_packet(&[5], Duration::from_nanos(10))
321             .expect("Failed to add packet.");
322 
323         let res =
324             PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb);
325 
326         let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect();
327         assert_eq!(packet_data_list, [&[1], &[4], &[2], &[3], &[5]]);
328     }
329 
330     #[test]
test_aggregate_with_one_ring_buffer_popped()331     fn test_aggregate_with_one_ring_buffer_popped() {
332         let mut tx_packet_rb = PacketRingBuffer::new(
333             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
334         );
335         tx_packet_rb
336             .add_packet(&[1], Duration::from_nanos(2))
337             .expect("Failed to add packet.");
338         tx_packet_rb
339             .add_packet(&[2], Duration::from_nanos(8))
340             .expect("Failed to add packet.");
341         tx_packet_rb
342             .add_packet(&[3, 4], Duration::from_nanos(9))
343             .expect("Failed to add packet.");
344         tx_packet_rb
345             .add_packet(&[5], Duration::from_nanos(14))
346             .expect("Failed to add packet.");
347 
348         let mut rx_packet_rb = PacketRingBuffer::new(
349             /* max_size_in_bytes= */ 2 + 2 * PACKET_HEADER_SIZE_IN_BYTES,
350         );
351         rx_packet_rb
352             .add_packet(&[6], Duration::from_nanos(6))
353             .expect("Failed to add packet.");
354         rx_packet_rb
355             .add_packet(&[7], Duration::from_nanos(10))
356             .expect("Failed to add packet.");
357 
358         let res =
359             PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb);
360 
361         let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect();
362         assert_eq!(packet_data_list.len(), 3);
363         assert_eq!(packet_data_list[0], &[3, 4]);
364         assert_eq!(packet_data_list[1], &[7]);
365         assert_eq!(packet_data_list[2], &[5]);
366     }
367 
368     #[test]
test_aggregate_with_both_ring_buffers_popped()369     fn test_aggregate_with_both_ring_buffers_popped() {
370         let mut tx_packet_rb = PacketRingBuffer::new(
371             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
372         );
373         tx_packet_rb
374             .add_packet(&[1], Duration::from_nanos(2))
375             .expect("Failed to add packet.");
376         tx_packet_rb
377             .add_packet(&[2], Duration::from_nanos(8))
378             .expect("Failed to add packet.");
379         tx_packet_rb
380             .add_packet(&[3, 4], Duration::from_nanos(9))
381             .expect("Failed to add packet.");
382         tx_packet_rb
383             .add_packet(&[5], Duration::from_nanos(15))
384             .expect("Failed to add packet.");
385 
386         let mut rx_packet_rb = PacketRingBuffer::new(
387             /* max_size_in_bytes= */ 5 + 2 * PACKET_HEADER_SIZE_IN_BYTES,
388         );
389         rx_packet_rb
390             .add_packet(&[6], Duration::from_nanos(6))
391             .expect("Failed to add packet.");
392         rx_packet_rb
393             .add_packet(&[7, 8], Duration::from_nanos(10))
394             .expect("Failed to add packet.");
395         rx_packet_rb
396             .add_packet(&[9], Duration::from_nanos(12))
397             .expect("Failed to add packet.");
398         rx_packet_rb
399             .add_packet(&[10, 11, 12], Duration::from_nanos(13))
400             .expect("Failed to add packet.");
401         rx_packet_rb
402             .add_packet(&[13, 14], Duration::from_nanos(16))
403             .expect("Failed to add packet.");
404 
405         let res =
406             PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb);
407 
408         let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect();
409         assert_eq!(packet_data_list.len(), 3);
410         assert_eq!(packet_data_list[0], &[10, 11, 12]);
411         assert_eq!(packet_data_list[1], &[5]);
412         assert_eq!(packet_data_list[2], &[13, 14]);
413     }
414 }
415