• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Streams manage coroutine.
15 
16 use std::future::Future;
17 use std::pin::Pin;
18 use std::sync::{Arc, Mutex};
19 use std::task::{Context, Poll};
20 
21 use ylong_http::h2::{
22     ErrorCode, Frame, FrameFlags, Goaway, H2Error, Payload, Ping, RstStream, Setting, StreamId,
23 };
24 
25 use crate::runtime::{BoundedReceiver, UnboundedReceiver, UnboundedSender};
26 use crate::util::dispatcher::http2::{
27     DispatchErrorKind, OutputMessage, ReqMessage, RespMessage, SettingsState, SettingsSync,
28     StreamController,
29 };
30 use crate::util::h2::streams::{DataReadState, FrameRecvState, StreamEndState};
31 
32 #[derive(Copy, Clone)]
33 enum ManagerState {
34     Send,
35     Receive,
36     Exit(DispatchErrorKind),
37 }
38 
39 pub(crate) struct ConnManager {
40     state: ManagerState,
41     next_state: ManagerState,
42     // Synchronize SETTINGS frames sent by the client.
43     settings: Arc<Mutex<SettingsSync>>,
44     // channel transmitter between manager and io input.
45     input_tx: UnboundedSender<Frame>,
46     // channel receiver between manager and io output.
47     resp_rx: BoundedReceiver<OutputMessage>,
48     // channel receiver between manager and stream coroutine.
49     req_rx: UnboundedReceiver<ReqMessage>,
50     controller: StreamController,
51 }
52 
53 impl Future for ConnManager {
54     type Output = Result<(), DispatchErrorKind>;
55 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>56     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
57         let manager = self.get_mut();
58         loop {
59             match manager.state {
60                 ManagerState::Send => {
61                     if manager.poll_blocked_frames(cx).is_pending() {
62                         return Poll::Pending;
63                     }
64                 }
65                 ManagerState::Receive => {
66                     // Receives a response frame from io output.
67                     match manager.resp_rx.poll_recv(cx) {
68                         #[cfg(feature = "tokio_base")]
69                         Poll::Ready(Some(message)) => match message {
70                             OutputMessage::Output(frame) => {
71                                 if manager.poll_recv_message(cx, frame)?.is_pending() {
72                                     return Poll::Pending;
73                                 }
74                             }
75                             // io output occurs error.
76                             OutputMessage::OutputExit(e) => {
77                                 // Ever received a goaway frame
78                                 if manager.controller.go_away_error_code.is_some() {
79                                     continue;
80                                 }
81                                 // Note error returned immediately.
82                                 if manager.manage_resp_error(cx, e)?.is_pending() {
83                                     return Poll::Pending;
84                                 }
85                             }
86                         },
87                         #[cfg(feature = "ylong_base")]
88                         Poll::Ready(Ok(message)) => match message {
89                             OutputMessage::Output(frame) => {
90                                 if manager.poll_recv_message(cx, frame)?.is_pending() {
91                                     return Poll::Pending;
92                                 }
93                             }
94                             // io output occurs error.
95                             OutputMessage::OutputExit(e) => {
96                                 // Ever received a goaway frame
97                                 if manager.controller.go_away_error_code.is_some() {
98                                     continue;
99                                 }
100                                 if manager.manage_resp_error(cx, e)?.is_pending() {
101                                     return Poll::Pending;
102                                 }
103                             }
104                         },
105                         #[cfg(feature = "tokio_base")]
106                         Poll::Ready(None) => {
107                             return manager.poll_channel_closed_exit(cx);
108                         }
109                         #[cfg(feature = "ylong_base")]
110                         Poll::Ready(Err(_e)) => {
111                             return manager.poll_channel_closed_exit(cx);
112                         }
113 
114                         Poll::Pending => {
115                             // TODO manage error state.
116                             return manager.manage_pending_state(cx);
117                         }
118                     }
119                 }
120                 ManagerState::Exit(e) => return Poll::Ready(Err(e)),
121             }
122         }
123     }
124 }
125 
126 impl ConnManager {
new( settings: Arc<Mutex<SettingsSync>>, input_tx: UnboundedSender<Frame>, resp_rx: BoundedReceiver<OutputMessage>, req_rx: UnboundedReceiver<ReqMessage>, controller: StreamController, ) -> Self127     pub(crate) fn new(
128         settings: Arc<Mutex<SettingsSync>>,
129         input_tx: UnboundedSender<Frame>,
130         resp_rx: BoundedReceiver<OutputMessage>,
131         req_rx: UnboundedReceiver<ReqMessage>,
132         controller: StreamController,
133     ) -> Self {
134         Self {
135             state: ManagerState::Receive,
136             next_state: ManagerState::Receive,
137             settings,
138             input_tx,
139             resp_rx,
140             req_rx,
141             controller,
142         }
143     }
144 
manage_pending_state( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), DispatchErrorKind>>145     fn manage_pending_state(
146         &mut self,
147         cx: &mut Context<'_>,
148     ) -> Poll<Result<(), DispatchErrorKind>> {
149         // The manager previously accepted a GOAWAY Frame.
150         if let Some(error_code) = self.controller.go_away_error_code {
151             self.poll_deal_with_go_away(error_code)?;
152             return Poll::Pending;
153         }
154         self.poll_recv_request(cx)?;
155         self.poll_input_request(cx)?;
156         Poll::Pending
157     }
158 
poll_recv_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind>159     fn poll_recv_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> {
160         loop {
161             #[cfg(feature = "tokio_base")]
162             let message = match self.req_rx.poll_recv(cx) {
163                 Poll::Ready(Some(message)) => message,
164                 Poll::Ready(None) => return Err(DispatchErrorKind::ChannelClosed),
165                 Poll::Pending => break,
166             };
167             #[cfg(feature = "ylong_base")]
168             let message = match self.req_rx.poll_recv(cx) {
169                 Poll::Ready(Ok(message)) => message,
170                 Poll::Ready(Err(_e)) => return Err(DispatchErrorKind::ChannelClosed),
171                 Poll::Pending => break,
172             };
173             let id = match self.controller.streams.generate_id() {
174                 Ok(id) => id,
175                 Err(e) => {
176                     let _ = message.sender.try_send(RespMessage::OutputExit(e));
177                     break;
178                 }
179             };
180             let headers = Frame::new(id, message.request.flag, message.request.payload);
181             if self.controller.streams.reach_max_concurrency()
182                 || !self.controller.streams.is_pending_concurrency_empty()
183             {
184                 self.controller.streams.push_pending_concurrency(id)
185             } else {
186                 self.controller.streams.increase_current_concurrency();
187                 self.controller.streams.push_back_pending_send(id)
188             }
189             self.controller.senders.insert(id, message.sender);
190             self.controller
191                 .streams
192                 .insert(id, headers, message.request.data);
193         }
194         Ok(())
195     }
196 
poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind>197     fn poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> {
198         self.controller.streams.try_consume_pending_concurrency();
199         let size = self.controller.streams.pending_stream_num();
200         let mut index = 0;
201         while index < size {
202             match self.controller.streams.next_pending_stream() {
203                 None => {
204                     break;
205                 }
206                 Some(id) => {
207                     self.input_stream_frame(cx, id)?;
208                 }
209             }
210             index += 1;
211         }
212         Ok(())
213     }
214 
input_stream_frame( &mut self, cx: &mut Context<'_>, id: StreamId, ) -> Result<(), DispatchErrorKind>215     fn input_stream_frame(
216         &mut self,
217         cx: &mut Context<'_>,
218         id: StreamId,
219     ) -> Result<(), DispatchErrorKind> {
220         match self.controller.streams.headers(id)? {
221             None => {}
222             Some(header) => {
223                 let is_end_stream = header.flags().is_end_stream();
224                 self.poll_send_frame(header)?;
225                 // Prevent sending empty data frames
226                 if is_end_stream {
227                     return Ok(());
228                 }
229             }
230         }
231 
232         loop {
233             match self.controller.streams.poll_read_body(cx, id)? {
234                 DataReadState::Closed => {
235                     break;
236                 }
237                 DataReadState::Pending => {
238                     break;
239                 }
240                 DataReadState::Ready(data) => {
241                     self.poll_send_frame(data)?;
242                 }
243                 DataReadState::Finish(frame) => {
244                     self.poll_send_frame(frame)?;
245                     break;
246                 }
247             }
248         }
249         Ok(())
250     }
251 
poll_send_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>252     fn poll_send_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
253         match frame.payload() {
254             Payload::Headers(_) => {
255                 if let FrameRecvState::Err(e) = self
256                     .controller
257                     .streams
258                     .send_headers_frame(frame.stream_id(), frame.flags().is_end_stream())
259                 {
260                     // Never return FrameRecvState::Ignore case.
261                     return Err(e.into());
262                 }
263             }
264             Payload::Data(_) => {
265                 if let FrameRecvState::Err(e) = self
266                     .controller
267                     .streams
268                     .send_data_frame(frame.stream_id(), frame.flags().is_end_stream())
269                 {
270                     // Never return FrameRecvState::Ignore case.
271                     return Err(e.into());
272                 }
273             }
274             _ => {}
275         }
276 
277         self.input_tx
278             .send(frame)
279             .map_err(|_e| DispatchErrorKind::ChannelClosed)
280     }
281 
poll_recv_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), DispatchErrorKind>>282     fn poll_recv_frame(
283         &mut self,
284         cx: &mut Context<'_>,
285         frame: Frame,
286     ) -> Poll<Result<(), DispatchErrorKind>> {
287         match frame.payload() {
288             Payload::Settings(_settings) => {
289                 self.recv_settings_frame(frame)?;
290             }
291             Payload::Ping(_ping) => {
292                 self.recv_ping_frame(frame)?;
293             }
294             Payload::PushPromise(_) => {
295                 // TODO The current settings_enable_push setting is fixed to false.
296                 return Poll::Ready(Err(
297                     H2Error::ConnectionError(ErrorCode::ProtocolError).into()
298                 ));
299             }
300             Payload::Goaway(_go_away) => {
301                 return self.recv_go_away_frame(cx, frame).map_err(Into::into);
302             }
303             Payload::RstStream(_reset) => {
304                 return self.recv_reset_frame(cx, frame).map_err(Into::into);
305             }
306             Payload::Headers(_headers) => {
307                 return self.recv_header_frame(cx, frame).map_err(Into::into);
308             }
309             Payload::Data(_data) => {
310                 return self.recv_data_frame(cx, frame);
311             }
312             Payload::WindowUpdate(_windows) => {
313                 self.recv_window_frame(frame)?;
314             }
315             // Priority is no longer recommended, so keep it compatible but not processed.
316             Payload::Priority(_priority) => {}
317         }
318         Poll::Ready(Ok(()))
319     }
320 
recv_settings_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>321     fn recv_settings_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
322         let settings = if let Payload::Settings(settings) = frame.payload() {
323             settings
324         } else {
325             // this will not happen forever.
326             return Ok(());
327         };
328 
329         if frame.flags().is_ack() {
330             let mut connection = self.settings.lock().unwrap();
331 
332             if let SettingsState::Acknowledging(ref acknowledged) = connection.settings {
333                 for setting in acknowledged.get_settings() {
334                     if let Setting::InitialWindowSize(size) = setting {
335                         self.controller
336                             .streams
337                             .apply_recv_initial_window_size(*size);
338                     }
339                 }
340             }
341             connection.settings = SettingsState::Synced;
342             Ok(())
343         } else {
344             for setting in settings.get_settings() {
345                 if let Setting::MaxConcurrentStreams(num) = setting {
346                     self.controller.streams.apply_max_concurrent_streams(*num);
347                 }
348                 if let Setting::InitialWindowSize(size) = setting {
349                     self.controller
350                         .streams
351                         .apply_send_initial_window_size(*size)?;
352                 }
353             }
354 
355             // The reason for copying the payload is to pass information to the io input to
356             // set the frame encoder, and the input will empty the
357             // payload when it is sent
358             let ack_settings = Frame::new(
359                 frame.stream_id(),
360                 FrameFlags::new(0x1),
361                 frame.payload().clone(),
362             );
363 
364             self.input_tx
365                 .send(ack_settings)
366                 .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
367             Ok(())
368         }
369     }
370 
recv_ping_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>371     fn recv_ping_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
372         let ping = if let Payload::Ping(ping) = frame.payload() {
373             ping
374         } else {
375             // this will not happen forever.
376             return Ok(());
377         };
378         if frame.flags().is_ack() {
379             // TODO The client does not have the logic to send ping frames. Therefore, the
380             // ack ping is not processed.
381             Ok(())
382         } else {
383             self.input_tx
384                 .send(Ping::ack(ping.clone()))
385                 .map_err(|_e| DispatchErrorKind::ChannelClosed)
386         }
387     }
388 
recv_go_away_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>389     fn recv_go_away_frame(
390         &mut self,
391         cx: &mut Context<'_>,
392         frame: Frame,
393     ) -> Poll<Result<(), H2Error>> {
394         let go_away = if let Payload::Goaway(goaway) = frame.payload() {
395             goaway
396         } else {
397             // this will not happen forever.
398             return Poll::Ready(Ok(()));
399         };
400         // Prevents the current connection from generating a new stream.
401         self.controller.goaway();
402         self.req_rx.close();
403         let last_stream_id = go_away.get_last_stream_id();
404         let streams = self.controller.get_unsent_streams(last_stream_id)?;
405 
406         let error = H2Error::ConnectionError(ErrorCode::try_from(go_away.get_error_code())?);
407 
408         let mut blocked = false;
409         for stream_id in streams {
410             match self.controller.send_message_to_stream(
411                 cx,
412                 stream_id,
413                 RespMessage::OutputExit(error.into()),
414             ) {
415                 // ignore error when going away.
416                 Poll::Ready(_) => {}
417                 Poll::Pending => {
418                     blocked = true;
419                 }
420             }
421         }
422         // Exit after the allowed stream is complete.
423         self.controller.go_away_error_code = Some(go_away.get_error_code());
424         if blocked {
425             Poll::Pending
426         } else {
427             Poll::Ready(Ok(()))
428         }
429     }
430 
recv_reset_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>431     fn recv_reset_frame(
432         &mut self,
433         cx: &mut Context<'_>,
434         frame: Frame,
435     ) -> Poll<Result<(), H2Error>> {
436         match self.controller.streams.recv_remote_reset(frame.stream_id()) {
437             StreamEndState::OK => self.controller.send_message_to_stream(
438                 cx,
439                 frame.stream_id(),
440                 RespMessage::Output(frame),
441             ),
442             StreamEndState::Err(e) => Poll::Ready(Err(e)),
443             StreamEndState::Ignore => Poll::Ready(Ok(())),
444         }
445     }
446 
recv_header_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>447     fn recv_header_frame(
448         &mut self,
449         cx: &mut Context<'_>,
450         frame: Frame,
451     ) -> Poll<Result<(), H2Error>> {
452         match self
453             .controller
454             .streams
455             .recv_headers(frame.stream_id(), frame.flags().is_end_stream())
456         {
457             FrameRecvState::OK => self.controller.send_message_to_stream(
458                 cx,
459                 frame.stream_id(),
460                 RespMessage::Output(frame),
461             ),
462             FrameRecvState::Err(e) => Poll::Ready(Err(e)),
463             FrameRecvState::Ignore => Poll::Ready(Ok(())),
464         }
465     }
466 
recv_data_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), DispatchErrorKind>>467     fn recv_data_frame(
468         &mut self,
469         cx: &mut Context<'_>,
470         frame: Frame,
471     ) -> Poll<Result<(), DispatchErrorKind>> {
472         let data = if let Payload::Data(data) = frame.payload() {
473             data
474         } else {
475             // this will not happen forever.
476             return Poll::Ready(Ok(()));
477         };
478         let id = frame.stream_id();
479         let len = data.size() as u32;
480 
481         self.update_window(id, len)?;
482 
483         match self
484             .controller
485             .streams
486             .recv_data(id, frame.flags().is_end_stream())
487         {
488             FrameRecvState::OK => self
489                 .controller
490                 .send_message_to_stream(cx, frame.stream_id(), RespMessage::Output(frame))
491                 .map_err(Into::into),
492             FrameRecvState::Ignore => Poll::Ready(Ok(())),
493             FrameRecvState::Err(e) => Poll::Ready(Err(e.into())),
494         }
495     }
496 
recv_window_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>497     fn recv_window_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
498         let windows = if let Payload::WindowUpdate(windows) = frame.payload() {
499             windows
500         } else {
501             // this will not happen forever.
502             return Ok(());
503         };
504         let id = frame.stream_id();
505         let increment = windows.get_increment();
506         if id == 0 {
507             self.controller
508                 .streams
509                 .increase_conn_send_window(increment)?;
510             self.controller.streams.reassign_conn_send_window();
511         } else {
512             self.controller
513                 .streams
514                 .reassign_stream_send_window(id, increment)?;
515         }
516         Ok(())
517     }
518 
manage_resp_error( &mut self, cx: &mut Context<'_>, kind: DispatchErrorKind, ) -> Poll<Result<(), DispatchErrorKind>>519     fn manage_resp_error(
520         &mut self,
521         cx: &mut Context<'_>,
522         kind: DispatchErrorKind,
523     ) -> Poll<Result<(), DispatchErrorKind>> {
524         match kind {
525             DispatchErrorKind::H2(h2) => match h2 {
526                 H2Error::StreamError(id, code) => self.manage_stream_error(cx, id, code),
527                 H2Error::ConnectionError(code) => self.manage_conn_error(cx, code),
528             },
529             other => {
530                 let blocked = self.exit_with_error(cx, other);
531                 if blocked {
532                     self.state = ManagerState::Send;
533                     self.next_state = ManagerState::Exit(other);
534                     Poll::Pending
535                 } else {
536                     Poll::Ready(Err(other))
537                 }
538             }
539         }
540     }
541 
manage_stream_error( &mut self, cx: &mut Context<'_>, id: StreamId, code: ErrorCode, ) -> Poll<Result<(), DispatchErrorKind>>542     fn manage_stream_error(
543         &mut self,
544         cx: &mut Context<'_>,
545         id: StreamId,
546         code: ErrorCode,
547     ) -> Poll<Result<(), DispatchErrorKind>> {
548         let rest_payload = RstStream::new(code.into_code());
549         let frame = Frame::new(id, FrameFlags::empty(), Payload::RstStream(rest_payload));
550         match self.controller.streams.send_local_reset(id) {
551             StreamEndState::OK => {
552                 self.input_tx
553                     .send(frame)
554                     .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
555 
556                 match self.controller.send_message_to_stream(
557                     cx,
558                     id,
559                     RespMessage::OutputExit(DispatchErrorKind::ChannelClosed),
560                 ) {
561                     Poll::Ready(_) => {
562                         // error at the stream level due to early exit of the coroutine in which the
563                         // request is located, ignored to avoid manager coroutine exit.
564                         Poll::Ready(Ok(()))
565                     }
566                     Poll::Pending => {
567                         self.state = ManagerState::Send;
568                         // stream error will not cause manager exit with error(exit state). Takes
569                         // effect only if blocked.
570                         self.next_state = ManagerState::Receive;
571                         Poll::Pending
572                     }
573                 }
574             }
575             StreamEndState::Ignore => Poll::Ready(Ok(())),
576             StreamEndState::Err(e) => {
577                 // This error will never happen.
578                 Poll::Ready(Err(e.into()))
579             }
580         }
581     }
582 
manage_conn_error( &mut self, cx: &mut Context<'_>, code: ErrorCode, ) -> Poll<Result<(), DispatchErrorKind>>583     fn manage_conn_error(
584         &mut self,
585         cx: &mut Context<'_>,
586         code: ErrorCode,
587     ) -> Poll<Result<(), DispatchErrorKind>> {
588         // last_stream_id is set to 0 to ensure that all pushed streams are
589         // shutdown.
590         let go_away_payload = Goaway::new(
591             code.into_code(),
592             self.controller.streams.latest_remote_id,
593             vec![],
594         );
595         let frame = Frame::new(
596             0,
597             FrameFlags::empty(),
598             Payload::Goaway(go_away_payload.clone()),
599         );
600         // Avoid sending the same GO_AWAY frame multiple times.
601         if let Some(ref go_away) = self.controller.go_away_sync.going_away {
602             if go_away.get_error_code() == go_away_payload.get_error_code()
603                 && go_away.get_last_stream_id() == go_away_payload.get_last_stream_id()
604             {
605                 return Poll::Ready(Ok(()));
606             }
607         }
608         self.controller.go_away_sync.going_away = Some(go_away_payload);
609         self.input_tx
610             .send(frame)
611             .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
612 
613         let blocked =
614             self.exit_with_error(cx, DispatchErrorKind::H2(H2Error::ConnectionError(code)));
615 
616         if blocked {
617             self.state = ManagerState::Send;
618             self.next_state = ManagerState::Exit(H2Error::ConnectionError(code).into());
619             Poll::Pending
620         } else {
621             // TODO When current client has an error,
622             // it always sends the GO_AWAY frame at the first time and exits directly.
623             // Should we consider letting part of the unfinished stream complete?
624             Poll::Ready(Err(H2Error::ConnectionError(code).into()))
625         }
626     }
627 
poll_deal_with_go_away(&mut self, error_code: u32) -> Result<(), DispatchErrorKind>628     fn poll_deal_with_go_away(&mut self, error_code: u32) -> Result<(), DispatchErrorKind> {
629         // The client that receives GO_AWAY needs to return a GO_AWAY to the server
630         // before closed. The preceding operations before receiving the frame
631         // ensure that the connection is in the closing state.
632         if self.controller.streams.is_closed() {
633             let last_stream_id = self.controller.streams.latest_remote_id;
634             let go_away_payload = Goaway::new(error_code, last_stream_id, vec![]);
635             let frame = Frame::new(
636                 0,
637                 FrameFlags::empty(),
638                 Payload::Goaway(go_away_payload.clone()),
639             );
640 
641             self.send_peer_goaway(frame, go_away_payload, error_code)?;
642             // close connection
643             self.controller.shutdown();
644             return Err(H2Error::ConnectionError(ErrorCode::try_from(error_code)?).into());
645         }
646         Ok(())
647     }
648 
send_peer_goaway( &mut self, frame: Frame, payload: Goaway, err_code: u32, ) -> Result<(), DispatchErrorKind>649     fn send_peer_goaway(
650         &mut self,
651         frame: Frame,
652         payload: Goaway,
653         err_code: u32,
654     ) -> Result<(), DispatchErrorKind> {
655         match self.controller.go_away_sync.going_away {
656             None => {
657                 self.controller.go_away_sync.going_away = Some(payload);
658                 self.input_tx
659                     .send(frame)
660                     .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
661             }
662             Some(ref go_away) => {
663                 // Whether the same GOAWAY Frame has been sent before.
664                 if !(go_away.get_error_code() == err_code
665                     && go_away.get_last_stream_id() == self.controller.streams.latest_remote_id)
666                 {
667                     self.controller.go_away_sync.going_away = Some(payload);
668                     self.input_tx
669                         .send(frame)
670                         .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
671                 }
672             }
673         }
674         Ok(())
675     }
676 
poll_recv_message( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), DispatchErrorKind>>677     fn poll_recv_message(
678         &mut self,
679         cx: &mut Context<'_>,
680         frame: Frame,
681     ) -> Poll<Result<(), DispatchErrorKind>> {
682         match self.poll_recv_frame(cx, frame) {
683             Poll::Ready(Err(kind)) => self.manage_resp_error(cx, kind),
684             Poll::Pending => {
685                 self.state = ManagerState::Send;
686                 self.next_state = ManagerState::Receive;
687                 Poll::Pending
688             }
689             Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
690         }
691     }
692 
poll_channel_closed_exit( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), DispatchErrorKind>>693     fn poll_channel_closed_exit(
694         &mut self,
695         cx: &mut Context<'_>,
696     ) -> Poll<Result<(), DispatchErrorKind>> {
697         if self.exit_with_error(cx, DispatchErrorKind::ChannelClosed) {
698             self.state = ManagerState::Send;
699             self.next_state = ManagerState::Exit(DispatchErrorKind::ChannelClosed);
700             Poll::Pending
701         } else {
702             Poll::Ready(Err(DispatchErrorKind::ChannelClosed))
703         }
704     }
705 
poll_blocked_frames(&mut self, cx: &mut Context<'_>) -> Poll<()>706     fn poll_blocked_frames(&mut self, cx: &mut Context<'_>) -> Poll<()> {
707         match self.controller.poll_blocked_message(cx, &self.input_tx) {
708             Poll::Ready(_) => {
709                 self.state = self.next_state;
710                 // Reset state.
711                 self.next_state = ManagerState::Receive;
712                 Poll::Ready(())
713             }
714             Poll::Pending => Poll::Pending,
715         }
716     }
717 
exit_with_error( &mut self, cx: &mut Context<'_>, error: DispatchErrorKind, ) -> bool718     pub(crate) fn exit_with_error(
719         &mut self,
720         cx: &mut Context<'_>,
721         error: DispatchErrorKind,
722     ) -> bool {
723         self.controller.shutdown();
724         self.req_rx.close();
725         self.controller.streams.clear_streams_states();
726 
727         let ids = self.controller.streams.get_all_unclosed_streams();
728         let mut blocked = false;
729         for stream_id in ids {
730             match self.controller.send_message_to_stream(
731                 cx,
732                 stream_id,
733                 RespMessage::OutputExit(error),
734             ) {
735                 // ignore error when going away.
736                 Poll::Ready(_) => {}
737                 Poll::Pending => {
738                     blocked = true;
739                 }
740             }
741         }
742         blocked
743     }
744 
update_window( &mut self, id: StreamId, len: u32, ) -> Result<(), DispatchErrorKind>745     pub(crate) fn update_window(
746         &mut self,
747         id: StreamId,
748         len: u32,
749     ) -> Result<(), DispatchErrorKind> {
750         self.controller
751             .streams
752             .release_conn_recv_window(len, &self.input_tx)?;
753         self.controller
754             .streams
755             .release_stream_recv_window(id, len, &self.input_tx)?;
756         Ok(())
757     }
758 }
759