• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Streams operations utils.
15 
16 use std::cmp::{min, Ordering};
17 use std::collections::{HashMap, HashSet, VecDeque};
18 use std::task::{Context, Poll};
19 
20 use ylong_http::h2::{Data, ErrorCode, Frame, FrameFlags, H2Error, Payload, StreamId};
21 
22 use crate::runtime::UnboundedSender;
23 use crate::util::data_ref::BodyDataRef;
24 use crate::util::dispatcher::http2::DispatchErrorKind;
25 use crate::util::h2::buffer::{FlowControl, RecvWindow, SendWindow};
26 
27 pub(crate) const INITIAL_MAX_SEND_STREAM_ID: StreamId = u32::MAX >> 1;
28 pub(crate) const INITIAL_MAX_RECV_STREAM_ID: StreamId = u32::MAX >> 1;
29 
30 const DEFAULT_MAX_STREAM_ID: StreamId = u32::MAX >> 1;
31 const INITIAL_LATEST_REMOTE_ID: StreamId = 0;
32 const DEFAULT_MAX_CONCURRENT_STREAMS: u32 = 100;
33 
34 #[cfg_attr(test, derive(Debug, PartialEq))]
35 pub(crate) enum FrameRecvState {
36     OK,
37     Ignore,
38     Err(H2Error),
39 }
40 
41 pub(crate) enum DataReadState {
42     Closed,
43     // Wait for poll_read or wait for window.
44     Pending,
45     Ready(Frame),
46     Finish(Frame),
47 }
48 #[cfg_attr(test, derive(Debug, PartialEq))]
49 pub(crate) enum StreamEndState {
50     OK,
51     Ignore,
52     Err(H2Error),
53 }
54 
55 //                              +--------+
56 //                      send PP |        | recv PP
57 //                     ,--------|  idle  |--------.
58 //                    /         |        |         \
59 //                   v          +--------+          v
60 //            +----------+          |           +----------+
61 //            |          |          | send H /  |          |
62 //     ,------| reserved |          | recv H    | reserved |------.
63 //     |      | (local)  |          |           | (remote) |      |
64 //     |      +----------+          v           +----------+      |
65 //     |          |             +--------+             |          |
66 //     |          |     recv ES |        | send ES     |          |
67 //     |   send H |     ,-------|  open  |-------.     | recv H   |
68 //     |          |    /        |        |        \    |          |
69 //     |          v   v         +--------+         v   v          |
70 //     |      +----------+          |           +----------+      |
71 //     |      |   half   |          |           |   half   |      |
72 //     |      |  closed  |          | send R /  |  closed  |      |
73 //     |      | (remote) |          | recv R    | (local)  |      |
74 //     |      +----------+          |           +----------+      |
75 //     |           |                |                 |           |
76 //     |           | send ES /      |       recv ES / |           |
77 //     |           | send R /       v        send R / |           |
78 //     |           | recv R     +--------+   recv R   |           |
79 //     | send R /  `----------->|        |<-----------'  send R / |
80 //     | recv R                 | closed |               recv R   |
81 //     `----------------------->|        |<----------------------'
82 //                              +--------+
83 #[derive(Copy, Clone, Debug)]
84 #[cfg_attr(test, derive(PartialEq))]
85 pub(crate) enum H2StreamState {
86     Idle,
87     // When response does not depend on request,
88     // the server can send response directly without waiting for the request to finish receiving.
89     // Therefore, the sending and receiving states of the client have their own states
90     Open {
91         send: ActiveState,
92         recv: ActiveState,
93     },
94     #[allow(dead_code)]
95     ReservedRemote,
96     // After the request is sent, the state is waiting for the response to be received.
97     LocalHalfClosed(ActiveState),
98     // When the response is received but the request is not fully sent,
99     // this indicates the status of the request being sent
100     RemoteHalfClosed(ActiveState),
101     Closed(CloseReason),
102 }
103 
104 #[derive(Copy, Clone, Debug)]
105 #[cfg_attr(test, derive(PartialEq))]
106 pub(crate) enum CloseReason {
107     LocalRst,
108     RemoteRst,
109     RemoteGoAway,
110     LocalGoAway,
111     EndStream,
112 }
113 
114 #[derive(Copy, Clone, Debug)]
115 #[cfg_attr(test, derive(PartialEq))]
116 pub(crate) enum ActiveState {
117     WaitHeaders,
118     WaitData,
119 }
120 
121 pub(crate) struct Stream {
122     pub(crate) recv_window: RecvWindow,
123     pub(crate) send_window: SendWindow,
124     pub(crate) state: H2StreamState,
125     pub(crate) header: Option<Frame>,
126     pub(crate) data: BodyDataRef,
127 }
128 
129 pub(crate) struct RequestWrapper {
130     pub(crate) flag: FrameFlags,
131     pub(crate) payload: Payload,
132     pub(crate) data: BodyDataRef,
133 }
134 
135 pub(crate) struct Streams {
136     // Records the received goaway last_stream_id.
137     pub(crate) max_send_id: StreamId,
138     // Records the send goaway last_stream_id.
139     pub(crate) max_recv_id: StreamId,
140     // Currently the client doesn't support push promise, so this value is always 0.
141     pub(crate) latest_remote_id: StreamId,
142     pub(crate) stream_recv_window_size: u32,
143     pub(crate) stream_send_window_size: u32,
144     max_concurrent_streams: u32,
145     current_concurrent_streams: u32,
146     flow_control: FlowControl,
147     pending_concurrency: VecDeque<StreamId>,
148     pending_stream_window: HashSet<u32>,
149     pending_conn_window: VecDeque<u32>,
150     pending_send: VecDeque<StreamId>,
151     window_updating_streams: VecDeque<StreamId>,
152     pub(crate) stream_map: HashMap<StreamId, Stream>,
153     pub(crate) next_stream_id: StreamId,
154 }
155 
156 macro_rules! change_stream_state {
157     (Idle: $eos: expr, $state: expr) => {
158         $state = if $eos {
159             H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders)
160         } else {
161             H2StreamState::Open {
162                 send: ActiveState::WaitHeaders,
163                 recv: ActiveState::WaitData,
164             }
165         };
166     };
167     (Open: $eos: expr, $state: expr, $send: expr) => {
168         $state = if $eos {
169             H2StreamState::RemoteHalfClosed($send.clone())
170         } else {
171             H2StreamState::Open {
172                 send: $send.clone(),
173                 recv: ActiveState::WaitData,
174             }
175         };
176     };
177     (HalfClosed: $eos: expr, $state: expr) => {
178         $state = if $eos {
179             H2StreamState::Closed(CloseReason::EndStream)
180         } else {
181             H2StreamState::LocalHalfClosed(ActiveState::WaitData)
182         };
183     };
184 }
185 
186 impl Streams {
new( recv_window_size: u32, send_window_size: u32, flow_control: FlowControl, ) -> Self187     pub(crate) fn new(
188         recv_window_size: u32,
189         send_window_size: u32,
190         flow_control: FlowControl,
191     ) -> Self {
192         Self {
193             max_send_id: INITIAL_MAX_SEND_STREAM_ID,
194             max_recv_id: INITIAL_MAX_RECV_STREAM_ID,
195             latest_remote_id: INITIAL_LATEST_REMOTE_ID,
196             max_concurrent_streams: DEFAULT_MAX_CONCURRENT_STREAMS,
197             current_concurrent_streams: 0,
198             stream_recv_window_size: recv_window_size,
199             stream_send_window_size: send_window_size,
200             flow_control,
201             pending_concurrency: VecDeque::new(),
202             pending_stream_window: HashSet::new(),
203             pending_conn_window: VecDeque::new(),
204             pending_send: VecDeque::new(),
205             window_updating_streams: VecDeque::new(),
206             stream_map: HashMap::new(),
207             next_stream_id: 1,
208         }
209     }
210 
decrease_current_concurrency(&mut self)211     pub(crate) fn decrease_current_concurrency(&mut self) {
212         self.current_concurrent_streams -= 1;
213     }
214 
increase_current_concurrency(&mut self)215     pub(crate) fn increase_current_concurrency(&mut self) {
216         self.current_concurrent_streams += 1;
217     }
218 
reach_max_concurrency(&mut self) -> bool219     pub(crate) fn reach_max_concurrency(&mut self) -> bool {
220         self.current_concurrent_streams >= self.max_concurrent_streams
221     }
222 
apply_max_concurrent_streams(&mut self, num: u32)223     pub(crate) fn apply_max_concurrent_streams(&mut self, num: u32) {
224         self.max_concurrent_streams = num;
225     }
226 
apply_send_initial_window_size(&mut self, size: u32) -> Result<(), H2Error>227     pub(crate) fn apply_send_initial_window_size(&mut self, size: u32) -> Result<(), H2Error> {
228         let current = self.stream_send_window_size;
229         self.stream_send_window_size = size;
230 
231         match current.cmp(&size) {
232             Ordering::Less => {
233                 let excess = size - current;
234                 for (_id, stream) in self.stream_map.iter_mut() {
235                     stream.send_window.increase_size(excess)?;
236                 }
237                 for id in self.pending_stream_window.iter() {
238                     self.pending_send.push_back(*id);
239                 }
240                 self.pending_stream_window.clear();
241             }
242             Ordering::Greater => {
243                 let excess = current - size;
244                 for (_id, stream) in self.stream_map.iter_mut() {
245                     stream.send_window.reduce_size(excess);
246                 }
247             }
248             Ordering::Equal => {}
249         }
250         Ok(())
251     }
252 
apply_recv_initial_window_size(&mut self, size: u32)253     pub(crate) fn apply_recv_initial_window_size(&mut self, size: u32) {
254         let current = self.stream_recv_window_size;
255         self.stream_recv_window_size = size;
256         match current.cmp(&size) {
257             Ordering::Less => {
258                 for (_id, stream) in self.stream_map.iter_mut() {
259                     let extra = size - current;
260                     stream.recv_window.increase_notification(extra);
261                     stream.recv_window.increase_actual(extra);
262                 }
263             }
264             Ordering::Greater => {
265                 for (_id, stream) in self.stream_map.iter_mut() {
266                     stream.recv_window.reduce_notification(current - size);
267                 }
268             }
269             Ordering::Equal => {}
270         }
271     }
272 
release_stream_recv_window( &mut self, id: StreamId, size: u32, sender: &UnboundedSender<Frame>, ) -> Result<(), DispatchErrorKind>273     pub(crate) fn release_stream_recv_window(
274         &mut self,
275         id: StreamId,
276         size: u32,
277         sender: &UnboundedSender<Frame>,
278     ) -> Result<(), DispatchErrorKind> {
279         if let Some(stream) = self.stream_map.get_mut(&id) {
280             if stream.recv_window.notification_available() < size {
281                 return Err(H2Error::StreamError(id, ErrorCode::FlowControlError).into());
282             }
283             stream.recv_window.recv_data(size);
284             // determine whether it is necessary to update the stream window
285             if stream.recv_window.unreleased_size().is_some() {
286                 if !stream.is_init_or_active_flow_control() {
287                     return Ok(());
288                 }
289                 if let Some(window_update) = stream.recv_window.check_window_update(id) {
290                     sender
291                         .send(window_update)
292                         .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
293                 }
294             }
295         }
296         Ok(())
297     }
298 
release_conn_recv_window( &mut self, size: u32, sender: &UnboundedSender<Frame>, ) -> Result<(), DispatchErrorKind>299     pub(crate) fn release_conn_recv_window(
300         &mut self,
301         size: u32,
302         sender: &UnboundedSender<Frame>,
303     ) -> Result<(), DispatchErrorKind> {
304         if self.flow_control.recv_notification_size_available() < size {
305             return Err(H2Error::ConnectionError(ErrorCode::FlowControlError).into());
306         }
307         self.flow_control.recv_data(size);
308         // determine whether it is necessary to update the connection window
309         if let Some(window_update) = self.flow_control.check_conn_recv_window_update() {
310             sender
311                 .send(window_update)
312                 .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
313         }
314         Ok(())
315     }
316 
is_closed(&self) -> bool317     pub(crate) fn is_closed(&self) -> bool {
318         for (_id, stream) in self.stream_map.iter() {
319             match stream.state {
320                 H2StreamState::Closed(_) => {}
321                 _ => {
322                     return false;
323                 }
324             }
325         }
326         true
327     }
328 
stream_state(&self, id: StreamId) -> Option<H2StreamState>329     pub(crate) fn stream_state(&self, id: StreamId) -> Option<H2StreamState> {
330         self.stream_map.get(&id).map(|stream| stream.state)
331     }
332 
insert(&mut self, id: StreamId, headers: Frame, data: BodyDataRef)333     pub(crate) fn insert(&mut self, id: StreamId, headers: Frame, data: BodyDataRef) {
334         let send_window = SendWindow::new(self.stream_send_window_size as i32);
335         let recv_window = RecvWindow::new(self.stream_recv_window_size as i32);
336         let stream = Stream::new(recv_window, send_window, headers, data);
337         self.stream_map.insert(id, stream);
338     }
339 
push_back_pending_send(&mut self, id: StreamId)340     pub(crate) fn push_back_pending_send(&mut self, id: StreamId) {
341         self.pending_send.push_back(id);
342     }
343 
push_pending_concurrency(&mut self, id: StreamId)344     pub(crate) fn push_pending_concurrency(&mut self, id: StreamId) {
345         self.pending_concurrency.push_back(id);
346     }
347 
is_pending_concurrency_empty(&self) -> bool348     pub(crate) fn is_pending_concurrency_empty(&self) -> bool {
349         self.pending_concurrency.is_empty()
350     }
351 
next_pending_stream(&mut self) -> Option<StreamId>352     pub(crate) fn next_pending_stream(&mut self) -> Option<StreamId> {
353         self.pending_send.pop_front()
354     }
355 
pending_stream_num(&self) -> usize356     pub(crate) fn pending_stream_num(&self) -> usize {
357         self.pending_send.len()
358     }
359 
try_consume_pending_concurrency(&mut self)360     pub(crate) fn try_consume_pending_concurrency(&mut self) {
361         while !self.reach_max_concurrency() {
362             match self.pending_concurrency.pop_front() {
363                 None => {
364                     return;
365                 }
366                 Some(id) => {
367                     self.increase_current_concurrency();
368                     self.push_back_pending_send(id);
369                 }
370             }
371         }
372     }
373 
increase_conn_send_window(&mut self, size: u32) -> Result<(), H2Error>374     pub(crate) fn increase_conn_send_window(&mut self, size: u32) -> Result<(), H2Error> {
375         self.flow_control.increase_send_size(size)
376     }
377 
reassign_conn_send_window(&mut self)378     pub(crate) fn reassign_conn_send_window(&mut self) {
379         // Since the data structure of the body is a stream,
380         // the size of a body cannot be obtained,
381         // so all streams in pending_conn_window are added to the pending_send queue
382         // again.
383         loop {
384             match self.pending_conn_window.pop_front() {
385                 None => break,
386                 Some(id) => {
387                     self.push_back_pending_send(id);
388                 }
389             }
390         }
391     }
392 
reassign_stream_send_window( &mut self, id: StreamId, size: u32, ) -> Result<(), H2Error>393     pub(crate) fn reassign_stream_send_window(
394         &mut self,
395         id: StreamId,
396         size: u32,
397     ) -> Result<(), H2Error> {
398         if let Some(stream) = self.stream_map.get_mut(&id) {
399             stream.send_window.increase_size(size)?;
400         }
401         if self.pending_stream_window.take(&id).is_some() {
402             self.pending_send.push_back(id);
403         }
404         Ok(())
405     }
406 
headers(&mut self, id: StreamId) -> Result<Option<Frame>, H2Error>407     pub(crate) fn headers(&mut self, id: StreamId) -> Result<Option<Frame>, H2Error> {
408         match self.stream_map.get_mut(&id) {
409             None => Err(H2Error::ConnectionError(ErrorCode::IntervalError)),
410             Some(stream) => match stream.state {
411                 H2StreamState::Closed(_) => Ok(None),
412                 _ => Ok(stream.header.take()),
413             },
414         }
415     }
416 
poll_read_body( &mut self, cx: &mut Context<'_>, id: StreamId, ) -> Result<DataReadState, H2Error>417     pub(crate) fn poll_read_body(
418         &mut self,
419         cx: &mut Context<'_>,
420         id: StreamId,
421     ) -> Result<DataReadState, H2Error> {
422         // TODO Since the Array length needs to be a constant,
423         // the minimum value is used here, which can be optimized to the MAX_FRAME_SIZE
424         // updated in SETTINGS
425         const DEFAULT_MAX_FRAME_SIZE: usize = 16 * 1024;
426 
427         match self.stream_map.get_mut(&id) {
428             None => Err(H2Error::ConnectionError(ErrorCode::IntervalError)),
429             Some(stream) => match stream.state {
430                 H2StreamState::Closed(_) => Ok(DataReadState::Closed),
431                 _ => {
432                     let stream_send_vacant = stream.send_window.size_available() as usize;
433                     if stream_send_vacant == 0 {
434                         self.pending_stream_window.insert(id);
435                         return Ok(DataReadState::Pending);
436                     }
437                     let conn_send_vacant = self.flow_control.send_size_available();
438                     if conn_send_vacant == 0 {
439                         self.pending_conn_window.push_back(id);
440                         return Ok(DataReadState::Pending);
441                     }
442 
443                     let available = min(stream_send_vacant, conn_send_vacant);
444                     let len = min(available, DEFAULT_MAX_FRAME_SIZE);
445 
446                     let mut buf = [0u8; DEFAULT_MAX_FRAME_SIZE];
447                     self.poll_sized_data(cx, id, &mut buf[..len])
448                 }
449             },
450         }
451     }
452 
poll_sized_data( &mut self, cx: &mut Context<'_>, id: StreamId, buf: &mut [u8], ) -> Result<DataReadState, H2Error>453     fn poll_sized_data(
454         &mut self,
455         cx: &mut Context<'_>,
456         id: StreamId,
457         buf: &mut [u8],
458     ) -> Result<DataReadState, H2Error> {
459         let stream = if let Some(stream) = self.stream_map.get_mut(&id) {
460             stream
461         } else {
462             return Err(H2Error::ConnectionError(ErrorCode::IntervalError));
463         };
464         match stream.data.poll_read(cx, buf) {
465             Poll::Ready(Some(size)) => {
466                 if size > 0 {
467                     stream.send_window.send_data(size as u32);
468                     self.flow_control.send_data(size as u32);
469                     let data_vec = Vec::from(&buf[..size]);
470                     let flag = FrameFlags::new(0);
471 
472                     Ok(DataReadState::Ready(Frame::new(
473                         id,
474                         flag,
475                         Payload::Data(Data::new(data_vec)),
476                     )))
477                 } else {
478                     let data_vec = vec![];
479                     let mut flag = FrameFlags::new(1);
480                     flag.set_end_stream(true);
481                     Ok(DataReadState::Finish(Frame::new(
482                         id,
483                         flag,
484                         Payload::Data(Data::new(data_vec)),
485                     )))
486                 }
487             }
488             Poll::Ready(None) => Err(H2Error::ConnectionError(ErrorCode::IntervalError)),
489             Poll::Pending => {
490                 self.push_back_pending_send(id);
491                 Ok(DataReadState::Pending)
492             }
493         }
494     }
495 
496     // Get unset streams less than or equal to last_stream_id and change the state
497     // of streams greater than last_stream_id to RemoteAaway
get_unset_streams(&mut self, last_stream_id: StreamId) -> Vec<StreamId>498     pub(crate) fn get_unset_streams(&mut self, last_stream_id: StreamId) -> Vec<StreamId> {
499         let mut ids = vec![];
500         for (id, unsent_stream) in self.stream_map.iter_mut() {
501             if *id > last_stream_id {
502                 match unsent_stream.state {
503                     // TODO Whether the close state needs to be selected.
504                     H2StreamState::Closed(_) => {}
505                     H2StreamState::Idle => {
506                         unsent_stream.state = H2StreamState::Closed(CloseReason::RemoteGoAway);
507                         unsent_stream.header = None;
508                         unsent_stream.data.clear();
509                     }
510                     _ => {
511                         self.current_concurrent_streams -= 1;
512                         unsent_stream.state = H2StreamState::Closed(CloseReason::RemoteGoAway);
513                         unsent_stream.header = None;
514                         unsent_stream.data.clear();
515                     }
516                 };
517                 ids.push(*id);
518             }
519         }
520         ids
521     }
522 
get_all_unclosed_streams(&mut self) -> Vec<StreamId>523     pub(crate) fn get_all_unclosed_streams(&mut self) -> Vec<StreamId> {
524         let mut ids = vec![];
525         for (id, stream) in self.stream_map.iter_mut() {
526             match stream.state {
527                 H2StreamState::Closed(_) => {}
528                 _ => {
529                     stream.header = None;
530                     stream.data.clear();
531                     stream.state = H2StreamState::Closed(CloseReason::LocalGoAway);
532                     ids.push(*id);
533                 }
534             }
535         }
536         ids
537     }
538 
clear_streams_states(&mut self)539     pub(crate) fn clear_streams_states(&mut self) {
540         self.window_updating_streams.clear();
541         self.pending_stream_window.clear();
542         self.pending_send.clear();
543         self.pending_conn_window.clear();
544         self.pending_concurrency.clear();
545     }
546 
send_local_reset(&mut self, id: StreamId) -> StreamEndState547     pub(crate) fn send_local_reset(&mut self, id: StreamId) -> StreamEndState {
548         return match self.stream_map.get_mut(&id) {
549             None => StreamEndState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
550             Some(stream) => match stream.state {
551                 H2StreamState::Closed(
552                     CloseReason::LocalRst
553                     | CloseReason::LocalGoAway
554                     | CloseReason::RemoteRst
555                     | CloseReason::RemoteGoAway,
556                 ) => StreamEndState::Ignore,
557                 H2StreamState::Closed(CloseReason::EndStream) => {
558                     stream.state = H2StreamState::Closed(CloseReason::LocalRst);
559                     StreamEndState::Ignore
560                 }
561                 _ => {
562                     stream.state = H2StreamState::Closed(CloseReason::LocalRst);
563                     stream.header = None;
564                     stream.data.clear();
565                     self.decrease_current_concurrency();
566                     StreamEndState::OK
567                 }
568             },
569         };
570     }
571 
send_headers_frame(&mut self, id: StreamId, eos: bool) -> FrameRecvState572     pub(crate) fn send_headers_frame(&mut self, id: StreamId, eos: bool) -> FrameRecvState {
573         match self.stream_map.get_mut(&id) {
574             None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
575             Some(stream) => match &stream.state {
576                 H2StreamState::Idle => {
577                     stream.state = if eos {
578                         H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders)
579                     } else {
580                         H2StreamState::Open {
581                             send: ActiveState::WaitData,
582                             recv: ActiveState::WaitHeaders,
583                         }
584                     };
585                 }
586                 H2StreamState::Open {
587                     send: ActiveState::WaitHeaders,
588                     recv,
589                 } => {
590                     stream.state = if eos {
591                         H2StreamState::LocalHalfClosed(*recv)
592                     } else {
593                         H2StreamState::Open {
594                             send: ActiveState::WaitData,
595                             recv: *recv,
596                         }
597                     };
598                 }
599                 H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders) => {
600                     stream.state = if eos {
601                         self.current_concurrent_streams -= 1;
602                         H2StreamState::Closed(CloseReason::EndStream)
603                     } else {
604                         H2StreamState::RemoteHalfClosed(ActiveState::WaitData)
605                     };
606                 }
607                 _ => {
608                     return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
609                 }
610             },
611         }
612         FrameRecvState::OK
613     }
614 
send_data_frame(&mut self, id: StreamId, eos: bool) -> FrameRecvState615     pub(crate) fn send_data_frame(&mut self, id: StreamId, eos: bool) -> FrameRecvState {
616         match self.stream_map.get_mut(&id) {
617             None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
618             Some(stream) => match &stream.state {
619                 H2StreamState::Open {
620                     send: ActiveState::WaitData,
621                     recv,
622                 } => {
623                     if eos {
624                         stream.state = H2StreamState::LocalHalfClosed(*recv);
625                     }
626                 }
627                 H2StreamState::RemoteHalfClosed(ActiveState::WaitData) => {
628                     if eos {
629                         self.current_concurrent_streams -= 1;
630                         stream.state = H2StreamState::Closed(CloseReason::EndStream);
631                     }
632                 }
633                 _ => {
634                     return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
635                 }
636             },
637         }
638         FrameRecvState::OK
639     }
640 
recv_remote_reset(&mut self, id: StreamId) -> StreamEndState641     pub(crate) fn recv_remote_reset(&mut self, id: StreamId) -> StreamEndState {
642         if id > self.max_recv_id {
643             return StreamEndState::Ignore;
644         }
645         return match self.stream_map.get_mut(&id) {
646             None => StreamEndState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
647             Some(stream) => match stream.state {
648                 H2StreamState::Closed(..) => StreamEndState::Ignore,
649                 _ => {
650                     stream.state = H2StreamState::Closed(CloseReason::RemoteRst);
651                     stream.header = None;
652                     stream.data.clear();
653                     self.decrease_current_concurrency();
654                     StreamEndState::OK
655                 }
656             },
657         };
658     }
659 
recv_headers(&mut self, id: StreamId, eos: bool) -> FrameRecvState660     pub(crate) fn recv_headers(&mut self, id: StreamId, eos: bool) -> FrameRecvState {
661         if id > self.max_recv_id {
662             return FrameRecvState::Ignore;
663         }
664 
665         match self.stream_map.get_mut(&id) {
666             None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
667             Some(stream) => match &stream.state {
668                 H2StreamState::Idle => {
669                     change_stream_state!(Idle: eos, stream.state);
670                 }
671                 H2StreamState::ReservedRemote => {
672                     change_stream_state!(HalfClosed: eos, stream.state);
673                     if eos {
674                         self.decrease_current_concurrency();
675                     }
676                 }
677                 H2StreamState::Open {
678                     send,
679                     recv: ActiveState::WaitHeaders,
680                 } => {
681                     change_stream_state!(Open: eos, stream.state, send);
682                 }
683                 H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders) => {
684                     change_stream_state!(HalfClosed: eos, stream.state);
685                     if eos {
686                         self.decrease_current_concurrency();
687                     }
688                 }
689                 H2StreamState::Closed(CloseReason::LocalGoAway | CloseReason::LocalRst) => {
690                     return FrameRecvState::Ignore;
691                 }
692                 _ => {
693                     return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
694                 }
695             },
696         }
697         FrameRecvState::OK
698     }
699 
recv_data(&mut self, id: StreamId, eos: bool) -> FrameRecvState700     pub(crate) fn recv_data(&mut self, id: StreamId, eos: bool) -> FrameRecvState {
701         if id > self.max_recv_id {
702             return FrameRecvState::Ignore;
703         }
704         match self.stream_map.get_mut(&id) {
705             None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
706             Some(stream) => match &stream.state {
707                 H2StreamState::Open {
708                     send,
709                     recv: ActiveState::WaitData,
710                 } => {
711                     if eos {
712                         stream.state = H2StreamState::RemoteHalfClosed(*send);
713                     }
714                 }
715                 H2StreamState::LocalHalfClosed(ActiveState::WaitData) => {
716                     if eos {
717                         stream.state = H2StreamState::Closed(CloseReason::EndStream);
718                         self.decrease_current_concurrency();
719                     }
720                 }
721                 H2StreamState::Closed(CloseReason::LocalGoAway | CloseReason::LocalRst) => {
722                     return FrameRecvState::Ignore;
723                 }
724                 _ => {
725                     return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
726                 }
727             },
728         }
729         FrameRecvState::OK
730     }
731 
generate_id(&mut self) -> Result<StreamId, DispatchErrorKind>732     pub(crate) fn generate_id(&mut self) -> Result<StreamId, DispatchErrorKind> {
733         let id = self.next_stream_id;
734         if self.next_stream_id < DEFAULT_MAX_STREAM_ID {
735             self.next_stream_id += 2;
736             Ok(id)
737         } else {
738             Err(DispatchErrorKind::H2(H2Error::ConnectionError(
739                 ErrorCode::ProtocolError,
740             )))
741         }
742     }
743 }
744 
745 impl Stream {
new( recv_window: RecvWindow, send_window: SendWindow, headers: Frame, data: BodyDataRef, ) -> Self746     pub(crate) fn new(
747         recv_window: RecvWindow,
748         send_window: SendWindow,
749         headers: Frame,
750         data: BodyDataRef,
751     ) -> Self {
752         Self {
753             recv_window,
754             send_window,
755             state: H2StreamState::Idle,
756             header: Some(headers),
757             data,
758         }
759     }
760 
is_init_or_active_flow_control(&self) -> bool761     pub(crate) fn is_init_or_active_flow_control(&self) -> bool {
762         matches!(
763             self.state,
764             H2StreamState::Idle
765                 | H2StreamState::Open {
766                     recv: ActiveState::WaitData,
767                     ..
768                 }
769                 | H2StreamState::LocalHalfClosed(ActiveState::WaitData)
770         )
771     }
772 }
773 
774 #[cfg(test)]
775 mod ut_h2streamstate {
776     use super::*;
777 
778     /// UT test case for `H2StreamState` with some states.
779     ///
780     /// # Brief
781     /// 1. Creates an H2StreamState with open, LocalHalfClosed, Closed state.
782     /// 2. Asserts that the send and recv field are as expected.
783     #[test]
ut_hss()784     fn ut_hss() {
785         let state = H2StreamState::Open {
786             send: ActiveState::WaitHeaders,
787             recv: ActiveState::WaitData,
788         };
789         if let H2StreamState::Open { send, recv } = state {
790             assert_eq!(send, ActiveState::WaitHeaders);
791             assert_eq!(recv, ActiveState::WaitData);
792         };
793 
794         let state = H2StreamState::LocalHalfClosed(ActiveState::WaitData);
795         if let H2StreamState::LocalHalfClosed(recv) = state {
796             assert_eq!(recv, ActiveState::WaitData);
797         };
798 
799         let state = H2StreamState::Closed(CloseReason::EndStream);
800         if let H2StreamState::Closed(reason) = state {
801             assert_eq!(reason, CloseReason::EndStream);
802         }
803     }
804 }
805 
806 #[cfg(test)]
807 mod ut_streams {
808     use super::*;
809     use crate::async_impl::{Body, Request};
810     use crate::request::RequestArc;
811 
stream_new(state: H2StreamState) -> Stream812     fn stream_new(state: H2StreamState) -> Stream {
813         Stream {
814             send_window: SendWindow::new(100),
815             recv_window: RecvWindow::new(100),
816             state,
817             header: None,
818             data: BodyDataRef::new(RequestArc::new(
819                 Request::builder().body(Body::empty()).unwrap(),
820             )),
821         }
822     }
823 
824     /// UT test case for `Streams::apply_max_concurrent_streams`.
825     ///
826     /// # Brief
827     /// 1. Sets the max concurrent streams to 2.
828     /// 2. Increases current concurrency twice and checks if it reaches max
829     ///    concurrency.
830     #[test]
ut_streams_apply_max_concurrent_streams()831     fn ut_streams_apply_max_concurrent_streams() {
832         let mut streams = Streams::new(100, 200, FlowControl::new(300, 400));
833         streams.apply_max_concurrent_streams(2);
834         streams.increase_current_concurrency();
835         assert!(!streams.reach_max_concurrency());
836         streams.increase_current_concurrency();
837         assert!(streams.reach_max_concurrency());
838     }
839 
840     /// UT test case for `Streams::apply_send_initial_window_size` and
841     /// `Streams::apply_recv_initial_window_size`.
842     ///
843     /// # Brief
844     /// 1. Adjusts the initial send and recv window size and checks for correct
845     ///    application.
846     /// 2. Asserts correct window sizes and that `pending_send` queue is empty
847     ///    and correct notification window sizes.
848     #[test]
ut_streams_apply_send_initial_window_size()849     fn ut_streams_apply_send_initial_window_size() {
850         let mut streams = Streams::new(100, 100, FlowControl::new(100, 100));
851         streams
852             .stream_map
853             .insert(1, stream_new(H2StreamState::Idle));
854 
855         assert!(streams.apply_send_initial_window_size(200).is_ok());
856         let stream = streams.stream_map.get(&1).unwrap();
857         assert_eq!(stream.send_window.size_available(), 200);
858         assert!(streams.pending_send.is_empty());
859 
860         assert!(streams.apply_send_initial_window_size(50).is_ok());
861         let stream = streams.stream_map.get(&1).unwrap();
862         assert_eq!(stream.send_window.size_available(), 50);
863         assert!(streams.pending_send.is_empty());
864 
865         assert!(streams.apply_send_initial_window_size(100).is_ok());
866         let stream = streams.stream_map.get(&1).unwrap();
867         assert_eq!(stream.send_window.size_available(), 100);
868         assert!(streams.pending_send.is_empty());
869 
870         streams.apply_recv_initial_window_size(200);
871         let stream = streams.stream_map.get(&1).unwrap();
872         assert_eq!(stream.recv_window.notification_available(), 200);
873 
874         streams.apply_recv_initial_window_size(50);
875         let stream = streams.stream_map.get(&1).unwrap();
876         assert_eq!(stream.recv_window.notification_available(), 50);
877 
878         streams.apply_recv_initial_window_size(100);
879         let stream = streams.stream_map.get(&1).unwrap();
880         assert_eq!(stream.recv_window.notification_available(), 100);
881     }
882 
883     /// UT test case for `Streams::get_unset_streams`.
884     ///
885     /// # Brief
886     /// 1. Insert streams with different states and sends go_away with a stream
887     ///    id.
888     /// 2. Asserts that only streams with IDs greater than to the go_away ID are closed.
889     #[test]
ut_streams_get_unset_streams()890     fn ut_streams_get_unset_streams() {
891         let mut streams = Streams::new(100, 100, FlowControl::new(100, 100));
892         streams.apply_max_concurrent_streams(4);
893         streams
894             .stream_map
895             .insert(1, stream_new(H2StreamState::Idle));
896         streams.increase_current_concurrency();
897         streams
898             .stream_map
899             .insert(2, stream_new(H2StreamState::Idle));
900         streams.increase_current_concurrency();
901         streams.stream_map.insert(
902             3,
903             stream_new(H2StreamState::Open {
904                 send: ActiveState::WaitHeaders,
905                 recv: ActiveState::WaitData,
906             }),
907         );
908         streams.increase_current_concurrency();
909         streams
910             .stream_map
911             .insert(4, stream_new(H2StreamState::Closed(CloseReason::EndStream)));
912         streams.increase_current_concurrency();
913 
914         let go_away_streams = streams.get_unset_streams(2);
915         assert!([3, 4].iter().all(|&e| go_away_streams.contains(&e)));
916 
917         let state = streams.stream_state(1).unwrap();
918         assert_eq!(state, H2StreamState::Idle);
919         let state = streams.stream_state(2).unwrap();
920         assert_eq!(state, H2StreamState::Idle);
921         let state = streams.stream_state(3).unwrap();
922         assert_eq!(state, H2StreamState::Closed(CloseReason::RemoteGoAway));
923         let state = streams.stream_state(4).unwrap();
924         assert_eq!(state, H2StreamState::Closed(CloseReason::EndStream));
925     }
926 
927     /// UT test case for `Streams::get_all_unclosed_streams`.
928     ///
929     /// # Brief
930     /// 1. Inserts streams with different states.
931     /// 2. Asserts that only unclosed streams are returned.
932     #[test]
ut_streams_get_all_unclosed_streams()933     fn ut_streams_get_all_unclosed_streams() {
934         let mut streams = Streams::new(1000, 1000, FlowControl::new(1000, 1000));
935         streams.apply_max_concurrent_streams(2);
936         streams
937             .stream_map
938             .insert(1, stream_new(H2StreamState::Idle));
939         streams.increase_current_concurrency();
940         streams
941             .stream_map
942             .insert(2, stream_new(H2StreamState::Closed(CloseReason::EndStream)));
943         streams.increase_current_concurrency();
944         assert_eq!(streams.get_all_unclosed_streams(), [1]);
945     }
946 
947     /// UT test case for `Streams::clear_streams_states`.
948     ///
949     /// # Brief
950     /// 1. Clears all the pending and window updating stream states.
951     /// 2. Asserts that all relevant collections are empty after clearing.
952     #[test]
ut_streams_clear_streams_states()953     fn ut_streams_clear_streams_states() {
954         let mut streams = Streams::new(1000, 1000, FlowControl::new(1000, 1000));
955         streams.clear_streams_states();
956         assert!(streams.window_updating_streams.is_empty());
957         assert!(streams.pending_stream_window.is_empty());
958         assert!(streams.pending_send.is_empty());
959         assert!(streams.pending_conn_window.is_empty());
960         assert!(streams.pending_concurrency.is_empty());
961     }
962 
963     /// UT test case for `Streams::send_local_reset`.
964     ///
965     /// # Brief
966     /// 1. Sends local reset on streams with different states.
967     /// 2. Asserts correct handing o each state.
968     #[test]
ut_streams_send_local_reset()969     fn ut_streams_send_local_reset() {
970         let mut streams = Streams::new(1000, 1000, FlowControl::new(1000, 1000));
971         streams.apply_max_concurrent_streams(3);
972         streams
973             .stream_map
974             .insert(1, stream_new(H2StreamState::Idle));
975         streams.increase_current_concurrency();
976         streams.stream_map.insert(
977             2,
978             stream_new(H2StreamState::Closed(CloseReason::RemoteGoAway)),
979         );
980         streams.increase_current_concurrency();
981         streams
982             .stream_map
983             .insert(3, stream_new(H2StreamState::Closed(CloseReason::EndStream)));
984         streams.increase_current_concurrency();
985         assert_eq!(
986             streams.send_local_reset(4),
987             StreamEndState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError))
988         );
989         assert_eq!(streams.send_local_reset(3), StreamEndState::Ignore);
990         assert_eq!(streams.send_local_reset(2), StreamEndState::Ignore);
991         assert_eq!(streams.send_local_reset(1), StreamEndState::OK);
992     }
993 
994     /// UT test case for `Streams::send_headers_frame`.
995     ///
996     /// # Brief
997     /// 1. Send headers frame on a stream.
998     /// 2. Asserts correct handling of frame and stream state changes.
999     #[test]
ut_streams_send_headers_frame()1000     fn ut_streams_send_headers_frame() {
1001         let mut streams = Streams::new(100, 100, FlowControl::new(100, 100));
1002         streams.apply_max_concurrent_streams(1);
1003         streams
1004             .stream_map
1005             .insert(1, stream_new(H2StreamState::Idle));
1006         streams.increase_current_concurrency();
1007         let res = streams.send_headers_frame(1, true);
1008         assert_eq!(res, FrameRecvState::OK);
1009         assert_eq!(
1010             streams.stream_state(1).unwrap(),
1011             H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders)
1012         );
1013         let res = streams.send_headers_frame(1, true);
1014         assert_eq!(
1015             res,
1016             FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError))
1017         );
1018     }
1019 
1020     /// UT test case for `Streams::send_data_frame`.
1021     ///
1022     /// # Brief
1023     /// 1. Sends data frame on a stream.
1024     /// 2. Asserts correct handling of frame and stream state changes.
1025     #[test]
ut_streams_send_data_frame()1026     fn ut_streams_send_data_frame() {
1027         let mut streams = Streams::new(100, 100, FlowControl::new(100, 100));
1028         streams.stream_map.insert(
1029             1,
1030             stream_new(H2StreamState::Open {
1031                 send: ActiveState::WaitData,
1032                 recv: ActiveState::WaitHeaders,
1033             }),
1034         );
1035         streams.increase_current_concurrency();
1036         let res = streams.send_data_frame(1, true);
1037         assert_eq!(res, FrameRecvState::OK);
1038         assert_eq!(
1039             streams.stream_state(1).unwrap(),
1040             H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders)
1041         );
1042         let res = streams.send_data_frame(1, true);
1043         assert_eq!(
1044             res,
1045             FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError))
1046         );
1047     }
1048 
1049     /// UT test for `Streams::recv_remote_reset`.
1050     ///
1051     /// # Brief
1052     /// 1. Receives remote reset on streams with different states.
1053     /// 2. Asserts correct handling of each state.
1054     #[test]
ut_streams_recv_remote_reset()1055     fn ut_streams_recv_remote_reset() {
1056         let mut streams = Streams::new(100, 100, FlowControl::new(100, 100));
1057         streams.apply_max_concurrent_streams(1);
1058         streams.stream_map.insert(
1059             1,
1060             stream_new(H2StreamState::Open {
1061                 send: ActiveState::WaitData,
1062                 recv: ActiveState::WaitHeaders,
1063             }),
1064         );
1065         streams.increase_current_concurrency();
1066         let res = streams.recv_remote_reset(1);
1067         assert_eq!(res, StreamEndState::OK);
1068         assert_eq!(
1069             streams.stream_state(1).unwrap(),
1070             H2StreamState::Closed(CloseReason::RemoteRst)
1071         );
1072         let res = streams.recv_remote_reset(1);
1073         assert_eq!(res, StreamEndState::Ignore);
1074     }
1075 
1076     /// UT test case for `Streams::recv_headers`.
1077     ///
1078     /// # Brief
1079     /// 1. Receives headers on a stream and checks for state changes.
1080     /// 2. Asserts error handling when headers are received again.
1081     #[test]
ut_streams_recv_headers()1082     fn ut_streams_recv_headers() {
1083         let mut streams = Streams::new(100, 100, FlowControl::new(100, 100));
1084         streams.apply_max_concurrent_streams(1);
1085         streams
1086             .stream_map
1087             .insert(1, stream_new(H2StreamState::Idle));
1088         let res = streams.recv_headers(1, false);
1089         assert_eq!(res, FrameRecvState::OK);
1090         assert_eq!(
1091             streams.stream_state(1).unwrap(),
1092             H2StreamState::Open {
1093                 send: ActiveState::WaitHeaders,
1094                 recv: ActiveState::WaitData,
1095             }
1096         );
1097         let res = streams.recv_headers(1, false);
1098         assert_eq!(
1099             res,
1100             FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError))
1101         );
1102     }
1103 
1104     /// UT test case for `Streams::recv_data`.
1105     ///
1106     /// # Brief
1107     /// 1. Receives data on a stream and checks for state changes.
1108     /// 2. Assert correct state when data is received with eos flag.
1109     #[test]
ut_streams_recv_data()1110     fn ut_streams_recv_data() {
1111         let mut streams = Streams::new(100, 100, FlowControl::new(100, 100));
1112         streams.stream_map.insert(
1113             1,
1114             stream_new(H2StreamState::Open {
1115                 send: ActiveState::WaitHeaders,
1116                 recv: ActiveState::WaitData,
1117             }),
1118         );
1119         let res = streams.recv_data(1, false);
1120         assert_eq!(res, FrameRecvState::OK);
1121         assert_eq!(
1122             streams.stream_state(1).unwrap(),
1123             H2StreamState::Open {
1124                 send: ActiveState::WaitHeaders,
1125                 recv: ActiveState::WaitData,
1126             }
1127         );
1128         let res = streams.recv_data(1, true);
1129         assert_eq!(res, FrameRecvState::OK);
1130         assert_eq!(
1131             streams.stream_state(1).unwrap(),
1132             H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders)
1133         );
1134     }
1135 }
1136