• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::collections::{HashMap, HashSet, VecDeque};
15 
16 use crate::h3::parts::Parts;
17 use crate::h3::qpack::error::ErrorCode::DecoderStreamError;
18 use crate::h3::qpack::error::{ErrorCode, QpackError};
19 use crate::h3::qpack::format::encoder::{
20     DecInstDecoder, InstDecodeState, PartsIter, ReprEncodeState, SetCap,
21 };
22 use crate::h3::qpack::format::ReprEncoder;
23 use crate::h3::qpack::integer::{Integer, IntegerEncoder};
24 use crate::h3::qpack::table::{DynamicTable, NameField};
25 use crate::h3::qpack::{DecoderInstruction, PrefixMask};
26 
27 pub struct QpackEncoder {
28     max_blocked_streams: usize,
29     blocked_stream_nums: usize,
30     capacity_to_update: Option<usize>,
31     tracked_stream: HashMap<u64, UnackFields>,
32     table: DynamicTable,
33     is_huffman: bool,
34     // Headers to be encode.
35     field_iter: Option<PartsIter>,
36     // save the state of encoding field.
37     field_state: Option<ReprEncodeState>,
38     // save the state of decoding instructions.
39     inst_state: Option<InstDecodeState>,
40     insert_length: usize,
41     // `RFC`: the number of insertions that the decoder needs to receive before it can decode the
42     // field section.
43     required_insert_count: usize,
44 }
45 
46 #[derive(Default)]
47 pub(crate) struct UnackFields {
48     unacked_section: VecDeque<HashSet<usize>>,
49 }
50 
51 impl UnackFields {
new(unacked: VecDeque<HashSet<usize>>) -> Self52     pub(crate) fn new(unacked: VecDeque<HashSet<usize>>) -> Self {
53         Self {
54             unacked_section: unacked,
55         }
56     }
max_unacked_index(&self) -> Option<usize>57     pub(crate) fn max_unacked_index(&self) -> Option<usize> {
58         self.unacked_section.iter().flatten().max().cloned()
59     }
60 
unacked_section_mut(&mut self) -> &mut VecDeque<HashSet<usize>>61     pub(crate) fn unacked_section_mut(&mut self) -> &mut VecDeque<HashSet<usize>> {
62         &mut self.unacked_section
63     }
64 
update(&mut self, unacked: HashSet<usize>)65     pub(crate) fn update(&mut self, unacked: HashSet<usize>) {
66         self.unacked_section.push_back(unacked);
67     }
68 }
69 
70 pub struct EncodeMessage {
71     fields: Vec<u8>,
72     inst: Vec<u8>,
73 }
74 
75 impl EncodeMessage {
new(fields: Vec<u8>, inst: Vec<u8>) -> Self76     pub fn new(fields: Vec<u8>, inst: Vec<u8>) -> Self {
77         Self { fields, inst }
78     }
fields(&self) -> &Vec<u8>79     pub fn fields(&self) -> &Vec<u8> {
80         &self.fields
81     }
82 
inst(&self) -> &Vec<u8>83     pub fn inst(&self) -> &Vec<u8> {
84         &self.inst
85     }
86 }
87 
88 impl QpackEncoder {
finish_stream(&self, id: u64) -> Result<(), QpackError>89     pub(crate) fn finish_stream(&self, id: u64) -> Result<(), QpackError> {
90         if self.tracked_stream.contains_key(&id) {
91             Err(QpackError::ConnectionError(ErrorCode::EncoderStreamError))
92         } else {
93             Ok(())
94         }
95     }
96 
set_max_table_capacity(&mut self, max_cap: usize) -> Result<(), QpackError>97     pub(crate) fn set_max_table_capacity(&mut self, max_cap: usize) -> Result<(), QpackError> {
98         const MAX_TABLE_CAPACITY: usize = (1 << 30) - 1;
99         if max_cap > MAX_TABLE_CAPACITY {
100             return Err(QpackError::ConnectionError(ErrorCode::H3SettingsError));
101         }
102         self.capacity_to_update = Some(max_cap);
103         Ok(())
104     }
105 
set_max_blocked_stream_size(&mut self, max_blocked: usize)106     pub(crate) fn set_max_blocked_stream_size(&mut self, max_blocked: usize) {
107         self.max_blocked_streams = max_blocked;
108     }
109 
update_max_dynamic_table_cap(&mut self, encoder_buf: &mut Vec<u8>)110     fn update_max_dynamic_table_cap(&mut self, encoder_buf: &mut Vec<u8>) {
111         if let Some(new_cap) = self.capacity_to_update {
112             if self.table.update_capacity(new_cap).is_some() {
113                 SetCap::new(new_cap).encode(encoder_buf);
114                 self.capacity_to_update = None;
115             }
116         }
117     }
118 
set_parts(&mut self, parts: Parts)119     pub fn set_parts(&mut self, parts: Parts) {
120         self.field_iter = Some(PartsIter::new(parts));
121     }
122 
ack(&mut self, stream_id: usize) -> Result<(), QpackError>123     fn ack(&mut self, stream_id: usize) -> Result<(), QpackError> {
124         let mut known_received = self.table.known_recved_count();
125         if let Some(unacked) = self.tracked_stream.get_mut(&(stream_id as u64)) {
126             if let Some(unacked_index) = unacked.unacked_section_mut().pop_front() {
127                 for index in unacked_index {
128                     if (index as u64) > known_received {
129                         known_received += 1;
130                     }
131                     self.table.untracked_field(index);
132                 }
133             }
134             if unacked.unacked_section_mut().is_empty() {
135                 self.tracked_stream.remove(&(stream_id as u64));
136             }
137         }
138         let increment = known_received - self.table.known_recved_count();
139 
140         if increment > 0 {
141             self.increase_insert_count(increment as usize)
142         }
143         Ok(())
144     }
145 
increase_insert_count(&mut self, increment: usize)146     fn increase_insert_count(&mut self, increment: usize) {
147         self.table.increase_known_receive_count(increment);
148         self.update_blocked_stream();
149     }
150 
cancel_stream(&mut self, stream_id: u64)151     fn cancel_stream(&mut self, stream_id: u64) {
152         let mut stream_blocked = false;
153         if let Some(mut fields) = self.tracked_stream.remove(&stream_id) {
154             fields
155                 .unacked_section_mut()
156                 .iter()
157                 .flatten()
158                 .for_each(|index| {
159                     self.table.untracked_field(*index);
160                     if *index > (self.table.known_recved_count() as usize) {
161                         stream_blocked = true;
162                     }
163                 })
164         }
165         if stream_blocked {
166             self.blocked_stream_nums -= 1;
167         }
168     }
169 
update_blocked_stream(&mut self)170     fn update_blocked_stream(&mut self) {
171         let known_receive_cnt = self.table.known_recved_count() as usize;
172         let mut blocked = 0;
173         self.tracked_stream.iter_mut().for_each(|(_, fields)| {
174             if fields
175                 .unacked_section_mut()
176                 .iter()
177                 .flatten()
178                 .any(|index| *index > known_receive_cnt)
179             {
180                 blocked += 1;
181             }
182         });
183         self.blocked_stream_nums = blocked;
184     }
185 
decode_ins(&mut self, buf: &[u8]) -> Result<(), QpackError>186     pub fn decode_ins(&mut self, buf: &[u8]) -> Result<(), QpackError> {
187         let mut decoder = DecInstDecoder::new(buf);
188         loop {
189             match decoder.decode(&mut self.inst_state)? {
190                 Some(DecoderInstruction::Ack { stream_id }) => self.ack(stream_id)?,
191                 Some(DecoderInstruction::StreamCancel { stream_id }) => {
192                     self.cancel_stream(stream_id as u64);
193                 }
194                 Some(DecoderInstruction::InsertCountIncrement { increment }) => {
195                     self.increase_insert_count(increment);
196                 }
197                 None => return Ok(()),
198             }
199         }
200     }
201 
encode(&mut self, stream_id: u64) -> EncodeMessage202     pub fn encode(&mut self, stream_id: u64) -> EncodeMessage {
203         let mut fields = Vec::new();
204         let mut inst = Vec::new();
205         self.update_max_dynamic_table_cap(&mut inst);
206 
207         let stream_blocked = self
208             .tracked_stream
209             .get(&stream_id)
210             .map_or(false, |unacked| {
211                 unacked
212                     .max_unacked_index()
213                     .map_or(false, |idx| (idx as u64) > self.table.known_recved_count())
214             });
215 
216         let reach_max_block = self.reach_max_blocked();
217         let mut encoder = ReprEncoder::new(
218             stream_id,
219             self.table.insert_count() as u64,
220             self.is_huffman,
221             &mut self.table,
222         );
223         encoder.iterate_encode_fields(
224             &mut self.field_iter,
225             &mut self.tracked_stream,
226             &mut self.blocked_stream_nums,
227             stream_blocked || !reach_max_block,
228             &mut fields,
229             &mut inst,
230         );
231         EncodeMessage::new(fields, inst)
232     }
233 
reach_max_blocked(&self) -> bool234     pub(crate) fn reach_max_blocked(&self) -> bool {
235         self.blocked_stream_nums >= self.max_blocked_streams
236     }
237 }
238 
239 impl Default for QpackEncoder {
default() -> Self240     fn default() -> Self {
241         Self {
242             max_blocked_streams: 0,
243             blocked_stream_nums: 0,
244             table: DynamicTable::with_empty(),
245             tracked_stream: HashMap::new(),
246             field_iter: None,
247             field_state: None,
248             inst_state: None,
249             insert_length: 0,
250             required_insert_count: 0,
251             capacity_to_update: None,
252             is_huffman: true,
253         }
254     }
255 }
256 
257 pub(crate) enum DecoderInst {
258     Ack,
259     StreamCancel,
260     InsertCountIncrement,
261 }
262