• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::collections::{HashMap, HashSet};
15 use std::mem::take;
16 
17 use ylong_runtime::iter::parallel::ParSplit;
18 
19 use crate::h3::parts::Parts;
20 use crate::h3::qpack::decoder::FieldDecodeState::{Blocked, Decoded};
21 use crate::h3::qpack::error::ErrorCode::{DecompressionFailed, EncoderStreamError};
22 use crate::h3::qpack::error::{ErrorCode, NotClassified, QpackError};
23 use crate::h3::qpack::format::decoder::{
24     EncInstDecoder, InstDecodeState, Name, ReprDecodeState, ReprDecoder,
25 };
26 use crate::h3::qpack::integer::Integer;
27 use crate::h3::qpack::table::NameField::Path;
28 use crate::h3::qpack::table::{DynamicTable, NameField, TableSearcher};
29 use crate::h3::qpack::{
30     DeltaBase, EncoderInstPrefixBit, EncoderInstruction, MidBit, ReprPrefixBit, Representation,
31     RequireInsertCount,
32 };
33 
34 pub(crate) enum FieldDecodeState {
35     Blocked,
36     Decoded,
37 }
38 
39 pub(crate) struct FiledLines {
40     parts: Parts,
41     header_size: usize,
42 }
43 
44 pub(crate) struct ReprMessage {
45     require_insert_count: usize,
46     base: usize,
47     // dynamic table
48     repr_state: Option<ReprDecodeState>,
49     remaining: Option<Vec<u8>>,
50     // instruction decode state
51     lines: FiledLines,
52 }
53 
54 impl ReprMessage {
new() -> Self55     pub(crate) fn new() -> Self {
56         Self {
57             require_insert_count: 0,
58             base: 0,
59             repr_state: None,
60             remaining: None,
61             lines: FiledLines {
62                 parts: Parts::new(),
63                 header_size: 0,
64             },
65         }
66     }
67 }
68 
69 pub struct QpackDecoder {
70     // max header list size
71     table: DynamicTable,
72     // field decode state
73     inst_state: Option<InstDecodeState>,
74     streams: HashMap<u64, ReprMessage>,
75     blocked: HashMap<u64, usize>,
76     max_blocked_streams: usize,
77     max_table_capacity: usize,
78     max_field_section_size: usize,
79 }
80 
81 impl QpackDecoder {
new(max_blocked_streams: usize, max_table_capacity: usize) -> Self82     pub(crate) fn new(max_blocked_streams: usize, max_table_capacity: usize) -> Self {
83         Self {
84             table: DynamicTable::with_empty(),
85             inst_state: None,
86             streams: HashMap::new(),
87             blocked: HashMap::new(),
88             max_blocked_streams,
89             max_table_capacity,
90             max_field_section_size: (1 << 62) - 1,
91         }
92     }
93 
finish_stream(&mut self, id: u64) -> Result<(), QpackError>94     pub(crate) fn finish_stream(&mut self, id: u64) -> Result<(), QpackError> {
95         if self.blocked.contains_key(&id) {
96             Err(QpackError::ConnectionError(ErrorCode::DecoderStreamError))
97         } else {
98             self.streams.remove(&id);
99             Ok(())
100         }
101     }
102 
set_max_field_section_size(&mut self, size: usize)103     pub(crate) fn set_max_field_section_size(&mut self, size: usize) {
104         self.max_field_section_size = size;
105     }
106 
decode_ins(&mut self, buf: &[u8]) -> Result<Vec<u64>, QpackError>107     pub(crate) fn decode_ins(&mut self, buf: &[u8]) -> Result<Vec<u64>, QpackError> {
108         let mut decoder = EncInstDecoder::new();
109         let mut updater = Updater::new(&mut self.table);
110         let mut cnt = 0;
111         while cnt < buf.len() {
112             match decoder.decode(&buf[cnt..], &mut self.inst_state)? {
113                 Some(inst) => match inst {
114                     (offset, EncoderInstruction::SetCap { capacity }) => {
115                         cnt += offset;
116                         if capacity > self.max_table_capacity {
117                             return Err(QpackError::ConnectionError(DecompressionFailed));
118                         }
119                         updater.update_capacity(capacity)?;
120                     }
121                     (
122                         offset,
123                         EncoderInstruction::InsertWithIndex {
124                             mid_bit,
125                             name,
126                             value,
127                         },
128                     ) => {
129                         cnt += offset;
130                         updater.update_table(mid_bit, name, value)?;
131                     }
132                     (
133                         offset,
134                         EncoderInstruction::InsertWithLiteral {
135                             mid_bit,
136                             name,
137                             value,
138                         },
139                     ) => {
140                         cnt += offset;
141                         updater.update_table(mid_bit, name, value)?;
142                     }
143                     (offset, EncoderInstruction::Duplicate { index }) => {
144                         cnt += offset;
145                         updater.duplicate(index)?;
146                     }
147                 },
148                 None => break,
149             }
150         }
151 
152         let insert_count = self.table.insert_count();
153 
154         let unblocked = self
155             .blocked
156             .iter()
157             .filter_map(|(id, required)| {
158                 if *required <= insert_count {
159                     Some(*id)
160                 } else {
161                     None
162                 }
163             })
164             .collect::<Vec<_>>();
165 
166         self.blocked.retain(|_, required| *required > insert_count);
167 
168         Ok(unblocked)
169     }
170 
171     /// User call `decoder_repr` once for decoding a complete field section,
172     /// which start with the `field section prefix`:  0   1   2   3   4   5
173     /// 6   7 +---+---+---+---+---+---+---+---+
174     /// |   Required Insert Count (8+)  |
175     /// +---+---------------------------+
176     /// | S |      Delta Base (7+)      |
177     /// +---+---------------------------+
178     /// |      Encoded Field Lines    ...
179     /// +-------------------------------+
180 
decode_repr( &mut self, buf: &[u8], stream_id: u64, ) -> Result<FieldDecodeState, QpackError>181     pub(crate) fn decode_repr(
182         &mut self,
183         buf: &[u8],
184         stream_id: u64,
185     ) -> Result<FieldDecodeState, QpackError> {
186         if self.blocked.contains_key(&stream_id) {
187             return Err(QpackError::InternalError(NotClassified::StreamBlocked));
188         }
189         let mut message = match self.streams.remove(&stream_id) {
190             None => ReprMessage::new(),
191             Some(mut message) => {
192                 if let Some(vec) = message.remaining.take() {
193                     // A block cannot occur here because this is the stream expelled from the block.
194                     self.decode_buffered_repr(vec.as_slice(), &mut message, stream_id)?;
195                 }
196                 message
197             }
198         };
199 
200         self.decode_buffered_repr(buf, &mut message, stream_id)
201             .map(|state| {
202                 self.streams.insert(stream_id, message);
203                 state
204             })
205     }
206 
decode_buffered_repr( &mut self, buf: &[u8], message: &mut ReprMessage, stream_id: u64, ) -> Result<FieldDecodeState, QpackError>207     fn decode_buffered_repr(
208         &mut self,
209         buf: &[u8],
210         message: &mut ReprMessage,
211         stream_id: u64,
212     ) -> Result<FieldDecodeState, QpackError> {
213         if buf.is_empty() {
214             return Ok(Decoded);
215         }
216         let mut decoder = ReprDecoder::new();
217         let mut searcher =
218             Searcher::new(self.max_field_section_size, &self.table, &mut message.lines);
219         let mut cnt = 0;
220         loop {
221             match decoder.decode(&buf[cnt..], &mut message.repr_state)? {
222                 Some((offset, repr)) => match repr {
223                     Representation::FieldSectionPrefix {
224                         require_insert_count,
225                         signal,
226                         delta_base,
227                     } => {
228                         cnt += offset;
229                         if require_insert_count.0 == 0 {
230                             message.require_insert_count = 0;
231                         } else {
232                             let max_entries = searcher.table.max_entries();
233                             let full_range = 2 * max_entries;
234                             if require_insert_count.0 > full_range {
235                                 return Err(QpackError::ConnectionError(DecompressionFailed));
236                             }
237                             let max_value = searcher.table.insert_count() + max_entries;
238                             let max_wrapped = (max_value / full_range) * full_range;
239                             message.require_insert_count = max_wrapped + require_insert_count.0 - 1;
240 
241                             if message.require_insert_count > max_value {
242                                 if message.require_insert_count <= full_range {
243                                     return Err(QpackError::ConnectionError(DecompressionFailed));
244                                 }
245                                 message.require_insert_count -= full_range;
246                             }
247                             if message.require_insert_count == 0 {
248                                 return Err(QpackError::ConnectionError(DecompressionFailed));
249                             }
250                         }
251                         if signal {
252                             message.base = message.require_insert_count - delta_base.0 - 1;
253                         } else {
254                             message.base = message.require_insert_count + delta_base.0;
255                         }
256                         searcher.base = message.base;
257                         if message.require_insert_count > searcher.table.insert_count() {
258                             if self.blocked.len() > self.max_blocked_streams {
259                                 return Err(QpackError::ConnectionError(DecompressionFailed));
260                             }
261                             self.blocked.insert(stream_id, message.require_insert_count);
262                             message.remaining = Some(Vec::from(&buf[cnt..]));
263                             return Ok(Blocked);
264                         }
265                     }
266                     Representation::Indexed { mid_bit, index } => {
267                         cnt += offset;
268                         searcher.search(Representation::Indexed { mid_bit, index })?;
269                     }
270                     Representation::IndexedWithPostIndex { index } => {
271                         cnt += offset;
272                         searcher.search(Representation::IndexedWithPostIndex { index })?;
273                     }
274                     Representation::LiteralWithIndexing {
275                         mid_bit,
276                         name,
277                         value,
278                     } => {
279                         cnt += offset;
280                         searcher.search_literal_with_indexing(mid_bit, name, value)?;
281                     }
282 
283                     Representation::LiteralWithPostIndexing {
284                         mid_bit,
285                         name,
286                         value,
287                     } => {
288                         cnt += offset;
289                         searcher.search_literal_with_post_indexing(mid_bit, name, value)?;
290                     }
291                     Representation::LiteralWithLiteralName {
292                         mid_bit,
293                         name,
294                         value,
295                     } => {
296                         cnt += offset;
297                         searcher.search_listeral_with_literal(mid_bit, name, value)?;
298                     }
299                 },
300                 None => {
301                     return Ok(Decoded);
302                 }
303             }
304         }
305     }
306 
307     /// Users call `finish` to stop decoding a field section. And send an
308     /// `Section Acknowledgment` to encoder: After processing an encoded
309     /// field section whose declared Required Insert Count is not zero,
310     /// the decoder emits a Section Acknowledgment instruction. The instruction
311     /// starts with the '1' 1-bit pattern, followed by the field section's
312     /// associated stream ID encoded as a 7-bit prefix integer
313     ///  0   1   2   3   4   5   6   7
314     /// +---+---+---+---+---+---+---+---+
315     /// | 1 |      Stream ID (7+)       |
316     /// +---+---------------------------+
317     /// # Examples(not run)
finish( &mut self, stream_id: u64, buf: &mut Vec<u8>, ) -> Result<(Parts, Option<usize>), QpackError>318     pub fn finish(
319         &mut self,
320         stream_id: u64,
321         buf: &mut Vec<u8>,
322     ) -> Result<(Parts, Option<usize>), QpackError> {
323         match self.streams.remove(&stream_id) {
324             None => Err(QpackError::ConnectionError(DecompressionFailed)),
325             Some(mut message) => {
326                 if message.repr_state.is_some() {
327                     return Err(QpackError::ConnectionError(DecompressionFailed));
328                 }
329                 message.lines.header_size = 0;
330                 if message.require_insert_count > 0 {
331                     let ack = Integer::index(0x80, stream_id as usize, 0x7f);
332 
333                     let mut res = Vec::new();
334                     ack.encode(&mut res);
335                     buf.extend_from_slice(res.as_slice());
336                     return Ok((take(&mut message.lines.parts), Some(res.len())));
337                 }
338                 Ok((take(&mut message.lines.parts), None))
339             }
340         }
341     }
342 
343     /// Users call `stream_cancel` to stop cancel a stream. And send an `Stream
344     /// Cancellation` to encoder: When a stream is reset or reading is
345     /// abandoned, the decoder emits a Stream Cancellation instruction. The
346     /// instruction starts with the '01' 2-bit pattern, followed by the
347     /// stream ID of the affected stream encoded as a 6-bit prefix integer.
348     ///  0   1   2   3   4   5   6   7
349     /// +---+---+---+---+---+---+---+---+
350     /// | 0 | 1 |     Stream ID (6+)    |
351     /// +---+---+-----------------------+
stream_cancel(&mut self, stream_id: u64, buf: &mut [u8]) -> Result<usize, QpackError>352     pub fn stream_cancel(&mut self, stream_id: u64, buf: &mut [u8]) -> Result<usize, QpackError> {
353         if self.table.capacity() > 0 {
354             self.blocked.remove(&stream_id);
355             self.streams.remove(&stream_id);
356             let ack = Integer::index(0x40, stream_id as usize, 0x3f);
357             let mut res = Vec::new();
358             ack.encode(&mut res);
359             if res.len() > buf.len() {
360                 Err(QpackError::ConnectionError(DecompressionFailed))
361             } else {
362                 buf[..res.len()].copy_from_slice(res.as_slice());
363                 Ok(res.len())
364             }
365         } else {
366             Ok(0)
367         }
368     }
369 }
370 
371 struct Updater<'a> {
372     table: &'a mut DynamicTable,
373 }
374 
375 impl<'a> Updater<'a> {
new(table: &'a mut DynamicTable) -> Self376     fn new(table: &'a mut DynamicTable) -> Self {
377         Self { table }
378     }
379 
update_capacity(&mut self, capacity: usize) -> Result<(), QpackError>380     fn update_capacity(&mut self, capacity: usize) -> Result<(), QpackError> {
381         self.table.update_size(capacity);
382         Ok(())
383     }
384 
update_table( &mut self, mid_bit: MidBit, name: Name, value: Vec<u8>, ) -> Result<(), QpackError>385     fn update_table(
386         &mut self,
387         mid_bit: MidBit,
388         name: Name,
389         value: Vec<u8>,
390     ) -> Result<(), QpackError> {
391         let (f, v) =
392             self.get_field_by_name_and_value(mid_bit, name, value, self.table.insert_count())?;
393         self.table.update(f, v);
394         Ok(())
395     }
396 
duplicate(&mut self, index: usize) -> Result<(), QpackError>397     fn duplicate(&mut self, index: usize) -> Result<(), QpackError> {
398         let table_searcher = TableSearcher::new(self.table);
399         let (f, v) = table_searcher
400             .find_field_dynamic(self.table.insert_count() - index - 1)
401             .ok_or(QpackError::ConnectionError(EncoderStreamError))?;
402         self.table.update(f, v);
403         Ok(())
404     }
405 
get_field_by_name_and_value( &self, mid_bit: MidBit, name: Name, value: Vec<u8>, insert_count: usize, ) -> Result<(NameField, String), QpackError>406     fn get_field_by_name_and_value(
407         &self,
408         mid_bit: MidBit,
409         name: Name,
410         value: Vec<u8>,
411         insert_count: usize,
412     ) -> Result<(NameField, String), QpackError> {
413         let h = match name {
414             Name::Index(index) => {
415                 let searcher = TableSearcher::new(self.table);
416                 if let Some(true) = mid_bit.t {
417                     searcher
418                         .find_field_name_static(index)
419                         .ok_or(QpackError::ConnectionError(EncoderStreamError))?
420                 } else {
421                     searcher
422                         .find_field_name_dynamic(insert_count - index - 1)
423                         .ok_or(QpackError::ConnectionError(EncoderStreamError))?
424                 }
425             }
426             Name::Literal(octets) => NameField::Other(
427                 String::from_utf8(octets)
428                     .map_err(|_| QpackError::ConnectionError(EncoderStreamError))?,
429             ),
430         };
431         let v = String::from_utf8(value)
432             .map_err(|_| QpackError::ConnectionError(EncoderStreamError))?;
433         Ok((h, v))
434     }
435 }
436 
437 struct Searcher<'a> {
438     max_field_section_size: usize,
439     table: &'a DynamicTable,
440     lines: &'a mut FiledLines,
441     base: usize,
442 }
443 
444 impl<'a> Searcher<'a> {
new( max_field_section_size: usize, table: &'a DynamicTable, lines: &'a mut FiledLines, ) -> Self445     fn new(
446         max_field_section_size: usize,
447         table: &'a DynamicTable,
448         lines: &'a mut FiledLines,
449     ) -> Self {
450         Self {
451             max_field_section_size,
452             table,
453             lines,
454             base: 0,
455         }
456     }
457 
search(&mut self, repr: Representation) -> Result<(), QpackError>458     fn search(&mut self, repr: Representation) -> Result<(), QpackError> {
459         match repr {
460             Representation::Indexed { mid_bit, index } => self.search_indexed(mid_bit, index),
461             Representation::IndexedWithPostIndex { index } => self.search_post_indexed(index),
462             _ => Ok(()),
463         }
464     }
465 
search_indexed(&mut self, mid_bit: MidBit, index: usize) -> Result<(), QpackError>466     fn search_indexed(&mut self, mid_bit: MidBit, index: usize) -> Result<(), QpackError> {
467         let table_searcher = TableSearcher::new(self.table);
468         if let Some(true) = mid_bit.t {
469             let (f, v) = table_searcher
470                 .find_field_static(index)
471                 .ok_or(QpackError::ConnectionError(DecompressionFailed))?;
472 
473             self.lines.parts.update(f, v);
474             Ok(())
475         } else {
476             let (f, v) = table_searcher
477                 .find_field_dynamic(self.base - index - 1)
478                 .ok_or(QpackError::ConnectionError(DecompressionFailed))?;
479 
480             self.lines.parts.update(f, v);
481             Ok(())
482         }
483     }
484 
search_post_indexed(&mut self, index: usize) -> Result<(), QpackError>485     fn search_post_indexed(&mut self, index: usize) -> Result<(), QpackError> {
486         let table_searcher = TableSearcher::new(self.table);
487         let (f, v) = table_searcher
488             .find_field_dynamic(self.base + index)
489             .ok_or(QpackError::ConnectionError(DecompressionFailed))?;
490         self.check_field_list_size(&f, &v)?;
491         self.lines.parts.update(f, v);
492         Ok(())
493     }
494 
search_literal_with_indexing( &mut self, mid_bit: MidBit, name: Name, value: Vec<u8>, ) -> Result<(), QpackError>495     fn search_literal_with_indexing(
496         &mut self,
497         mid_bit: MidBit,
498         name: Name,
499         value: Vec<u8>,
500     ) -> Result<(), QpackError> {
501         let (f, v) = self.get_field_by_name_and_value(
502             mid_bit,
503             name,
504             value,
505             ReprPrefixBit::LITERALWITHINDEXING,
506         )?;
507         self.check_field_list_size(&f, &v)?;
508         self.lines.parts.update(f, v);
509         Ok(())
510     }
511 
search_literal_with_post_indexing( &mut self, mid_bit: MidBit, name: Name, value: Vec<u8>, ) -> Result<(), QpackError>512     fn search_literal_with_post_indexing(
513         &mut self,
514         mid_bit: MidBit,
515         name: Name,
516         value: Vec<u8>,
517     ) -> Result<(), QpackError> {
518         let (f, v) = self.get_field_by_name_and_value(
519             mid_bit,
520             name,
521             value,
522             ReprPrefixBit::LITERALWITHPOSTINDEXING,
523         )?;
524         self.check_field_list_size(&f, &v)?;
525         self.lines.parts.update(f, v);
526         Ok(())
527     }
528 
search_listeral_with_literal( &mut self, mid_bit: MidBit, name: Name, value: Vec<u8>, ) -> Result<(), QpackError>529     fn search_listeral_with_literal(
530         &mut self,
531         mid_bit: MidBit,
532         name: Name,
533         value: Vec<u8>,
534     ) -> Result<(), QpackError> {
535         let (h, v) = self.get_field_by_name_and_value(
536             mid_bit,
537             name,
538             value,
539             ReprPrefixBit::LITERALWITHLITERALNAME,
540         )?;
541         self.check_field_list_size(&h, &v)?;
542         self.lines.parts.update(h, v);
543         Ok(())
544     }
545 
get_field_by_name_and_value( &self, mid_bit: MidBit, name: Name, value: Vec<u8>, repr: ReprPrefixBit, ) -> Result<(NameField, String), QpackError>546     fn get_field_by_name_and_value(
547         &self,
548         mid_bit: MidBit,
549         name: Name,
550         value: Vec<u8>,
551         repr: ReprPrefixBit,
552     ) -> Result<(NameField, String), QpackError> {
553         let h = match name {
554             Name::Index(index) => {
555                 if repr == ReprPrefixBit::LITERALWITHINDEXING {
556                     let searcher = TableSearcher::new(self.table);
557                     if let Some(true) = mid_bit.t {
558                         searcher
559                             .find_field_name_static(index)
560                             .ok_or(QpackError::ConnectionError(DecompressionFailed))?
561                     } else {
562                         searcher
563                             .find_field_name_dynamic(self.base - index - 1)
564                             .ok_or(QpackError::ConnectionError(DecompressionFailed))?
565                     }
566                 } else {
567                     let searcher = TableSearcher::new(self.table);
568                     searcher
569                         .find_field_name_dynamic(self.base + index)
570                         .ok_or(QpackError::ConnectionError(DecompressionFailed))?
571                 }
572             }
573             Name::Literal(octets) => NameField::Other(
574                 String::from_utf8(octets)
575                     .map_err(|_| QpackError::ConnectionError(DecompressionFailed))?,
576             ),
577         };
578         let v = String::from_utf8(value)
579             .map_err(|_| QpackError::ConnectionError(DecompressionFailed))?;
580         Ok((h, v))
581     }
update_size(&mut self, addition: usize)582     pub(crate) fn update_size(&mut self, addition: usize) {
583         self.lines.header_size += addition;
584     }
585 
check_field_list_size(&mut self, key: &NameField, value: &str) -> Result<(), QpackError>586     fn check_field_list_size(&mut self, key: &NameField, value: &str) -> Result<(), QpackError> {
587         let line_size = field_line_length(key.len(), value.len());
588         self.update_size(line_size);
589         if self.lines.header_size > self.max_field_section_size {
590             Err(QpackError::ConnectionError(DecompressionFailed))
591         } else {
592             Ok(())
593         }
594     }
595 }
596 
field_line_length(key_size: usize, value_size: usize) -> usize597 fn field_line_length(key_size: usize, value_size: usize) -> usize {
598     key_size + value_size + 32
599 }
600