1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::collections::{HashMap, HashSet};
15 use std::mem::take;
16
17 use ylong_runtime::iter::parallel::ParSplit;
18
19 use crate::h3::parts::Parts;
20 use crate::h3::qpack::decoder::FieldDecodeState::{Blocked, Decoded};
21 use crate::h3::qpack::error::ErrorCode::{DecompressionFailed, EncoderStreamError};
22 use crate::h3::qpack::error::{ErrorCode, NotClassified, QpackError};
23 use crate::h3::qpack::format::decoder::{
24 EncInstDecoder, InstDecodeState, Name, ReprDecodeState, ReprDecoder,
25 };
26 use crate::h3::qpack::integer::Integer;
27 use crate::h3::qpack::table::NameField::Path;
28 use crate::h3::qpack::table::{DynamicTable, NameField, TableSearcher};
29 use crate::h3::qpack::{
30 DeltaBase, EncoderInstPrefixBit, EncoderInstruction, MidBit, ReprPrefixBit, Representation,
31 RequireInsertCount,
32 };
33
34 pub(crate) enum FieldDecodeState {
35 Blocked,
36 Decoded,
37 }
38
39 pub(crate) struct FiledLines {
40 parts: Parts,
41 header_size: usize,
42 }
43
44 pub(crate) struct ReprMessage {
45 require_insert_count: usize,
46 base: usize,
47 // dynamic table
48 repr_state: Option<ReprDecodeState>,
49 remaining: Option<Vec<u8>>,
50 // instruction decode state
51 lines: FiledLines,
52 }
53
54 impl ReprMessage {
new() -> Self55 pub(crate) fn new() -> Self {
56 Self {
57 require_insert_count: 0,
58 base: 0,
59 repr_state: None,
60 remaining: None,
61 lines: FiledLines {
62 parts: Parts::new(),
63 header_size: 0,
64 },
65 }
66 }
67 }
68
69 pub struct QpackDecoder {
70 // max header list size
71 table: DynamicTable,
72 // field decode state
73 inst_state: Option<InstDecodeState>,
74 streams: HashMap<u64, ReprMessage>,
75 blocked: HashMap<u64, usize>,
76 max_blocked_streams: usize,
77 max_table_capacity: usize,
78 max_field_section_size: usize,
79 }
80
81 impl QpackDecoder {
new(max_blocked_streams: usize, max_table_capacity: usize) -> Self82 pub(crate) fn new(max_blocked_streams: usize, max_table_capacity: usize) -> Self {
83 Self {
84 table: DynamicTable::with_empty(),
85 inst_state: None,
86 streams: HashMap::new(),
87 blocked: HashMap::new(),
88 max_blocked_streams,
89 max_table_capacity,
90 max_field_section_size: (1 << 62) - 1,
91 }
92 }
93
finish_stream(&mut self, id: u64) -> Result<(), QpackError>94 pub(crate) fn finish_stream(&mut self, id: u64) -> Result<(), QpackError> {
95 if self.blocked.contains_key(&id) {
96 Err(QpackError::ConnectionError(ErrorCode::DecoderStreamError))
97 } else {
98 self.streams.remove(&id);
99 Ok(())
100 }
101 }
102
set_max_field_section_size(&mut self, size: usize)103 pub(crate) fn set_max_field_section_size(&mut self, size: usize) {
104 self.max_field_section_size = size;
105 }
106
decode_ins(&mut self, buf: &[u8]) -> Result<Vec<u64>, QpackError>107 pub(crate) fn decode_ins(&mut self, buf: &[u8]) -> Result<Vec<u64>, QpackError> {
108 let mut decoder = EncInstDecoder::new();
109 let mut updater = Updater::new(&mut self.table);
110 let mut cnt = 0;
111 while cnt < buf.len() {
112 match decoder.decode(&buf[cnt..], &mut self.inst_state)? {
113 Some(inst) => match inst {
114 (offset, EncoderInstruction::SetCap { capacity }) => {
115 cnt += offset;
116 if capacity > self.max_table_capacity {
117 return Err(QpackError::ConnectionError(DecompressionFailed));
118 }
119 updater.update_capacity(capacity)?;
120 }
121 (
122 offset,
123 EncoderInstruction::InsertWithIndex {
124 mid_bit,
125 name,
126 value,
127 },
128 ) => {
129 cnt += offset;
130 updater.update_table(mid_bit, name, value)?;
131 }
132 (
133 offset,
134 EncoderInstruction::InsertWithLiteral {
135 mid_bit,
136 name,
137 value,
138 },
139 ) => {
140 cnt += offset;
141 updater.update_table(mid_bit, name, value)?;
142 }
143 (offset, EncoderInstruction::Duplicate { index }) => {
144 cnt += offset;
145 updater.duplicate(index)?;
146 }
147 },
148 None => break,
149 }
150 }
151
152 let insert_count = self.table.insert_count();
153
154 let unblocked = self
155 .blocked
156 .iter()
157 .filter_map(|(id, required)| {
158 if *required <= insert_count {
159 Some(*id)
160 } else {
161 None
162 }
163 })
164 .collect::<Vec<_>>();
165
166 self.blocked.retain(|_, required| *required > insert_count);
167
168 Ok(unblocked)
169 }
170
171 /// User call `decoder_repr` once for decoding a complete field section,
172 /// which start with the `field section prefix`: 0 1 2 3 4 5
173 /// 6 7 +---+---+---+---+---+---+---+---+
174 /// | Required Insert Count (8+) |
175 /// +---+---------------------------+
176 /// | S | Delta Base (7+) |
177 /// +---+---------------------------+
178 /// | Encoded Field Lines ...
179 /// +-------------------------------+
180
decode_repr( &mut self, buf: &[u8], stream_id: u64, ) -> Result<FieldDecodeState, QpackError>181 pub(crate) fn decode_repr(
182 &mut self,
183 buf: &[u8],
184 stream_id: u64,
185 ) -> Result<FieldDecodeState, QpackError> {
186 if self.blocked.contains_key(&stream_id) {
187 return Err(QpackError::InternalError(NotClassified::StreamBlocked));
188 }
189 let mut message = match self.streams.remove(&stream_id) {
190 None => ReprMessage::new(),
191 Some(mut message) => {
192 if let Some(vec) = message.remaining.take() {
193 // A block cannot occur here because this is the stream expelled from the block.
194 self.decode_buffered_repr(vec.as_slice(), &mut message, stream_id)?;
195 }
196 message
197 }
198 };
199
200 self.decode_buffered_repr(buf, &mut message, stream_id)
201 .map(|state| {
202 self.streams.insert(stream_id, message);
203 state
204 })
205 }
206
decode_buffered_repr( &mut self, buf: &[u8], message: &mut ReprMessage, stream_id: u64, ) -> Result<FieldDecodeState, QpackError>207 fn decode_buffered_repr(
208 &mut self,
209 buf: &[u8],
210 message: &mut ReprMessage,
211 stream_id: u64,
212 ) -> Result<FieldDecodeState, QpackError> {
213 if buf.is_empty() {
214 return Ok(Decoded);
215 }
216 let mut decoder = ReprDecoder::new();
217 let mut searcher =
218 Searcher::new(self.max_field_section_size, &self.table, &mut message.lines);
219 let mut cnt = 0;
220 loop {
221 match decoder.decode(&buf[cnt..], &mut message.repr_state)? {
222 Some((offset, repr)) => match repr {
223 Representation::FieldSectionPrefix {
224 require_insert_count,
225 signal,
226 delta_base,
227 } => {
228 cnt += offset;
229 if require_insert_count.0 == 0 {
230 message.require_insert_count = 0;
231 } else {
232 let max_entries = searcher.table.max_entries();
233 let full_range = 2 * max_entries;
234 if require_insert_count.0 > full_range {
235 return Err(QpackError::ConnectionError(DecompressionFailed));
236 }
237 let max_value = searcher.table.insert_count() + max_entries;
238 let max_wrapped = (max_value / full_range) * full_range;
239 message.require_insert_count = max_wrapped + require_insert_count.0 - 1;
240
241 if message.require_insert_count > max_value {
242 if message.require_insert_count <= full_range {
243 return Err(QpackError::ConnectionError(DecompressionFailed));
244 }
245 message.require_insert_count -= full_range;
246 }
247 if message.require_insert_count == 0 {
248 return Err(QpackError::ConnectionError(DecompressionFailed));
249 }
250 }
251 if signal {
252 message.base = message.require_insert_count - delta_base.0 - 1;
253 } else {
254 message.base = message.require_insert_count + delta_base.0;
255 }
256 searcher.base = message.base;
257 if message.require_insert_count > searcher.table.insert_count() {
258 if self.blocked.len() > self.max_blocked_streams {
259 return Err(QpackError::ConnectionError(DecompressionFailed));
260 }
261 self.blocked.insert(stream_id, message.require_insert_count);
262 message.remaining = Some(Vec::from(&buf[cnt..]));
263 return Ok(Blocked);
264 }
265 }
266 Representation::Indexed { mid_bit, index } => {
267 cnt += offset;
268 searcher.search(Representation::Indexed { mid_bit, index })?;
269 }
270 Representation::IndexedWithPostIndex { index } => {
271 cnt += offset;
272 searcher.search(Representation::IndexedWithPostIndex { index })?;
273 }
274 Representation::LiteralWithIndexing {
275 mid_bit,
276 name,
277 value,
278 } => {
279 cnt += offset;
280 searcher.search_literal_with_indexing(mid_bit, name, value)?;
281 }
282
283 Representation::LiteralWithPostIndexing {
284 mid_bit,
285 name,
286 value,
287 } => {
288 cnt += offset;
289 searcher.search_literal_with_post_indexing(mid_bit, name, value)?;
290 }
291 Representation::LiteralWithLiteralName {
292 mid_bit,
293 name,
294 value,
295 } => {
296 cnt += offset;
297 searcher.search_listeral_with_literal(mid_bit, name, value)?;
298 }
299 },
300 None => {
301 return Ok(Decoded);
302 }
303 }
304 }
305 }
306
307 /// Users call `finish` to stop decoding a field section. And send an
308 /// `Section Acknowledgment` to encoder: After processing an encoded
309 /// field section whose declared Required Insert Count is not zero,
310 /// the decoder emits a Section Acknowledgment instruction. The instruction
311 /// starts with the '1' 1-bit pattern, followed by the field section's
312 /// associated stream ID encoded as a 7-bit prefix integer
313 /// 0 1 2 3 4 5 6 7
314 /// +---+---+---+---+---+---+---+---+
315 /// | 1 | Stream ID (7+) |
316 /// +---+---------------------------+
317 /// # Examples(not run)
finish( &mut self, stream_id: u64, buf: &mut Vec<u8>, ) -> Result<(Parts, Option<usize>), QpackError>318 pub fn finish(
319 &mut self,
320 stream_id: u64,
321 buf: &mut Vec<u8>,
322 ) -> Result<(Parts, Option<usize>), QpackError> {
323 match self.streams.remove(&stream_id) {
324 None => Err(QpackError::ConnectionError(DecompressionFailed)),
325 Some(mut message) => {
326 if message.repr_state.is_some() {
327 return Err(QpackError::ConnectionError(DecompressionFailed));
328 }
329 message.lines.header_size = 0;
330 if message.require_insert_count > 0 {
331 let ack = Integer::index(0x80, stream_id as usize, 0x7f);
332
333 let mut res = Vec::new();
334 ack.encode(&mut res);
335 buf.extend_from_slice(res.as_slice());
336 return Ok((take(&mut message.lines.parts), Some(res.len())));
337 }
338 Ok((take(&mut message.lines.parts), None))
339 }
340 }
341 }
342
343 /// Users call `stream_cancel` to stop cancel a stream. And send an `Stream
344 /// Cancellation` to encoder: When a stream is reset or reading is
345 /// abandoned, the decoder emits a Stream Cancellation instruction. The
346 /// instruction starts with the '01' 2-bit pattern, followed by the
347 /// stream ID of the affected stream encoded as a 6-bit prefix integer.
348 /// 0 1 2 3 4 5 6 7
349 /// +---+---+---+---+---+---+---+---+
350 /// | 0 | 1 | Stream ID (6+) |
351 /// +---+---+-----------------------+
stream_cancel(&mut self, stream_id: u64, buf: &mut [u8]) -> Result<usize, QpackError>352 pub fn stream_cancel(&mut self, stream_id: u64, buf: &mut [u8]) -> Result<usize, QpackError> {
353 if self.table.capacity() > 0 {
354 self.blocked.remove(&stream_id);
355 self.streams.remove(&stream_id);
356 let ack = Integer::index(0x40, stream_id as usize, 0x3f);
357 let mut res = Vec::new();
358 ack.encode(&mut res);
359 if res.len() > buf.len() {
360 Err(QpackError::ConnectionError(DecompressionFailed))
361 } else {
362 buf[..res.len()].copy_from_slice(res.as_slice());
363 Ok(res.len())
364 }
365 } else {
366 Ok(0)
367 }
368 }
369 }
370
371 struct Updater<'a> {
372 table: &'a mut DynamicTable,
373 }
374
375 impl<'a> Updater<'a> {
new(table: &'a mut DynamicTable) -> Self376 fn new(table: &'a mut DynamicTable) -> Self {
377 Self { table }
378 }
379
update_capacity(&mut self, capacity: usize) -> Result<(), QpackError>380 fn update_capacity(&mut self, capacity: usize) -> Result<(), QpackError> {
381 self.table.update_size(capacity);
382 Ok(())
383 }
384
update_table( &mut self, mid_bit: MidBit, name: Name, value: Vec<u8>, ) -> Result<(), QpackError>385 fn update_table(
386 &mut self,
387 mid_bit: MidBit,
388 name: Name,
389 value: Vec<u8>,
390 ) -> Result<(), QpackError> {
391 let (f, v) =
392 self.get_field_by_name_and_value(mid_bit, name, value, self.table.insert_count())?;
393 self.table.update(f, v);
394 Ok(())
395 }
396
duplicate(&mut self, index: usize) -> Result<(), QpackError>397 fn duplicate(&mut self, index: usize) -> Result<(), QpackError> {
398 let table_searcher = TableSearcher::new(self.table);
399 let (f, v) = table_searcher
400 .find_field_dynamic(self.table.insert_count() - index - 1)
401 .ok_or(QpackError::ConnectionError(EncoderStreamError))?;
402 self.table.update(f, v);
403 Ok(())
404 }
405
get_field_by_name_and_value( &self, mid_bit: MidBit, name: Name, value: Vec<u8>, insert_count: usize, ) -> Result<(NameField, String), QpackError>406 fn get_field_by_name_and_value(
407 &self,
408 mid_bit: MidBit,
409 name: Name,
410 value: Vec<u8>,
411 insert_count: usize,
412 ) -> Result<(NameField, String), QpackError> {
413 let h = match name {
414 Name::Index(index) => {
415 let searcher = TableSearcher::new(self.table);
416 if let Some(true) = mid_bit.t {
417 searcher
418 .find_field_name_static(index)
419 .ok_or(QpackError::ConnectionError(EncoderStreamError))?
420 } else {
421 searcher
422 .find_field_name_dynamic(insert_count - index - 1)
423 .ok_or(QpackError::ConnectionError(EncoderStreamError))?
424 }
425 }
426 Name::Literal(octets) => NameField::Other(
427 String::from_utf8(octets)
428 .map_err(|_| QpackError::ConnectionError(EncoderStreamError))?,
429 ),
430 };
431 let v = String::from_utf8(value)
432 .map_err(|_| QpackError::ConnectionError(EncoderStreamError))?;
433 Ok((h, v))
434 }
435 }
436
437 struct Searcher<'a> {
438 max_field_section_size: usize,
439 table: &'a DynamicTable,
440 lines: &'a mut FiledLines,
441 base: usize,
442 }
443
444 impl<'a> Searcher<'a> {
new( max_field_section_size: usize, table: &'a DynamicTable, lines: &'a mut FiledLines, ) -> Self445 fn new(
446 max_field_section_size: usize,
447 table: &'a DynamicTable,
448 lines: &'a mut FiledLines,
449 ) -> Self {
450 Self {
451 max_field_section_size,
452 table,
453 lines,
454 base: 0,
455 }
456 }
457
search(&mut self, repr: Representation) -> Result<(), QpackError>458 fn search(&mut self, repr: Representation) -> Result<(), QpackError> {
459 match repr {
460 Representation::Indexed { mid_bit, index } => self.search_indexed(mid_bit, index),
461 Representation::IndexedWithPostIndex { index } => self.search_post_indexed(index),
462 _ => Ok(()),
463 }
464 }
465
search_indexed(&mut self, mid_bit: MidBit, index: usize) -> Result<(), QpackError>466 fn search_indexed(&mut self, mid_bit: MidBit, index: usize) -> Result<(), QpackError> {
467 let table_searcher = TableSearcher::new(self.table);
468 if let Some(true) = mid_bit.t {
469 let (f, v) = table_searcher
470 .find_field_static(index)
471 .ok_or(QpackError::ConnectionError(DecompressionFailed))?;
472
473 self.lines.parts.update(f, v);
474 Ok(())
475 } else {
476 let (f, v) = table_searcher
477 .find_field_dynamic(self.base - index - 1)
478 .ok_or(QpackError::ConnectionError(DecompressionFailed))?;
479
480 self.lines.parts.update(f, v);
481 Ok(())
482 }
483 }
484
search_post_indexed(&mut self, index: usize) -> Result<(), QpackError>485 fn search_post_indexed(&mut self, index: usize) -> Result<(), QpackError> {
486 let table_searcher = TableSearcher::new(self.table);
487 let (f, v) = table_searcher
488 .find_field_dynamic(self.base + index)
489 .ok_or(QpackError::ConnectionError(DecompressionFailed))?;
490 self.check_field_list_size(&f, &v)?;
491 self.lines.parts.update(f, v);
492 Ok(())
493 }
494
search_literal_with_indexing( &mut self, mid_bit: MidBit, name: Name, value: Vec<u8>, ) -> Result<(), QpackError>495 fn search_literal_with_indexing(
496 &mut self,
497 mid_bit: MidBit,
498 name: Name,
499 value: Vec<u8>,
500 ) -> Result<(), QpackError> {
501 let (f, v) = self.get_field_by_name_and_value(
502 mid_bit,
503 name,
504 value,
505 ReprPrefixBit::LITERALWITHINDEXING,
506 )?;
507 self.check_field_list_size(&f, &v)?;
508 self.lines.parts.update(f, v);
509 Ok(())
510 }
511
search_literal_with_post_indexing( &mut self, mid_bit: MidBit, name: Name, value: Vec<u8>, ) -> Result<(), QpackError>512 fn search_literal_with_post_indexing(
513 &mut self,
514 mid_bit: MidBit,
515 name: Name,
516 value: Vec<u8>,
517 ) -> Result<(), QpackError> {
518 let (f, v) = self.get_field_by_name_and_value(
519 mid_bit,
520 name,
521 value,
522 ReprPrefixBit::LITERALWITHPOSTINDEXING,
523 )?;
524 self.check_field_list_size(&f, &v)?;
525 self.lines.parts.update(f, v);
526 Ok(())
527 }
528
search_listeral_with_literal( &mut self, mid_bit: MidBit, name: Name, value: Vec<u8>, ) -> Result<(), QpackError>529 fn search_listeral_with_literal(
530 &mut self,
531 mid_bit: MidBit,
532 name: Name,
533 value: Vec<u8>,
534 ) -> Result<(), QpackError> {
535 let (h, v) = self.get_field_by_name_and_value(
536 mid_bit,
537 name,
538 value,
539 ReprPrefixBit::LITERALWITHLITERALNAME,
540 )?;
541 self.check_field_list_size(&h, &v)?;
542 self.lines.parts.update(h, v);
543 Ok(())
544 }
545
get_field_by_name_and_value( &self, mid_bit: MidBit, name: Name, value: Vec<u8>, repr: ReprPrefixBit, ) -> Result<(NameField, String), QpackError>546 fn get_field_by_name_and_value(
547 &self,
548 mid_bit: MidBit,
549 name: Name,
550 value: Vec<u8>,
551 repr: ReprPrefixBit,
552 ) -> Result<(NameField, String), QpackError> {
553 let h = match name {
554 Name::Index(index) => {
555 if repr == ReprPrefixBit::LITERALWITHINDEXING {
556 let searcher = TableSearcher::new(self.table);
557 if let Some(true) = mid_bit.t {
558 searcher
559 .find_field_name_static(index)
560 .ok_or(QpackError::ConnectionError(DecompressionFailed))?
561 } else {
562 searcher
563 .find_field_name_dynamic(self.base - index - 1)
564 .ok_or(QpackError::ConnectionError(DecompressionFailed))?
565 }
566 } else {
567 let searcher = TableSearcher::new(self.table);
568 searcher
569 .find_field_name_dynamic(self.base + index)
570 .ok_or(QpackError::ConnectionError(DecompressionFailed))?
571 }
572 }
573 Name::Literal(octets) => NameField::Other(
574 String::from_utf8(octets)
575 .map_err(|_| QpackError::ConnectionError(DecompressionFailed))?,
576 ),
577 };
578 let v = String::from_utf8(value)
579 .map_err(|_| QpackError::ConnectionError(DecompressionFailed))?;
580 Ok((h, v))
581 }
update_size(&mut self, addition: usize)582 pub(crate) fn update_size(&mut self, addition: usize) {
583 self.lines.header_size += addition;
584 }
585
check_field_list_size(&mut self, key: &NameField, value: &str) -> Result<(), QpackError>586 fn check_field_list_size(&mut self, key: &NameField, value: &str) -> Result<(), QpackError> {
587 let line_size = field_line_length(key.len(), value.len());
588 self.update_size(line_size);
589 if self.lines.header_size > self.max_field_section_size {
590 Err(QpackError::ConnectionError(DecompressionFailed))
591 } else {
592 Ok(())
593 }
594 }
595 }
596
field_line_length(key_size: usize, value_size: usize) -> usize597 fn field_line_length(key_size: usize, value_size: usize) -> usize {
598 key_size + value_size + 32
599 }
600