1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::collections::HashMap;
15 use std::mem::take;
16
17 use crate::h3::error::CommonError::FieldMissing;
18 use crate::h3::error::DecodeError::{FrameSizeError, UnexpectedFrame, UnsupportedSetting};
19 use crate::h3::error::{CommonError, DecodeError, H3Error};
20 use crate::h3::frame::{
21 CancelPush, Data, GoAway, Headers, MaxPushId, Payload, PushPromise, Settings, DATA_FRAME_TYPE,
22 HEADERS_FRAME_TYPE, PUSH_PROMISE_FRAME_TYPE, SETTINGS_FRAME_TYPE,
23 };
24 use crate::h3::octets::{ReadableBytes, WritableBytes};
25 use crate::h3::parts::Parts;
26 use crate::h3::qpack::error::QpackError;
27 use crate::h3::qpack::table::DynamicTable;
28 use crate::h3::qpack::{FieldDecodeState, FiledLines, QpackDecoder};
29 use crate::h3::stream::StreamMessage::Request;
30 use crate::h3::stream::{FrameKind, Frames, StreamMessage};
31 use crate::h3::{frame, is_bidirectional, stream, Frame, H3ErrorCode};
32
33 /// HTTP3 stream bytes sequence decoder.
34 /// The http3 stream decoder deserializes stream data into readable structured
35 /// data, including stream type, Frame, etc.
36 ///
37 /// # Examples
38 ///
39 /// ```
40 /// use ylong_http::h3::FrameDecoder;
41 ///
42 /// let mut decoder = FrameDecoder::new(100, 10240);
43 /// let data_frame_bytes = &[0, 5, b'h', b'e', b'l', b'l', b'o'];
44 /// let message = decoder.decode(0, data_frame_bytes).unwrap();
45 /// ```
46 pub struct FrameDecoder {
47 qpack_decoder: QpackDecoder,
48 streams: HashMap<u64, DecodedH3Stream>,
49 }
50
51 #[derive(Copy, Clone)]
52 enum DecodeState {
53 StreamType,
54 PushId,
55 FrameType,
56 PayloadLen,
57 HeadersPayload,
58 DataPayload,
59 SettingsPayload,
60 VariablePayload,
61 PushPromisePayload,
62 UnknownPayload,
63 QpackDecoderInst,
64 QpackEncoderInst,
65 DropUnknown,
66 }
67
68 #[derive(Copy, Clone, Eq, PartialEq)]
69 enum StreamType {
70 Init,
71 Request,
72 Control,
73 Push,
74 QpackEncoder,
75 QpackDecoder,
76 Unknown,
77 }
78
79 struct DecodedH3Stream {
80 ty: StreamType,
81 state: DecodeState,
82 buffer: Vec<u8>,
83 offset: usize,
84 push_id: Option<u64>,
85 frame_type: Option<u64>,
86 push_frame_id: Option<u64>,
87 payload_len: Option<u64>,
88 stream_set: bool,
89 }
90
91 enum DecodePartRes {
92 ReturnOuter,
93 Continue,
94 }
95
96 impl FrameDecoder {
97 /// `FrameDecoder` constructor. max_blocked_streams is the maximum number of
98 /// stream blocks allowed by qpack, and max_table_capacity is the
99 /// maximum dynamic table capacity allowed by the encoder.
new(max_blocked_streams: usize, max_table_capacity: usize) -> Self100 pub fn new(max_blocked_streams: usize, max_table_capacity: usize) -> Self {
101 Self {
102 qpack_decoder: QpackDecoder::new(max_blocked_streams, max_table_capacity),
103 streams: HashMap::new(),
104 }
105 }
106
107 /// Sets allowed_max_field_section_size Setting. Only one call is allowed,
108 /// and the max_field_section_size needs to be sent to the peer through the
109 /// Settings frame
local_allowed_max_field_section_size(&mut self, size: usize)110 pub fn local_allowed_max_field_section_size(&mut self, size: usize) {
111 self.qpack_decoder.set_max_field_section_size(size)
112 }
113
114 /// The Decoder sends the Stream Cancellation instruction to actively cancel
115 /// the stream.
cancel_stream(&mut self, stream_id: u64, buf: &mut [u8]) -> Result<usize, H3Error>116 pub fn cancel_stream(&mut self, stream_id: u64, buf: &mut [u8]) -> Result<usize, H3Error> {
117 self.streams.remove(&stream_id);
118 self.qpack_decoder
119 .stream_cancel(stream_id, buf)
120 .map_err(|e| DecodeError::QpackError(e).into())
121 }
122
123 /// Cleans the stream information when the stream normally ends.
finish_stream(&mut self, id: u64) -> Result<(), H3Error>124 pub fn finish_stream(&mut self, id: u64) -> Result<(), H3Error> {
125 if is_bidirectional(id) {
126 self.qpack_decoder
127 .finish_stream(id)
128 .map_err(|e| H3Error::Decode(e.into()))?;
129 }
130 self.streams.remove(&id);
131 Ok(())
132 }
133
134 /// Deserializes stream data into readable structured data,
135 /// including stream type, Frame, etc.
136 ///
137 /// # Examples
138 ///
139 /// ```
140 /// use ylong_http::h3::FrameDecoder;
141 ///
142 /// let mut decoder = FrameDecoder::new(100, 10240);
143 /// let data_frame_bytes = &[0, 5, b'h', b'e', b'l', b'l', b'o'];
144 /// let message = decoder.decode(0, data_frame_bytes).unwrap();
145 /// ```
decode(&mut self, id: u64, src: &[u8]) -> Result<StreamMessage, H3Error>146 pub fn decode(&mut self, id: u64, src: &[u8]) -> Result<StreamMessage, H3Error> {
147 let mut stream = if let Some(stream) = self.streams.remove(&id) {
148 stream
149 } else {
150 DecodedH3Stream::new(id)
151 };
152 stream.buffer.extend_from_slice(src);
153 let mut frames = Frames::new();
154 loop {
155 match stream.decode_state() {
156 DecodeState::StreamType => {
157 if let DecodePartRes::ReturnOuter = stream.decode_stream_type()? {
158 self.streams.insert(id, stream);
159 return Ok(StreamMessage::WaitingMore);
160 }
161 }
162 DecodeState::PushId => {
163 if let DecodePartRes::ReturnOuter = stream.decode_push_id()? {
164 self.streams.insert(id, stream);
165 return Ok(StreamMessage::WaitingMore);
166 }
167 }
168 // The StreamType branch ensures that only Request/Control/Push can go to the
169 // FrameType branch.
170 DecodeState::FrameType => {
171 if let DecodePartRes::ReturnOuter = stream.decode_frame_type(&mut frames)? {
172 let message = stream.return_by_type(frames)?;
173 self.streams.insert(id, stream);
174 return Ok(message);
175 }
176 }
177 DecodeState::PayloadLen => {
178 if let DecodePartRes::ReturnOuter = stream.decode_payload_len(&mut frames)? {
179 let message = stream.return_by_type(frames)?;
180 self.streams.insert(id, stream);
181 return Ok(message);
182 }
183 }
184 DecodeState::DataPayload => {
185 if let DecodePartRes::ReturnOuter = stream.decode_data_payload(&mut frames)? {
186 let message = stream.return_by_type(frames)?;
187 self.streams.insert(id, stream);
188 return Ok(message);
189 }
190 }
191 DecodeState::HeadersPayload => {
192 if let DecodePartRes::ReturnOuter =
193 stream.decode_headers_payload(&mut frames, &mut self.qpack_decoder, id)?
194 {
195 let message = stream.return_by_type(frames)?;
196 self.streams.insert(id, stream);
197 return Ok(message);
198 }
199 }
200 DecodeState::VariablePayload => {
201 if let DecodePartRes::ReturnOuter =
202 stream.decode_variable_payload(&mut frames)?
203 {
204 let message = stream.return_by_type(frames)?;
205 self.streams.insert(id, stream);
206 return Ok(message);
207 }
208 }
209 DecodeState::SettingsPayload => {
210 if let DecodePartRes::ReturnOuter =
211 stream.decode_settings_payload(&mut frames)?
212 {
213 let message = stream.return_by_type(frames)?;
214 self.streams.insert(id, stream);
215 return Ok(message);
216 }
217 }
218 DecodeState::PushPromisePayload => {
219 if let DecodePartRes::ReturnOuter = stream.decode_push_payload(&mut frames)? {
220 let message = stream.return_by_type(frames)?;
221 self.streams.insert(id, stream);
222 return Ok(message);
223 }
224 }
225 DecodeState::UnknownPayload => {
226 if let DecodePartRes::ReturnOuter =
227 stream.decode_unknown_payload(&mut frames)?
228 {
229 let message = stream.return_by_type(frames)?;
230 self.streams.insert(id, stream);
231 return Ok(message);
232 }
233 }
234 DecodeState::QpackDecoderInst => {
235 let reader = ReadableBytes::from(&stream.buffer.as_slice()[stream.offset..]);
236 let inst = Vec::from(reader.remaining());
237 stream.clear_buffer();
238 self.streams.insert(id, stream);
239 return Ok(StreamMessage::QpackDecoder(inst));
240 }
241 DecodeState::QpackEncoderInst => {
242 let reader = ReadableBytes::from(&stream.buffer.as_slice()[stream.offset..]);
243 let unblocked = self
244 .qpack_decoder
245 .decode_ins(reader.remaining())
246 .map_err(DecodeError::QpackError)?;
247 stream.clear_buffer();
248 self.streams.insert(id, stream);
249 return Ok(StreamMessage::QpackEncoder(unblocked));
250 }
251 DecodeState::DropUnknown => {
252 stream.clear_buffer();
253 return Ok(StreamMessage::Unknown);
254 }
255 }
256 }
257 }
258 }
259
260 impl StreamType {
is_request(&self) -> bool261 pub(crate) fn is_request(&self) -> bool {
262 *self == StreamType::Request
263 }
264
is_control(&self) -> bool265 pub(crate) fn is_control(&self) -> bool {
266 *self == StreamType::Control
267 }
268
is_push(&self) -> bool269 pub(crate) fn is_push(&self) -> bool {
270 *self == StreamType::Push
271 }
272 }
273
274 impl DecodedH3Stream {
new(id: u64) -> Self275 pub(crate) fn new(id: u64) -> Self {
276 const DECODED_BUFFER_SIZE: usize = 1024;
277 let (ty, state) = if is_bidirectional(id) {
278 (StreamType::Request, DecodeState::FrameType)
279 } else {
280 (StreamType::Init, DecodeState::StreamType)
281 };
282 Self {
283 ty,
284 state,
285 // TODO a property size.
286 buffer: Vec::with_capacity(DECODED_BUFFER_SIZE),
287 offset: 0,
288 push_id: None,
289 frame_type: None,
290 push_frame_id: None,
291 payload_len: None,
292 stream_set: false,
293 }
294 }
295
decode_state(&self) -> DecodeState296 pub(crate) fn decode_state(&self) -> DecodeState {
297 self.state
298 }
299
set_decode_state(&mut self, state: DecodeState)300 pub(crate) fn set_decode_state(&mut self, state: DecodeState) {
301 self.state = state
302 }
303
init_state_by_type(&mut self)304 fn init_state_by_type(&mut self) {
305 let next_state = match self.ty {
306 StreamType::Control => DecodeState::FrameType,
307 StreamType::Push => DecodeState::PushId,
308 StreamType::QpackEncoder => DecodeState::QpackEncoderInst,
309 StreamType::QpackDecoder => DecodeState::QpackDecoderInst,
310 StreamType::Unknown => DecodeState::DropUnknown,
311 _ => unreachable!(),
312 };
313 self.set_decode_state(next_state);
314 }
315
stream_type(&self) -> StreamType316 fn stream_type(&self) -> StreamType {
317 self.ty
318 }
319
curr_frame_type(&self) -> Option<u64>320 fn curr_frame_type(&self) -> Option<u64> {
321 self.frame_type
322 }
323
set_stream_type(&mut self, ty: StreamType)324 fn set_stream_type(&mut self, ty: StreamType) {
325 self.ty = ty
326 }
327
is_set(&self) -> bool328 fn is_set(&self) -> bool {
329 self.stream_set
330 }
331
remain_payload_len(&self) -> u64332 fn remain_payload_len(&self) -> u64 {
333 self.payload_len.unwrap_or(0)
334 }
335
subtract_payload_len(&mut self, off: usize) -> Result<(), H3Error>336 fn subtract_payload_len(&mut self, off: usize) -> Result<(), H3Error> {
337 match self.payload_len {
338 None => Err(CommonError::FieldMissing.into()),
339 Some(curr) => {
340 let (remain, overflow) = curr.overflowing_sub(off as u64);
341 if overflow {
342 Err(CommonError::CalculateOverflow.into())
343 } else {
344 self.payload_len = Some(remain);
345 Ok(())
346 }
347 }
348 }
349 }
350
push_id(&self) -> Result<u64, H3Error>351 fn push_id(&self) -> Result<u64, H3Error> {
352 self.push_id.ok_or(CommonError::FieldMissing.into())
353 }
354
clear_frame(&mut self)355 fn clear_frame(&mut self) {
356 self.frame_type = None;
357 self.payload_len = None;
358 }
359
clear_buffer(&mut self)360 fn clear_buffer(&mut self) {
361 self.buffer.clear();
362 self.offset = 0;
363 }
364
return_by_type(&self, frames: Frames) -> Result<StreamMessage, H3Error>365 fn return_by_type(&self, frames: Frames) -> Result<StreamMessage, H3Error> {
366 match self.stream_type() {
367 StreamType::Request => Ok(StreamMessage::Request(frames)),
368 StreamType::Push => {
369 let push_id = self.push_id()?;
370 Ok(StreamMessage::Push(push_id, frames))
371 }
372 StreamType::Control => Ok(StreamMessage::Control(frames)),
373 _ => {
374 // Note: unreachable
375 Err(UnexpectedFrame(self.frame_type.unwrap()).into())
376 }
377 }
378 }
379
decode_stream_type(&mut self) -> Result<DecodePartRes, H3Error>380 fn decode_stream_type(&mut self) -> Result<DecodePartRes, H3Error> {
381 let mut reader = ReadableBytes::from(&self.buffer.as_slice()[self.offset..]);
382 match reader.get_varint() {
383 Ok(integer) => {
384 self.offset += reader.index();
385 let ty = decode_type(integer);
386 self.set_stream_type(ty);
387 self.init_state_by_type();
388 Ok(DecodePartRes::Continue)
389 }
390 // Byte shortage
391 Err(_) => {
392 self.set_decode_state(DecodeState::StreamType);
393 Ok(DecodePartRes::ReturnOuter)
394 }
395 }
396 }
397
decode_push_id(&mut self) -> Result<DecodePartRes, H3Error>398 fn decode_push_id(&mut self) -> Result<DecodePartRes, H3Error> {
399 let mut reader = ReadableBytes::from(&self.buffer.as_slice()[self.offset..]);
400 match reader.get_varint() {
401 // TODO enable push
402 Ok(integer) => {
403 self.push_id = Some(integer);
404 self.offset += reader.index();
405 self.set_decode_state(DecodeState::FrameType);
406 Ok(DecodePartRes::Continue)
407 }
408 // Byte shortage
409 Err(_) => {
410 self.set_decode_state(DecodeState::PushId);
411 Ok(DecodePartRes::ReturnOuter)
412 }
413 }
414 }
415
decode_frame_type(&mut self, frames: &mut Frames) -> Result<DecodePartRes, H3Error>416 fn decode_frame_type(&mut self, frames: &mut Frames) -> Result<DecodePartRes, H3Error> {
417 let mut reader = ReadableBytes::from(&self.buffer.as_slice()[self.offset..]);
418 match reader.get_varint() {
419 Ok(integer) => {
420 match integer {
421 frame::DATA_FRAME_TYPE | frame::HEADERS_FRAME_TYPE => {
422 if !(self.stream_type().is_request() || self.stream_type().is_push()) {
423 return Err(DecodeError::UnexpectedFrame(integer).into());
424 }
425 }
426 frame::PUSH_PROMISE_FRAME_TYPE => {
427 if !self.stream_type().is_request() {
428 return Err(DecodeError::UnexpectedFrame(integer).into());
429 }
430 }
431 frame::SETTINGS_FRAME_TYPE => {
432 if !self.stream_type().is_control() || self.is_set() {
433 return Err(DecodeError::UnexpectedFrame(integer).into());
434 }
435 self.stream_set = true;
436 }
437 frame::CANCEL_PUSH_FRAME_TYPE => {
438 if !self.stream_type().is_control() {
439 return Err(DecodeError::UnexpectedFrame(integer).into());
440 }
441 }
442 frame::MAX_PUSH_ID_FRAME_TYPE => {
443 return Err(DecodeError::UnexpectedFrame(integer).into())
444 }
445 _ => {}
446 }
447 self.frame_type = Some(integer);
448 self.offset += reader.index();
449 self.set_decode_state(DecodeState::PayloadLen);
450 Ok(DecodePartRes::Continue)
451 }
452 // Byte shortage
453 Err(_) => {
454 self.set_decode_state(DecodeState::FrameType);
455 frames.push(FrameKind::Partial);
456 Ok(DecodePartRes::ReturnOuter)
457 }
458 }
459 }
460
decode_payload_len(&mut self, frames: &mut Frames) -> Result<DecodePartRes, H3Error>461 fn decode_payload_len(&mut self, frames: &mut Frames) -> Result<DecodePartRes, H3Error> {
462 let mut reader = ReadableBytes::from(&self.buffer.as_slice()[self.offset..]);
463 match reader.get_varint() {
464 Ok(integer) => {
465 self.payload_len = Some(integer);
466 self.offset += reader.index();
467 match self.curr_frame_type() {
468 None => {
469 unreachable!()
470 }
471 Some(DATA_FRAME_TYPE) => {
472 self.set_decode_state(DecodeState::DataPayload);
473 }
474 Some(HEADERS_FRAME_TYPE) => {
475 self.set_decode_state(DecodeState::HeadersPayload);
476 }
477 Some(SETTINGS_FRAME_TYPE) => {
478 self.set_decode_state(DecodeState::SettingsPayload);
479 }
480 Some(PUSH_PROMISE_FRAME_TYPE) => {
481 self.set_decode_state(DecodeState::PushPromisePayload);
482 }
483 Some(frame::GOAWAY_FRAME_TYPE) => {
484 self.set_decode_state(DecodeState::VariablePayload);
485 }
486 Some(frame::MAX_PUSH_ID_FRAME_TYPE) => {
487 self.set_decode_state(DecodeState::VariablePayload);
488 }
489 Some(frame::CANCEL_PUSH_FRAME_TYPE) => {
490 self.set_decode_state(DecodeState::VariablePayload);
491 }
492 _ => {
493 self.set_decode_state(DecodeState::UnknownPayload);
494 }
495 }
496 }
497 // Byte shortage
498 Err(_) => {
499 frames.push(FrameKind::Partial);
500 return Ok(DecodePartRes::ReturnOuter);
501 }
502 }
503 Ok(DecodePartRes::Continue)
504 }
505
decode_data_payload(&mut self, frames: &mut Frames) -> Result<DecodePartRes, H3Error>506 fn decode_data_payload(&mut self, frames: &mut Frames) -> Result<DecodePartRes, H3Error> {
507 let mut reader = ReadableBytes::from(&self.buffer.as_slice()[self.offset..]);
508
509 // Note: None is impossible for `stream.payload_len`.
510 let payload_len = self.remain_payload_len() as usize;
511 if reader.cap() < payload_len {
512 let frame = Frame::new(
513 DATA_FRAME_TYPE,
514 Payload::Data(Data::new(Vec::from(reader.remaining()))),
515 );
516 self.subtract_payload_len(reader.cap())?;
517 self.clear_buffer();
518 frames.push(FrameKind::Complete(Box::from(frame)));
519 frames.push(FrameKind::Partial);
520 Ok(DecodePartRes::ReturnOuter)
521 } else {
522 let frame = Frame::new(
523 DATA_FRAME_TYPE,
524 Payload::Data(Data::new(Vec::from(reader.slice(payload_len)?))),
525 );
526 frames.push(FrameKind::Complete(Box::from(frame)));
527 self.offset += payload_len;
528 let remaining = reader.cap();
529 self.clear_frame();
530 self.set_decode_state(DecodeState::FrameType);
531 if remaining == 0 {
532 self.clear_buffer();
533 Ok(DecodePartRes::ReturnOuter)
534 } else {
535 Ok(DecodePartRes::Continue)
536 }
537 }
538 }
539
get_qpack_decoded_header( &mut self, frames: &mut Frames, qpack_decoder: &mut QpackDecoder, id: u64, remaining: usize, ) -> Result<DecodePartRes, H3Error>540 fn get_qpack_decoded_header(
541 &mut self,
542 frames: &mut Frames,
543 qpack_decoder: &mut QpackDecoder,
544 id: u64,
545 remaining: usize,
546 ) -> Result<DecodePartRes, H3Error> {
547 let mut ins_buf = Vec::new();
548 // TODO id can be u64.
549 let (part, len) = qpack_decoder
550 .finish(id, &mut ins_buf)
551 .map_err(DecodeError::QpackError)?;
552 let frame = match self.curr_frame_type() {
553 Some(HEADERS_FRAME_TYPE) => {
554 let mut headers_payload = Headers::new(part);
555 if len.is_some() {
556 headers_payload.set_instruction(ins_buf);
557 };
558 Frame::new(HEADERS_FRAME_TYPE, Payload::Headers(headers_payload))
559 }
560 Some(PUSH_PROMISE_FRAME_TYPE) => {
561 let mut push_promise =
562 PushPromise::new(self.push_frame_id.ok_or(FieldMissing)?, part);
563 if len.is_some() {
564 push_promise.set_instruction(ins_buf)
565 };
566 Frame::new(PUSH_PROMISE_FRAME_TYPE, Payload::PushPromise(push_promise))
567 }
568 _ => unreachable!(),
569 };
570 frames.push(FrameKind::Complete(Box::from(frame)));
571 self.clear_frame();
572 self.set_decode_state(DecodeState::FrameType);
573 if remaining == 0 {
574 self.clear_buffer();
575 Ok(DecodePartRes::ReturnOuter)
576 } else {
577 Ok(DecodePartRes::Continue)
578 }
579 }
580
decode_headers_payload( &mut self, frames: &mut Frames, qpack_decoder: &mut QpackDecoder, id: u64, ) -> Result<DecodePartRes, H3Error>581 fn decode_headers_payload(
582 &mut self,
583 frames: &mut Frames,
584 qpack_decoder: &mut QpackDecoder,
585 id: u64,
586 ) -> Result<DecodePartRes, H3Error> {
587 let mut reader = ReadableBytes::from(&self.buffer.as_slice()[self.offset..]);
588 let payload_len = self.remain_payload_len() as usize;
589 if reader.cap() < payload_len {
590 if let FieldDecodeState::Blocked = qpack_decoder
591 .decode_repr(reader.remaining(), id)
592 .map_err(DecodeError::QpackError)?
593 {
594 frames.push(FrameKind::Blocked);
595 } else {
596 frames.push(FrameKind::Partial);
597 }
598 self.subtract_payload_len(reader.cap())?;
599 self.clear_buffer();
600 Ok(DecodePartRes::ReturnOuter)
601 } else {
602 match qpack_decoder
603 .decode_repr(reader.slice(payload_len)?, id)
604 .map_err(DecodeError::QpackError)?
605 {
606 FieldDecodeState::Blocked => {
607 frames.push(FrameKind::Blocked);
608 self.subtract_payload_len(payload_len)?;
609 self.offset += payload_len;
610 Ok(DecodePartRes::ReturnOuter)
611 }
612 FieldDecodeState::Decoded => {
613 self.offset += payload_len;
614 self.get_qpack_decoded_header(frames, qpack_decoder, id, reader.cap())
615 }
616 }
617 }
618 }
619
decode_variable_payload(&mut self, frames: &mut Frames) -> Result<DecodePartRes, H3Error>620 fn decode_variable_payload(&mut self, frames: &mut Frames) -> Result<DecodePartRes, H3Error> {
621 let mut reader = ReadableBytes::from(&self.buffer.as_slice()[self.offset..]);
622 match reader.get_varint() {
623 Ok(id) => {
624 match self.frame_type {
625 Some(frame::GOAWAY_FRAME_TYPE) => {
626 let frame =
627 Frame::new(frame::GOAWAY_FRAME_TYPE, Payload::Goaway(GoAway::new(id)));
628 frames.push(FrameKind::Complete(Box::from(frame)));
629 }
630 Some(frame::MAX_PUSH_ID_FRAME_TYPE) => {
631 let frame = Frame::new(
632 frame::MAX_PUSH_ID_FRAME_TYPE,
633 Payload::MaxPushId(MaxPushId::new(id)),
634 );
635 frames.push(FrameKind::Complete(Box::from(frame)));
636 }
637 Some(frame::CANCEL_PUSH_FRAME_TYPE) => {
638 let frame = Frame::new(
639 frame::CANCEL_PUSH_FRAME_TYPE,
640 Payload::CancelPush(CancelPush::new(id)),
641 );
642 frames.push(FrameKind::Complete(Box::from(frame)));
643 }
644 _ => return Err(DecodeError::UnexpectedFrame(self.frame_type.unwrap()).into()),
645 }
646 self.offset += reader.index();
647 let remaining = reader.cap();
648 self.clear_frame();
649 self.set_decode_state(DecodeState::FrameType);
650 if remaining == 0 {
651 self.clear_buffer();
652 Ok(DecodePartRes::ReturnOuter)
653 } else {
654 Ok(DecodePartRes::Continue)
655 }
656 }
657 Err(_) => {
658 frames.push(FrameKind::Partial);
659 Ok(DecodePartRes::ReturnOuter)
660 }
661 }
662 }
663
decode_settings_payload(&mut self, frames: &mut Frames) -> Result<DecodePartRes, H3Error>664 fn decode_settings_payload(&mut self, frames: &mut Frames) -> Result<DecodePartRes, H3Error> {
665 let mut reader = ReadableBytes::from(&self.buffer.as_slice()[self.offset..]);
666 let payload_len = self.remain_payload_len();
667 if (reader.cap() as u64) < payload_len {
668 frames.push(FrameKind::Partial);
669 return Ok(DecodePartRes::ReturnOuter);
670 }
671 let mut settings = Settings::default();
672
673 let mut addition = Vec::new();
674
675 while (reader.index() as u64) < payload_len {
676 let key = match reader.get_varint() {
677 Ok(id) => id,
678 Err(_) => return Err(FrameSizeError(payload_len).into()),
679 };
680 let value = match reader.get_varint() {
681 Ok(val) => val,
682 Err(_) => return Err(FrameSizeError(payload_len).into()),
683 };
684
685 match key {
686 frame::SETTING_QPACK_MAX_TABLE_CAPACITY => {
687 settings.set_qpack_max_table_capacity(value);
688 }
689 frame::SETTING_ENABLE_CONNECT_PROTOCOL => {
690 settings.set_connect_protocol_enabled(value)
691 }
692 frame::SETTING_H3_DATAGRAM => settings.set_h3_datagram(value),
693 frame::SETTING_MAX_FIELD_SECTION_SIZE => settings.set_max_field_section_size(value),
694 frame::SETTING_QPACK_BLOCKED_STREAMS => settings.set_qpack_block_stream(value),
695 0x0 | 0x2 | 0x3 | 0x4 | 0x5 => return Err(UnsupportedSetting(key).into()),
696 _ => addition.push((key, value)),
697 }
698 }
699
700 if !addition.is_empty() {
701 settings.set_additional(addition);
702 }
703 let frame = Frame::new(SETTINGS_FRAME_TYPE, Payload::Settings(settings));
704 frames.push(FrameKind::Complete(Box::from(frame)));
705 let remaining = reader.cap();
706 self.clear_frame();
707 self.set_decode_state(DecodeState::FrameType);
708 if remaining == 0 {
709 self.clear_buffer();
710 Ok(DecodePartRes::ReturnOuter)
711 } else {
712 Ok(DecodePartRes::Continue)
713 }
714 }
715
decode_push_payload(&mut self, frames: &mut Frames) -> Result<DecodePartRes, H3Error>716 fn decode_push_payload(&mut self, frames: &mut Frames) -> Result<DecodePartRes, H3Error> {
717 let mut reader = ReadableBytes::from(&self.buffer.as_slice()[self.offset..]);
718 match reader.get_varint() {
719 Ok(id) => {
720 self.push_frame_id = Some(id);
721 self.subtract_payload_len(reader.index())?;
722 self.set_decode_state(DecodeState::HeadersPayload);
723 Ok(DecodePartRes::Continue)
724 }
725 Err(_) => {
726 frames.push(FrameKind::Partial);
727 Ok(DecodePartRes::ReturnOuter)
728 }
729 }
730 }
731
decode_unknown_payload(&mut self, frames: &mut Frames) -> Result<DecodePartRes, H3Error>732 fn decode_unknown_payload(&mut self, frames: &mut Frames) -> Result<DecodePartRes, H3Error> {
733 let mut reader = ReadableBytes::from(&self.buffer.as_slice()[self.offset..]);
734 // Note: None is impossible for `stream.payload_len`.
735 let payload_len = self.remain_payload_len() as usize;
736 if reader.cap() < payload_len {
737 let remaining = reader.cap();
738 self.clear_buffer();
739 self.subtract_payload_len(remaining)?;
740 frames.push(FrameKind::Partial);
741 Ok(DecodePartRes::ReturnOuter)
742 } else {
743 reader.slice(payload_len)?;
744 // Reader will renew by stream.offset, so don't need to reset.
745 self.offset += payload_len;
746 let remaining = reader.cap();
747 self.clear_frame();
748 self.set_decode_state(DecodeState::FrameType);
749 if remaining == 0 {
750 self.clear_buffer();
751 Ok(DecodePartRes::ReturnOuter)
752 } else {
753 Ok(DecodePartRes::Continue)
754 }
755 }
756 }
757 }
758
decode_type(integer: u64) -> StreamType759 fn decode_type(integer: u64) -> StreamType {
760 match integer {
761 0x0 => StreamType::Control,
762 0x1 => StreamType::Push,
763 0x2 => StreamType::QpackEncoder,
764 0x3 => StreamType::QpackDecoder,
765 _ => StreamType::Unknown,
766 }
767 }
768
769 #[cfg(test)]
770 mod h3_decoder {
771 use crate::h3::{FrameDecoder, FrameKind, Payload, StreamMessage};
772
773 /// UT test cases for `FrameDecoder` decoding `Settings` frame.
774 ///
775 /// # Brief
776 /// 1. Creates a `FrameEncoder`.
777 /// 2. Creates a buf with encoded Settings frame bytes.
778 /// 3. Decode the bytes with unidirectional stream id.
779 /// 4. Checkout if the stream type is correct.
780 /// 5. Checkout if the frame type is correct.
781 #[allow(clippy::assertions_on_constants)]
782 #[test]
ut_decode_stream_control_with_setting()783 fn ut_decode_stream_control_with_setting() {
784 let mut decoder = FrameDecoder::new(100, 500);
785 let buf: [u8; 24] = [
786 0x0, 0x4, 0x15, 0x6, 0x80, 0x1, 0x0, 0x0, 0xF5, 0xEF, 0x9, 0x12, 0x8C, 0xC, 0x8E, 0x72,
787 0xDD, 0xB5, 0xB5, 0x15, 0xCD, 0x85, 0x44, 0xF8,
788 ];
789 let res = decoder.decode(3, &buf).unwrap();
790 if let StreamMessage::Control(frames) = res {
791 assert_eq!(frames.len(), 1);
792 for frame_kind in frames.iter() {
793 if let FrameKind::Complete(frame) = frame_kind {
794 assert_eq!(*frame.frame_type(), 0x4);
795 if let Payload::Settings(_setting) = frame.payload() {
796 return;
797 }
798 }
799 }
800 }
801 assert!(false)
802 }
803
804 /// UT test cases for `FrameDecoder` decoding `Headers` frame.
805 ///
806 /// # Brief
807 /// 1. Creates a `FrameEncoder`.
808 /// 2. Creates a buf with encoded Headers frame bytes.
809 /// 3. Decode the bytes with bidirectional stream id.
810 /// 4. Checkout if the stream type is correct.
811 /// 5. Checkout if the frame type is correct.
812 #[allow(clippy::assertions_on_constants)]
813 #[test]
ut_decode_stream_request_with_header()814 fn ut_decode_stream_request_with_header() {
815 let mut decoder = FrameDecoder::new(100, 500);
816 let buf: [u8; 117] = [
817 0x1, 0x40, 0x72, 0x0, 0x0, 0xD9, 0x56, 0x96, 0xC3, 0x61, 0xBE, 0x94, 0xB, 0xCA, 0x6A,
818 0x22, 0x54, 0x10, 0x4, 0xD2, 0x80, 0x15, 0xC6, 0x99, 0xB8, 0x27, 0x54, 0xC5, 0xA3,
819 0x7F, 0x5F, 0x1D, 0x87, 0x49, 0x7C, 0xA5, 0x89, 0xD3, 0x4D, 0x1F, 0x54, 0x85, 0x8,
820 0x9B, 0x7D, 0xB7, 0xFF, 0x5F, 0x4D, 0x87, 0x25, 0x7, 0xB6, 0x49, 0x68, 0x1D, 0x85,
821 0x2D, 0x24, 0xAB, 0x58, 0x3F, 0x5F, 0x8F, 0x7A, 0x46, 0x9B, 0x11, 0x5B, 0x64, 0x92,
822 0x46, 0xF1, 0x23, 0x7C, 0x8B, 0x67, 0x87, 0xF3, 0x5F, 0x44, 0x90, 0x9D, 0x98, 0x3F,
823 0x9B, 0x8D, 0x34, 0xCF, 0xF3, 0xF6, 0xA5, 0x23, 0x81, 0xE7, 0x1A, 0x0, 0x3F, 0x2F, 0x3,
824 0x41, 0x6C, 0xEE, 0x5B, 0x16, 0x49, 0xA9, 0x35, 0x53, 0x7F, 0x86, 0x24, 0xB8, 0x3C,
825 0xA7, 0x5D, 0x86,
826 ];
827 let res = decoder.decode(0, &buf).unwrap();
828 if let StreamMessage::Request(frames) = res {
829 assert_eq!(frames.len(), 1);
830 for frame_kind in frames.iter() {
831 if let FrameKind::Complete(frame) = frame_kind {
832 assert_eq!(*frame.frame_type(), 0x1);
833 if let Payload::Headers(header) = frame.payload() {
834 assert!(header.get_instruction().is_none());
835 let part = header.get_part();
836 let (pseudo, _headers) = part.parts();
837 assert_eq!(pseudo.status(), Some("200"));
838 return;
839 }
840 }
841 }
842 }
843 assert!(false)
844 }
845
846 /// UT test cases for `FrameDecoder` decoding `Unknown` frame.
847 ///
848 /// # Brief
849 /// 1. Creates a `FrameEncoder`.
850 /// 2. Creates a buf with encoded unknown type frame bytes.
851 /// 3. Decode the bytes with bidirectional stream id.
852 /// 4. Checkout if the stream type is correct.
853 #[allow(clippy::assertions_on_constants)]
854 #[test]
ut_decode_stream_request_with_unknown()855 fn ut_decode_stream_request_with_unknown() {
856 let mut decoder = FrameDecoder::new(100, 500);
857 let buf: [u8; 9] = [0xDF, 0x6B, 0xED, 0xB9, 0x11, 0x75, 0x93, 0x91, 0x0];
858 let res = decoder.decode(0, &buf).unwrap();
859 if let StreamMessage::Request(frames) = res {
860 assert_eq!(frames.len(), 0);
861 return;
862 }
863 assert!(false)
864 }
865 }
866