1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 //! Streams manage coroutine. 15 16 use std::future::Future; 17 use std::pin::Pin; 18 use std::sync::{Arc, Mutex}; 19 use std::task::{Context, Poll}; 20 21 use ylong_http::h2::{ 22 ErrorCode, Frame, FrameFlags, Goaway, H2Error, Payload, Ping, RstStream, Setting, StreamId, 23 }; 24 25 use crate::runtime::{BoundedReceiver, UnboundedReceiver, UnboundedSender}; 26 use crate::util::dispatcher::http2::{ 27 DispatchErrorKind, OutputMessage, ReqMessage, RespMessage, SettingsState, SettingsSync, 28 StreamController, 29 }; 30 use crate::util::h2::streams::{DataReadState, FrameRecvState, StreamEndState}; 31 32 #[derive(Copy, Clone)] 33 enum ManagerState { 34 Send, 35 Receive, 36 Exit(DispatchErrorKind), 37 } 38 39 pub(crate) struct ConnManager { 40 state: ManagerState, 41 next_state: ManagerState, 42 // Synchronize SETTINGS frames sent by the client. 43 settings: Arc<Mutex<SettingsSync>>, 44 // channel transmitter between manager and io input. 45 input_tx: UnboundedSender<Frame>, 46 // channel receiver between manager and io output. 47 resp_rx: BoundedReceiver<OutputMessage>, 48 // channel receiver between manager and stream coroutine. 49 req_rx: UnboundedReceiver<ReqMessage>, 50 controller: StreamController, 51 } 52 53 impl Future for ConnManager { 54 type Output = Result<(), DispatchErrorKind>; 55 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>56 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 57 let manager = self.get_mut(); 58 loop { 59 match manager.state { 60 ManagerState::Send => { 61 if manager.poll_blocked_frames(cx).is_pending() { 62 return Poll::Pending; 63 } 64 } 65 ManagerState::Receive => { 66 // Receives a response frame from io output. 67 match manager.resp_rx.poll_recv(cx) { 68 #[cfg(feature = "tokio_base")] 69 Poll::Ready(Some(message)) => match message { 70 OutputMessage::Output(frame) => { 71 if manager.poll_recv_message(cx, frame)?.is_pending() { 72 return Poll::Pending; 73 } 74 } 75 // io output occurs error. 76 OutputMessage::OutputExit(e) => { 77 // Ever received a goaway frame 78 if manager.controller.go_away_error_code.is_some() { 79 continue; 80 } 81 // Note error returned immediately. 82 if manager.manage_resp_error(cx, e)?.is_pending() { 83 return Poll::Pending; 84 } 85 } 86 }, 87 #[cfg(feature = "ylong_base")] 88 Poll::Ready(Ok(message)) => match message { 89 OutputMessage::Output(frame) => { 90 if manager.poll_recv_message(cx, frame)?.is_pending() { 91 return Poll::Pending; 92 } 93 } 94 // io output occurs error. 95 OutputMessage::OutputExit(e) => { 96 // Ever received a goaway frame 97 if manager.controller.go_away_error_code.is_some() { 98 continue; 99 } 100 if manager.manage_resp_error(cx, e)?.is_pending() { 101 return Poll::Pending; 102 } 103 } 104 }, 105 #[cfg(feature = "tokio_base")] 106 Poll::Ready(None) => { 107 return manager.poll_channel_closed_exit(cx); 108 } 109 #[cfg(feature = "ylong_base")] 110 Poll::Ready(Err(_e)) => { 111 return manager.poll_channel_closed_exit(cx); 112 } 113 114 Poll::Pending => { 115 // TODO manage error state. 116 return manager.manage_pending_state(cx); 117 } 118 } 119 } 120 ManagerState::Exit(e) => return Poll::Ready(Err(e)), 121 } 122 } 123 } 124 } 125 126 impl ConnManager { new( settings: Arc<Mutex<SettingsSync>>, input_tx: UnboundedSender<Frame>, resp_rx: BoundedReceiver<OutputMessage>, req_rx: UnboundedReceiver<ReqMessage>, controller: StreamController, ) -> Self127 pub(crate) fn new( 128 settings: Arc<Mutex<SettingsSync>>, 129 input_tx: UnboundedSender<Frame>, 130 resp_rx: BoundedReceiver<OutputMessage>, 131 req_rx: UnboundedReceiver<ReqMessage>, 132 controller: StreamController, 133 ) -> Self { 134 Self { 135 state: ManagerState::Receive, 136 next_state: ManagerState::Receive, 137 settings, 138 input_tx, 139 resp_rx, 140 req_rx, 141 controller, 142 } 143 } 144 manage_pending_state( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), DispatchErrorKind>>145 fn manage_pending_state( 146 &mut self, 147 cx: &mut Context<'_>, 148 ) -> Poll<Result<(), DispatchErrorKind>> { 149 // The manager previously accepted a GOAWAY Frame. 150 if let Some(error_code) = self.controller.go_away_error_code { 151 self.poll_deal_with_go_away(error_code)?; 152 return Poll::Pending; 153 } 154 self.poll_recv_request(cx)?; 155 self.poll_input_request(cx)?; 156 Poll::Pending 157 } 158 poll_recv_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind>159 fn poll_recv_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> { 160 loop { 161 #[cfg(feature = "tokio_base")] 162 let message = match self.req_rx.poll_recv(cx) { 163 Poll::Ready(Some(message)) => message, 164 Poll::Ready(None) => return Err(DispatchErrorKind::ChannelClosed), 165 Poll::Pending => break, 166 }; 167 #[cfg(feature = "ylong_base")] 168 let message = match self.req_rx.poll_recv(cx) { 169 Poll::Ready(Ok(message)) => message, 170 Poll::Ready(Err(_e)) => return Err(DispatchErrorKind::ChannelClosed), 171 Poll::Pending => break, 172 }; 173 let id = match self.controller.streams.generate_id() { 174 Ok(id) => id, 175 Err(e) => { 176 let _ = message.sender.try_send(RespMessage::OutputExit(e)); 177 break; 178 } 179 }; 180 let headers = Frame::new(id, message.request.flag, message.request.payload); 181 if self.controller.streams.reach_max_concurrency() 182 || !self.controller.streams.is_pending_concurrency_empty() 183 { 184 self.controller.streams.push_pending_concurrency(id) 185 } else { 186 self.controller.streams.increase_current_concurrency(); 187 self.controller.streams.push_back_pending_send(id) 188 } 189 self.controller.senders.insert(id, message.sender); 190 self.controller 191 .streams 192 .insert(id, headers, message.request.data); 193 } 194 Ok(()) 195 } 196 poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind>197 fn poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> { 198 self.controller.streams.try_consume_pending_concurrency(); 199 let size = self.controller.streams.pending_stream_num(); 200 let mut index = 0; 201 while index < size { 202 match self.controller.streams.next_pending_stream() { 203 None => { 204 break; 205 } 206 Some(id) => { 207 self.input_stream_frame(cx, id)?; 208 } 209 } 210 index += 1; 211 } 212 Ok(()) 213 } 214 input_stream_frame( &mut self, cx: &mut Context<'_>, id: StreamId, ) -> Result<(), DispatchErrorKind>215 fn input_stream_frame( 216 &mut self, 217 cx: &mut Context<'_>, 218 id: StreamId, 219 ) -> Result<(), DispatchErrorKind> { 220 match self.controller.streams.headers(id)? { 221 None => {} 222 Some(header) => { 223 let is_end_stream = header.flags().is_end_stream(); 224 self.poll_send_frame(header)?; 225 // Prevent sending empty data frames 226 if is_end_stream { 227 return Ok(()); 228 } 229 } 230 } 231 232 loop { 233 match self.controller.streams.poll_read_body(cx, id)? { 234 DataReadState::Closed => { 235 break; 236 } 237 DataReadState::Pending => { 238 break; 239 } 240 DataReadState::Ready(data) => { 241 self.poll_send_frame(data)?; 242 } 243 DataReadState::Finish(frame) => { 244 self.poll_send_frame(frame)?; 245 break; 246 } 247 } 248 } 249 Ok(()) 250 } 251 poll_send_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>252 fn poll_send_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { 253 match frame.payload() { 254 Payload::Headers(_) => { 255 if let FrameRecvState::Err(e) = self 256 .controller 257 .streams 258 .send_headers_frame(frame.stream_id(), frame.flags().is_end_stream()) 259 { 260 // Never return FrameRecvState::Ignore case. 261 return Err(e.into()); 262 } 263 } 264 Payload::Data(_) => { 265 if let FrameRecvState::Err(e) = self 266 .controller 267 .streams 268 .send_data_frame(frame.stream_id(), frame.flags().is_end_stream()) 269 { 270 // Never return FrameRecvState::Ignore case. 271 return Err(e.into()); 272 } 273 } 274 _ => {} 275 } 276 277 self.input_tx 278 .send(frame) 279 .map_err(|_e| DispatchErrorKind::ChannelClosed) 280 } 281 poll_recv_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), DispatchErrorKind>>282 fn poll_recv_frame( 283 &mut self, 284 cx: &mut Context<'_>, 285 frame: Frame, 286 ) -> Poll<Result<(), DispatchErrorKind>> { 287 match frame.payload() { 288 Payload::Settings(_settings) => { 289 self.recv_settings_frame(frame)?; 290 } 291 Payload::Ping(_ping) => { 292 self.recv_ping_frame(frame)?; 293 } 294 Payload::PushPromise(_) => { 295 // TODO The current settings_enable_push setting is fixed to false. 296 return Poll::Ready(Err( 297 H2Error::ConnectionError(ErrorCode::ProtocolError).into() 298 )); 299 } 300 Payload::Goaway(_go_away) => { 301 return self.recv_go_away_frame(cx, frame).map_err(Into::into); 302 } 303 Payload::RstStream(_reset) => { 304 return self.recv_reset_frame(cx, frame).map_err(Into::into); 305 } 306 Payload::Headers(_headers) => { 307 return self.recv_header_frame(cx, frame).map_err(Into::into); 308 } 309 Payload::Data(_data) => { 310 return self.recv_data_frame(cx, frame); 311 } 312 Payload::WindowUpdate(_windows) => { 313 self.recv_window_frame(frame)?; 314 } 315 // Priority is no longer recommended, so keep it compatible but not processed. 316 Payload::Priority(_priority) => {} 317 } 318 Poll::Ready(Ok(())) 319 } 320 recv_settings_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>321 fn recv_settings_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { 322 let settings = if let Payload::Settings(settings) = frame.payload() { 323 settings 324 } else { 325 // this will not happen forever. 326 return Ok(()); 327 }; 328 329 if frame.flags().is_ack() { 330 let mut connection = self.settings.lock().unwrap(); 331 332 if let SettingsState::Acknowledging(ref acknowledged) = connection.settings { 333 for setting in acknowledged.get_settings() { 334 if let Setting::InitialWindowSize(size) = setting { 335 self.controller 336 .streams 337 .apply_recv_initial_window_size(*size); 338 } 339 } 340 } 341 connection.settings = SettingsState::Synced; 342 Ok(()) 343 } else { 344 for setting in settings.get_settings() { 345 if let Setting::MaxConcurrentStreams(num) = setting { 346 self.controller.streams.apply_max_concurrent_streams(*num); 347 } 348 if let Setting::InitialWindowSize(size) = setting { 349 self.controller 350 .streams 351 .apply_send_initial_window_size(*size)?; 352 } 353 } 354 355 // The reason for copying the payload is to pass information to the io input to 356 // set the frame encoder, and the input will empty the 357 // payload when it is sent 358 let ack_settings = Frame::new( 359 frame.stream_id(), 360 FrameFlags::new(0x1), 361 frame.payload().clone(), 362 ); 363 364 self.input_tx 365 .send(ack_settings) 366 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 367 Ok(()) 368 } 369 } 370 recv_ping_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>371 fn recv_ping_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { 372 let ping = if let Payload::Ping(ping) = frame.payload() { 373 ping 374 } else { 375 // this will not happen forever. 376 return Ok(()); 377 }; 378 if frame.flags().is_ack() { 379 // TODO The client does not have the logic to send ping frames. Therefore, the 380 // ack ping is not processed. 381 Ok(()) 382 } else { 383 self.input_tx 384 .send(Ping::ack(ping.clone())) 385 .map_err(|_e| DispatchErrorKind::ChannelClosed) 386 } 387 } 388 recv_go_away_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>389 fn recv_go_away_frame( 390 &mut self, 391 cx: &mut Context<'_>, 392 frame: Frame, 393 ) -> Poll<Result<(), H2Error>> { 394 let go_away = if let Payload::Goaway(goaway) = frame.payload() { 395 goaway 396 } else { 397 // this will not happen forever. 398 return Poll::Ready(Ok(())); 399 }; 400 // Prevents the current connection from generating a new stream. 401 self.controller.goaway(); 402 self.req_rx.close(); 403 let last_stream_id = go_away.get_last_stream_id(); 404 let streams = self.controller.get_unsent_streams(last_stream_id)?; 405 406 let error = H2Error::ConnectionError(ErrorCode::try_from(go_away.get_error_code())?); 407 408 let mut blocked = false; 409 for stream_id in streams { 410 match self.controller.send_message_to_stream( 411 cx, 412 stream_id, 413 RespMessage::OutputExit(error.into()), 414 ) { 415 // ignore error when going away. 416 Poll::Ready(_) => {} 417 Poll::Pending => { 418 blocked = true; 419 } 420 } 421 } 422 // Exit after the allowed stream is complete. 423 self.controller.go_away_error_code = Some(go_away.get_error_code()); 424 if blocked { 425 Poll::Pending 426 } else { 427 Poll::Ready(Ok(())) 428 } 429 } 430 recv_reset_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>431 fn recv_reset_frame( 432 &mut self, 433 cx: &mut Context<'_>, 434 frame: Frame, 435 ) -> Poll<Result<(), H2Error>> { 436 match self.controller.streams.recv_remote_reset(frame.stream_id()) { 437 StreamEndState::OK => self.controller.send_message_to_stream( 438 cx, 439 frame.stream_id(), 440 RespMessage::Output(frame), 441 ), 442 StreamEndState::Err(e) => Poll::Ready(Err(e)), 443 StreamEndState::Ignore => Poll::Ready(Ok(())), 444 } 445 } 446 recv_header_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>447 fn recv_header_frame( 448 &mut self, 449 cx: &mut Context<'_>, 450 frame: Frame, 451 ) -> Poll<Result<(), H2Error>> { 452 match self 453 .controller 454 .streams 455 .recv_headers(frame.stream_id(), frame.flags().is_end_stream()) 456 { 457 FrameRecvState::OK => self.controller.send_message_to_stream( 458 cx, 459 frame.stream_id(), 460 RespMessage::Output(frame), 461 ), 462 FrameRecvState::Err(e) => Poll::Ready(Err(e)), 463 FrameRecvState::Ignore => Poll::Ready(Ok(())), 464 } 465 } 466 recv_data_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), DispatchErrorKind>>467 fn recv_data_frame( 468 &mut self, 469 cx: &mut Context<'_>, 470 frame: Frame, 471 ) -> Poll<Result<(), DispatchErrorKind>> { 472 let data = if let Payload::Data(data) = frame.payload() { 473 data 474 } else { 475 // this will not happen forever. 476 return Poll::Ready(Ok(())); 477 }; 478 let id = frame.stream_id(); 479 let len = data.size() as u32; 480 481 self.update_window(id, len)?; 482 483 match self 484 .controller 485 .streams 486 .recv_data(id, frame.flags().is_end_stream()) 487 { 488 FrameRecvState::OK => self 489 .controller 490 .send_message_to_stream(cx, frame.stream_id(), RespMessage::Output(frame)) 491 .map_err(Into::into), 492 FrameRecvState::Ignore => Poll::Ready(Ok(())), 493 FrameRecvState::Err(e) => Poll::Ready(Err(e.into())), 494 } 495 } 496 recv_window_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>497 fn recv_window_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { 498 let windows = if let Payload::WindowUpdate(windows) = frame.payload() { 499 windows 500 } else { 501 // this will not happen forever. 502 return Ok(()); 503 }; 504 let id = frame.stream_id(); 505 let increment = windows.get_increment(); 506 if id == 0 { 507 self.controller 508 .streams 509 .increase_conn_send_window(increment)?; 510 self.controller.streams.reassign_conn_send_window(); 511 } else { 512 self.controller 513 .streams 514 .reassign_stream_send_window(id, increment)?; 515 } 516 Ok(()) 517 } 518 manage_resp_error( &mut self, cx: &mut Context<'_>, kind: DispatchErrorKind, ) -> Poll<Result<(), DispatchErrorKind>>519 fn manage_resp_error( 520 &mut self, 521 cx: &mut Context<'_>, 522 kind: DispatchErrorKind, 523 ) -> Poll<Result<(), DispatchErrorKind>> { 524 match kind { 525 DispatchErrorKind::H2(h2) => match h2 { 526 H2Error::StreamError(id, code) => self.manage_stream_error(cx, id, code), 527 H2Error::ConnectionError(code) => self.manage_conn_error(cx, code), 528 }, 529 other => { 530 let blocked = self.exit_with_error(cx, other); 531 if blocked { 532 self.state = ManagerState::Send; 533 self.next_state = ManagerState::Exit(other); 534 Poll::Pending 535 } else { 536 Poll::Ready(Err(other)) 537 } 538 } 539 } 540 } 541 manage_stream_error( &mut self, cx: &mut Context<'_>, id: StreamId, code: ErrorCode, ) -> Poll<Result<(), DispatchErrorKind>>542 fn manage_stream_error( 543 &mut self, 544 cx: &mut Context<'_>, 545 id: StreamId, 546 code: ErrorCode, 547 ) -> Poll<Result<(), DispatchErrorKind>> { 548 let rest_payload = RstStream::new(code.into_code()); 549 let frame = Frame::new(id, FrameFlags::empty(), Payload::RstStream(rest_payload)); 550 match self.controller.streams.send_local_reset(id) { 551 StreamEndState::OK => { 552 self.input_tx 553 .send(frame) 554 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 555 556 match self.controller.send_message_to_stream( 557 cx, 558 id, 559 RespMessage::OutputExit(DispatchErrorKind::ChannelClosed), 560 ) { 561 Poll::Ready(_) => { 562 // error at the stream level due to early exit of the coroutine in which the 563 // request is located, ignored to avoid manager coroutine exit. 564 Poll::Ready(Ok(())) 565 } 566 Poll::Pending => { 567 self.state = ManagerState::Send; 568 // stream error will not cause manager exit with error(exit state). Takes 569 // effect only if blocked. 570 self.next_state = ManagerState::Receive; 571 Poll::Pending 572 } 573 } 574 } 575 StreamEndState::Ignore => Poll::Ready(Ok(())), 576 StreamEndState::Err(e) => { 577 // This error will never happen. 578 Poll::Ready(Err(e.into())) 579 } 580 } 581 } 582 manage_conn_error( &mut self, cx: &mut Context<'_>, code: ErrorCode, ) -> Poll<Result<(), DispatchErrorKind>>583 fn manage_conn_error( 584 &mut self, 585 cx: &mut Context<'_>, 586 code: ErrorCode, 587 ) -> Poll<Result<(), DispatchErrorKind>> { 588 // last_stream_id is set to 0 to ensure that all pushed streams are 589 // shutdown. 590 let go_away_payload = Goaway::new( 591 code.into_code(), 592 self.controller.streams.latest_remote_id, 593 vec![], 594 ); 595 let frame = Frame::new( 596 0, 597 FrameFlags::empty(), 598 Payload::Goaway(go_away_payload.clone()), 599 ); 600 // Avoid sending the same GO_AWAY frame multiple times. 601 if let Some(ref go_away) = self.controller.go_away_sync.going_away { 602 if go_away.get_error_code() == go_away_payload.get_error_code() 603 && go_away.get_last_stream_id() == go_away_payload.get_last_stream_id() 604 { 605 return Poll::Ready(Ok(())); 606 } 607 } 608 self.controller.go_away_sync.going_away = Some(go_away_payload); 609 self.input_tx 610 .send(frame) 611 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 612 613 let blocked = 614 self.exit_with_error(cx, DispatchErrorKind::H2(H2Error::ConnectionError(code))); 615 616 if blocked { 617 self.state = ManagerState::Send; 618 self.next_state = ManagerState::Exit(H2Error::ConnectionError(code).into()); 619 Poll::Pending 620 } else { 621 // TODO When current client has an error, 622 // it always sends the GO_AWAY frame at the first time and exits directly. 623 // Should we consider letting part of the unfinished stream complete? 624 Poll::Ready(Err(H2Error::ConnectionError(code).into())) 625 } 626 } 627 poll_deal_with_go_away(&mut self, error_code: u32) -> Result<(), DispatchErrorKind>628 fn poll_deal_with_go_away(&mut self, error_code: u32) -> Result<(), DispatchErrorKind> { 629 // The client that receives GO_AWAY needs to return a GO_AWAY to the server 630 // before closed. The preceding operations before receiving the frame 631 // ensure that the connection is in the closing state. 632 if self.controller.streams.is_closed() { 633 let last_stream_id = self.controller.streams.latest_remote_id; 634 let go_away_payload = Goaway::new(error_code, last_stream_id, vec![]); 635 let frame = Frame::new( 636 0, 637 FrameFlags::empty(), 638 Payload::Goaway(go_away_payload.clone()), 639 ); 640 641 self.send_peer_goaway(frame, go_away_payload, error_code)?; 642 // close connection 643 self.controller.shutdown(); 644 return Err(H2Error::ConnectionError(ErrorCode::try_from(error_code)?).into()); 645 } 646 Ok(()) 647 } 648 send_peer_goaway( &mut self, frame: Frame, payload: Goaway, err_code: u32, ) -> Result<(), DispatchErrorKind>649 fn send_peer_goaway( 650 &mut self, 651 frame: Frame, 652 payload: Goaway, 653 err_code: u32, 654 ) -> Result<(), DispatchErrorKind> { 655 match self.controller.go_away_sync.going_away { 656 None => { 657 self.controller.go_away_sync.going_away = Some(payload); 658 self.input_tx 659 .send(frame) 660 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 661 } 662 Some(ref go_away) => { 663 // Whether the same GOAWAY Frame has been sent before. 664 if !(go_away.get_error_code() == err_code 665 && go_away.get_last_stream_id() == self.controller.streams.latest_remote_id) 666 { 667 self.controller.go_away_sync.going_away = Some(payload); 668 self.input_tx 669 .send(frame) 670 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 671 } 672 } 673 } 674 Ok(()) 675 } 676 poll_recv_message( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), DispatchErrorKind>>677 fn poll_recv_message( 678 &mut self, 679 cx: &mut Context<'_>, 680 frame: Frame, 681 ) -> Poll<Result<(), DispatchErrorKind>> { 682 match self.poll_recv_frame(cx, frame) { 683 Poll::Ready(Err(kind)) => self.manage_resp_error(cx, kind), 684 Poll::Pending => { 685 self.state = ManagerState::Send; 686 self.next_state = ManagerState::Receive; 687 Poll::Pending 688 } 689 Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), 690 } 691 } 692 poll_channel_closed_exit( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), DispatchErrorKind>>693 fn poll_channel_closed_exit( 694 &mut self, 695 cx: &mut Context<'_>, 696 ) -> Poll<Result<(), DispatchErrorKind>> { 697 if self.exit_with_error(cx, DispatchErrorKind::ChannelClosed) { 698 self.state = ManagerState::Send; 699 self.next_state = ManagerState::Exit(DispatchErrorKind::ChannelClosed); 700 Poll::Pending 701 } else { 702 Poll::Ready(Err(DispatchErrorKind::ChannelClosed)) 703 } 704 } 705 poll_blocked_frames(&mut self, cx: &mut Context<'_>) -> Poll<()>706 fn poll_blocked_frames(&mut self, cx: &mut Context<'_>) -> Poll<()> { 707 match self.controller.poll_blocked_message(cx, &self.input_tx) { 708 Poll::Ready(_) => { 709 self.state = self.next_state; 710 // Reset state. 711 self.next_state = ManagerState::Receive; 712 Poll::Ready(()) 713 } 714 Poll::Pending => Poll::Pending, 715 } 716 } 717 exit_with_error( &mut self, cx: &mut Context<'_>, error: DispatchErrorKind, ) -> bool718 pub(crate) fn exit_with_error( 719 &mut self, 720 cx: &mut Context<'_>, 721 error: DispatchErrorKind, 722 ) -> bool { 723 self.controller.shutdown(); 724 self.req_rx.close(); 725 self.controller.streams.clear_streams_states(); 726 727 let ids = self.controller.streams.get_all_unclosed_streams(); 728 let mut blocked = false; 729 for stream_id in ids { 730 match self.controller.send_message_to_stream( 731 cx, 732 stream_id, 733 RespMessage::OutputExit(error), 734 ) { 735 // ignore error when going away. 736 Poll::Ready(_) => {} 737 Poll::Pending => { 738 blocked = true; 739 } 740 } 741 } 742 blocked 743 } 744 update_window( &mut self, id: StreamId, len: u32, ) -> Result<(), DispatchErrorKind>745 pub(crate) fn update_window( 746 &mut self, 747 id: StreamId, 748 len: u32, 749 ) -> Result<(), DispatchErrorKind> { 750 self.controller 751 .streams 752 .release_conn_recv_window(len, &self.input_tx)?; 753 self.controller 754 .streams 755 .release_stream_recv_window(id, len, &self.input_tx)?; 756 Ok(()) 757 } 758 } 759