• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Streams manage coroutine.
15 
16 use std::future::Future;
17 use std::pin::Pin;
18 use std::sync::{Arc, Mutex};
19 use std::task::{Context, Poll};
20 
21 use ylong_http::h2::{
22     ErrorCode, Frame, FrameFlags, Goaway, H2Error, Payload, Ping, RstStream, Setting, StreamId,
23 };
24 
25 use crate::runtime::{BoundedReceiver, UnboundedReceiver, UnboundedSender};
26 use crate::util::dispatcher::http2::{
27     DispatchErrorKind, OutputMessage, ReqMessage, RespMessage, SettingsState, SettingsSync,
28     StreamController,
29 };
30 use crate::util::h2::streams::{DataReadState, FrameRecvState, StreamEndState};
31 
32 #[derive(Copy, Clone)]
33 enum ManagerState {
34     Send,
35     Receive,
36     Exit(DispatchErrorKind),
37 }
38 
39 pub(crate) struct ConnManager {
40     state: ManagerState,
41     next_state: ManagerState,
42     // Synchronize SETTINGS frames sent by the client.
43     settings: Arc<Mutex<SettingsSync>>,
44     // channel transmitter between manager and io input.
45     input_tx: UnboundedSender<Frame>,
46     // channel receiver between manager and io output.
47     resp_rx: BoundedReceiver<OutputMessage>,
48     // channel receiver between manager and stream coroutine.
49     req_rx: UnboundedReceiver<ReqMessage>,
50     controller: StreamController,
51 }
52 
53 impl Future for ConnManager {
54     type Output = Result<(), DispatchErrorKind>;
55 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>56     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
57         let manager = self.get_mut();
58         loop {
59             match manager.state {
60                 ManagerState::Send => {
61                     if manager.poll_blocked_frames(cx).is_pending() {
62                         return Poll::Pending;
63                     }
64                 }
65                 ManagerState::Receive => {
66                     // Receives a response frame from io output.
67                     match manager.resp_rx.poll_recv(cx) {
68                         #[cfg(feature = "tokio_base")]
69                         Poll::Ready(Some(message)) => match message {
70                             OutputMessage::Output(frame) => {
71                                 if manager.poll_recv_message(cx, frame)?.is_pending() {
72                                     return Poll::Pending;
73                                 }
74                             }
75                             // io output occurs error.
76                             OutputMessage::OutputExit(e) => {
77                                 // Ever received a goaway frame
78                                 if manager.controller.go_away_error_code.is_some() {
79                                     continue;
80                                 }
81                                 // Note error returned immediately.
82                                 if manager.manage_resp_error(cx, e)?.is_pending() {
83                                     return Poll::Pending;
84                                 }
85                             }
86                         },
87                         #[cfg(feature = "ylong_base")]
88                         Poll::Ready(Ok(message)) => match message {
89                             OutputMessage::Output(frame) => {
90                                 if manager.poll_recv_message(cx, frame)?.is_pending() {
91                                     return Poll::Pending;
92                                 }
93                             }
94                             // io output occurs error.
95                             OutputMessage::OutputExit(e) => {
96                                 // Ever received a goaway frame
97                                 if manager.controller.go_away_error_code.is_some() {
98                                     continue;
99                                 }
100                                 if manager.manage_resp_error(cx, e)?.is_pending() {
101                                     return Poll::Pending;
102                                 }
103                             }
104                         },
105                         #[cfg(feature = "tokio_base")]
106                         Poll::Ready(None) => {
107                             return manager.poll_channel_closed_exit(cx);
108                         }
109                         #[cfg(feature = "ylong_base")]
110                         Poll::Ready(Err(_e)) => {
111                             return manager.poll_channel_closed_exit(cx);
112                         }
113 
114                         Poll::Pending => {
115                             // TODO manage error state.
116                             return manager.manage_pending_state(cx);
117                         }
118                     }
119                 }
120                 ManagerState::Exit(e) => return Poll::Ready(Err(e)),
121             }
122         }
123     }
124 }
125 
126 impl ConnManager {
new( settings: Arc<Mutex<SettingsSync>>, input_tx: UnboundedSender<Frame>, resp_rx: BoundedReceiver<OutputMessage>, req_rx: UnboundedReceiver<ReqMessage>, controller: StreamController, ) -> Self127     pub(crate) fn new(
128         settings: Arc<Mutex<SettingsSync>>,
129         input_tx: UnboundedSender<Frame>,
130         resp_rx: BoundedReceiver<OutputMessage>,
131         req_rx: UnboundedReceiver<ReqMessage>,
132         controller: StreamController,
133     ) -> Self {
134         Self {
135             state: ManagerState::Receive,
136             next_state: ManagerState::Receive,
137             settings,
138             input_tx,
139             resp_rx,
140             req_rx,
141             controller,
142         }
143     }
144 
manage_pending_state( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), DispatchErrorKind>>145     fn manage_pending_state(
146         &mut self,
147         cx: &mut Context<'_>,
148     ) -> Poll<Result<(), DispatchErrorKind>> {
149         // The manager previously accepted a GOAWAY Frame.
150         if let Some(error_code) = self.controller.go_away_error_code {
151             self.poll_deal_with_go_away(error_code)?;
152             return Poll::Pending;
153         }
154         self.poll_recv_request(cx)?;
155         self.poll_input_request(cx)?;
156         Poll::Pending
157     }
158 
poll_recv_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind>159     fn poll_recv_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> {
160         loop {
161             #[cfg(feature = "tokio_base")]
162             let message = match self.req_rx.poll_recv(cx) {
163                 Poll::Ready(Some(message)) => message,
164                 Poll::Ready(None) => return Err(DispatchErrorKind::ChannelClosed),
165                 Poll::Pending => break,
166             };
167             #[cfg(feature = "ylong_base")]
168             let message = match self.req_rx.poll_recv(cx) {
169                 Poll::Ready(Ok(message)) => message,
170                 Poll::Ready(Err(_e)) => return Err(DispatchErrorKind::ChannelClosed),
171                 Poll::Pending => break,
172             };
173             let id = match self.controller.streams.generate_id() {
174                 Ok(id) => id,
175                 Err(e) => {
176                     let _ = message.sender.try_send(RespMessage::OutputExit(e));
177                     break;
178                 }
179             };
180             let headers = Frame::new(id, message.request.flag, message.request.payload);
181             if self.controller.streams.reach_max_concurrency()
182                 || !self.controller.streams.is_pending_concurrency_empty()
183             {
184                 self.controller.streams.push_pending_concurrency(id)
185             } else {
186                 self.controller.streams.increase_current_concurrency();
187                 self.controller.streams.push_back_pending_send(id)
188             }
189             self.controller.senders.insert(id, message.sender);
190             self.controller
191                 .streams
192                 .insert(id, headers, message.request.data);
193         }
194         Ok(())
195     }
196 
poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind>197     fn poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> {
198         self.controller.streams.try_consume_pending_concurrency();
199         let size = self.controller.streams.pending_stream_num();
200         let mut index = 0;
201         while index < size {
202             match self.controller.streams.next_pending_stream() {
203                 None => {
204                     break;
205                 }
206                 Some(id) => {
207                     self.input_stream_frame(cx, id)?;
208                 }
209             }
210             index += 1;
211         }
212         Ok(())
213     }
214 
input_stream_frame( &mut self, cx: &mut Context<'_>, id: StreamId, ) -> Result<(), DispatchErrorKind>215     fn input_stream_frame(
216         &mut self,
217         cx: &mut Context<'_>,
218         id: StreamId,
219     ) -> Result<(), DispatchErrorKind> {
220         match self.controller.streams.headers(id)? {
221             None => {}
222             Some(header) => {
223                 let is_end_stream = header.flags().is_end_stream();
224                 self.poll_send_frame(header)?;
225                 // Prevent sending empty data frames
226                 if is_end_stream {
227                     return Ok(());
228                 }
229             }
230         }
231 
232         loop {
233             match self.controller.streams.poll_read_body(cx, id) {
234                 Ok(state) => match state {
235                     DataReadState::Closed => break,
236                     DataReadState::Pending => break,
237                     DataReadState::Ready(data) => self.poll_send_frame(data)?,
238                     DataReadState::Finish(frame) => {
239                         self.poll_send_frame(frame)?;
240                         break;
241                     }
242                 },
243                 Err(e) => return self.deal_poll_body_error(cx, e),
244             }
245         }
246         Ok(())
247     }
248 
deal_poll_body_error( &mut self, cx: &mut Context<'_>, e: H2Error, ) -> Result<(), DispatchErrorKind>249     fn deal_poll_body_error(
250         &mut self,
251         cx: &mut Context<'_>,
252         e: H2Error,
253     ) -> Result<(), DispatchErrorKind> {
254         match e {
255             H2Error::StreamError(id, code) => match self.manage_stream_error(cx, id, code) {
256                 Poll::Ready(res) => res,
257                 Poll::Pending => Ok(()),
258             },
259             H2Error::ConnectionError(e) => Err(H2Error::ConnectionError(e).into()),
260         }
261     }
262 
poll_send_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>263     fn poll_send_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
264         match frame.payload() {
265             Payload::Headers(_) => {
266                 if let FrameRecvState::Err(e) = self
267                     .controller
268                     .streams
269                     .send_headers_frame(frame.stream_id(), frame.flags().is_end_stream())
270                 {
271                     // Never return FrameRecvState::Ignore case.
272                     return Err(e.into());
273                 }
274             }
275             Payload::Data(_) => {
276                 if let FrameRecvState::Err(e) = self
277                     .controller
278                     .streams
279                     .send_data_frame(frame.stream_id(), frame.flags().is_end_stream())
280                 {
281                     // Never return FrameRecvState::Ignore case.
282                     return Err(e.into());
283                 }
284             }
285             _ => {}
286         }
287         // TODO Replace with a bounded channel to avoid excessive local memory overhead
288         // when I/O is blocked in the process of uploading large files.
289         self.input_tx
290             .send(frame)
291             .map_err(|_e| DispatchErrorKind::ChannelClosed)
292     }
293 
poll_recv_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), DispatchErrorKind>>294     fn poll_recv_frame(
295         &mut self,
296         cx: &mut Context<'_>,
297         frame: Frame,
298     ) -> Poll<Result<(), DispatchErrorKind>> {
299         match frame.payload() {
300             Payload::Settings(_settings) => {
301                 self.recv_settings_frame(frame)?;
302             }
303             Payload::Ping(_ping) => {
304                 self.recv_ping_frame(frame)?;
305             }
306             Payload::PushPromise(_) => {
307                 // TODO The current settings_enable_push setting is fixed to false.
308                 return Poll::Ready(Err(
309                     H2Error::ConnectionError(ErrorCode::ProtocolError).into()
310                 ));
311             }
312             Payload::Goaway(_go_away) => {
313                 return self.recv_go_away_frame(cx, frame).map_err(Into::into);
314             }
315             Payload::RstStream(_reset) => {
316                 return self.recv_reset_frame(cx, frame).map_err(Into::into);
317             }
318             Payload::Headers(_headers) => {
319                 return self.recv_header_frame(cx, frame).map_err(Into::into);
320             }
321             Payload::Data(_data) => {
322                 return self.recv_data_frame(cx, frame);
323             }
324             Payload::WindowUpdate(_windows) => {
325                 self.recv_window_frame(frame)?;
326             }
327             // Priority is no longer recommended, so keep it compatible but not processed.
328             Payload::Priority(_priority) => {}
329         }
330         Poll::Ready(Ok(()))
331     }
332 
recv_settings_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>333     fn recv_settings_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
334         let settings = if let Payload::Settings(settings) = frame.payload() {
335             settings
336         } else {
337             // this will not happen forever.
338             return Ok(());
339         };
340 
341         if frame.flags().is_ack() {
342             let mut connection = self.settings.lock().unwrap();
343 
344             if let SettingsState::Acknowledging(ref acknowledged) = connection.settings {
345                 for setting in acknowledged.get_settings() {
346                     if let Setting::InitialWindowSize(size) = setting {
347                         self.controller
348                             .streams
349                             .apply_recv_initial_window_size(*size);
350                     }
351                 }
352             }
353             connection.settings = SettingsState::Synced;
354             Ok(())
355         } else {
356             for setting in settings.get_settings() {
357                 if let Setting::MaxConcurrentStreams(num) = setting {
358                     self.controller.streams.apply_max_concurrent_streams(*num);
359                 }
360                 if let Setting::InitialWindowSize(size) = setting {
361                     self.controller
362                         .streams
363                         .apply_send_initial_window_size(*size)?;
364                 }
365             }
366 
367             // The reason for copying the payload is to pass information to the io input to
368             // set the frame encoder, and the input will empty the
369             // payload when it is sent
370             let ack_settings = Frame::new(
371                 frame.stream_id(),
372                 FrameFlags::new(0x1),
373                 frame.payload().clone(),
374             );
375 
376             self.input_tx
377                 .send(ack_settings)
378                 .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
379             Ok(())
380         }
381     }
382 
recv_ping_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>383     fn recv_ping_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
384         let ping = if let Payload::Ping(ping) = frame.payload() {
385             ping
386         } else {
387             // this will not happen forever.
388             return Ok(());
389         };
390         if frame.flags().is_ack() {
391             // TODO The client does not have the logic to send ping frames. Therefore, the
392             // ack ping is not processed.
393             Ok(())
394         } else {
395             self.input_tx
396                 .send(Ping::ack(ping.clone()))
397                 .map_err(|_e| DispatchErrorKind::ChannelClosed)
398         }
399     }
400 
recv_go_away_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>401     fn recv_go_away_frame(
402         &mut self,
403         cx: &mut Context<'_>,
404         frame: Frame,
405     ) -> Poll<Result<(), H2Error>> {
406         let go_away = if let Payload::Goaway(goaway) = frame.payload() {
407             goaway
408         } else {
409             // this will not happen forever.
410             return Poll::Ready(Ok(()));
411         };
412         // Prevents the current connection from generating a new stream.
413         self.controller.goaway();
414         self.req_rx.close();
415         let last_stream_id = go_away.get_last_stream_id();
416         let streams = self.controller.get_unsent_streams(last_stream_id)?;
417 
418         let error = H2Error::ConnectionError(ErrorCode::try_from(go_away.get_error_code())?);
419 
420         let mut blocked = false;
421         for stream_id in streams {
422             match self.controller.send_message_to_stream(
423                 cx,
424                 stream_id,
425                 RespMessage::OutputExit(error.into()),
426             ) {
427                 // ignore error when going away.
428                 Poll::Ready(_) => {}
429                 Poll::Pending => {
430                     blocked = true;
431                 }
432             }
433         }
434         // Exit after the allowed stream is complete.
435         self.controller.go_away_error_code = Some(go_away.get_error_code());
436         if blocked {
437             Poll::Pending
438         } else {
439             Poll::Ready(Ok(()))
440         }
441     }
442 
recv_reset_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>443     fn recv_reset_frame(
444         &mut self,
445         cx: &mut Context<'_>,
446         frame: Frame,
447     ) -> Poll<Result<(), H2Error>> {
448         match self.controller.streams.recv_remote_reset(frame.stream_id()) {
449             StreamEndState::OK => self.controller.send_message_to_stream(
450                 cx,
451                 frame.stream_id(),
452                 RespMessage::Output(frame),
453             ),
454             StreamEndState::Err(e) => Poll::Ready(Err(e)),
455             StreamEndState::Ignore => Poll::Ready(Ok(())),
456         }
457     }
458 
recv_header_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>459     fn recv_header_frame(
460         &mut self,
461         cx: &mut Context<'_>,
462         frame: Frame,
463     ) -> Poll<Result<(), H2Error>> {
464         match self
465             .controller
466             .streams
467             .recv_headers(frame.stream_id(), frame.flags().is_end_stream())
468         {
469             FrameRecvState::OK => self.controller.send_message_to_stream(
470                 cx,
471                 frame.stream_id(),
472                 RespMessage::Output(frame),
473             ),
474             FrameRecvState::Err(e) => Poll::Ready(Err(e)),
475             FrameRecvState::Ignore => Poll::Ready(Ok(())),
476         }
477     }
478 
recv_data_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), DispatchErrorKind>>479     fn recv_data_frame(
480         &mut self,
481         cx: &mut Context<'_>,
482         frame: Frame,
483     ) -> Poll<Result<(), DispatchErrorKind>> {
484         let data = if let Payload::Data(data) = frame.payload() {
485             data
486         } else {
487             // this will not happen forever.
488             return Poll::Ready(Ok(()));
489         };
490         let id = frame.stream_id();
491         let len = data.size() as u32;
492 
493         self.update_window(id, len)?;
494 
495         match self
496             .controller
497             .streams
498             .recv_data(id, frame.flags().is_end_stream())
499         {
500             FrameRecvState::OK => self
501                 .controller
502                 .send_message_to_stream(cx, frame.stream_id(), RespMessage::Output(frame))
503                 .map_err(Into::into),
504             FrameRecvState::Ignore => Poll::Ready(Ok(())),
505             FrameRecvState::Err(e) => Poll::Ready(Err(e.into())),
506         }
507     }
508 
recv_window_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>509     fn recv_window_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
510         let windows = if let Payload::WindowUpdate(windows) = frame.payload() {
511             windows
512         } else {
513             // this will not happen forever.
514             return Ok(());
515         };
516         let id = frame.stream_id();
517         let increment = windows.get_increment();
518         if id == 0 {
519             self.controller
520                 .streams
521                 .increase_conn_send_window(increment)?;
522             self.controller.streams.reassign_conn_send_window();
523         } else {
524             self.controller
525                 .streams
526                 .reassign_stream_send_window(id, increment)?;
527         }
528         Ok(())
529     }
530 
manage_resp_error( &mut self, cx: &mut Context<'_>, kind: DispatchErrorKind, ) -> Poll<Result<(), DispatchErrorKind>>531     fn manage_resp_error(
532         &mut self,
533         cx: &mut Context<'_>,
534         kind: DispatchErrorKind,
535     ) -> Poll<Result<(), DispatchErrorKind>> {
536         match kind {
537             DispatchErrorKind::H2(h2) => match h2 {
538                 H2Error::StreamError(id, code) => self.manage_stream_error(cx, id, code),
539                 H2Error::ConnectionError(code) => self.manage_conn_error(cx, code),
540             },
541             other => {
542                 let blocked = self.exit_with_error(cx, other);
543                 if blocked {
544                     self.state = ManagerState::Send;
545                     self.next_state = ManagerState::Exit(other);
546                     Poll::Pending
547                 } else {
548                     Poll::Ready(Err(other))
549                 }
550             }
551         }
552     }
553 
manage_stream_error( &mut self, cx: &mut Context<'_>, id: StreamId, code: ErrorCode, ) -> Poll<Result<(), DispatchErrorKind>>554     fn manage_stream_error(
555         &mut self,
556         cx: &mut Context<'_>,
557         id: StreamId,
558         code: ErrorCode,
559     ) -> Poll<Result<(), DispatchErrorKind>> {
560         let rest_payload = RstStream::new(code.into_code());
561         let frame = Frame::new(id, FrameFlags::empty(), Payload::RstStream(rest_payload));
562         match self.controller.streams.send_local_reset(id) {
563             StreamEndState::OK => {
564                 self.input_tx
565                     .send(frame)
566                     .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
567 
568                 match self.controller.send_message_to_stream(
569                     cx,
570                     id,
571                     RespMessage::OutputExit(DispatchErrorKind::H2(H2Error::StreamError(id, code))),
572                 ) {
573                     Poll::Ready(_) => {
574                         // error at the stream level due to early exit of the coroutine in which the
575                         // request is located, ignored to avoid manager coroutine exit.
576                         Poll::Ready(Ok(()))
577                     }
578                     Poll::Pending => {
579                         self.state = ManagerState::Send;
580                         // stream error will not cause manager exit with error(exit state). Takes
581                         // effect only if blocked.
582                         self.next_state = ManagerState::Receive;
583                         Poll::Pending
584                     }
585                 }
586             }
587             StreamEndState::Ignore => Poll::Ready(Ok(())),
588             StreamEndState::Err(e) => {
589                 // This error will never happen.
590                 Poll::Ready(Err(e.into()))
591             }
592         }
593     }
594 
manage_conn_error( &mut self, cx: &mut Context<'_>, code: ErrorCode, ) -> Poll<Result<(), DispatchErrorKind>>595     fn manage_conn_error(
596         &mut self,
597         cx: &mut Context<'_>,
598         code: ErrorCode,
599     ) -> Poll<Result<(), DispatchErrorKind>> {
600         // last_stream_id is set to 0 to ensure that all pushed streams are
601         // shutdown.
602         let go_away_payload = Goaway::new(
603             code.into_code(),
604             self.controller.streams.latest_remote_id,
605             vec![],
606         );
607         let frame = Frame::new(
608             0,
609             FrameFlags::empty(),
610             Payload::Goaway(go_away_payload.clone()),
611         );
612         // Avoid sending the same GO_AWAY frame multiple times.
613         if let Some(ref go_away) = self.controller.go_away_sync.going_away {
614             if go_away.get_error_code() == go_away_payload.get_error_code()
615                 && go_away.get_last_stream_id() == go_away_payload.get_last_stream_id()
616             {
617                 return Poll::Ready(Ok(()));
618             }
619         }
620         self.controller.go_away_sync.going_away = Some(go_away_payload);
621         self.input_tx
622             .send(frame)
623             .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
624 
625         let blocked =
626             self.exit_with_error(cx, DispatchErrorKind::H2(H2Error::ConnectionError(code)));
627 
628         if blocked {
629             self.state = ManagerState::Send;
630             self.next_state = ManagerState::Exit(H2Error::ConnectionError(code).into());
631             Poll::Pending
632         } else {
633             // TODO When current client has an error,
634             // it always sends the GO_AWAY frame at the first time and exits directly.
635             // Should we consider letting part of the unfinished stream complete?
636             Poll::Ready(Err(H2Error::ConnectionError(code).into()))
637         }
638     }
639 
poll_deal_with_go_away(&mut self, error_code: u32) -> Result<(), DispatchErrorKind>640     fn poll_deal_with_go_away(&mut self, error_code: u32) -> Result<(), DispatchErrorKind> {
641         // The client that receives GO_AWAY needs to return a GO_AWAY to the server
642         // before closed. The preceding operations before receiving the frame
643         // ensure that the connection is in the closing state.
644         if self.controller.streams.is_closed() {
645             let last_stream_id = self.controller.streams.latest_remote_id;
646             let go_away_payload = Goaway::new(error_code, last_stream_id, vec![]);
647             let frame = Frame::new(
648                 0,
649                 FrameFlags::empty(),
650                 Payload::Goaway(go_away_payload.clone()),
651             );
652 
653             self.send_peer_goaway(frame, go_away_payload, error_code)?;
654             // close connection
655             self.controller.shutdown();
656             return Err(H2Error::ConnectionError(ErrorCode::try_from(error_code)?).into());
657         }
658         Ok(())
659     }
660 
send_peer_goaway( &mut self, frame: Frame, payload: Goaway, err_code: u32, ) -> Result<(), DispatchErrorKind>661     fn send_peer_goaway(
662         &mut self,
663         frame: Frame,
664         payload: Goaway,
665         err_code: u32,
666     ) -> Result<(), DispatchErrorKind> {
667         match self.controller.go_away_sync.going_away {
668             None => {
669                 self.controller.go_away_sync.going_away = Some(payload);
670                 self.input_tx
671                     .send(frame)
672                     .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
673             }
674             Some(ref go_away) => {
675                 // Whether the same GOAWAY Frame has been sent before.
676                 if !(go_away.get_error_code() == err_code
677                     && go_away.get_last_stream_id() == self.controller.streams.latest_remote_id)
678                 {
679                     self.controller.go_away_sync.going_away = Some(payload);
680                     self.input_tx
681                         .send(frame)
682                         .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
683                 }
684             }
685         }
686         Ok(())
687     }
688 
poll_recv_message( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), DispatchErrorKind>>689     fn poll_recv_message(
690         &mut self,
691         cx: &mut Context<'_>,
692         frame: Frame,
693     ) -> Poll<Result<(), DispatchErrorKind>> {
694         match self.poll_recv_frame(cx, frame) {
695             Poll::Ready(Err(kind)) => self.manage_resp_error(cx, kind),
696             Poll::Pending => {
697                 self.state = ManagerState::Send;
698                 self.next_state = ManagerState::Receive;
699                 Poll::Pending
700             }
701             Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
702         }
703     }
704 
poll_channel_closed_exit( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), DispatchErrorKind>>705     fn poll_channel_closed_exit(
706         &mut self,
707         cx: &mut Context<'_>,
708     ) -> Poll<Result<(), DispatchErrorKind>> {
709         if self.exit_with_error(cx, DispatchErrorKind::ChannelClosed) {
710             self.state = ManagerState::Send;
711             self.next_state = ManagerState::Exit(DispatchErrorKind::ChannelClosed);
712             Poll::Pending
713         } else {
714             Poll::Ready(Err(DispatchErrorKind::ChannelClosed))
715         }
716     }
717 
poll_blocked_frames(&mut self, cx: &mut Context<'_>) -> Poll<()>718     fn poll_blocked_frames(&mut self, cx: &mut Context<'_>) -> Poll<()> {
719         match self.controller.poll_blocked_message(cx, &self.input_tx) {
720             Poll::Ready(_) => {
721                 self.state = self.next_state;
722                 // Reset state.
723                 self.next_state = ManagerState::Receive;
724                 Poll::Ready(())
725             }
726             Poll::Pending => Poll::Pending,
727         }
728     }
729 
exit_with_error( &mut self, cx: &mut Context<'_>, error: DispatchErrorKind, ) -> bool730     pub(crate) fn exit_with_error(
731         &mut self,
732         cx: &mut Context<'_>,
733         error: DispatchErrorKind,
734     ) -> bool {
735         self.controller.shutdown();
736         self.req_rx.close();
737         self.controller.streams.clear_streams_states();
738 
739         let ids = self.controller.streams.get_all_unclosed_streams();
740         let mut blocked = false;
741         for stream_id in ids {
742             match self.controller.send_message_to_stream(
743                 cx,
744                 stream_id,
745                 RespMessage::OutputExit(error),
746             ) {
747                 // ignore error when going away.
748                 Poll::Ready(_) => {}
749                 Poll::Pending => {
750                     blocked = true;
751                 }
752             }
753         }
754         blocked
755     }
756 
update_window( &mut self, id: StreamId, len: u32, ) -> Result<(), DispatchErrorKind>757     pub(crate) fn update_window(
758         &mut self,
759         id: StreamId,
760         len: u32,
761     ) -> Result<(), DispatchErrorKind> {
762         self.controller
763             .streams
764             .release_conn_recv_window(len, &self.input_tx)?;
765         self.controller
766             .streams
767             .release_stream_recv_window(id, len, &self.input_tx)?;
768         Ok(())
769     }
770 }
771