1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 //! Streams manage coroutine. 15 16 use std::future::Future; 17 use std::pin::Pin; 18 use std::sync::{Arc, Mutex}; 19 use std::task::{Context, Poll}; 20 21 use ylong_http::h2::{ 22 ErrorCode, Frame, FrameFlags, Goaway, H2Error, Payload, Ping, RstStream, Setting, StreamId, 23 }; 24 25 use crate::runtime::{BoundedReceiver, UnboundedReceiver, UnboundedSender}; 26 use crate::util::dispatcher::http2::{ 27 DispatchErrorKind, OutputMessage, ReqMessage, RespMessage, SettingsState, SettingsSync, 28 StreamController, 29 }; 30 use crate::util::h2::streams::{DataReadState, FrameRecvState, StreamEndState}; 31 32 #[derive(Copy, Clone)] 33 enum ManagerState { 34 Send, 35 Receive, 36 Exit(DispatchErrorKind), 37 } 38 39 pub(crate) struct ConnManager { 40 state: ManagerState, 41 next_state: ManagerState, 42 // Synchronize SETTINGS frames sent by the client. 43 settings: Arc<Mutex<SettingsSync>>, 44 // channel transmitter between manager and io input. 45 input_tx: UnboundedSender<Frame>, 46 // channel receiver between manager and io output. 47 resp_rx: BoundedReceiver<OutputMessage>, 48 // channel receiver between manager and stream coroutine. 49 req_rx: UnboundedReceiver<ReqMessage>, 50 controller: StreamController, 51 } 52 53 impl Future for ConnManager { 54 type Output = Result<(), DispatchErrorKind>; 55 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>56 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 57 let manager = self.get_mut(); 58 loop { 59 match manager.state { 60 ManagerState::Send => { 61 if manager.poll_blocked_frames(cx).is_pending() { 62 return Poll::Pending; 63 } 64 } 65 ManagerState::Receive => { 66 // Receives a response frame from io output. 67 match manager.resp_rx.poll_recv(cx) { 68 #[cfg(feature = "tokio_base")] 69 Poll::Ready(Some(message)) => match message { 70 OutputMessage::Output(frame) => { 71 if manager.poll_recv_message(cx, frame)?.is_pending() { 72 return Poll::Pending; 73 } 74 } 75 // io output occurs error. 76 OutputMessage::OutputExit(e) => { 77 // Ever received a goaway frame 78 if manager.controller.go_away_error_code.is_some() { 79 continue; 80 } 81 // Note error returned immediately. 82 if manager.manage_resp_error(cx, e)?.is_pending() { 83 return Poll::Pending; 84 } 85 } 86 }, 87 #[cfg(feature = "ylong_base")] 88 Poll::Ready(Ok(message)) => match message { 89 OutputMessage::Output(frame) => { 90 if manager.poll_recv_message(cx, frame)?.is_pending() { 91 return Poll::Pending; 92 } 93 } 94 // io output occurs error. 95 OutputMessage::OutputExit(e) => { 96 // Ever received a goaway frame 97 if manager.controller.go_away_error_code.is_some() { 98 continue; 99 } 100 if manager.manage_resp_error(cx, e)?.is_pending() { 101 return Poll::Pending; 102 } 103 } 104 }, 105 #[cfg(feature = "tokio_base")] 106 Poll::Ready(None) => { 107 return manager.poll_channel_closed_exit(cx); 108 } 109 #[cfg(feature = "ylong_base")] 110 Poll::Ready(Err(_e)) => { 111 return manager.poll_channel_closed_exit(cx); 112 } 113 114 Poll::Pending => { 115 // TODO manage error state. 116 return manager.manage_pending_state(cx); 117 } 118 } 119 } 120 ManagerState::Exit(e) => return Poll::Ready(Err(e)), 121 } 122 } 123 } 124 } 125 126 impl ConnManager { new( settings: Arc<Mutex<SettingsSync>>, input_tx: UnboundedSender<Frame>, resp_rx: BoundedReceiver<OutputMessage>, req_rx: UnboundedReceiver<ReqMessage>, controller: StreamController, ) -> Self127 pub(crate) fn new( 128 settings: Arc<Mutex<SettingsSync>>, 129 input_tx: UnboundedSender<Frame>, 130 resp_rx: BoundedReceiver<OutputMessage>, 131 req_rx: UnboundedReceiver<ReqMessage>, 132 controller: StreamController, 133 ) -> Self { 134 Self { 135 state: ManagerState::Receive, 136 next_state: ManagerState::Receive, 137 settings, 138 input_tx, 139 resp_rx, 140 req_rx, 141 controller, 142 } 143 } 144 manage_pending_state( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), DispatchErrorKind>>145 fn manage_pending_state( 146 &mut self, 147 cx: &mut Context<'_>, 148 ) -> Poll<Result<(), DispatchErrorKind>> { 149 // The manager previously accepted a GOAWAY Frame. 150 if let Some(error_code) = self.controller.go_away_error_code { 151 self.poll_deal_with_go_away(error_code)?; 152 return Poll::Pending; 153 } 154 self.poll_recv_request(cx)?; 155 self.poll_input_request(cx)?; 156 Poll::Pending 157 } 158 poll_recv_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind>159 fn poll_recv_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> { 160 loop { 161 #[cfg(feature = "tokio_base")] 162 let message = match self.req_rx.poll_recv(cx) { 163 Poll::Ready(Some(message)) => message, 164 Poll::Ready(None) => return Err(DispatchErrorKind::ChannelClosed), 165 Poll::Pending => break, 166 }; 167 #[cfg(feature = "ylong_base")] 168 let message = match self.req_rx.poll_recv(cx) { 169 Poll::Ready(Ok(message)) => message, 170 Poll::Ready(Err(_e)) => return Err(DispatchErrorKind::ChannelClosed), 171 Poll::Pending => break, 172 }; 173 let id = match self.controller.streams.generate_id() { 174 Ok(id) => id, 175 Err(e) => { 176 let _ = message.sender.try_send(RespMessage::OutputExit(e)); 177 break; 178 } 179 }; 180 let headers = Frame::new(id, message.request.flag, message.request.payload); 181 if self.controller.streams.reach_max_concurrency() 182 || !self.controller.streams.is_pending_concurrency_empty() 183 { 184 self.controller.streams.push_pending_concurrency(id) 185 } else { 186 self.controller.streams.increase_current_concurrency(); 187 self.controller.streams.push_back_pending_send(id) 188 } 189 self.controller.senders.insert(id, message.sender); 190 self.controller 191 .streams 192 .insert(id, headers, message.request.data); 193 } 194 Ok(()) 195 } 196 poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind>197 fn poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> { 198 self.controller.streams.try_consume_pending_concurrency(); 199 let size = self.controller.streams.pending_stream_num(); 200 let mut index = 0; 201 while index < size { 202 match self.controller.streams.next_pending_stream() { 203 None => { 204 break; 205 } 206 Some(id) => { 207 self.input_stream_frame(cx, id)?; 208 } 209 } 210 index += 1; 211 } 212 Ok(()) 213 } 214 input_stream_frame( &mut self, cx: &mut Context<'_>, id: StreamId, ) -> Result<(), DispatchErrorKind>215 fn input_stream_frame( 216 &mut self, 217 cx: &mut Context<'_>, 218 id: StreamId, 219 ) -> Result<(), DispatchErrorKind> { 220 match self.controller.streams.headers(id)? { 221 None => {} 222 Some(header) => { 223 let is_end_stream = header.flags().is_end_stream(); 224 self.poll_send_frame(header)?; 225 // Prevent sending empty data frames 226 if is_end_stream { 227 return Ok(()); 228 } 229 } 230 } 231 232 loop { 233 match self.controller.streams.poll_read_body(cx, id) { 234 Ok(state) => match state { 235 DataReadState::Closed => break, 236 DataReadState::Pending => break, 237 DataReadState::Ready(data) => self.poll_send_frame(data)?, 238 DataReadState::Finish(frame) => { 239 self.poll_send_frame(frame)?; 240 break; 241 } 242 }, 243 Err(e) => return self.deal_poll_body_error(cx, e), 244 } 245 } 246 Ok(()) 247 } 248 deal_poll_body_error( &mut self, cx: &mut Context<'_>, e: H2Error, ) -> Result<(), DispatchErrorKind>249 fn deal_poll_body_error( 250 &mut self, 251 cx: &mut Context<'_>, 252 e: H2Error, 253 ) -> Result<(), DispatchErrorKind> { 254 match e { 255 H2Error::StreamError(id, code) => match self.manage_stream_error(cx, id, code) { 256 Poll::Ready(res) => res, 257 Poll::Pending => Ok(()), 258 }, 259 H2Error::ConnectionError(e) => Err(H2Error::ConnectionError(e).into()), 260 } 261 } 262 poll_send_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>263 fn poll_send_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { 264 match frame.payload() { 265 Payload::Headers(_) => { 266 if let FrameRecvState::Err(e) = self 267 .controller 268 .streams 269 .send_headers_frame(frame.stream_id(), frame.flags().is_end_stream()) 270 { 271 // Never return FrameRecvState::Ignore case. 272 return Err(e.into()); 273 } 274 } 275 Payload::Data(_) => { 276 if let FrameRecvState::Err(e) = self 277 .controller 278 .streams 279 .send_data_frame(frame.stream_id(), frame.flags().is_end_stream()) 280 { 281 // Never return FrameRecvState::Ignore case. 282 return Err(e.into()); 283 } 284 } 285 _ => {} 286 } 287 // TODO Replace with a bounded channel to avoid excessive local memory overhead 288 // when I/O is blocked in the process of uploading large files. 289 self.input_tx 290 .send(frame) 291 .map_err(|_e| DispatchErrorKind::ChannelClosed) 292 } 293 poll_recv_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), DispatchErrorKind>>294 fn poll_recv_frame( 295 &mut self, 296 cx: &mut Context<'_>, 297 frame: Frame, 298 ) -> Poll<Result<(), DispatchErrorKind>> { 299 match frame.payload() { 300 Payload::Settings(_settings) => { 301 self.recv_settings_frame(frame)?; 302 } 303 Payload::Ping(_ping) => { 304 self.recv_ping_frame(frame)?; 305 } 306 Payload::PushPromise(_) => { 307 // TODO The current settings_enable_push setting is fixed to false. 308 return Poll::Ready(Err( 309 H2Error::ConnectionError(ErrorCode::ProtocolError).into() 310 )); 311 } 312 Payload::Goaway(_go_away) => { 313 return self.recv_go_away_frame(cx, frame).map_err(Into::into); 314 } 315 Payload::RstStream(_reset) => { 316 return self.recv_reset_frame(cx, frame).map_err(Into::into); 317 } 318 Payload::Headers(_headers) => { 319 return self.recv_header_frame(cx, frame).map_err(Into::into); 320 } 321 Payload::Data(_data) => { 322 return self.recv_data_frame(cx, frame); 323 } 324 Payload::WindowUpdate(_windows) => { 325 self.recv_window_frame(frame)?; 326 } 327 // Priority is no longer recommended, so keep it compatible but not processed. 328 Payload::Priority(_priority) => {} 329 } 330 Poll::Ready(Ok(())) 331 } 332 recv_settings_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>333 fn recv_settings_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { 334 let settings = if let Payload::Settings(settings) = frame.payload() { 335 settings 336 } else { 337 // this will not happen forever. 338 return Ok(()); 339 }; 340 341 if frame.flags().is_ack() { 342 let mut connection = self.settings.lock().unwrap(); 343 344 if let SettingsState::Acknowledging(ref acknowledged) = connection.settings { 345 for setting in acknowledged.get_settings() { 346 if let Setting::InitialWindowSize(size) = setting { 347 self.controller 348 .streams 349 .apply_recv_initial_window_size(*size); 350 } 351 } 352 } 353 connection.settings = SettingsState::Synced; 354 Ok(()) 355 } else { 356 for setting in settings.get_settings() { 357 if let Setting::MaxConcurrentStreams(num) = setting { 358 self.controller.streams.apply_max_concurrent_streams(*num); 359 } 360 if let Setting::InitialWindowSize(size) = setting { 361 self.controller 362 .streams 363 .apply_send_initial_window_size(*size)?; 364 } 365 } 366 367 // The reason for copying the payload is to pass information to the io input to 368 // set the frame encoder, and the input will empty the 369 // payload when it is sent 370 let ack_settings = Frame::new( 371 frame.stream_id(), 372 FrameFlags::new(0x1), 373 frame.payload().clone(), 374 ); 375 376 self.input_tx 377 .send(ack_settings) 378 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 379 Ok(()) 380 } 381 } 382 recv_ping_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>383 fn recv_ping_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { 384 let ping = if let Payload::Ping(ping) = frame.payload() { 385 ping 386 } else { 387 // this will not happen forever. 388 return Ok(()); 389 }; 390 if frame.flags().is_ack() { 391 // TODO The client does not have the logic to send ping frames. Therefore, the 392 // ack ping is not processed. 393 Ok(()) 394 } else { 395 self.input_tx 396 .send(Ping::ack(ping.clone())) 397 .map_err(|_e| DispatchErrorKind::ChannelClosed) 398 } 399 } 400 recv_go_away_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>401 fn recv_go_away_frame( 402 &mut self, 403 cx: &mut Context<'_>, 404 frame: Frame, 405 ) -> Poll<Result<(), H2Error>> { 406 let go_away = if let Payload::Goaway(goaway) = frame.payload() { 407 goaway 408 } else { 409 // this will not happen forever. 410 return Poll::Ready(Ok(())); 411 }; 412 // Prevents the current connection from generating a new stream. 413 self.controller.goaway(); 414 self.req_rx.close(); 415 let last_stream_id = go_away.get_last_stream_id(); 416 let streams = self.controller.get_unsent_streams(last_stream_id)?; 417 418 let error = H2Error::ConnectionError(ErrorCode::try_from(go_away.get_error_code())?); 419 420 let mut blocked = false; 421 for stream_id in streams { 422 match self.controller.send_message_to_stream( 423 cx, 424 stream_id, 425 RespMessage::OutputExit(error.into()), 426 ) { 427 // ignore error when going away. 428 Poll::Ready(_) => {} 429 Poll::Pending => { 430 blocked = true; 431 } 432 } 433 } 434 // Exit after the allowed stream is complete. 435 self.controller.go_away_error_code = Some(go_away.get_error_code()); 436 if blocked { 437 Poll::Pending 438 } else { 439 Poll::Ready(Ok(())) 440 } 441 } 442 recv_reset_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>443 fn recv_reset_frame( 444 &mut self, 445 cx: &mut Context<'_>, 446 frame: Frame, 447 ) -> Poll<Result<(), H2Error>> { 448 match self.controller.streams.recv_remote_reset(frame.stream_id()) { 449 StreamEndState::OK => self.controller.send_message_to_stream( 450 cx, 451 frame.stream_id(), 452 RespMessage::Output(frame), 453 ), 454 StreamEndState::Err(e) => Poll::Ready(Err(e)), 455 StreamEndState::Ignore => Poll::Ready(Ok(())), 456 } 457 } 458 recv_header_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>459 fn recv_header_frame( 460 &mut self, 461 cx: &mut Context<'_>, 462 frame: Frame, 463 ) -> Poll<Result<(), H2Error>> { 464 match self 465 .controller 466 .streams 467 .recv_headers(frame.stream_id(), frame.flags().is_end_stream()) 468 { 469 FrameRecvState::OK => self.controller.send_message_to_stream( 470 cx, 471 frame.stream_id(), 472 RespMessage::Output(frame), 473 ), 474 FrameRecvState::Err(e) => Poll::Ready(Err(e)), 475 FrameRecvState::Ignore => Poll::Ready(Ok(())), 476 } 477 } 478 recv_data_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), DispatchErrorKind>>479 fn recv_data_frame( 480 &mut self, 481 cx: &mut Context<'_>, 482 frame: Frame, 483 ) -> Poll<Result<(), DispatchErrorKind>> { 484 let data = if let Payload::Data(data) = frame.payload() { 485 data 486 } else { 487 // this will not happen forever. 488 return Poll::Ready(Ok(())); 489 }; 490 let id = frame.stream_id(); 491 let len = data.size() as u32; 492 493 self.update_window(id, len)?; 494 495 match self 496 .controller 497 .streams 498 .recv_data(id, frame.flags().is_end_stream()) 499 { 500 FrameRecvState::OK => self 501 .controller 502 .send_message_to_stream(cx, frame.stream_id(), RespMessage::Output(frame)) 503 .map_err(Into::into), 504 FrameRecvState::Ignore => Poll::Ready(Ok(())), 505 FrameRecvState::Err(e) => Poll::Ready(Err(e.into())), 506 } 507 } 508 recv_window_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>509 fn recv_window_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { 510 let windows = if let Payload::WindowUpdate(windows) = frame.payload() { 511 windows 512 } else { 513 // this will not happen forever. 514 return Ok(()); 515 }; 516 let id = frame.stream_id(); 517 let increment = windows.get_increment(); 518 if id == 0 { 519 self.controller 520 .streams 521 .increase_conn_send_window(increment)?; 522 self.controller.streams.reassign_conn_send_window(); 523 } else { 524 self.controller 525 .streams 526 .reassign_stream_send_window(id, increment)?; 527 } 528 Ok(()) 529 } 530 manage_resp_error( &mut self, cx: &mut Context<'_>, kind: DispatchErrorKind, ) -> Poll<Result<(), DispatchErrorKind>>531 fn manage_resp_error( 532 &mut self, 533 cx: &mut Context<'_>, 534 kind: DispatchErrorKind, 535 ) -> Poll<Result<(), DispatchErrorKind>> { 536 match kind { 537 DispatchErrorKind::H2(h2) => match h2 { 538 H2Error::StreamError(id, code) => self.manage_stream_error(cx, id, code), 539 H2Error::ConnectionError(code) => self.manage_conn_error(cx, code), 540 }, 541 other => { 542 let blocked = self.exit_with_error(cx, other); 543 if blocked { 544 self.state = ManagerState::Send; 545 self.next_state = ManagerState::Exit(other); 546 Poll::Pending 547 } else { 548 Poll::Ready(Err(other)) 549 } 550 } 551 } 552 } 553 manage_stream_error( &mut self, cx: &mut Context<'_>, id: StreamId, code: ErrorCode, ) -> Poll<Result<(), DispatchErrorKind>>554 fn manage_stream_error( 555 &mut self, 556 cx: &mut Context<'_>, 557 id: StreamId, 558 code: ErrorCode, 559 ) -> Poll<Result<(), DispatchErrorKind>> { 560 let rest_payload = RstStream::new(code.into_code()); 561 let frame = Frame::new(id, FrameFlags::empty(), Payload::RstStream(rest_payload)); 562 match self.controller.streams.send_local_reset(id) { 563 StreamEndState::OK => { 564 self.input_tx 565 .send(frame) 566 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 567 568 match self.controller.send_message_to_stream( 569 cx, 570 id, 571 RespMessage::OutputExit(DispatchErrorKind::H2(H2Error::StreamError(id, code))), 572 ) { 573 Poll::Ready(_) => { 574 // error at the stream level due to early exit of the coroutine in which the 575 // request is located, ignored to avoid manager coroutine exit. 576 Poll::Ready(Ok(())) 577 } 578 Poll::Pending => { 579 self.state = ManagerState::Send; 580 // stream error will not cause manager exit with error(exit state). Takes 581 // effect only if blocked. 582 self.next_state = ManagerState::Receive; 583 Poll::Pending 584 } 585 } 586 } 587 StreamEndState::Ignore => Poll::Ready(Ok(())), 588 StreamEndState::Err(e) => { 589 // This error will never happen. 590 Poll::Ready(Err(e.into())) 591 } 592 } 593 } 594 manage_conn_error( &mut self, cx: &mut Context<'_>, code: ErrorCode, ) -> Poll<Result<(), DispatchErrorKind>>595 fn manage_conn_error( 596 &mut self, 597 cx: &mut Context<'_>, 598 code: ErrorCode, 599 ) -> Poll<Result<(), DispatchErrorKind>> { 600 // last_stream_id is set to 0 to ensure that all pushed streams are 601 // shutdown. 602 let go_away_payload = Goaway::new( 603 code.into_code(), 604 self.controller.streams.latest_remote_id, 605 vec![], 606 ); 607 let frame = Frame::new( 608 0, 609 FrameFlags::empty(), 610 Payload::Goaway(go_away_payload.clone()), 611 ); 612 // Avoid sending the same GO_AWAY frame multiple times. 613 if let Some(ref go_away) = self.controller.go_away_sync.going_away { 614 if go_away.get_error_code() == go_away_payload.get_error_code() 615 && go_away.get_last_stream_id() == go_away_payload.get_last_stream_id() 616 { 617 return Poll::Ready(Ok(())); 618 } 619 } 620 self.controller.go_away_sync.going_away = Some(go_away_payload); 621 self.input_tx 622 .send(frame) 623 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 624 625 let blocked = 626 self.exit_with_error(cx, DispatchErrorKind::H2(H2Error::ConnectionError(code))); 627 628 if blocked { 629 self.state = ManagerState::Send; 630 self.next_state = ManagerState::Exit(H2Error::ConnectionError(code).into()); 631 Poll::Pending 632 } else { 633 // TODO When current client has an error, 634 // it always sends the GO_AWAY frame at the first time and exits directly. 635 // Should we consider letting part of the unfinished stream complete? 636 Poll::Ready(Err(H2Error::ConnectionError(code).into())) 637 } 638 } 639 poll_deal_with_go_away(&mut self, error_code: u32) -> Result<(), DispatchErrorKind>640 fn poll_deal_with_go_away(&mut self, error_code: u32) -> Result<(), DispatchErrorKind> { 641 // The client that receives GO_AWAY needs to return a GO_AWAY to the server 642 // before closed. The preceding operations before receiving the frame 643 // ensure that the connection is in the closing state. 644 if self.controller.streams.is_closed() { 645 let last_stream_id = self.controller.streams.latest_remote_id; 646 let go_away_payload = Goaway::new(error_code, last_stream_id, vec![]); 647 let frame = Frame::new( 648 0, 649 FrameFlags::empty(), 650 Payload::Goaway(go_away_payload.clone()), 651 ); 652 653 self.send_peer_goaway(frame, go_away_payload, error_code)?; 654 // close connection 655 self.controller.shutdown(); 656 return Err(H2Error::ConnectionError(ErrorCode::try_from(error_code)?).into()); 657 } 658 Ok(()) 659 } 660 send_peer_goaway( &mut self, frame: Frame, payload: Goaway, err_code: u32, ) -> Result<(), DispatchErrorKind>661 fn send_peer_goaway( 662 &mut self, 663 frame: Frame, 664 payload: Goaway, 665 err_code: u32, 666 ) -> Result<(), DispatchErrorKind> { 667 match self.controller.go_away_sync.going_away { 668 None => { 669 self.controller.go_away_sync.going_away = Some(payload); 670 self.input_tx 671 .send(frame) 672 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 673 } 674 Some(ref go_away) => { 675 // Whether the same GOAWAY Frame has been sent before. 676 if !(go_away.get_error_code() == err_code 677 && go_away.get_last_stream_id() == self.controller.streams.latest_remote_id) 678 { 679 self.controller.go_away_sync.going_away = Some(payload); 680 self.input_tx 681 .send(frame) 682 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 683 } 684 } 685 } 686 Ok(()) 687 } 688 poll_recv_message( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), DispatchErrorKind>>689 fn poll_recv_message( 690 &mut self, 691 cx: &mut Context<'_>, 692 frame: Frame, 693 ) -> Poll<Result<(), DispatchErrorKind>> { 694 match self.poll_recv_frame(cx, frame) { 695 Poll::Ready(Err(kind)) => self.manage_resp_error(cx, kind), 696 Poll::Pending => { 697 self.state = ManagerState::Send; 698 self.next_state = ManagerState::Receive; 699 Poll::Pending 700 } 701 Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), 702 } 703 } 704 poll_channel_closed_exit( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), DispatchErrorKind>>705 fn poll_channel_closed_exit( 706 &mut self, 707 cx: &mut Context<'_>, 708 ) -> Poll<Result<(), DispatchErrorKind>> { 709 if self.exit_with_error(cx, DispatchErrorKind::ChannelClosed) { 710 self.state = ManagerState::Send; 711 self.next_state = ManagerState::Exit(DispatchErrorKind::ChannelClosed); 712 Poll::Pending 713 } else { 714 Poll::Ready(Err(DispatchErrorKind::ChannelClosed)) 715 } 716 } 717 poll_blocked_frames(&mut self, cx: &mut Context<'_>) -> Poll<()>718 fn poll_blocked_frames(&mut self, cx: &mut Context<'_>) -> Poll<()> { 719 match self.controller.poll_blocked_message(cx, &self.input_tx) { 720 Poll::Ready(_) => { 721 self.state = self.next_state; 722 // Reset state. 723 self.next_state = ManagerState::Receive; 724 Poll::Ready(()) 725 } 726 Poll::Pending => Poll::Pending, 727 } 728 } 729 exit_with_error( &mut self, cx: &mut Context<'_>, error: DispatchErrorKind, ) -> bool730 pub(crate) fn exit_with_error( 731 &mut self, 732 cx: &mut Context<'_>, 733 error: DispatchErrorKind, 734 ) -> bool { 735 self.controller.shutdown(); 736 self.req_rx.close(); 737 self.controller.streams.clear_streams_states(); 738 739 let ids = self.controller.streams.get_all_unclosed_streams(); 740 let mut blocked = false; 741 for stream_id in ids { 742 match self.controller.send_message_to_stream( 743 cx, 744 stream_id, 745 RespMessage::OutputExit(error), 746 ) { 747 // ignore error when going away. 748 Poll::Ready(_) => {} 749 Poll::Pending => { 750 blocked = true; 751 } 752 } 753 } 754 blocked 755 } 756 update_window( &mut self, id: StreamId, len: u32, ) -> Result<(), DispatchErrorKind>757 pub(crate) fn update_window( 758 &mut self, 759 id: StreamId, 760 len: u32, 761 ) -> Result<(), DispatchErrorKind> { 762 self.controller 763 .streams 764 .release_conn_recv_window(len, &self.input_tx)?; 765 self.controller 766 .streams 767 .release_stream_recv_window(id, len, &self.input_tx)?; 768 Ok(()) 769 } 770 } 771