1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::collections::{HashMap, HashSet, VecDeque}; 15 16 use crate::h3::parts::Parts; 17 use crate::h3::qpack::error::ErrorCode::DecoderStreamError; 18 use crate::h3::qpack::error::{ErrorCode, QpackError}; 19 use crate::h3::qpack::format::encoder::{ 20 DecInstDecoder, InstDecodeState, PartsIter, ReprEncodeState, SetCap, 21 }; 22 use crate::h3::qpack::format::ReprEncoder; 23 use crate::h3::qpack::integer::{Integer, IntegerEncoder}; 24 use crate::h3::qpack::table::{DynamicTable, NameField}; 25 use crate::h3::qpack::{DecoderInstruction, PrefixMask}; 26 27 pub struct QpackEncoder { 28 max_blocked_streams: usize, 29 blocked_stream_nums: usize, 30 capacity_to_update: Option<usize>, 31 tracked_stream: HashMap<u64, UnackFields>, 32 table: DynamicTable, 33 is_huffman: bool, 34 // Headers to be encode. 35 field_iter: Option<PartsIter>, 36 // save the state of encoding field. 37 field_state: Option<ReprEncodeState>, 38 // save the state of decoding instructions. 39 inst_state: Option<InstDecodeState>, 40 insert_length: usize, 41 // `RFC`: the number of insertions that the decoder needs to receive before it can decode the 42 // field section. 43 required_insert_count: usize, 44 } 45 46 #[derive(Default)] 47 pub(crate) struct UnackFields { 48 unacked_section: VecDeque<HashSet<usize>>, 49 } 50 51 impl UnackFields { new(unacked: VecDeque<HashSet<usize>>) -> Self52 pub(crate) fn new(unacked: VecDeque<HashSet<usize>>) -> Self { 53 Self { 54 unacked_section: unacked, 55 } 56 } max_unacked_index(&self) -> Option<usize>57 pub(crate) fn max_unacked_index(&self) -> Option<usize> { 58 self.unacked_section.iter().flatten().max().cloned() 59 } 60 unacked_section_mut(&mut self) -> &mut VecDeque<HashSet<usize>>61 pub(crate) fn unacked_section_mut(&mut self) -> &mut VecDeque<HashSet<usize>> { 62 &mut self.unacked_section 63 } 64 update(&mut self, unacked: HashSet<usize>)65 pub(crate) fn update(&mut self, unacked: HashSet<usize>) { 66 self.unacked_section.push_back(unacked); 67 } 68 } 69 70 pub struct EncodeMessage { 71 fields: Vec<u8>, 72 inst: Vec<u8>, 73 } 74 75 impl EncodeMessage { new(fields: Vec<u8>, inst: Vec<u8>) -> Self76 pub fn new(fields: Vec<u8>, inst: Vec<u8>) -> Self { 77 Self { fields, inst } 78 } fields(&self) -> &Vec<u8>79 pub fn fields(&self) -> &Vec<u8> { 80 &self.fields 81 } 82 inst(&self) -> &Vec<u8>83 pub fn inst(&self) -> &Vec<u8> { 84 &self.inst 85 } 86 } 87 88 impl QpackEncoder { finish_stream(&self, id: u64) -> Result<(), QpackError>89 pub(crate) fn finish_stream(&self, id: u64) -> Result<(), QpackError> { 90 if self.tracked_stream.contains_key(&id) { 91 Err(QpackError::ConnectionError(ErrorCode::EncoderStreamError)) 92 } else { 93 Ok(()) 94 } 95 } 96 set_max_table_capacity(&mut self, max_cap: usize) -> Result<(), QpackError>97 pub(crate) fn set_max_table_capacity(&mut self, max_cap: usize) -> Result<(), QpackError> { 98 const MAX_TABLE_CAPACITY: usize = (1 << 30) - 1; 99 if max_cap > MAX_TABLE_CAPACITY { 100 return Err(QpackError::ConnectionError(ErrorCode::H3SettingsError)); 101 } 102 self.capacity_to_update = Some(max_cap); 103 Ok(()) 104 } 105 set_max_blocked_stream_size(&mut self, max_blocked: usize)106 pub(crate) fn set_max_blocked_stream_size(&mut self, max_blocked: usize) { 107 self.max_blocked_streams = max_blocked; 108 } 109 update_max_dynamic_table_cap(&mut self, encoder_buf: &mut Vec<u8>)110 fn update_max_dynamic_table_cap(&mut self, encoder_buf: &mut Vec<u8>) { 111 if let Some(new_cap) = self.capacity_to_update { 112 if self.table.update_capacity(new_cap).is_some() { 113 SetCap::new(new_cap).encode(encoder_buf); 114 self.capacity_to_update = None; 115 } 116 } 117 } 118 set_parts(&mut self, parts: Parts)119 pub fn set_parts(&mut self, parts: Parts) { 120 self.field_iter = Some(PartsIter::new(parts)); 121 } 122 ack(&mut self, stream_id: usize) -> Result<(), QpackError>123 fn ack(&mut self, stream_id: usize) -> Result<(), QpackError> { 124 let mut known_received = self.table.known_recved_count(); 125 if let Some(unacked) = self.tracked_stream.get_mut(&(stream_id as u64)) { 126 if let Some(unacked_index) = unacked.unacked_section_mut().pop_front() { 127 for index in unacked_index { 128 if (index as u64) > known_received { 129 known_received += 1; 130 } 131 self.table.untracked_field(index); 132 } 133 } 134 if unacked.unacked_section_mut().is_empty() { 135 self.tracked_stream.remove(&(stream_id as u64)); 136 } 137 } 138 let increment = known_received - self.table.known_recved_count(); 139 140 if increment > 0 { 141 self.increase_insert_count(increment as usize) 142 } 143 Ok(()) 144 } 145 increase_insert_count(&mut self, increment: usize)146 fn increase_insert_count(&mut self, increment: usize) { 147 self.table.increase_known_receive_count(increment); 148 self.update_blocked_stream(); 149 } 150 cancel_stream(&mut self, stream_id: u64)151 fn cancel_stream(&mut self, stream_id: u64) { 152 let mut stream_blocked = false; 153 if let Some(mut fields) = self.tracked_stream.remove(&stream_id) { 154 fields 155 .unacked_section_mut() 156 .iter() 157 .flatten() 158 .for_each(|index| { 159 self.table.untracked_field(*index); 160 if *index > (self.table.known_recved_count() as usize) { 161 stream_blocked = true; 162 } 163 }) 164 } 165 if stream_blocked { 166 self.blocked_stream_nums -= 1; 167 } 168 } 169 update_blocked_stream(&mut self)170 fn update_blocked_stream(&mut self) { 171 let known_receive_cnt = self.table.known_recved_count() as usize; 172 let mut blocked = 0; 173 self.tracked_stream.iter_mut().for_each(|(_, fields)| { 174 if fields 175 .unacked_section_mut() 176 .iter() 177 .flatten() 178 .any(|index| *index > known_receive_cnt) 179 { 180 blocked += 1; 181 } 182 }); 183 self.blocked_stream_nums = blocked; 184 } 185 decode_ins(&mut self, buf: &[u8]) -> Result<(), QpackError>186 pub fn decode_ins(&mut self, buf: &[u8]) -> Result<(), QpackError> { 187 let mut decoder = DecInstDecoder::new(buf); 188 loop { 189 match decoder.decode(&mut self.inst_state)? { 190 Some(DecoderInstruction::Ack { stream_id }) => self.ack(stream_id)?, 191 Some(DecoderInstruction::StreamCancel { stream_id }) => { 192 self.cancel_stream(stream_id as u64); 193 } 194 Some(DecoderInstruction::InsertCountIncrement { increment }) => { 195 self.increase_insert_count(increment); 196 } 197 None => return Ok(()), 198 } 199 } 200 } 201 encode(&mut self, stream_id: u64) -> EncodeMessage202 pub fn encode(&mut self, stream_id: u64) -> EncodeMessage { 203 let mut fields = Vec::new(); 204 let mut inst = Vec::new(); 205 self.update_max_dynamic_table_cap(&mut inst); 206 207 let stream_blocked = self 208 .tracked_stream 209 .get(&stream_id) 210 .map_or(false, |unacked| { 211 unacked 212 .max_unacked_index() 213 .map_or(false, |idx| (idx as u64) > self.table.known_recved_count()) 214 }); 215 216 let reach_max_block = self.reach_max_blocked(); 217 let mut encoder = ReprEncoder::new( 218 stream_id, 219 self.table.insert_count() as u64, 220 self.is_huffman, 221 &mut self.table, 222 ); 223 encoder.iterate_encode_fields( 224 &mut self.field_iter, 225 &mut self.tracked_stream, 226 &mut self.blocked_stream_nums, 227 stream_blocked || !reach_max_block, 228 &mut fields, 229 &mut inst, 230 ); 231 EncodeMessage::new(fields, inst) 232 } 233 reach_max_blocked(&self) -> bool234 pub(crate) fn reach_max_blocked(&self) -> bool { 235 self.blocked_stream_nums >= self.max_blocked_streams 236 } 237 } 238 239 impl Default for QpackEncoder { default() -> Self240 fn default() -> Self { 241 Self { 242 max_blocked_streams: 0, 243 blocked_stream_nums: 0, 244 table: DynamicTable::with_empty(), 245 tracked_stream: HashMap::new(), 246 field_iter: None, 247 field_state: None, 248 inst_state: None, 249 insert_length: 0, 250 required_insert_count: 0, 251 capacity_to_update: None, 252 is_huffman: true, 253 } 254 } 255 } 256 257 pub(crate) enum DecoderInst { 258 Ack, 259 StreamCancel, 260 InsertCountIncrement, 261 } 262