1 // Copyright (C) 2020, Cloudflare, Inc. 2 // All rights reserved. 3 // 4 // Redistribution and use in source and binary forms, with or without 5 // modification, are permitted provided that the following conditions are 6 // met: 7 // 8 // * Redistributions of source code must retain the above copyright notice, 9 // this list of conditions and the following disclaimer. 10 // 11 // * Redistributions in binary form must reproduce the above copyright 12 // notice, this list of conditions and the following disclaimer in the 13 // documentation and/or other materials provided with the distribution. 14 // 15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 27 use crate::Error; 28 use crate::Result; 29 30 use std::collections::VecDeque; 31 32 /// Keeps track of DATAGRAM frames. 33 #[derive(Default)] 34 pub struct DatagramQueue { 35 queue: Option<VecDeque<Vec<u8>>>, 36 queue_max_len: usize, 37 queue_bytes_size: usize, 38 } 39 40 impl DatagramQueue { new(queue_max_len: usize) -> Self41 pub fn new(queue_max_len: usize) -> Self { 42 DatagramQueue { 43 queue: None, 44 queue_bytes_size: 0, 45 queue_max_len, 46 } 47 } 48 push(&mut self, data: Vec<u8>) -> Result<()>49 pub fn push(&mut self, data: Vec<u8>) -> Result<()> { 50 if self.is_full() { 51 return Err(Error::Done); 52 } 53 54 self.queue_bytes_size += data.len(); 55 self.queue 56 .get_or_insert_with(Default::default) 57 .push_back(data); 58 59 Ok(()) 60 } 61 peek_front_len(&self) -> Option<usize>62 pub fn peek_front_len(&self) -> Option<usize> { 63 self.queue.as_ref().and_then(|q| q.front().map(|d| d.len())) 64 } 65 peek_front_bytes(&self, buf: &mut [u8], len: usize) -> Result<usize>66 pub fn peek_front_bytes(&self, buf: &mut [u8], len: usize) -> Result<usize> { 67 match self.queue.as_ref().and_then(|q| q.front()) { 68 Some(d) => { 69 let len = std::cmp::min(len, d.len()); 70 if buf.len() < len { 71 return Err(Error::BufferTooShort); 72 } 73 74 buf[..len].copy_from_slice(&d[..len]); 75 Ok(len) 76 }, 77 78 None => Err(Error::Done), 79 } 80 } 81 pop(&mut self) -> Option<Vec<u8>>82 pub fn pop(&mut self) -> Option<Vec<u8>> { 83 if let Some(d) = self.queue.as_mut().and_then(|q| q.pop_front()) { 84 self.queue_bytes_size = self.queue_bytes_size.saturating_sub(d.len()); 85 return Some(d); 86 } 87 88 None 89 } 90 has_pending(&self) -> bool91 pub fn has_pending(&self) -> bool { 92 !self.queue.as_ref().map(|q| q.is_empty()).unwrap_or(true) 93 } 94 purge<F: Fn(&[u8]) -> bool>(&mut self, f: F)95 pub fn purge<F: Fn(&[u8]) -> bool>(&mut self, f: F) { 96 if let Some(q) = self.queue.as_mut() { 97 q.retain(|d| !f(d)); 98 self.queue_bytes_size = q.iter().fold(0, |total, d| total + d.len()); 99 } 100 } 101 is_full(&self) -> bool102 pub fn is_full(&self) -> bool { 103 self.len() == self.queue_max_len 104 } 105 len(&self) -> usize106 pub fn len(&self) -> usize { 107 self.queue.as_ref().map(|q| q.len()).unwrap_or(0) 108 } 109 byte_size(&self) -> usize110 pub fn byte_size(&self) -> usize { 111 self.queue_bytes_size 112 } 113 } 114