• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Streams operations utils.
15 
16 use std::cmp::{min, Ordering};
17 use std::collections::{HashMap, HashSet, VecDeque};
18 use std::task::{Context, Poll};
19 
20 use ylong_http::h2::{Data, ErrorCode, Frame, FrameFlags, H2Error, Payload, StreamId};
21 
22 use crate::runtime::UnboundedSender;
23 use crate::util::data_ref::BodyDataRef;
24 use crate::util::dispatcher::http2::DispatchErrorKind;
25 use crate::util::h2::buffer::{FlowControl, RecvWindow, SendWindow};
26 
27 pub(crate) const INITIAL_MAX_SEND_STREAM_ID: StreamId = u32::MAX >> 1;
28 pub(crate) const INITIAL_MAX_RECV_STREAM_ID: StreamId = u32::MAX >> 1;
29 
30 const DEFAULT_MAX_STREAM_ID: StreamId = u32::MAX >> 1;
31 const INITIAL_LATEST_REMOTE_ID: StreamId = 0;
32 const DEFAULT_MAX_CONCURRENT_STREAMS: u32 = 100;
33 
34 #[cfg_attr(test, derive(Debug, PartialEq))]
35 pub(crate) enum FrameRecvState {
36     OK,
37     Ignore,
38     Err(H2Error),
39 }
40 
41 pub(crate) enum DataReadState {
42     Closed,
43     // Wait for poll_read or wait for window.
44     Pending,
45     Ready(Frame),
46     Finish(Frame),
47 }
48 #[cfg_attr(test, derive(Debug, PartialEq))]
49 pub(crate) enum StreamEndState {
50     OK,
51     Ignore,
52     Err(H2Error),
53 }
54 
55 //                              +--------+
56 //                      send PP |        | recv PP
57 //                     ,--------|  idle  |--------.
58 //                    /         |        |         \
59 //                   v          +--------+          v
60 //            +----------+          |           +----------+
61 //            |          |          | send H /  |          |
62 //     ,------| reserved |          | recv H    | reserved |------.
63 //     |      | (local)  |          |           | (remote) |      |
64 //     |      +----------+          v           +----------+      |
65 //     |          |             +--------+             |          |
66 //     |          |     recv ES |        | send ES     |          |
67 //     |   send H |     ,-------|  open  |-------.     | recv H   |
68 //     |          |    /        |        |        \    |          |
69 //     |          v   v         +--------+         v   v          |
70 //     |      +----------+          |           +----------+      |
71 //     |      |   half   |          |           |   half   |      |
72 //     |      |  closed  |          | send R /  |  closed  |      |
73 //     |      | (remote) |          | recv R    | (local)  |      |
74 //     |      +----------+          |           +----------+      |
75 //     |           |                |                 |           |
76 //     |           | send ES /      |       recv ES / |           |
77 //     |           | send R /       v        send R / |           |
78 //     |           | recv R     +--------+   recv R   |           |
79 //     | send R /  `----------->|        |<-----------'  send R / |
80 //     | recv R                 | closed |               recv R   |
81 //     `----------------------->|        |<----------------------'
82 //                              +--------+
83 #[derive(Copy, Clone, Debug)]
84 #[cfg_attr(test, derive(PartialEq))]
85 pub(crate) enum H2StreamState {
86     Idle,
87     // When response does not depend on request,
88     // the server can send response directly without waiting for the request to finish receiving.
89     // Therefore, the sending and receiving states of the client have their own states
90     Open {
91         send: ActiveState,
92         recv: ActiveState,
93     },
94     #[allow(dead_code)]
95     ReservedRemote,
96     // After the request is sent, the state is waiting for the response to be received.
97     LocalHalfClosed(ActiveState),
98     // When the response is received but the request is not fully sent,
99     // this indicates the status of the request being sent
100     RemoteHalfClosed(ActiveState),
101     Closed(CloseReason),
102 }
103 
104 #[derive(Copy, Clone, Debug)]
105 #[cfg_attr(test, derive(PartialEq))]
106 pub(crate) enum CloseReason {
107     LocalRst,
108     RemoteRst,
109     RemoteGoAway,
110     LocalGoAway,
111     EndStream,
112 }
113 
114 #[derive(Copy, Clone, Debug)]
115 #[cfg_attr(test, derive(PartialEq))]
116 pub(crate) enum ActiveState {
117     WaitHeaders,
118     WaitData,
119 }
120 
121 pub(crate) struct Stream {
122     pub(crate) recv_window: RecvWindow,
123     pub(crate) send_window: SendWindow,
124     pub(crate) state: H2StreamState,
125     pub(crate) header: Option<Frame>,
126     pub(crate) data: BodyDataRef,
127 }
128 
129 pub(crate) struct RequestWrapper {
130     pub(crate) flag: FrameFlags,
131     pub(crate) payload: Payload,
132     pub(crate) data: BodyDataRef,
133 }
134 
135 pub(crate) struct Streams {
136     // Records the received goaway last_stream_id.
137     pub(crate) max_send_id: StreamId,
138     // Records the send goaway last_stream_id.
139     pub(crate) max_recv_id: StreamId,
140     // Currently the client doesn't support push promise, so this value is always 0.
141     pub(crate) latest_remote_id: StreamId,
142     pub(crate) stream_recv_window_size: u32,
143     pub(crate) stream_send_window_size: u32,
144     max_concurrent_streams: u32,
145     current_concurrent_streams: u32,
146     flow_control: FlowControl,
147     pending_concurrency: VecDeque<StreamId>,
148     pending_stream_window: HashSet<u32>,
149     pending_conn_window: VecDeque<u32>,
150     pending_send: VecDeque<StreamId>,
151     window_updating_streams: VecDeque<StreamId>,
152     pub(crate) stream_map: HashMap<StreamId, Stream>,
153     pub(crate) next_stream_id: StreamId,
154 }
155 
156 macro_rules! change_stream_state {
157     (Idle: $eos: expr, $state: expr) => {
158         $state = if $eos {
159             H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders)
160         } else {
161             H2StreamState::Open {
162                 send: ActiveState::WaitHeaders,
163                 recv: ActiveState::WaitData,
164             }
165         };
166     };
167     (Open: $eos: expr, $state: expr, $send: expr) => {
168         $state = if $eos {
169             H2StreamState::RemoteHalfClosed($send.clone())
170         } else {
171             H2StreamState::Open {
172                 send: $send.clone(),
173                 recv: ActiveState::WaitData,
174             }
175         };
176     };
177     (HalfClosed: $eos: expr, $state: expr) => {
178         $state = if $eos {
179             H2StreamState::Closed(CloseReason::EndStream)
180         } else {
181             H2StreamState::LocalHalfClosed(ActiveState::WaitData)
182         };
183     };
184 }
185 
186 impl Streams {
new( recv_window_size: u32, send_window_size: u32, flow_control: FlowControl, ) -> Self187     pub(crate) fn new(
188         recv_window_size: u32,
189         send_window_size: u32,
190         flow_control: FlowControl,
191     ) -> Self {
192         Self {
193             max_send_id: INITIAL_MAX_SEND_STREAM_ID,
194             max_recv_id: INITIAL_MAX_RECV_STREAM_ID,
195             latest_remote_id: INITIAL_LATEST_REMOTE_ID,
196             max_concurrent_streams: DEFAULT_MAX_CONCURRENT_STREAMS,
197             current_concurrent_streams: 0,
198             stream_recv_window_size: recv_window_size,
199             stream_send_window_size: send_window_size,
200             flow_control,
201             pending_concurrency: VecDeque::new(),
202             pending_stream_window: HashSet::new(),
203             pending_conn_window: VecDeque::new(),
204             pending_send: VecDeque::new(),
205             window_updating_streams: VecDeque::new(),
206             stream_map: HashMap::new(),
207             next_stream_id: 1,
208         }
209     }
210 
decrease_current_concurrency(&mut self)211     pub(crate) fn decrease_current_concurrency(&mut self) {
212         self.current_concurrent_streams -= 1;
213     }
214 
increase_current_concurrency(&mut self)215     pub(crate) fn increase_current_concurrency(&mut self) {
216         self.current_concurrent_streams += 1;
217     }
218 
reach_max_concurrency(&mut self) -> bool219     pub(crate) fn reach_max_concurrency(&mut self) -> bool {
220         self.current_concurrent_streams >= self.max_concurrent_streams
221     }
222 
apply_max_concurrent_streams(&mut self, num: u32)223     pub(crate) fn apply_max_concurrent_streams(&mut self, num: u32) {
224         self.max_concurrent_streams = num;
225     }
226 
apply_send_initial_window_size(&mut self, size: u32) -> Result<(), H2Error>227     pub(crate) fn apply_send_initial_window_size(&mut self, size: u32) -> Result<(), H2Error> {
228         let current = self.stream_send_window_size;
229         self.stream_send_window_size = size;
230 
231         match current.cmp(&size) {
232             Ordering::Less => {
233                 let excess = size - current;
234                 for (_id, stream) in self.stream_map.iter_mut() {
235                     stream.send_window.increase_size(excess)?;
236                 }
237                 for id in self.pending_stream_window.iter() {
238                     self.pending_send.push_back(*id);
239                 }
240                 self.pending_stream_window.clear();
241             }
242             Ordering::Greater => {
243                 let excess = current - size;
244                 for (_id, stream) in self.stream_map.iter_mut() {
245                     stream.send_window.reduce_size(excess);
246                 }
247             }
248             Ordering::Equal => {}
249         }
250         Ok(())
251     }
252 
apply_recv_initial_window_size(&mut self, size: u32)253     pub(crate) fn apply_recv_initial_window_size(&mut self, size: u32) {
254         let current = self.stream_recv_window_size;
255         self.stream_recv_window_size = size;
256         match current.cmp(&size) {
257             Ordering::Less => {
258                 for (_id, stream) in self.stream_map.iter_mut() {
259                     let extra = size - current;
260                     stream.recv_window.increase_notification(extra);
261                     stream.recv_window.increase_actual(extra);
262                 }
263             }
264             Ordering::Greater => {
265                 for (_id, stream) in self.stream_map.iter_mut() {
266                     stream.recv_window.reduce_notification(current - size);
267                 }
268             }
269             Ordering::Equal => {}
270         }
271     }
272 
release_stream_recv_window( &mut self, id: StreamId, size: u32, sender: &UnboundedSender<Frame>, ) -> Result<(), DispatchErrorKind>273     pub(crate) fn release_stream_recv_window(
274         &mut self,
275         id: StreamId,
276         size: u32,
277         sender: &UnboundedSender<Frame>,
278     ) -> Result<(), DispatchErrorKind> {
279         if let Some(stream) = self.stream_map.get_mut(&id) {
280             if stream.recv_window.notification_available() < size {
281                 return Err(H2Error::StreamError(id, ErrorCode::FlowControlError).into());
282             }
283             stream.recv_window.recv_data(size);
284             // determine whether it is necessary to update the stream window
285             if stream.recv_window.unreleased_size().is_some() {
286                 if !stream.is_init_or_active_flow_control() {
287                     return Ok(());
288                 }
289                 if let Some(window_update) = stream.recv_window.check_window_update(id) {
290                     sender
291                         .send(window_update)
292                         .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
293                 }
294             }
295         }
296         Ok(())
297     }
298 
release_conn_recv_window( &mut self, size: u32, sender: &UnboundedSender<Frame>, ) -> Result<(), DispatchErrorKind>299     pub(crate) fn release_conn_recv_window(
300         &mut self,
301         size: u32,
302         sender: &UnboundedSender<Frame>,
303     ) -> Result<(), DispatchErrorKind> {
304         if self.flow_control.recv_notification_size_available() < size {
305             return Err(H2Error::ConnectionError(ErrorCode::FlowControlError).into());
306         }
307         self.flow_control.recv_data(size);
308         // determine whether it is necessary to update the connection window
309         if let Some(window_update) = self.flow_control.check_conn_recv_window_update() {
310             sender
311                 .send(window_update)
312                 .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
313         }
314         Ok(())
315     }
316 
is_closed(&self) -> bool317     pub(crate) fn is_closed(&self) -> bool {
318         for (_id, stream) in self.stream_map.iter() {
319             match stream.state {
320                 H2StreamState::Closed(_) => {}
321                 _ => {
322                     return false;
323                 }
324             }
325         }
326         true
327     }
328 
stream_state(&self, id: StreamId) -> Option<H2StreamState>329     pub(crate) fn stream_state(&self, id: StreamId) -> Option<H2StreamState> {
330         self.stream_map.get(&id).map(|stream| stream.state)
331     }
332 
insert(&mut self, id: StreamId, headers: Frame, data: BodyDataRef)333     pub(crate) fn insert(&mut self, id: StreamId, headers: Frame, data: BodyDataRef) {
334         let send_window = SendWindow::new(self.stream_send_window_size as i32);
335         let recv_window = RecvWindow::new(self.stream_recv_window_size as i32);
336         let stream = Stream::new(recv_window, send_window, headers, data);
337         self.stream_map.insert(id, stream);
338     }
339 
push_back_pending_send(&mut self, id: StreamId)340     pub(crate) fn push_back_pending_send(&mut self, id: StreamId) {
341         self.pending_send.push_back(id);
342     }
343 
push_pending_concurrency(&mut self, id: StreamId)344     pub(crate) fn push_pending_concurrency(&mut self, id: StreamId) {
345         self.pending_concurrency.push_back(id);
346     }
347 
is_pending_concurrency_empty(&self) -> bool348     pub(crate) fn is_pending_concurrency_empty(&self) -> bool {
349         self.pending_concurrency.is_empty()
350     }
351 
next_pending_stream(&mut self) -> Option<StreamId>352     pub(crate) fn next_pending_stream(&mut self) -> Option<StreamId> {
353         self.pending_send.pop_front()
354     }
355 
pending_stream_num(&self) -> usize356     pub(crate) fn pending_stream_num(&self) -> usize {
357         self.pending_send.len()
358     }
359 
try_consume_pending_concurrency(&mut self)360     pub(crate) fn try_consume_pending_concurrency(&mut self) {
361         while !self.reach_max_concurrency() {
362             match self.pending_concurrency.pop_front() {
363                 None => {
364                     return;
365                 }
366                 Some(id) => {
367                     self.increase_current_concurrency();
368                     self.push_back_pending_send(id);
369                 }
370             }
371         }
372     }
373 
increase_conn_send_window(&mut self, size: u32) -> Result<(), H2Error>374     pub(crate) fn increase_conn_send_window(&mut self, size: u32) -> Result<(), H2Error> {
375         self.flow_control.increase_send_size(size)
376     }
377 
reassign_conn_send_window(&mut self)378     pub(crate) fn reassign_conn_send_window(&mut self) {
379         // Since the data structure of the body is a stream,
380         // the size of a body cannot be obtained,
381         // so all streams in pending_conn_window are added to the pending_send queue
382         // again.
383         loop {
384             match self.pending_conn_window.pop_front() {
385                 None => break,
386                 Some(id) => {
387                     self.push_back_pending_send(id);
388                 }
389             }
390         }
391     }
392 
reassign_stream_send_window( &mut self, id: StreamId, size: u32, ) -> Result<(), H2Error>393     pub(crate) fn reassign_stream_send_window(
394         &mut self,
395         id: StreamId,
396         size: u32,
397     ) -> Result<(), H2Error> {
398         if let Some(stream) = self.stream_map.get_mut(&id) {
399             stream.send_window.increase_size(size)?;
400         }
401         if self.pending_stream_window.take(&id).is_some() {
402             self.pending_send.push_back(id);
403         }
404         Ok(())
405     }
406 
headers(&mut self, id: StreamId) -> Result<Option<Frame>, H2Error>407     pub(crate) fn headers(&mut self, id: StreamId) -> Result<Option<Frame>, H2Error> {
408         match self.stream_map.get_mut(&id) {
409             None => Err(H2Error::ConnectionError(ErrorCode::IntervalError)),
410             Some(stream) => match stream.state {
411                 H2StreamState::Closed(_) => Ok(None),
412                 _ => Ok(stream.header.take()),
413             },
414         }
415     }
416 
poll_read_body( &mut self, cx: &mut Context<'_>, id: StreamId, ) -> Result<DataReadState, H2Error>417     pub(crate) fn poll_read_body(
418         &mut self,
419         cx: &mut Context<'_>,
420         id: StreamId,
421     ) -> Result<DataReadState, H2Error> {
422         // TODO Since the Array length needs to be a constant,
423         // the minimum value is used here, which can be optimized to the MAX_FRAME_SIZE
424         // updated in SETTINGS
425         const DEFAULT_MAX_FRAME_SIZE: usize = 16 * 1024;
426 
427         match self.stream_map.get_mut(&id) {
428             None => Err(H2Error::ConnectionError(ErrorCode::IntervalError)),
429             Some(stream) => match stream.state {
430                 H2StreamState::Closed(_) => Ok(DataReadState::Closed),
431                 _ => {
432                     let stream_send_vacant = stream.send_window.size_available() as usize;
433                     if stream_send_vacant == 0 {
434                         self.pending_stream_window.insert(id);
435                         return Ok(DataReadState::Pending);
436                     }
437                     let conn_send_vacant = self.flow_control.send_size_available();
438                     if conn_send_vacant == 0 {
439                         self.pending_conn_window.push_back(id);
440                         return Ok(DataReadState::Pending);
441                     }
442 
443                     let available = min(stream_send_vacant, conn_send_vacant);
444                     let len = min(available, DEFAULT_MAX_FRAME_SIZE);
445 
446                     let mut buf = [0u8; DEFAULT_MAX_FRAME_SIZE];
447                     self.poll_sized_data(cx, id, &mut buf[..len])
448                 }
449             },
450         }
451     }
452 
poll_sized_data( &mut self, cx: &mut Context<'_>, id: StreamId, buf: &mut [u8], ) -> Result<DataReadState, H2Error>453     fn poll_sized_data(
454         &mut self,
455         cx: &mut Context<'_>,
456         id: StreamId,
457         buf: &mut [u8],
458     ) -> Result<DataReadState, H2Error> {
459         let stream = if let Some(stream) = self.stream_map.get_mut(&id) {
460             stream
461         } else {
462             return Err(H2Error::ConnectionError(ErrorCode::IntervalError));
463         };
464         match stream.data.poll_read(cx, buf) {
465             Poll::Ready(Ok(size)) => {
466                 if size > 0 {
467                     stream.send_window.send_data(size as u32);
468                     self.flow_control.send_data(size as u32);
469                     let data_vec = Vec::from(&buf[..size]);
470                     let flag = FrameFlags::new(0);
471 
472                     Ok(DataReadState::Ready(Frame::new(
473                         id,
474                         flag,
475                         Payload::Data(Data::new(data_vec)),
476                     )))
477                 } else {
478                     let data_vec = vec![];
479                     let mut flag = FrameFlags::new(1);
480                     flag.set_end_stream(true);
481                     Ok(DataReadState::Finish(Frame::new(
482                         id,
483                         flag,
484                         Payload::Data(Data::new(data_vec)),
485                     )))
486                 }
487             }
488             Poll::Ready(Err(_)) => Err(H2Error::StreamError(id, ErrorCode::IntervalError)),
489             Poll::Pending => {
490                 self.push_back_pending_send(id);
491                 Ok(DataReadState::Pending)
492             }
493         }
494     }
495 
496     // Get unset streams less than or equal to last_stream_id and change the state
497     // of streams greater than last_stream_id to RemoteAaway
get_unset_streams(&mut self, last_stream_id: StreamId) -> Vec<StreamId>498     pub(crate) fn get_unset_streams(&mut self, last_stream_id: StreamId) -> Vec<StreamId> {
499         let mut ids = vec![];
500         for (id, unsent_stream) in self.stream_map.iter_mut() {
501             if *id > last_stream_id {
502                 match unsent_stream.state {
503                     // TODO Whether the close state needs to be selected.
504                     H2StreamState::Closed(_) => {}
505                     H2StreamState::Idle => {
506                         unsent_stream.state = H2StreamState::Closed(CloseReason::RemoteGoAway);
507                         unsent_stream.header = None;
508                         unsent_stream.data.clear();
509                     }
510                     _ => {
511                         self.current_concurrent_streams -= 1;
512                         unsent_stream.state = H2StreamState::Closed(CloseReason::RemoteGoAway);
513                         unsent_stream.header = None;
514                         unsent_stream.data.clear();
515                     }
516                 };
517                 ids.push(*id);
518             }
519         }
520         ids
521     }
522 
get_all_unclosed_streams(&mut self) -> Vec<StreamId>523     pub(crate) fn get_all_unclosed_streams(&mut self) -> Vec<StreamId> {
524         let mut ids = vec![];
525         for (id, stream) in self.stream_map.iter_mut() {
526             match stream.state {
527                 H2StreamState::Closed(_) => {}
528                 _ => {
529                     stream.header = None;
530                     stream.data.clear();
531                     stream.state = H2StreamState::Closed(CloseReason::LocalGoAway);
532                     ids.push(*id);
533                 }
534             }
535         }
536         ids
537     }
538 
clear_streams_states(&mut self)539     pub(crate) fn clear_streams_states(&mut self) {
540         self.window_updating_streams.clear();
541         self.pending_stream_window.clear();
542         self.pending_send.clear();
543         self.pending_conn_window.clear();
544         self.pending_concurrency.clear();
545     }
546 
send_local_reset(&mut self, id: StreamId) -> StreamEndState547     pub(crate) fn send_local_reset(&mut self, id: StreamId) -> StreamEndState {
548         return match self.stream_map.get_mut(&id) {
549             None => StreamEndState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
550             Some(stream) => match stream.state {
551                 H2StreamState::Closed(
552                     CloseReason::LocalRst
553                     | CloseReason::LocalGoAway
554                     | CloseReason::RemoteRst
555                     | CloseReason::RemoteGoAway,
556                 ) => StreamEndState::Ignore,
557                 H2StreamState::Closed(CloseReason::EndStream) => {
558                     stream.state = H2StreamState::Closed(CloseReason::LocalRst);
559                     StreamEndState::Ignore
560                 }
561                 _ => {
562                     stream.state = H2StreamState::Closed(CloseReason::LocalRst);
563                     stream.header = None;
564                     stream.data.clear();
565                     self.decrease_current_concurrency();
566                     StreamEndState::OK
567                 }
568             },
569         };
570     }
571 
send_headers_frame(&mut self, id: StreamId, eos: bool) -> FrameRecvState572     pub(crate) fn send_headers_frame(&mut self, id: StreamId, eos: bool) -> FrameRecvState {
573         match self.stream_map.get_mut(&id) {
574             None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
575             Some(stream) => match &stream.state {
576                 H2StreamState::Idle => {
577                     stream.state = if eos {
578                         H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders)
579                     } else {
580                         H2StreamState::Open {
581                             send: ActiveState::WaitData,
582                             recv: ActiveState::WaitHeaders,
583                         }
584                     };
585                 }
586                 H2StreamState::Open {
587                     send: ActiveState::WaitHeaders,
588                     recv,
589                 } => {
590                     stream.state = if eos {
591                         H2StreamState::LocalHalfClosed(*recv)
592                     } else {
593                         H2StreamState::Open {
594                             send: ActiveState::WaitData,
595                             recv: *recv,
596                         }
597                     };
598                 }
599                 H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders) => {
600                     stream.state = if eos {
601                         self.current_concurrent_streams -= 1;
602                         H2StreamState::Closed(CloseReason::EndStream)
603                     } else {
604                         H2StreamState::RemoteHalfClosed(ActiveState::WaitData)
605                     };
606                 }
607                 _ => {
608                     return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
609                 }
610             },
611         }
612         FrameRecvState::OK
613     }
614 
send_data_frame(&mut self, id: StreamId, eos: bool) -> FrameRecvState615     pub(crate) fn send_data_frame(&mut self, id: StreamId, eos: bool) -> FrameRecvState {
616         match self.stream_map.get_mut(&id) {
617             None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
618             Some(stream) => match &stream.state {
619                 H2StreamState::Open {
620                     send: ActiveState::WaitData,
621                     recv,
622                 } => {
623                     if eos {
624                         stream.state = H2StreamState::LocalHalfClosed(*recv);
625                     }
626                 }
627                 H2StreamState::RemoteHalfClosed(ActiveState::WaitData) => {
628                     if eos {
629                         self.current_concurrent_streams -= 1;
630                         stream.state = H2StreamState::Closed(CloseReason::EndStream);
631                     }
632                 }
633                 _ => {
634                     return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
635                 }
636             },
637         }
638         FrameRecvState::OK
639     }
640 
recv_remote_reset(&mut self, id: StreamId) -> StreamEndState641     pub(crate) fn recv_remote_reset(&mut self, id: StreamId) -> StreamEndState {
642         if id > self.max_recv_id {
643             return StreamEndState::Ignore;
644         }
645         return match self.stream_map.get_mut(&id) {
646             None => StreamEndState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
647             Some(stream) => match stream.state {
648                 H2StreamState::Closed(..) => StreamEndState::Ignore,
649                 _ => {
650                     stream.state = H2StreamState::Closed(CloseReason::RemoteRst);
651                     stream.header = None;
652                     stream.data.clear();
653                     self.decrease_current_concurrency();
654                     StreamEndState::OK
655                 }
656             },
657         };
658     }
659 
recv_headers(&mut self, id: StreamId, eos: bool) -> FrameRecvState660     pub(crate) fn recv_headers(&mut self, id: StreamId, eos: bool) -> FrameRecvState {
661         if id > self.max_recv_id {
662             return FrameRecvState::Ignore;
663         }
664 
665         match self.stream_map.get_mut(&id) {
666             None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
667             Some(stream) => match &stream.state {
668                 H2StreamState::Idle => {
669                     change_stream_state!(Idle: eos, stream.state);
670                 }
671                 H2StreamState::ReservedRemote => {
672                     change_stream_state!(HalfClosed: eos, stream.state);
673                     if eos {
674                         self.decrease_current_concurrency();
675                     }
676                 }
677                 H2StreamState::Open {
678                     send,
679                     recv: ActiveState::WaitHeaders,
680                 } => {
681                     change_stream_state!(Open: eos, stream.state, send);
682                 }
683                 H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders) => {
684                     change_stream_state!(HalfClosed: eos, stream.state);
685                     if eos {
686                         self.decrease_current_concurrency();
687                     }
688                 }
689                 H2StreamState::Closed(CloseReason::LocalGoAway | CloseReason::LocalRst) => {
690                     return FrameRecvState::Ignore;
691                 }
692                 _ => {
693                     return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
694                 }
695             },
696         }
697         FrameRecvState::OK
698     }
699 
recv_data(&mut self, id: StreamId, eos: bool) -> FrameRecvState700     pub(crate) fn recv_data(&mut self, id: StreamId, eos: bool) -> FrameRecvState {
701         if id > self.max_recv_id {
702             return FrameRecvState::Ignore;
703         }
704         match self.stream_map.get_mut(&id) {
705             None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
706             Some(stream) => match &stream.state {
707                 H2StreamState::Open {
708                     send,
709                     recv: ActiveState::WaitData,
710                 } => {
711                     if eos {
712                         stream.state = H2StreamState::RemoteHalfClosed(*send);
713                     }
714                 }
715                 H2StreamState::LocalHalfClosed(ActiveState::WaitData) => {
716                     if eos {
717                         stream.state = H2StreamState::Closed(CloseReason::EndStream);
718                         self.decrease_current_concurrency();
719                     }
720                 }
721                 H2StreamState::Closed(CloseReason::LocalGoAway | CloseReason::LocalRst) => {
722                     return FrameRecvState::Ignore;
723                 }
724                 _ => {
725                     return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
726                 }
727             },
728         }
729         FrameRecvState::OK
730     }
731 
generate_id(&mut self) -> Result<StreamId, DispatchErrorKind>732     pub(crate) fn generate_id(&mut self) -> Result<StreamId, DispatchErrorKind> {
733         let id = self.next_stream_id;
734         if self.next_stream_id < DEFAULT_MAX_STREAM_ID {
735             self.next_stream_id += 2;
736             Ok(id)
737         } else {
738             Err(DispatchErrorKind::H2(H2Error::ConnectionError(
739                 ErrorCode::ProtocolError,
740             )))
741         }
742     }
743 }
744 
745 impl Stream {
new( recv_window: RecvWindow, send_window: SendWindow, headers: Frame, data: BodyDataRef, ) -> Self746     pub(crate) fn new(
747         recv_window: RecvWindow,
748         send_window: SendWindow,
749         headers: Frame,
750         data: BodyDataRef,
751     ) -> Self {
752         Self {
753             recv_window,
754             send_window,
755             state: H2StreamState::Idle,
756             header: Some(headers),
757             data,
758         }
759     }
760 
is_init_or_active_flow_control(&self) -> bool761     pub(crate) fn is_init_or_active_flow_control(&self) -> bool {
762         matches!(
763             self.state,
764             H2StreamState::Idle
765                 | H2StreamState::Open {
766                     recv: ActiveState::WaitData,
767                     ..
768                 }
769                 | H2StreamState::LocalHalfClosed(ActiveState::WaitData)
770         )
771     }
772 }
773 
774 #[cfg(test)]
775 mod ut_h2streamstate {
776     use super::*;
777 
778     /// UT test case for `H2StreamState` with some states.
779     ///
780     /// # Brief
781     /// 1. Creates an H2StreamState with open, LocalHalfClosed, Closed state.
782     /// 2. Asserts that the send and recv field are as expected.
783     #[test]
ut_hss()784     fn ut_hss() {
785         let state = H2StreamState::Open {
786             send: ActiveState::WaitHeaders,
787             recv: ActiveState::WaitData,
788         };
789         if let H2StreamState::Open { send, recv } = state {
790             assert_eq!(send, ActiveState::WaitHeaders);
791             assert_eq!(recv, ActiveState::WaitData);
792         };
793 
794         let state = H2StreamState::LocalHalfClosed(ActiveState::WaitData);
795         if let H2StreamState::LocalHalfClosed(recv) = state {
796             assert_eq!(recv, ActiveState::WaitData);
797         };
798 
799         let state = H2StreamState::Closed(CloseReason::EndStream);
800         if let H2StreamState::Closed(reason) = state {
801             assert_eq!(reason, CloseReason::EndStream);
802         }
803     }
804 }
805 
806 #[cfg(test)]
807 mod ut_streams {
808     use super::*;
809     use crate::async_impl::{Body, Request};
810     use crate::request::RequestArc;
811     use crate::util::progress::SpeedController;
812 
stream_new(state: H2StreamState) -> Stream813     fn stream_new(state: H2StreamState) -> Stream {
814         Stream {
815             send_window: SendWindow::new(100),
816             recv_window: RecvWindow::new(100),
817             state,
818             header: None,
819             data: BodyDataRef::new(
820                 RequestArc::new(Request::builder().body(Body::empty()).unwrap()),
821                 SpeedController::none(),
822             ),
823         }
824     }
825 
826     /// UT test case for `Streams::apply_max_concurrent_streams`.
827     ///
828     /// # Brief
829     /// 1. Sets the max concurrent streams to 2.
830     /// 2. Increases current concurrency twice and checks if it reaches max
831     ///    concurrency.
832     #[test]
ut_streams_apply_max_concurrent_streams()833     fn ut_streams_apply_max_concurrent_streams() {
834         let mut streams = Streams::new(100, 200, FlowControl::new(300, 400));
835         streams.apply_max_concurrent_streams(2);
836         streams.increase_current_concurrency();
837         assert!(!streams.reach_max_concurrency());
838         streams.increase_current_concurrency();
839         assert!(streams.reach_max_concurrency());
840     }
841 
842     /// UT test case for `Streams::apply_send_initial_window_size` and
843     /// `Streams::apply_recv_initial_window_size`.
844     ///
845     /// # Brief
846     /// 1. Adjusts the initial send and recv window size and checks for correct
847     ///    application.
848     /// 2. Asserts correct window sizes and that `pending_send` queue is empty
849     ///    and correct notification window sizes.
850     #[test]
ut_streams_apply_send_initial_window_size()851     fn ut_streams_apply_send_initial_window_size() {
852         let mut streams = Streams::new(100, 100, FlowControl::new(100, 100));
853         streams
854             .stream_map
855             .insert(1, stream_new(H2StreamState::Idle));
856 
857         assert!(streams.apply_send_initial_window_size(200).is_ok());
858         let stream = streams.stream_map.get(&1).unwrap();
859         assert_eq!(stream.send_window.size_available(), 200);
860         assert!(streams.pending_send.is_empty());
861 
862         assert!(streams.apply_send_initial_window_size(50).is_ok());
863         let stream = streams.stream_map.get(&1).unwrap();
864         assert_eq!(stream.send_window.size_available(), 50);
865         assert!(streams.pending_send.is_empty());
866 
867         assert!(streams.apply_send_initial_window_size(100).is_ok());
868         let stream = streams.stream_map.get(&1).unwrap();
869         assert_eq!(stream.send_window.size_available(), 100);
870         assert!(streams.pending_send.is_empty());
871 
872         streams.apply_recv_initial_window_size(200);
873         let stream = streams.stream_map.get(&1).unwrap();
874         assert_eq!(stream.recv_window.notification_available(), 200);
875 
876         streams.apply_recv_initial_window_size(50);
877         let stream = streams.stream_map.get(&1).unwrap();
878         assert_eq!(stream.recv_window.notification_available(), 50);
879 
880         streams.apply_recv_initial_window_size(100);
881         let stream = streams.stream_map.get(&1).unwrap();
882         assert_eq!(stream.recv_window.notification_available(), 100);
883     }
884 
885     /// UT test case for `Streams::get_unset_streams`.
886     ///
887     /// # Brief
888     /// 1. Insert streams with different states and sends go_away with a stream
889     ///    id.
890     /// 2. Asserts that only streams with IDs greater than to the go_away ID are
891     ///    closed.
892     #[test]
ut_streams_get_unset_streams()893     fn ut_streams_get_unset_streams() {
894         let mut streams = Streams::new(100, 100, FlowControl::new(100, 100));
895         streams.apply_max_concurrent_streams(4);
896         streams
897             .stream_map
898             .insert(1, stream_new(H2StreamState::Idle));
899         streams.increase_current_concurrency();
900         streams
901             .stream_map
902             .insert(2, stream_new(H2StreamState::Idle));
903         streams.increase_current_concurrency();
904         streams.stream_map.insert(
905             3,
906             stream_new(H2StreamState::Open {
907                 send: ActiveState::WaitHeaders,
908                 recv: ActiveState::WaitData,
909             }),
910         );
911         streams.increase_current_concurrency();
912         streams
913             .stream_map
914             .insert(4, stream_new(H2StreamState::Closed(CloseReason::EndStream)));
915         streams.increase_current_concurrency();
916 
917         let go_away_streams = streams.get_unset_streams(2);
918         assert!([3, 4].iter().all(|&e| go_away_streams.contains(&e)));
919 
920         let state = streams.stream_state(1).unwrap();
921         assert_eq!(state, H2StreamState::Idle);
922         let state = streams.stream_state(2).unwrap();
923         assert_eq!(state, H2StreamState::Idle);
924         let state = streams.stream_state(3).unwrap();
925         assert_eq!(state, H2StreamState::Closed(CloseReason::RemoteGoAway));
926         let state = streams.stream_state(4).unwrap();
927         assert_eq!(state, H2StreamState::Closed(CloseReason::EndStream));
928     }
929 
930     /// UT test case for `Streams::get_all_unclosed_streams`.
931     ///
932     /// # Brief
933     /// 1. Inserts streams with different states.
934     /// 2. Asserts that only unclosed streams are returned.
935     #[test]
ut_streams_get_all_unclosed_streams()936     fn ut_streams_get_all_unclosed_streams() {
937         let mut streams = Streams::new(1000, 1000, FlowControl::new(1000, 1000));
938         streams.apply_max_concurrent_streams(2);
939         streams
940             .stream_map
941             .insert(1, stream_new(H2StreamState::Idle));
942         streams.increase_current_concurrency();
943         streams
944             .stream_map
945             .insert(2, stream_new(H2StreamState::Closed(CloseReason::EndStream)));
946         streams.increase_current_concurrency();
947         assert_eq!(streams.get_all_unclosed_streams(), [1]);
948     }
949 
950     /// UT test case for `Streams::clear_streams_states`.
951     ///
952     /// # Brief
953     /// 1. Clears all the pending and window updating stream states.
954     /// 2. Asserts that all relevant collections are empty after clearing.
955     #[test]
ut_streams_clear_streams_states()956     fn ut_streams_clear_streams_states() {
957         let mut streams = Streams::new(1000, 1000, FlowControl::new(1000, 1000));
958         streams.clear_streams_states();
959         assert!(streams.window_updating_streams.is_empty());
960         assert!(streams.pending_stream_window.is_empty());
961         assert!(streams.pending_send.is_empty());
962         assert!(streams.pending_conn_window.is_empty());
963         assert!(streams.pending_concurrency.is_empty());
964     }
965 
966     /// UT test case for `Streams::send_local_reset`.
967     ///
968     /// # Brief
969     /// 1. Sends local reset on streams with different states.
970     /// 2. Asserts correct handing o each state.
971     #[test]
ut_streams_send_local_reset()972     fn ut_streams_send_local_reset() {
973         let mut streams = Streams::new(1000, 1000, FlowControl::new(1000, 1000));
974         streams.apply_max_concurrent_streams(3);
975         streams
976             .stream_map
977             .insert(1, stream_new(H2StreamState::Idle));
978         streams.increase_current_concurrency();
979         streams.stream_map.insert(
980             2,
981             stream_new(H2StreamState::Closed(CloseReason::RemoteGoAway)),
982         );
983         streams.increase_current_concurrency();
984         streams
985             .stream_map
986             .insert(3, stream_new(H2StreamState::Closed(CloseReason::EndStream)));
987         streams.increase_current_concurrency();
988         assert_eq!(
989             streams.send_local_reset(4),
990             StreamEndState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError))
991         );
992         assert_eq!(streams.send_local_reset(3), StreamEndState::Ignore);
993         assert_eq!(streams.send_local_reset(2), StreamEndState::Ignore);
994         assert_eq!(streams.send_local_reset(1), StreamEndState::OK);
995     }
996 
997     /// UT test case for `Streams::send_headers_frame`.
998     ///
999     /// # Brief
1000     /// 1. Send headers frame on a stream.
1001     /// 2. Asserts correct handling of frame and stream state changes.
1002     #[test]
ut_streams_send_headers_frame()1003     fn ut_streams_send_headers_frame() {
1004         let mut streams = Streams::new(100, 100, FlowControl::new(100, 100));
1005         streams.apply_max_concurrent_streams(1);
1006         streams
1007             .stream_map
1008             .insert(1, stream_new(H2StreamState::Idle));
1009         streams.increase_current_concurrency();
1010         let res = streams.send_headers_frame(1, true);
1011         assert_eq!(res, FrameRecvState::OK);
1012         assert_eq!(
1013             streams.stream_state(1).unwrap(),
1014             H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders)
1015         );
1016         let res = streams.send_headers_frame(1, true);
1017         assert_eq!(
1018             res,
1019             FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError))
1020         );
1021     }
1022 
1023     /// UT test case for `Streams::send_data_frame`.
1024     ///
1025     /// # Brief
1026     /// 1. Sends data frame on a stream.
1027     /// 2. Asserts correct handling of frame and stream state changes.
1028     #[test]
ut_streams_send_data_frame()1029     fn ut_streams_send_data_frame() {
1030         let mut streams = Streams::new(100, 100, FlowControl::new(100, 100));
1031         streams.stream_map.insert(
1032             1,
1033             stream_new(H2StreamState::Open {
1034                 send: ActiveState::WaitData,
1035                 recv: ActiveState::WaitHeaders,
1036             }),
1037         );
1038         streams.increase_current_concurrency();
1039         let res = streams.send_data_frame(1, true);
1040         assert_eq!(res, FrameRecvState::OK);
1041         assert_eq!(
1042             streams.stream_state(1).unwrap(),
1043             H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders)
1044         );
1045         let res = streams.send_data_frame(1, true);
1046         assert_eq!(
1047             res,
1048             FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError))
1049         );
1050     }
1051 
1052     /// UT test for `Streams::recv_remote_reset`.
1053     ///
1054     /// # Brief
1055     /// 1. Receives remote reset on streams with different states.
1056     /// 2. Asserts correct handling of each state.
1057     #[test]
ut_streams_recv_remote_reset()1058     fn ut_streams_recv_remote_reset() {
1059         let mut streams = Streams::new(100, 100, FlowControl::new(100, 100));
1060         streams.apply_max_concurrent_streams(1);
1061         streams.stream_map.insert(
1062             1,
1063             stream_new(H2StreamState::Open {
1064                 send: ActiveState::WaitData,
1065                 recv: ActiveState::WaitHeaders,
1066             }),
1067         );
1068         streams.increase_current_concurrency();
1069         let res = streams.recv_remote_reset(1);
1070         assert_eq!(res, StreamEndState::OK);
1071         assert_eq!(
1072             streams.stream_state(1).unwrap(),
1073             H2StreamState::Closed(CloseReason::RemoteRst)
1074         );
1075         let res = streams.recv_remote_reset(1);
1076         assert_eq!(res, StreamEndState::Ignore);
1077     }
1078 
1079     /// UT test case for `Streams::recv_headers`.
1080     ///
1081     /// # Brief
1082     /// 1. Receives headers on a stream and checks for state changes.
1083     /// 2. Asserts error handling when headers are received again.
1084     #[test]
ut_streams_recv_headers()1085     fn ut_streams_recv_headers() {
1086         let mut streams = Streams::new(100, 100, FlowControl::new(100, 100));
1087         streams.apply_max_concurrent_streams(1);
1088         streams
1089             .stream_map
1090             .insert(1, stream_new(H2StreamState::Idle));
1091         let res = streams.recv_headers(1, false);
1092         assert_eq!(res, FrameRecvState::OK);
1093         assert_eq!(
1094             streams.stream_state(1).unwrap(),
1095             H2StreamState::Open {
1096                 send: ActiveState::WaitHeaders,
1097                 recv: ActiveState::WaitData,
1098             }
1099         );
1100         let res = streams.recv_headers(1, false);
1101         assert_eq!(
1102             res,
1103             FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError))
1104         );
1105     }
1106 
1107     /// UT test case for `Streams::recv_data`.
1108     ///
1109     /// # Brief
1110     /// 1. Receives data on a stream and checks for state changes.
1111     /// 2. Assert correct state when data is received with eos flag.
1112     #[test]
ut_streams_recv_data()1113     fn ut_streams_recv_data() {
1114         let mut streams = Streams::new(100, 100, FlowControl::new(100, 100));
1115         streams.stream_map.insert(
1116             1,
1117             stream_new(H2StreamState::Open {
1118                 send: ActiveState::WaitHeaders,
1119                 recv: ActiveState::WaitData,
1120             }),
1121         );
1122         let res = streams.recv_data(1, false);
1123         assert_eq!(res, FrameRecvState::OK);
1124         assert_eq!(
1125             streams.stream_state(1).unwrap(),
1126             H2StreamState::Open {
1127                 send: ActiveState::WaitHeaders,
1128                 recv: ActiveState::WaitData,
1129             }
1130         );
1131         let res = streams.recv_data(1, true);
1132         assert_eq!(res, FrameRecvState::OK);
1133         assert_eq!(
1134             streams.stream_state(1).unwrap(),
1135             H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders)
1136         );
1137     }
1138 }
1139