1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 //! Streams operations utils. 15 16 use std::cmp::{min, Ordering}; 17 use std::collections::{HashMap, HashSet, VecDeque}; 18 use std::task::{Context, Poll}; 19 20 use ylong_http::h2::{Data, ErrorCode, Frame, FrameFlags, H2Error, Payload, StreamId}; 21 22 use crate::runtime::UnboundedSender; 23 use crate::util::data_ref::BodyDataRef; 24 use crate::util::dispatcher::http2::DispatchErrorKind; 25 use crate::util::h2::buffer::{FlowControl, RecvWindow, SendWindow}; 26 27 pub(crate) const INITIAL_MAX_SEND_STREAM_ID: StreamId = u32::MAX >> 1; 28 pub(crate) const INITIAL_MAX_RECV_STREAM_ID: StreamId = u32::MAX >> 1; 29 30 const DEFAULT_MAX_STREAM_ID: StreamId = u32::MAX >> 1; 31 const INITIAL_LATEST_REMOTE_ID: StreamId = 0; 32 const DEFAULT_MAX_CONCURRENT_STREAMS: u32 = 100; 33 34 #[cfg_attr(test, derive(Debug, PartialEq))] 35 pub(crate) enum FrameRecvState { 36 OK, 37 Ignore, 38 Err(H2Error), 39 } 40 41 pub(crate) enum DataReadState { 42 Closed, 43 // Wait for poll_read or wait for window. 44 Pending, 45 Ready(Frame), 46 Finish(Frame), 47 } 48 #[cfg_attr(test, derive(Debug, PartialEq))] 49 pub(crate) enum StreamEndState { 50 OK, 51 Ignore, 52 Err(H2Error), 53 } 54 55 // +--------+ 56 // send PP | | recv PP 57 // ,--------| idle |--------. 58 // / | | \ 59 // v +--------+ v 60 // +----------+ | +----------+ 61 // | | | send H / | | 62 // ,------| reserved | | recv H | reserved |------. 63 // | | (local) | | | (remote) | | 64 // | +----------+ v +----------+ | 65 // | | +--------+ | | 66 // | | recv ES | | send ES | | 67 // | send H | ,-------| open |-------. | recv H | 68 // | | / | | \ | | 69 // | v v +--------+ v v | 70 // | +----------+ | +----------+ | 71 // | | half | | | half | | 72 // | | closed | | send R / | closed | | 73 // | | (remote) | | recv R | (local) | | 74 // | +----------+ | +----------+ | 75 // | | | | | 76 // | | send ES / | recv ES / | | 77 // | | send R / v send R / | | 78 // | | recv R +--------+ recv R | | 79 // | send R / `----------->| |<-----------' send R / | 80 // | recv R | closed | recv R | 81 // `----------------------->| |<----------------------' 82 // +--------+ 83 #[derive(Copy, Clone, Debug)] 84 #[cfg_attr(test, derive(PartialEq))] 85 pub(crate) enum H2StreamState { 86 Idle, 87 // When response does not depend on request, 88 // the server can send response directly without waiting for the request to finish receiving. 89 // Therefore, the sending and receiving states of the client have their own states 90 Open { 91 send: ActiveState, 92 recv: ActiveState, 93 }, 94 #[allow(dead_code)] 95 ReservedRemote, 96 // After the request is sent, the state is waiting for the response to be received. 97 LocalHalfClosed(ActiveState), 98 // When the response is received but the request is not fully sent, 99 // this indicates the status of the request being sent 100 RemoteHalfClosed(ActiveState), 101 Closed(CloseReason), 102 } 103 104 #[derive(Copy, Clone, Debug)] 105 #[cfg_attr(test, derive(PartialEq))] 106 pub(crate) enum CloseReason { 107 LocalRst, 108 RemoteRst, 109 RemoteGoAway, 110 LocalGoAway, 111 EndStream, 112 } 113 114 #[derive(Copy, Clone, Debug)] 115 #[cfg_attr(test, derive(PartialEq))] 116 pub(crate) enum ActiveState { 117 WaitHeaders, 118 WaitData, 119 } 120 121 pub(crate) struct Stream { 122 pub(crate) recv_window: RecvWindow, 123 pub(crate) send_window: SendWindow, 124 pub(crate) state: H2StreamState, 125 pub(crate) header: Option<Frame>, 126 pub(crate) data: BodyDataRef, 127 } 128 129 pub(crate) struct RequestWrapper { 130 pub(crate) flag: FrameFlags, 131 pub(crate) payload: Payload, 132 pub(crate) data: BodyDataRef, 133 } 134 135 pub(crate) struct Streams { 136 // Records the received goaway last_stream_id. 137 pub(crate) max_send_id: StreamId, 138 // Records the send goaway last_stream_id. 139 pub(crate) max_recv_id: StreamId, 140 // Currently the client doesn't support push promise, so this value is always 0. 141 pub(crate) latest_remote_id: StreamId, 142 pub(crate) stream_recv_window_size: u32, 143 pub(crate) stream_send_window_size: u32, 144 max_concurrent_streams: u32, 145 current_concurrent_streams: u32, 146 flow_control: FlowControl, 147 pending_concurrency: VecDeque<StreamId>, 148 pending_stream_window: HashSet<u32>, 149 pending_conn_window: VecDeque<u32>, 150 pending_send: VecDeque<StreamId>, 151 window_updating_streams: VecDeque<StreamId>, 152 pub(crate) stream_map: HashMap<StreamId, Stream>, 153 pub(crate) next_stream_id: StreamId, 154 } 155 156 macro_rules! change_stream_state { 157 (Idle: $eos: expr, $state: expr) => { 158 $state = if $eos { 159 H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders) 160 } else { 161 H2StreamState::Open { 162 send: ActiveState::WaitHeaders, 163 recv: ActiveState::WaitData, 164 } 165 }; 166 }; 167 (Open: $eos: expr, $state: expr, $send: expr) => { 168 $state = if $eos { 169 H2StreamState::RemoteHalfClosed($send.clone()) 170 } else { 171 H2StreamState::Open { 172 send: $send.clone(), 173 recv: ActiveState::WaitData, 174 } 175 }; 176 }; 177 (HalfClosed: $eos: expr, $state: expr) => { 178 $state = if $eos { 179 H2StreamState::Closed(CloseReason::EndStream) 180 } else { 181 H2StreamState::LocalHalfClosed(ActiveState::WaitData) 182 }; 183 }; 184 } 185 186 impl Streams { new( recv_window_size: u32, send_window_size: u32, flow_control: FlowControl, ) -> Self187 pub(crate) fn new( 188 recv_window_size: u32, 189 send_window_size: u32, 190 flow_control: FlowControl, 191 ) -> Self { 192 Self { 193 max_send_id: INITIAL_MAX_SEND_STREAM_ID, 194 max_recv_id: INITIAL_MAX_RECV_STREAM_ID, 195 latest_remote_id: INITIAL_LATEST_REMOTE_ID, 196 max_concurrent_streams: DEFAULT_MAX_CONCURRENT_STREAMS, 197 current_concurrent_streams: 0, 198 stream_recv_window_size: recv_window_size, 199 stream_send_window_size: send_window_size, 200 flow_control, 201 pending_concurrency: VecDeque::new(), 202 pending_stream_window: HashSet::new(), 203 pending_conn_window: VecDeque::new(), 204 pending_send: VecDeque::new(), 205 window_updating_streams: VecDeque::new(), 206 stream_map: HashMap::new(), 207 next_stream_id: 1, 208 } 209 } 210 decrease_current_concurrency(&mut self)211 pub(crate) fn decrease_current_concurrency(&mut self) { 212 self.current_concurrent_streams -= 1; 213 } 214 increase_current_concurrency(&mut self)215 pub(crate) fn increase_current_concurrency(&mut self) { 216 self.current_concurrent_streams += 1; 217 } 218 reach_max_concurrency(&mut self) -> bool219 pub(crate) fn reach_max_concurrency(&mut self) -> bool { 220 self.current_concurrent_streams >= self.max_concurrent_streams 221 } 222 apply_max_concurrent_streams(&mut self, num: u32)223 pub(crate) fn apply_max_concurrent_streams(&mut self, num: u32) { 224 self.max_concurrent_streams = num; 225 } 226 apply_send_initial_window_size(&mut self, size: u32) -> Result<(), H2Error>227 pub(crate) fn apply_send_initial_window_size(&mut self, size: u32) -> Result<(), H2Error> { 228 let current = self.stream_send_window_size; 229 self.stream_send_window_size = size; 230 231 match current.cmp(&size) { 232 Ordering::Less => { 233 let excess = size - current; 234 for (_id, stream) in self.stream_map.iter_mut() { 235 stream.send_window.increase_size(excess)?; 236 } 237 for id in self.pending_stream_window.iter() { 238 self.pending_send.push_back(*id); 239 } 240 self.pending_stream_window.clear(); 241 } 242 Ordering::Greater => { 243 let excess = current - size; 244 for (_id, stream) in self.stream_map.iter_mut() { 245 stream.send_window.reduce_size(excess); 246 } 247 } 248 Ordering::Equal => {} 249 } 250 Ok(()) 251 } 252 apply_recv_initial_window_size(&mut self, size: u32)253 pub(crate) fn apply_recv_initial_window_size(&mut self, size: u32) { 254 let current = self.stream_recv_window_size; 255 self.stream_recv_window_size = size; 256 match current.cmp(&size) { 257 Ordering::Less => { 258 for (_id, stream) in self.stream_map.iter_mut() { 259 let extra = size - current; 260 stream.recv_window.increase_notification(extra); 261 stream.recv_window.increase_actual(extra); 262 } 263 } 264 Ordering::Greater => { 265 for (_id, stream) in self.stream_map.iter_mut() { 266 stream.recv_window.reduce_notification(current - size); 267 } 268 } 269 Ordering::Equal => {} 270 } 271 } 272 release_stream_recv_window( &mut self, id: StreamId, size: u32, sender: &UnboundedSender<Frame>, ) -> Result<(), DispatchErrorKind>273 pub(crate) fn release_stream_recv_window( 274 &mut self, 275 id: StreamId, 276 size: u32, 277 sender: &UnboundedSender<Frame>, 278 ) -> Result<(), DispatchErrorKind> { 279 if let Some(stream) = self.stream_map.get_mut(&id) { 280 if stream.recv_window.notification_available() < size { 281 return Err(H2Error::StreamError(id, ErrorCode::FlowControlError).into()); 282 } 283 stream.recv_window.recv_data(size); 284 // determine whether it is necessary to update the stream window 285 if stream.recv_window.unreleased_size().is_some() { 286 if !stream.is_init_or_active_flow_control() { 287 return Ok(()); 288 } 289 if let Some(window_update) = stream.recv_window.check_window_update(id) { 290 sender 291 .send(window_update) 292 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 293 } 294 } 295 } 296 Ok(()) 297 } 298 release_conn_recv_window( &mut self, size: u32, sender: &UnboundedSender<Frame>, ) -> Result<(), DispatchErrorKind>299 pub(crate) fn release_conn_recv_window( 300 &mut self, 301 size: u32, 302 sender: &UnboundedSender<Frame>, 303 ) -> Result<(), DispatchErrorKind> { 304 if self.flow_control.recv_notification_size_available() < size { 305 return Err(H2Error::ConnectionError(ErrorCode::FlowControlError).into()); 306 } 307 self.flow_control.recv_data(size); 308 // determine whether it is necessary to update the connection window 309 if let Some(window_update) = self.flow_control.check_conn_recv_window_update() { 310 sender 311 .send(window_update) 312 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 313 } 314 Ok(()) 315 } 316 is_closed(&self) -> bool317 pub(crate) fn is_closed(&self) -> bool { 318 for (_id, stream) in self.stream_map.iter() { 319 match stream.state { 320 H2StreamState::Closed(_) => {} 321 _ => { 322 return false; 323 } 324 } 325 } 326 true 327 } 328 stream_state(&self, id: StreamId) -> Option<H2StreamState>329 pub(crate) fn stream_state(&self, id: StreamId) -> Option<H2StreamState> { 330 self.stream_map.get(&id).map(|stream| stream.state) 331 } 332 insert(&mut self, id: StreamId, headers: Frame, data: BodyDataRef)333 pub(crate) fn insert(&mut self, id: StreamId, headers: Frame, data: BodyDataRef) { 334 let send_window = SendWindow::new(self.stream_send_window_size as i32); 335 let recv_window = RecvWindow::new(self.stream_recv_window_size as i32); 336 let stream = Stream::new(recv_window, send_window, headers, data); 337 self.stream_map.insert(id, stream); 338 } 339 push_back_pending_send(&mut self, id: StreamId)340 pub(crate) fn push_back_pending_send(&mut self, id: StreamId) { 341 self.pending_send.push_back(id); 342 } 343 push_pending_concurrency(&mut self, id: StreamId)344 pub(crate) fn push_pending_concurrency(&mut self, id: StreamId) { 345 self.pending_concurrency.push_back(id); 346 } 347 is_pending_concurrency_empty(&self) -> bool348 pub(crate) fn is_pending_concurrency_empty(&self) -> bool { 349 self.pending_concurrency.is_empty() 350 } 351 next_pending_stream(&mut self) -> Option<StreamId>352 pub(crate) fn next_pending_stream(&mut self) -> Option<StreamId> { 353 self.pending_send.pop_front() 354 } 355 pending_stream_num(&self) -> usize356 pub(crate) fn pending_stream_num(&self) -> usize { 357 self.pending_send.len() 358 } 359 try_consume_pending_concurrency(&mut self)360 pub(crate) fn try_consume_pending_concurrency(&mut self) { 361 while !self.reach_max_concurrency() { 362 match self.pending_concurrency.pop_front() { 363 None => { 364 return; 365 } 366 Some(id) => { 367 self.increase_current_concurrency(); 368 self.push_back_pending_send(id); 369 } 370 } 371 } 372 } 373 increase_conn_send_window(&mut self, size: u32) -> Result<(), H2Error>374 pub(crate) fn increase_conn_send_window(&mut self, size: u32) -> Result<(), H2Error> { 375 self.flow_control.increase_send_size(size) 376 } 377 reassign_conn_send_window(&mut self)378 pub(crate) fn reassign_conn_send_window(&mut self) { 379 // Since the data structure of the body is a stream, 380 // the size of a body cannot be obtained, 381 // so all streams in pending_conn_window are added to the pending_send queue 382 // again. 383 loop { 384 match self.pending_conn_window.pop_front() { 385 None => break, 386 Some(id) => { 387 self.push_back_pending_send(id); 388 } 389 } 390 } 391 } 392 reassign_stream_send_window( &mut self, id: StreamId, size: u32, ) -> Result<(), H2Error>393 pub(crate) fn reassign_stream_send_window( 394 &mut self, 395 id: StreamId, 396 size: u32, 397 ) -> Result<(), H2Error> { 398 if let Some(stream) = self.stream_map.get_mut(&id) { 399 stream.send_window.increase_size(size)?; 400 } 401 if self.pending_stream_window.take(&id).is_some() { 402 self.pending_send.push_back(id); 403 } 404 Ok(()) 405 } 406 headers(&mut self, id: StreamId) -> Result<Option<Frame>, H2Error>407 pub(crate) fn headers(&mut self, id: StreamId) -> Result<Option<Frame>, H2Error> { 408 match self.stream_map.get_mut(&id) { 409 None => Err(H2Error::ConnectionError(ErrorCode::IntervalError)), 410 Some(stream) => match stream.state { 411 H2StreamState::Closed(_) => Ok(None), 412 _ => Ok(stream.header.take()), 413 }, 414 } 415 } 416 poll_read_body( &mut self, cx: &mut Context<'_>, id: StreamId, ) -> Result<DataReadState, H2Error>417 pub(crate) fn poll_read_body( 418 &mut self, 419 cx: &mut Context<'_>, 420 id: StreamId, 421 ) -> Result<DataReadState, H2Error> { 422 // TODO Since the Array length needs to be a constant, 423 // the minimum value is used here, which can be optimized to the MAX_FRAME_SIZE 424 // updated in SETTINGS 425 const DEFAULT_MAX_FRAME_SIZE: usize = 16 * 1024; 426 427 match self.stream_map.get_mut(&id) { 428 None => Err(H2Error::ConnectionError(ErrorCode::IntervalError)), 429 Some(stream) => match stream.state { 430 H2StreamState::Closed(_) => Ok(DataReadState::Closed), 431 _ => { 432 let stream_send_vacant = stream.send_window.size_available() as usize; 433 if stream_send_vacant == 0 { 434 self.pending_stream_window.insert(id); 435 return Ok(DataReadState::Pending); 436 } 437 let conn_send_vacant = self.flow_control.send_size_available(); 438 if conn_send_vacant == 0 { 439 self.pending_conn_window.push_back(id); 440 return Ok(DataReadState::Pending); 441 } 442 443 let available = min(stream_send_vacant, conn_send_vacant); 444 let len = min(available, DEFAULT_MAX_FRAME_SIZE); 445 446 let mut buf = [0u8; DEFAULT_MAX_FRAME_SIZE]; 447 self.poll_sized_data(cx, id, &mut buf[..len]) 448 } 449 }, 450 } 451 } 452 poll_sized_data( &mut self, cx: &mut Context<'_>, id: StreamId, buf: &mut [u8], ) -> Result<DataReadState, H2Error>453 fn poll_sized_data( 454 &mut self, 455 cx: &mut Context<'_>, 456 id: StreamId, 457 buf: &mut [u8], 458 ) -> Result<DataReadState, H2Error> { 459 let stream = if let Some(stream) = self.stream_map.get_mut(&id) { 460 stream 461 } else { 462 return Err(H2Error::ConnectionError(ErrorCode::IntervalError)); 463 }; 464 match stream.data.poll_read(cx, buf) { 465 Poll::Ready(Some(size)) => { 466 if size > 0 { 467 stream.send_window.send_data(size as u32); 468 self.flow_control.send_data(size as u32); 469 let data_vec = Vec::from(&buf[..size]); 470 let flag = FrameFlags::new(0); 471 472 Ok(DataReadState::Ready(Frame::new( 473 id, 474 flag, 475 Payload::Data(Data::new(data_vec)), 476 ))) 477 } else { 478 let data_vec = vec![]; 479 let mut flag = FrameFlags::new(1); 480 flag.set_end_stream(true); 481 Ok(DataReadState::Finish(Frame::new( 482 id, 483 flag, 484 Payload::Data(Data::new(data_vec)), 485 ))) 486 } 487 } 488 Poll::Ready(None) => Err(H2Error::ConnectionError(ErrorCode::IntervalError)), 489 Poll::Pending => { 490 self.push_back_pending_send(id); 491 Ok(DataReadState::Pending) 492 } 493 } 494 } 495 496 // Get unset streams less than or equal to last_stream_id and change the state 497 // of streams greater than last_stream_id to RemoteAaway get_unset_streams(&mut self, last_stream_id: StreamId) -> Vec<StreamId>498 pub(crate) fn get_unset_streams(&mut self, last_stream_id: StreamId) -> Vec<StreamId> { 499 let mut ids = vec![]; 500 for (id, unsent_stream) in self.stream_map.iter_mut() { 501 if *id > last_stream_id { 502 match unsent_stream.state { 503 // TODO Whether the close state needs to be selected. 504 H2StreamState::Closed(_) => {} 505 H2StreamState::Idle => { 506 unsent_stream.state = H2StreamState::Closed(CloseReason::RemoteGoAway); 507 unsent_stream.header = None; 508 unsent_stream.data.clear(); 509 } 510 _ => { 511 self.current_concurrent_streams -= 1; 512 unsent_stream.state = H2StreamState::Closed(CloseReason::RemoteGoAway); 513 unsent_stream.header = None; 514 unsent_stream.data.clear(); 515 } 516 }; 517 ids.push(*id); 518 } 519 } 520 ids 521 } 522 get_all_unclosed_streams(&mut self) -> Vec<StreamId>523 pub(crate) fn get_all_unclosed_streams(&mut self) -> Vec<StreamId> { 524 let mut ids = vec![]; 525 for (id, stream) in self.stream_map.iter_mut() { 526 match stream.state { 527 H2StreamState::Closed(_) => {} 528 _ => { 529 stream.header = None; 530 stream.data.clear(); 531 stream.state = H2StreamState::Closed(CloseReason::LocalGoAway); 532 ids.push(*id); 533 } 534 } 535 } 536 ids 537 } 538 clear_streams_states(&mut self)539 pub(crate) fn clear_streams_states(&mut self) { 540 self.window_updating_streams.clear(); 541 self.pending_stream_window.clear(); 542 self.pending_send.clear(); 543 self.pending_conn_window.clear(); 544 self.pending_concurrency.clear(); 545 } 546 send_local_reset(&mut self, id: StreamId) -> StreamEndState547 pub(crate) fn send_local_reset(&mut self, id: StreamId) -> StreamEndState { 548 return match self.stream_map.get_mut(&id) { 549 None => StreamEndState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 550 Some(stream) => match stream.state { 551 H2StreamState::Closed( 552 CloseReason::LocalRst 553 | CloseReason::LocalGoAway 554 | CloseReason::RemoteRst 555 | CloseReason::RemoteGoAway, 556 ) => StreamEndState::Ignore, 557 H2StreamState::Closed(CloseReason::EndStream) => { 558 stream.state = H2StreamState::Closed(CloseReason::LocalRst); 559 StreamEndState::Ignore 560 } 561 _ => { 562 stream.state = H2StreamState::Closed(CloseReason::LocalRst); 563 stream.header = None; 564 stream.data.clear(); 565 self.decrease_current_concurrency(); 566 StreamEndState::OK 567 } 568 }, 569 }; 570 } 571 send_headers_frame(&mut self, id: StreamId, eos: bool) -> FrameRecvState572 pub(crate) fn send_headers_frame(&mut self, id: StreamId, eos: bool) -> FrameRecvState { 573 match self.stream_map.get_mut(&id) { 574 None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 575 Some(stream) => match &stream.state { 576 H2StreamState::Idle => { 577 stream.state = if eos { 578 H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders) 579 } else { 580 H2StreamState::Open { 581 send: ActiveState::WaitData, 582 recv: ActiveState::WaitHeaders, 583 } 584 }; 585 } 586 H2StreamState::Open { 587 send: ActiveState::WaitHeaders, 588 recv, 589 } => { 590 stream.state = if eos { 591 H2StreamState::LocalHalfClosed(*recv) 592 } else { 593 H2StreamState::Open { 594 send: ActiveState::WaitData, 595 recv: *recv, 596 } 597 }; 598 } 599 H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders) => { 600 stream.state = if eos { 601 self.current_concurrent_streams -= 1; 602 H2StreamState::Closed(CloseReason::EndStream) 603 } else { 604 H2StreamState::RemoteHalfClosed(ActiveState::WaitData) 605 }; 606 } 607 _ => { 608 return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); 609 } 610 }, 611 } 612 FrameRecvState::OK 613 } 614 send_data_frame(&mut self, id: StreamId, eos: bool) -> FrameRecvState615 pub(crate) fn send_data_frame(&mut self, id: StreamId, eos: bool) -> FrameRecvState { 616 match self.stream_map.get_mut(&id) { 617 None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 618 Some(stream) => match &stream.state { 619 H2StreamState::Open { 620 send: ActiveState::WaitData, 621 recv, 622 } => { 623 if eos { 624 stream.state = H2StreamState::LocalHalfClosed(*recv); 625 } 626 } 627 H2StreamState::RemoteHalfClosed(ActiveState::WaitData) => { 628 if eos { 629 self.current_concurrent_streams -= 1; 630 stream.state = H2StreamState::Closed(CloseReason::EndStream); 631 } 632 } 633 _ => { 634 return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); 635 } 636 }, 637 } 638 FrameRecvState::OK 639 } 640 recv_remote_reset(&mut self, id: StreamId) -> StreamEndState641 pub(crate) fn recv_remote_reset(&mut self, id: StreamId) -> StreamEndState { 642 if id > self.max_recv_id { 643 return StreamEndState::Ignore; 644 } 645 return match self.stream_map.get_mut(&id) { 646 None => StreamEndState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 647 Some(stream) => match stream.state { 648 H2StreamState::Closed(..) => StreamEndState::Ignore, 649 _ => { 650 stream.state = H2StreamState::Closed(CloseReason::RemoteRst); 651 stream.header = None; 652 stream.data.clear(); 653 self.decrease_current_concurrency(); 654 StreamEndState::OK 655 } 656 }, 657 }; 658 } 659 recv_headers(&mut self, id: StreamId, eos: bool) -> FrameRecvState660 pub(crate) fn recv_headers(&mut self, id: StreamId, eos: bool) -> FrameRecvState { 661 if id > self.max_recv_id { 662 return FrameRecvState::Ignore; 663 } 664 665 match self.stream_map.get_mut(&id) { 666 None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 667 Some(stream) => match &stream.state { 668 H2StreamState::Idle => { 669 change_stream_state!(Idle: eos, stream.state); 670 } 671 H2StreamState::ReservedRemote => { 672 change_stream_state!(HalfClosed: eos, stream.state); 673 if eos { 674 self.decrease_current_concurrency(); 675 } 676 } 677 H2StreamState::Open { 678 send, 679 recv: ActiveState::WaitHeaders, 680 } => { 681 change_stream_state!(Open: eos, stream.state, send); 682 } 683 H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders) => { 684 change_stream_state!(HalfClosed: eos, stream.state); 685 if eos { 686 self.decrease_current_concurrency(); 687 } 688 } 689 H2StreamState::Closed(CloseReason::LocalGoAway | CloseReason::LocalRst) => { 690 return FrameRecvState::Ignore; 691 } 692 _ => { 693 return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); 694 } 695 }, 696 } 697 FrameRecvState::OK 698 } 699 recv_data(&mut self, id: StreamId, eos: bool) -> FrameRecvState700 pub(crate) fn recv_data(&mut self, id: StreamId, eos: bool) -> FrameRecvState { 701 if id > self.max_recv_id { 702 return FrameRecvState::Ignore; 703 } 704 match self.stream_map.get_mut(&id) { 705 None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 706 Some(stream) => match &stream.state { 707 H2StreamState::Open { 708 send, 709 recv: ActiveState::WaitData, 710 } => { 711 if eos { 712 stream.state = H2StreamState::RemoteHalfClosed(*send); 713 } 714 } 715 H2StreamState::LocalHalfClosed(ActiveState::WaitData) => { 716 if eos { 717 stream.state = H2StreamState::Closed(CloseReason::EndStream); 718 self.decrease_current_concurrency(); 719 } 720 } 721 H2StreamState::Closed(CloseReason::LocalGoAway | CloseReason::LocalRst) => { 722 return FrameRecvState::Ignore; 723 } 724 _ => { 725 return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); 726 } 727 }, 728 } 729 FrameRecvState::OK 730 } 731 generate_id(&mut self) -> Result<StreamId, DispatchErrorKind>732 pub(crate) fn generate_id(&mut self) -> Result<StreamId, DispatchErrorKind> { 733 let id = self.next_stream_id; 734 if self.next_stream_id < DEFAULT_MAX_STREAM_ID { 735 self.next_stream_id += 2; 736 Ok(id) 737 } else { 738 Err(DispatchErrorKind::H2(H2Error::ConnectionError( 739 ErrorCode::ProtocolError, 740 ))) 741 } 742 } 743 } 744 745 impl Stream { new( recv_window: RecvWindow, send_window: SendWindow, headers: Frame, data: BodyDataRef, ) -> Self746 pub(crate) fn new( 747 recv_window: RecvWindow, 748 send_window: SendWindow, 749 headers: Frame, 750 data: BodyDataRef, 751 ) -> Self { 752 Self { 753 recv_window, 754 send_window, 755 state: H2StreamState::Idle, 756 header: Some(headers), 757 data, 758 } 759 } 760 is_init_or_active_flow_control(&self) -> bool761 pub(crate) fn is_init_or_active_flow_control(&self) -> bool { 762 matches!( 763 self.state, 764 H2StreamState::Idle 765 | H2StreamState::Open { 766 recv: ActiveState::WaitData, 767 .. 768 } 769 | H2StreamState::LocalHalfClosed(ActiveState::WaitData) 770 ) 771 } 772 } 773 774 #[cfg(test)] 775 mod ut_h2streamstate { 776 use super::*; 777 778 /// UT test case for `H2StreamState` with some states. 779 /// 780 /// # Brief 781 /// 1. Creates an H2StreamState with open, LocalHalfClosed, Closed state. 782 /// 2. Asserts that the send and recv field are as expected. 783 #[test] ut_hss()784 fn ut_hss() { 785 let state = H2StreamState::Open { 786 send: ActiveState::WaitHeaders, 787 recv: ActiveState::WaitData, 788 }; 789 if let H2StreamState::Open { send, recv } = state { 790 assert_eq!(send, ActiveState::WaitHeaders); 791 assert_eq!(recv, ActiveState::WaitData); 792 }; 793 794 let state = H2StreamState::LocalHalfClosed(ActiveState::WaitData); 795 if let H2StreamState::LocalHalfClosed(recv) = state { 796 assert_eq!(recv, ActiveState::WaitData); 797 }; 798 799 let state = H2StreamState::Closed(CloseReason::EndStream); 800 if let H2StreamState::Closed(reason) = state { 801 assert_eq!(reason, CloseReason::EndStream); 802 } 803 } 804 } 805 806 #[cfg(test)] 807 mod ut_streams { 808 use super::*; 809 use crate::async_impl::{Body, Request}; 810 use crate::request::RequestArc; 811 stream_new(state: H2StreamState) -> Stream812 fn stream_new(state: H2StreamState) -> Stream { 813 Stream { 814 send_window: SendWindow::new(100), 815 recv_window: RecvWindow::new(100), 816 state, 817 header: None, 818 data: BodyDataRef::new(RequestArc::new( 819 Request::builder().body(Body::empty()).unwrap(), 820 )), 821 } 822 } 823 824 /// UT test case for `Streams::apply_max_concurrent_streams`. 825 /// 826 /// # Brief 827 /// 1. Sets the max concurrent streams to 2. 828 /// 2. Increases current concurrency twice and checks if it reaches max 829 /// concurrency. 830 #[test] ut_streams_apply_max_concurrent_streams()831 fn ut_streams_apply_max_concurrent_streams() { 832 let mut streams = Streams::new(100, 200, FlowControl::new(300, 400)); 833 streams.apply_max_concurrent_streams(2); 834 streams.increase_current_concurrency(); 835 assert!(!streams.reach_max_concurrency()); 836 streams.increase_current_concurrency(); 837 assert!(streams.reach_max_concurrency()); 838 } 839 840 /// UT test case for `Streams::apply_send_initial_window_size` and 841 /// `Streams::apply_recv_initial_window_size`. 842 /// 843 /// # Brief 844 /// 1. Adjusts the initial send and recv window size and checks for correct 845 /// application. 846 /// 2. Asserts correct window sizes and that `pending_send` queue is empty 847 /// and correct notification window sizes. 848 #[test] ut_streams_apply_send_initial_window_size()849 fn ut_streams_apply_send_initial_window_size() { 850 let mut streams = Streams::new(100, 100, FlowControl::new(100, 100)); 851 streams 852 .stream_map 853 .insert(1, stream_new(H2StreamState::Idle)); 854 855 assert!(streams.apply_send_initial_window_size(200).is_ok()); 856 let stream = streams.stream_map.get(&1).unwrap(); 857 assert_eq!(stream.send_window.size_available(), 200); 858 assert!(streams.pending_send.is_empty()); 859 860 assert!(streams.apply_send_initial_window_size(50).is_ok()); 861 let stream = streams.stream_map.get(&1).unwrap(); 862 assert_eq!(stream.send_window.size_available(), 50); 863 assert!(streams.pending_send.is_empty()); 864 865 assert!(streams.apply_send_initial_window_size(100).is_ok()); 866 let stream = streams.stream_map.get(&1).unwrap(); 867 assert_eq!(stream.send_window.size_available(), 100); 868 assert!(streams.pending_send.is_empty()); 869 870 streams.apply_recv_initial_window_size(200); 871 let stream = streams.stream_map.get(&1).unwrap(); 872 assert_eq!(stream.recv_window.notification_available(), 200); 873 874 streams.apply_recv_initial_window_size(50); 875 let stream = streams.stream_map.get(&1).unwrap(); 876 assert_eq!(stream.recv_window.notification_available(), 50); 877 878 streams.apply_recv_initial_window_size(100); 879 let stream = streams.stream_map.get(&1).unwrap(); 880 assert_eq!(stream.recv_window.notification_available(), 100); 881 } 882 883 /// UT test case for `Streams::get_unset_streams`. 884 /// 885 /// # Brief 886 /// 1. Insert streams with different states and sends go_away with a stream 887 /// id. 888 /// 2. Asserts that only streams with IDs greater than to the go_away ID are closed. 889 #[test] ut_streams_get_unset_streams()890 fn ut_streams_get_unset_streams() { 891 let mut streams = Streams::new(100, 100, FlowControl::new(100, 100)); 892 streams.apply_max_concurrent_streams(4); 893 streams 894 .stream_map 895 .insert(1, stream_new(H2StreamState::Idle)); 896 streams.increase_current_concurrency(); 897 streams 898 .stream_map 899 .insert(2, stream_new(H2StreamState::Idle)); 900 streams.increase_current_concurrency(); 901 streams.stream_map.insert( 902 3, 903 stream_new(H2StreamState::Open { 904 send: ActiveState::WaitHeaders, 905 recv: ActiveState::WaitData, 906 }), 907 ); 908 streams.increase_current_concurrency(); 909 streams 910 .stream_map 911 .insert(4, stream_new(H2StreamState::Closed(CloseReason::EndStream))); 912 streams.increase_current_concurrency(); 913 914 let go_away_streams = streams.get_unset_streams(2); 915 assert!([3, 4].iter().all(|&e| go_away_streams.contains(&e))); 916 917 let state = streams.stream_state(1).unwrap(); 918 assert_eq!(state, H2StreamState::Idle); 919 let state = streams.stream_state(2).unwrap(); 920 assert_eq!(state, H2StreamState::Idle); 921 let state = streams.stream_state(3).unwrap(); 922 assert_eq!(state, H2StreamState::Closed(CloseReason::RemoteGoAway)); 923 let state = streams.stream_state(4).unwrap(); 924 assert_eq!(state, H2StreamState::Closed(CloseReason::EndStream)); 925 } 926 927 /// UT test case for `Streams::get_all_unclosed_streams`. 928 /// 929 /// # Brief 930 /// 1. Inserts streams with different states. 931 /// 2. Asserts that only unclosed streams are returned. 932 #[test] ut_streams_get_all_unclosed_streams()933 fn ut_streams_get_all_unclosed_streams() { 934 let mut streams = Streams::new(1000, 1000, FlowControl::new(1000, 1000)); 935 streams.apply_max_concurrent_streams(2); 936 streams 937 .stream_map 938 .insert(1, stream_new(H2StreamState::Idle)); 939 streams.increase_current_concurrency(); 940 streams 941 .stream_map 942 .insert(2, stream_new(H2StreamState::Closed(CloseReason::EndStream))); 943 streams.increase_current_concurrency(); 944 assert_eq!(streams.get_all_unclosed_streams(), [1]); 945 } 946 947 /// UT test case for `Streams::clear_streams_states`. 948 /// 949 /// # Brief 950 /// 1. Clears all the pending and window updating stream states. 951 /// 2. Asserts that all relevant collections are empty after clearing. 952 #[test] ut_streams_clear_streams_states()953 fn ut_streams_clear_streams_states() { 954 let mut streams = Streams::new(1000, 1000, FlowControl::new(1000, 1000)); 955 streams.clear_streams_states(); 956 assert!(streams.window_updating_streams.is_empty()); 957 assert!(streams.pending_stream_window.is_empty()); 958 assert!(streams.pending_send.is_empty()); 959 assert!(streams.pending_conn_window.is_empty()); 960 assert!(streams.pending_concurrency.is_empty()); 961 } 962 963 /// UT test case for `Streams::send_local_reset`. 964 /// 965 /// # Brief 966 /// 1. Sends local reset on streams with different states. 967 /// 2. Asserts correct handing o each state. 968 #[test] ut_streams_send_local_reset()969 fn ut_streams_send_local_reset() { 970 let mut streams = Streams::new(1000, 1000, FlowControl::new(1000, 1000)); 971 streams.apply_max_concurrent_streams(3); 972 streams 973 .stream_map 974 .insert(1, stream_new(H2StreamState::Idle)); 975 streams.increase_current_concurrency(); 976 streams.stream_map.insert( 977 2, 978 stream_new(H2StreamState::Closed(CloseReason::RemoteGoAway)), 979 ); 980 streams.increase_current_concurrency(); 981 streams 982 .stream_map 983 .insert(3, stream_new(H2StreamState::Closed(CloseReason::EndStream))); 984 streams.increase_current_concurrency(); 985 assert_eq!( 986 streams.send_local_reset(4), 987 StreamEndState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)) 988 ); 989 assert_eq!(streams.send_local_reset(3), StreamEndState::Ignore); 990 assert_eq!(streams.send_local_reset(2), StreamEndState::Ignore); 991 assert_eq!(streams.send_local_reset(1), StreamEndState::OK); 992 } 993 994 /// UT test case for `Streams::send_headers_frame`. 995 /// 996 /// # Brief 997 /// 1. Send headers frame on a stream. 998 /// 2. Asserts correct handling of frame and stream state changes. 999 #[test] ut_streams_send_headers_frame()1000 fn ut_streams_send_headers_frame() { 1001 let mut streams = Streams::new(100, 100, FlowControl::new(100, 100)); 1002 streams.apply_max_concurrent_streams(1); 1003 streams 1004 .stream_map 1005 .insert(1, stream_new(H2StreamState::Idle)); 1006 streams.increase_current_concurrency(); 1007 let res = streams.send_headers_frame(1, true); 1008 assert_eq!(res, FrameRecvState::OK); 1009 assert_eq!( 1010 streams.stream_state(1).unwrap(), 1011 H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders) 1012 ); 1013 let res = streams.send_headers_frame(1, true); 1014 assert_eq!( 1015 res, 1016 FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)) 1017 ); 1018 } 1019 1020 /// UT test case for `Streams::send_data_frame`. 1021 /// 1022 /// # Brief 1023 /// 1. Sends data frame on a stream. 1024 /// 2. Asserts correct handling of frame and stream state changes. 1025 #[test] ut_streams_send_data_frame()1026 fn ut_streams_send_data_frame() { 1027 let mut streams = Streams::new(100, 100, FlowControl::new(100, 100)); 1028 streams.stream_map.insert( 1029 1, 1030 stream_new(H2StreamState::Open { 1031 send: ActiveState::WaitData, 1032 recv: ActiveState::WaitHeaders, 1033 }), 1034 ); 1035 streams.increase_current_concurrency(); 1036 let res = streams.send_data_frame(1, true); 1037 assert_eq!(res, FrameRecvState::OK); 1038 assert_eq!( 1039 streams.stream_state(1).unwrap(), 1040 H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders) 1041 ); 1042 let res = streams.send_data_frame(1, true); 1043 assert_eq!( 1044 res, 1045 FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)) 1046 ); 1047 } 1048 1049 /// UT test for `Streams::recv_remote_reset`. 1050 /// 1051 /// # Brief 1052 /// 1. Receives remote reset on streams with different states. 1053 /// 2. Asserts correct handling of each state. 1054 #[test] ut_streams_recv_remote_reset()1055 fn ut_streams_recv_remote_reset() { 1056 let mut streams = Streams::new(100, 100, FlowControl::new(100, 100)); 1057 streams.apply_max_concurrent_streams(1); 1058 streams.stream_map.insert( 1059 1, 1060 stream_new(H2StreamState::Open { 1061 send: ActiveState::WaitData, 1062 recv: ActiveState::WaitHeaders, 1063 }), 1064 ); 1065 streams.increase_current_concurrency(); 1066 let res = streams.recv_remote_reset(1); 1067 assert_eq!(res, StreamEndState::OK); 1068 assert_eq!( 1069 streams.stream_state(1).unwrap(), 1070 H2StreamState::Closed(CloseReason::RemoteRst) 1071 ); 1072 let res = streams.recv_remote_reset(1); 1073 assert_eq!(res, StreamEndState::Ignore); 1074 } 1075 1076 /// UT test case for `Streams::recv_headers`. 1077 /// 1078 /// # Brief 1079 /// 1. Receives headers on a stream and checks for state changes. 1080 /// 2. Asserts error handling when headers are received again. 1081 #[test] ut_streams_recv_headers()1082 fn ut_streams_recv_headers() { 1083 let mut streams = Streams::new(100, 100, FlowControl::new(100, 100)); 1084 streams.apply_max_concurrent_streams(1); 1085 streams 1086 .stream_map 1087 .insert(1, stream_new(H2StreamState::Idle)); 1088 let res = streams.recv_headers(1, false); 1089 assert_eq!(res, FrameRecvState::OK); 1090 assert_eq!( 1091 streams.stream_state(1).unwrap(), 1092 H2StreamState::Open { 1093 send: ActiveState::WaitHeaders, 1094 recv: ActiveState::WaitData, 1095 } 1096 ); 1097 let res = streams.recv_headers(1, false); 1098 assert_eq!( 1099 res, 1100 FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)) 1101 ); 1102 } 1103 1104 /// UT test case for `Streams::recv_data`. 1105 /// 1106 /// # Brief 1107 /// 1. Receives data on a stream and checks for state changes. 1108 /// 2. Assert correct state when data is received with eos flag. 1109 #[test] ut_streams_recv_data()1110 fn ut_streams_recv_data() { 1111 let mut streams = Streams::new(100, 100, FlowControl::new(100, 100)); 1112 streams.stream_map.insert( 1113 1, 1114 stream_new(H2StreamState::Open { 1115 send: ActiveState::WaitHeaders, 1116 recv: ActiveState::WaitData, 1117 }), 1118 ); 1119 let res = streams.recv_data(1, false); 1120 assert_eq!(res, FrameRecvState::OK); 1121 assert_eq!( 1122 streams.stream_state(1).unwrap(), 1123 H2StreamState::Open { 1124 send: ActiveState::WaitHeaders, 1125 recv: ActiveState::WaitData, 1126 } 1127 ); 1128 let res = streams.recv_data(1, true); 1129 assert_eq!(res, FrameRecvState::OK); 1130 assert_eq!( 1131 streams.stream_state(1).unwrap(), 1132 H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders) 1133 ); 1134 } 1135 } 1136