• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 pub(crate) trait Dispatcher {
15     type Handle;
16 
dispatch(&self) -> Option<Self::Handle>17     fn dispatch(&self) -> Option<Self::Handle>;
18 
is_shutdown(&self) -> bool19     fn is_shutdown(&self) -> bool;
20 }
21 
22 pub(crate) enum ConnDispatcher<S> {
23     #[cfg(feature = "http1_1")]
24     Http1(http1::Http1Dispatcher<S>),
25 
26     #[cfg(feature = "http2")]
27     Http2(http2::Http2Dispatcher<S>),
28 }
29 
30 impl<S> Dispatcher for ConnDispatcher<S> {
31     type Handle = Conn<S>;
32 
dispatch(&self) -> Option<Self::Handle>33     fn dispatch(&self) -> Option<Self::Handle> {
34         match self {
35             #[cfg(feature = "http1_1")]
36             Self::Http1(h1) => h1.dispatch().map(Conn::Http1),
37 
38             #[cfg(feature = "http2")]
39             Self::Http2(h2) => h2.dispatch().map(Conn::Http2),
40         }
41     }
42 
is_shutdown(&self) -> bool43     fn is_shutdown(&self) -> bool {
44         match self {
45             #[cfg(feature = "http1_1")]
46             Self::Http1(h1) => h1.is_shutdown(),
47 
48             #[cfg(feature = "http2")]
49             Self::Http2(h2) => h2.is_shutdown(),
50         }
51     }
52 }
53 
54 pub(crate) enum Conn<S> {
55     #[cfg(feature = "http1_1")]
56     Http1(http1::Http1Conn<S>),
57 
58     #[cfg(feature = "http2")]
59     Http2(http2::Http2Conn<S>),
60 }
61 
62 #[cfg(feature = "http1_1")]
63 pub(crate) mod http1 {
64     use std::sync::atomic::{AtomicBool, Ordering};
65     use std::sync::Arc;
66 
67     use super::{ConnDispatcher, Dispatcher};
68 
69     impl<S> ConnDispatcher<S> {
http1(io: S) -> Self70         pub(crate) fn http1(io: S) -> Self {
71             Self::Http1(Http1Dispatcher::new(io))
72         }
73     }
74 
75     /// HTTP1-based connection manager, which can dispatch connections to other
76     /// threads according to HTTP1 syntax.
77     pub(crate) struct Http1Dispatcher<S> {
78         inner: Arc<Inner<S>>,
79     }
80 
81     pub(crate) struct Inner<S> {
82         pub(crate) io: S,
83         // `occupied` indicates that the connection is occupied. Only one coroutine
84         // can get the handle at the same time. Once the handle is fetched, the flag
85         // position is true.
86         pub(crate) occupied: AtomicBool,
87         // `shutdown` indicates that the connection need to be shut down.
88         pub(crate) shutdown: AtomicBool,
89     }
90 
91     impl<S> Http1Dispatcher<S> {
new(io: S) -> Self92         pub(crate) fn new(io: S) -> Self {
93             Self {
94                 inner: Arc::new(Inner {
95                     io,
96                     occupied: AtomicBool::new(false),
97                     shutdown: AtomicBool::new(false),
98                 }),
99             }
100         }
101     }
102 
103     impl<S> Dispatcher for Http1Dispatcher<S> {
104         type Handle = Http1Conn<S>;
105 
dispatch(&self) -> Option<Self::Handle>106         fn dispatch(&self) -> Option<Self::Handle> {
107             self.inner
108                 .occupied
109                 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
110                 .ok()
111                 .map(|_| Http1Conn {
112                     inner: self.inner.clone(),
113                 })
114         }
115 
is_shutdown(&self) -> bool116         fn is_shutdown(&self) -> bool {
117             self.inner.shutdown.load(Ordering::Relaxed)
118         }
119     }
120 
121     /// Handle returned to other threads for I/O operations.
122     pub(crate) struct Http1Conn<S> {
123         pub(crate) inner: Arc<Inner<S>>,
124     }
125 
126     impl<S> Http1Conn<S> {
127         // TODO: Use `UnsafeCell` instead when `Arc::get_mut_unchecked` become stable.
raw_mut(&mut self) -> &mut S128         pub(crate) fn raw_mut(&mut self) -> &mut S {
129             // SAFETY: In the case of `HTTP1`, only one coroutine gets the handle
130             // at the same time.
131             &mut unsafe { &mut *(Arc::as_ptr(&self.inner) as *mut Inner<S>) }.io
132         }
133 
shutdown(&self)134         pub(crate) fn shutdown(&self) {
135             self.inner.shutdown.store(true, Ordering::Release);
136         }
137     }
138 
139     impl<S> Drop for Http1Conn<S> {
drop(&mut self)140         fn drop(&mut self) {
141             self.inner.occupied.store(false, Ordering::Release)
142         }
143     }
144 }
145 
146 #[cfg(feature = "http2")]
147 pub(crate) mod http2 {
148     use std::collections::{HashMap, VecDeque};
149     use std::future::Future;
150     use std::mem::take;
151     use std::pin::Pin;
152     use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
153     use std::sync::{Arc, Mutex};
154     use std::task::{Context, Poll, Waker};
155 
156     use ylong_http::error::HttpError;
157     use ylong_http::h2;
158     use ylong_http::h2::Payload::Settings;
159     use ylong_http::h2::{
160         ErrorCode, Frame, FrameDecoder, FrameEncoder, FrameFlags, FrameKind, FramesIntoIter,
161         Goaway, H2Error, Payload, RstStream, Setting, SettingsBuilder,
162     };
163 
164     use super::{ConnDispatcher, Dispatcher};
165     use crate::dispatcher::http2::StreamState::Closed;
166     use crate::error::HttpClientError;
167     use crate::util::H2Config;
168     use crate::{
169         unbounded_channel, AsyncMutex, AsyncRead, AsyncWrite, ErrorKind, MutexGuard, ReadBuf,
170         TryRecvError, UnboundedReceiver, UnboundedSender,
171     };
172 
173     impl<S> ConnDispatcher<S> {
http2(config: H2Config, io: S) -> Self174         pub(crate) fn http2(config: H2Config, io: S) -> Self {
175             Self::Http2(Http2Dispatcher::new(config, io))
176         }
177     }
178 
179     // The data type of the first Frame sent to the `StreamController`.
180     type Send2Ctrl = (Option<(u32, UnboundedSender<Frame>)>, Frame);
181 
182     const DEFAULT_MAX_STREAM_ID: u32 = u32::MAX >> 1;
183     const DEFAULT_MAX_FRAME_SIZE: usize = 2 << 13;
184     const DEFAULT_MAX_HEADER_LIST_SIZE: usize = 16 << 20;
185 
186     // HTTP2-based connection manager, which can dispatch connections to other
187     // threads according to HTTP2 syntax.
188     pub(crate) struct Http2Dispatcher<S> {
189         pub(crate) controller: Arc<StreamController<S>>,
190         pub(crate) next_stream_id: Arc<StreamId>,
191         pub(crate) sender: UnboundedSender<Send2Ctrl>,
192     }
193 
194     pub(crate) struct Http2Conn<S> {
195         // Handle id
196         pub(crate) id: u32,
197         // Sends frame to StreamController
198         pub(crate) sender: UnboundedSender<Send2Ctrl>,
199         pub(crate) stream_info: StreamInfo<S>,
200     }
201 
202     pub(crate) struct StreamInfo<S> {
203         // Stream id
204         pub(crate) id: u32,
205         pub(crate) next_stream_id: Arc<StreamId>,
206         // Receive the Response frame transmitted from the StreamController
207         pub(crate) receiver: FrameReceiver,
208         // Used to handle TCP Stream
209         pub(crate) controller: Arc<StreamController<S>>,
210     }
211 
212     pub(crate) struct StreamController<S> {
213         // I/O unavailability flag, which prevents the upper layer from using this I/O to create
214         // new streams.
215         pub(crate) io_shutdown: AtomicBool,
216         // Indicates that the dispatcher is occupied. At this time, a user coroutine is already
217         // acting as the dispatcher.
218         pub(crate) occupied: AtomicU32,
219         pub(crate) dispatcher_invalid: AtomicBool,
220         pub(crate) manager: AsyncMutex<IoManager<S>>,
221         pub(crate) stream_waker: Mutex<StreamWaker>,
222     }
223 
224     pub(crate) struct StreamWaker {
225         waker: HashMap<u32, Waker>,
226     }
227 
228     pub(crate) struct IoManager<S> {
229         inner: Inner<S>,
230         senders: HashMap<u32, UnboundedSender<Frame>>,
231         frame_receiver: UnboundedReceiver<Send2Ctrl>,
232         streams: Streams,
233         frame_iter: FrameIter,
234         connection_frame: ConnectionFrames,
235     }
236 
237     #[derive(Default)]
238     pub(crate) struct FrameIter {
239         iter: Option<FramesIntoIter>,
240     }
241 
242     pub(crate) struct Streams {
243         stream_to_send: VecDeque<u32>,
244         buffer: HashMap<u32, StreamBuffer>,
245     }
246 
247     pub(crate) struct StreamBuffer {
248         state: StreamState,
249         frames: VecDeque<Frame>,
250     }
251 
252     pub(crate) struct Inner<S> {
253         pub(crate) io: S,
254         pub(crate) encoder: FrameEncoder,
255         pub(crate) decoder: FrameDecoder,
256     }
257 
258     pub(crate) enum ReadState {
259         EmptyIo,
260         CurrentStream,
261     }
262 
263     enum DispatchState {
264         Partial,
265         Finish,
266     }
267 
268     #[derive(Clone)]
269     pub(crate) enum ResetReason {
270         Local,
271         Remote,
272         Goaway(u32),
273     }
274 
275     #[derive(Clone)]
276     pub(crate) enum SettingsSync {
277         Send(h2::Settings),
278         Acknowledging(h2::Settings),
279         Synced,
280     }
281 
282     pub(crate) struct StreamId {
283         // TODO Determine the maximum value of id.
284         next_id: AtomicU32,
285     }
286 
287     // TODO Add "open", "half-closed", "reserved" state
288     #[derive(Clone)]
289     pub(crate) enum StreamState {
290         Idle,
291         Closed(ResetReason),
292     }
293 
294     #[derive(Default)]
295     pub(crate) struct FrameReceiver {
296         receiver: Option<UnboundedReceiver<Frame>>,
297     }
298 
299     impl<S> StreamController<S> {
new( inner: Inner<S>, frame_receiver: UnboundedReceiver<Send2Ctrl>, connection_frame: ConnectionFrames, ) -> Self300         pub(crate) fn new(
301             inner: Inner<S>,
302             frame_receiver: UnboundedReceiver<Send2Ctrl>,
303             connection_frame: ConnectionFrames,
304         ) -> Self {
305             let manager = IoManager::new(inner, frame_receiver, connection_frame);
306             Self {
307                 io_shutdown: AtomicBool::new(false),
308                 // 0 means io is not occupied
309                 occupied: AtomicU32::new(0),
310                 dispatcher_invalid: AtomicBool::new(false),
311                 manager: AsyncMutex::new(manager),
312                 stream_waker: Mutex::new(StreamWaker::new()),
313             }
314         }
315 
shutdown(&self)316         pub(crate) fn shutdown(&self) {
317             self.io_shutdown.store(true, Ordering::Release);
318         }
319 
invalid(&self)320         pub(crate) fn invalid(&self) {
321             self.dispatcher_invalid.store(true, Ordering::Release);
322         }
323     }
324 
325     impl Streams {
new() -> Self326         pub(crate) fn new() -> Self {
327             Self {
328                 stream_to_send: VecDeque::new(),
329                 buffer: HashMap::new(),
330             }
331         }
332 
size(&self) -> usize333         pub(crate) fn size(&self) -> usize {
334             self.stream_to_send.len()
335         }
336 
insert(&mut self, frame: Frame)337         pub(crate) fn insert(&mut self, frame: Frame) {
338             let id = frame.stream_id() as u32;
339             self.stream_to_send.push_back(id);
340             match self.buffer.get_mut(&id) {
341                 Some(sender) => {
342                     sender.push_back(frame);
343                 }
344                 None => {
345                     let mut sender = StreamBuffer::new();
346                     sender.push_back(frame);
347                     self.buffer.insert(id, sender);
348                 }
349             }
350         }
351 
get_goaway_streams( &mut self, last_stream_id: u32, ) -> Result<Vec<u32>, H2Error>352         pub(crate) fn get_goaway_streams(
353             &mut self,
354             last_stream_id: u32,
355         ) -> Result<Vec<u32>, H2Error> {
356             let mut ids = vec![];
357             for (id, sender) in self.buffer.iter_mut() {
358                 if *id >= last_stream_id {
359                     ids.push(*id);
360                     sender.go_away(*id)?;
361                 }
362             }
363             Ok(ids)
364         }
365 
recv_local_reset(&mut self, id: u32) -> Result<(), H2Error>366         pub(crate) fn recv_local_reset(&mut self, id: u32) -> Result<(), H2Error> {
367             match self.buffer.get_mut(&id) {
368                 None => Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
369                 Some(sender) => {
370                     match sender.state {
371                         Closed(ResetReason::Remote | ResetReason::Local) => {}
372                         _ => {
373                             sender.state = Closed(ResetReason::Local);
374                         }
375                     }
376                     Ok(())
377                 }
378             }
379         }
380 
recv_remote_reset(&mut self, id: u32) -> Result<(), H2Error>381         pub(crate) fn recv_remote_reset(&mut self, id: u32) -> Result<(), H2Error> {
382             match self.buffer.get_mut(&id) {
383                 None => Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
384                 Some(sender) => {
385                     match sender.state {
386                         Closed(ResetReason::Remote) => {}
387                         _ => {
388                             sender.state = Closed(ResetReason::Remote);
389                         }
390                     }
391                     Ok(())
392                 }
393             }
394         }
395 
396         // TODO At present, only the state is changed to closed, and other states are
397         // not involved, and it needs to be added later
recv_headers(&mut self, id: u32) -> Result<StreamState, H2Error>398         pub(crate) fn recv_headers(&mut self, id: u32) -> Result<StreamState, H2Error> {
399             match self.buffer.get_mut(&id) {
400                 None => Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
401                 Some(sender) => match sender.state {
402                     Closed(ResetReason::Goaway(last_id)) => {
403                         if id > last_id {
404                             return Err(H2Error::ConnectionError(ErrorCode::StreamClosed));
405                         }
406                         Ok(sender.state.clone())
407                     }
408                     Closed(ResetReason::Remote) => {
409                         Err(H2Error::ConnectionError(ErrorCode::StreamClosed))
410                     }
411                     _ => Ok(sender.state.clone()),
412                 },
413             }
414         }
415 
recv_data(&mut self, id: u32) -> Result<StreamState, H2Error>416         pub(crate) fn recv_data(&mut self, id: u32) -> Result<StreamState, H2Error> {
417             self.recv_headers(id)
418         }
419 
pop_front(&mut self) -> Result<Option<Frame>, H2Error>420         pub(crate) fn pop_front(&mut self) -> Result<Option<Frame>, H2Error> {
421             match self.stream_to_send.pop_front() {
422                 None => Ok(None),
423                 Some(id) => {
424                     // TODO Subsequent consideration is to delete the corresponding elements in the
425                     // map after the status becomes Closed
426                     match self.buffer.get_mut(&id) {
427                         None => Err(H2Error::ConnectionError(ErrorCode::IntervalError)),
428                         Some(sender) => {
429                             // TODO For the time being, match state is used here, and the complete
430                             // logic should be judged based on the frame type and state
431                             match sender.state {
432                                 Closed(ResetReason::Remote | ResetReason::Local) => Ok(None),
433                                 _ => Ok(sender.pop_front()),
434                             }
435                         }
436                     }
437                 }
438             }
439         }
440     }
441 
442     impl StreamBuffer {
push_back(&mut self, frame: Frame)443         pub(crate) fn push_back(&mut self, frame: Frame) {
444             self.frames.push_back(frame);
445         }
446 
pop_front(&mut self) -> Option<Frame>447         pub(crate) fn pop_front(&mut self) -> Option<Frame> {
448             self.frames.pop_front()
449         }
450 
new() -> Self451         pub(crate) fn new() -> Self {
452             Self {
453                 state: StreamState::Idle,
454                 frames: VecDeque::new(),
455             }
456         }
457 
go_away(&mut self, last_stream_id: u32) -> Result<(), H2Error>458         pub(crate) fn go_away(&mut self, last_stream_id: u32) -> Result<(), H2Error> {
459             match self.state {
460                 Closed(ResetReason::Local | ResetReason::Remote) => {}
461                 Closed(ResetReason::Goaway(id)) => {
462                     if last_stream_id > id {
463                         return Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
464                     }
465                     self.state = Closed(ResetReason::Goaway(last_stream_id));
466                 }
467                 _ => {
468                     self.state = Closed(ResetReason::Goaway(last_stream_id));
469                 }
470             }
471             Ok(())
472         }
473     }
474 
475     impl SettingsSync {
ack_settings() -> Frame476         pub(crate) fn ack_settings() -> Frame {
477             Frame::new(0, FrameFlags::new(0x1), Settings(h2::Settings::new(vec![])))
478         }
479     }
480 
481     pub(crate) struct ConnectionFrames {
482         preface: bool,
483         settings: SettingsSync,
484     }
485 
486     impl ConnectionFrames {
new(settings: h2::Settings) -> Self487         pub(crate) fn new(settings: h2::Settings) -> Self {
488             Self {
489                 preface: true,
490                 settings: SettingsSync::Send(settings),
491             }
492         }
493     }
494 
495     impl StreamWaker {
new() -> Self496         pub(crate) fn new() -> Self {
497             Self {
498                 waker: HashMap::new(),
499             }
500         }
501     }
502 
503     impl<S> IoManager<S> {
new( inner: Inner<S>, frame_receiver: UnboundedReceiver<Send2Ctrl>, connection_frame: ConnectionFrames, ) -> Self504         pub(crate) fn new(
505             inner: Inner<S>,
506             frame_receiver: UnboundedReceiver<Send2Ctrl>,
507             connection_frame: ConnectionFrames,
508         ) -> Self {
509             Self {
510                 inner,
511                 senders: HashMap::new(),
512                 frame_receiver,
513                 streams: Streams::new(),
514                 frame_iter: FrameIter::default(),
515                 connection_frame,
516             }
517         }
518 
close_frame_receiver(&mut self)519         fn close_frame_receiver(&mut self) {
520             self.frame_receiver.close()
521         }
522     }
523 
524     impl FrameIter {
is_empty(&self) -> bool525         pub(crate) fn is_empty(&self) -> bool {
526             self.iter.is_none()
527         }
528     }
529 
530     impl StreamId {
stream_id_generate(&self) -> u32531         fn stream_id_generate(&self) -> u32 {
532             self.next_id.fetch_add(2, Ordering::Relaxed)
533         }
534 
get_next_id(&self) -> u32535         fn get_next_id(&self) -> u32 {
536             self.next_id.load(Ordering::Relaxed)
537         }
538     }
539 
540     impl<S> Http2Dispatcher<S> {
new(config: H2Config, io: S) -> Self541         pub(crate) fn new(config: H2Config, io: S) -> Self {
542             // send_preface(&mut io).await?;
543 
544             let connection_frames = build_connection_frames(config);
545             let inner = Inner {
546                 io,
547                 encoder: FrameEncoder::new(DEFAULT_MAX_FRAME_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE),
548                 decoder: FrameDecoder::new(),
549             };
550 
551             // For each stream to send the frame to the controller
552             let (tx, rx) = unbounded_channel::<Send2Ctrl>();
553 
554             let stream_controller = Arc::new(StreamController::new(inner, rx, connection_frames));
555 
556             // The id of the client stream, starting from 1
557             let next_stream_id = StreamId {
558                 next_id: AtomicU32::new(1),
559             };
560             Self {
561                 controller: stream_controller,
562                 sender: tx,
563                 next_stream_id: Arc::new(next_stream_id),
564             }
565         }
566     }
567 
568     impl<S> Dispatcher for Http2Dispatcher<S> {
569         type Handle = Http2Conn<S>;
570 
571         // Call this method to get a stream
dispatch(&self) -> Option<Self::Handle>572         fn dispatch(&self) -> Option<Self::Handle> {
573             let id = self.next_stream_id.stream_id_generate();
574             // TODO Consider how to create a new connection and transfer state
575             if id > DEFAULT_MAX_STREAM_ID {
576                 return None;
577             }
578             let controller = self.controller.clone();
579             let sender = self.sender.clone();
580             let handle = Http2Conn::new(id, self.next_stream_id.clone(), sender, controller);
581             Some(handle)
582         }
583 
584         // TODO When the stream id reaches the maximum value, shutdown the current
585         // connection
is_shutdown(&self) -> bool586         fn is_shutdown(&self) -> bool {
587             self.controller.io_shutdown.load(Ordering::Relaxed)
588         }
589     }
590 
591     impl<S> Http2Conn<S> {
new( id: u32, next_stream_id: Arc<StreamId>, sender: UnboundedSender<Send2Ctrl>, controller: Arc<StreamController<S>>, ) -> Self592         pub(crate) fn new(
593             id: u32,
594             next_stream_id: Arc<StreamId>,
595             sender: UnboundedSender<Send2Ctrl>,
596             controller: Arc<StreamController<S>>,
597         ) -> Self {
598             let stream_info = StreamInfo {
599                 id,
600                 next_stream_id,
601                 receiver: FrameReceiver::default(),
602                 controller,
603             };
604             Self {
605                 id,
606                 sender,
607                 stream_info,
608             }
609         }
610 
send_frame_to_controller( &mut self, frame: Frame, ) -> Result<(), HttpClientError>611         pub(crate) fn send_frame_to_controller(
612             &mut self,
613             frame: Frame,
614         ) -> Result<(), HttpClientError> {
615             if self.stream_info.receiver.is_none() {
616                 let (tx, rx) = unbounded_channel::<Frame>();
617                 self.stream_info.receiver.set_receiver(rx);
618                 self.sender.send((Some((self.id, tx)), frame)).map_err(|_| {
619                     HttpClientError::new_with_cause(
620                         ErrorKind::Request,
621                         Some(String::from("resend")),
622                     )
623                 })
624             } else {
625                 self.sender.send((None, frame)).map_err(|_| {
626                     HttpClientError::new_with_cause(
627                         ErrorKind::Request,
628                         Some(String::from("resend")),
629                     )
630                 })
631             }
632         }
633     }
634 
635     impl<S: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static> Future for StreamInfo<S> {
636         type Output = Result<Frame, HttpError>;
637 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>638         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
639             let stream_info = self.get_mut();
640 
641             // First, check whether the frame of the current stream is in the Channel.
642             // The error cannot occur. Therefore, the error is thrown directly without
643             // connection-level processing.
644             if let Some(frame) = stream_info.receiver.recv_frame(stream_info.id)? {
645                 {
646                     let mut stream_waker = stream_info
647                         .controller
648                         .stream_waker
649                         .lock()
650                         .expect("Blocking get waker lock failed! ");
651                     //
652                     wakeup_next_stream(&mut stream_waker.waker);
653                 }
654                 return Poll::Ready(Ok(frame));
655             }
656 
657             // If the dispatcher sends a goaway frame, all streams on the current connection
658             // are unavailable.
659             if stream_info
660                 .controller
661                 .dispatcher_invalid
662                 .load(Ordering::Relaxed)
663             {
664                 return Poll::Ready(Err(H2Error::ConnectionError(ErrorCode::ConnectError).into()));
665             }
666 
667             // The error cannot occur. Therefore, the error is thrown directly without
668             // connection-level processing.
669             if is_io_available(&stream_info.controller.occupied, stream_info.id)? {
670                 {
671                     // Second, try to get io and read the frame of the current stream from io.
672                     if let Ok(mut io_manager) = stream_info.controller.manager.try_lock() {
673                         if stream_info
674                             .poll_match_result(cx, &mut io_manager)?
675                             .is_pending()
676                         {
677                             return Poll::Pending;
678                         }
679                     }
680                 }
681                 {
682                     let mut stream_waker = stream_info
683                         .controller
684                         .stream_waker
685                         .lock()
686                         .expect("Blocking get waker lock failed! ");
687                     wakeup_next_stream(&mut stream_waker.waker);
688                 }
689                 // The error cannot occur. Therefore, the error is thrown directly without
690                 // connection-level processing.
691                 let frame_opt = get_frame(stream_info.receiver.recv_frame(stream_info.id)?);
692                 return Poll::Ready(frame_opt);
693             }
694 
695             {
696                 let mut io_manager = {
697                     // Third, wait to acquire the lock of waker, which is used to insert the current
698                     // waker, and wait to be awakened by the io stream.
699                     let mut stream_waker = stream_info
700                         .controller
701                         .stream_waker
702                         .lock()
703                         .expect("Blocking get waker lock failed! ");
704 
705                     // Fourth, after obtaining the waker lock,
706                     // you need to check the Receiver again to prevent the Receiver from receiving a
707                     // frame while waiting for the waker. The error cannot
708                     // occur. Therefore, the error is thrown directly without connection-level
709                     // processing.
710                     if let Some(frame) = stream_info.receiver.recv_frame(stream_info.id)? {
711                         wakeup_next_stream(&mut stream_waker.waker);
712                         return Poll::Ready(Ok(frame));
713                     }
714 
715                     // The error cannot occur. Therefore, the error is thrown directly without
716                     // connection-level processing.
717                     if is_io_available(&stream_info.controller.occupied, stream_info.id)? {
718                         // Fifth, get io again to prevent no other streams from controlling io while
719                         // waiting for the waker, leaving only the current
720                         // stream.
721                         match stream_info.controller.manager.try_lock() {
722                             Ok(guard) => guard,
723                             _ => {
724                                 stream_waker
725                                     .waker
726                                     .insert(stream_info.id, cx.waker().clone());
727                                 return Poll::Pending;
728                             }
729                         }
730                     } else {
731                         stream_waker
732                             .waker
733                             .insert(stream_info.id, cx.waker().clone());
734                         return Poll::Pending;
735                     }
736                 };
737                 if stream_info
738                     .poll_match_result(cx, &mut io_manager)?
739                     .is_pending()
740                 {
741                     return Poll::Pending;
742                 }
743             }
744             {
745                 {
746                     let mut stream_waker = stream_info
747                         .controller
748                         .stream_waker
749                         .lock()
750                         .expect("Blocking get waker lock failed! ");
751                     wakeup_next_stream(&mut stream_waker.waker);
752                 }
753                 // The error cannot occur. Therefore, the error is thrown directly without
754                 // connection-level processing.
755                 let frame_opt = get_frame(stream_info.receiver.recv_frame(stream_info.id)?);
756                 Poll::Ready(frame_opt)
757             }
758         }
759     }
760 
761     impl<S: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static> StreamInfo<S> {
poll_match_result( &self, cx: &mut Context<'_>, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<(), HttpError>>762         fn poll_match_result(
763             &self,
764             cx: &mut Context<'_>,
765             io_manager: &mut MutexGuard<IoManager<S>>,
766         ) -> Poll<Result<(), HttpError>> {
767             loop {
768                 match self.poll_io(cx, io_manager) {
769                     Poll::Ready(Ok(_)) => {
770                         return Poll::Ready(Ok(()));
771                     }
772                     Poll::Ready(Err(h2_error)) => {
773                         match h2_error {
774                             H2Error::StreamError(id, code) => {
775                                 let rest_payload = RstStream::new(code.clone().into_code());
776                                 let frame = Frame::new(
777                                     id as usize,
778                                     FrameFlags::empty(),
779                                     Payload::RstStream(rest_payload),
780                                 );
781                                 io_manager.streams.recv_local_reset(id)?;
782                                 if self
783                                     .poll_send_reset(cx, frame.clone(), io_manager)?
784                                     .is_pending()
785                                 {
786                                     compare_exchange_occupation(
787                                         &self.controller.occupied,
788                                         0,
789                                         self.id,
790                                     )?;
791                                     return Poll::Pending;
792                                 }
793                                 if self.id == id {
794                                     return Poll::Ready(Err(H2Error::StreamError(id, code).into()));
795                                 } else {
796                                     self.controller_send_frame_to_stream(id, frame, io_manager);
797                                     {
798                                         let mut stream_waker = self
799                                             .controller
800                                             .stream_waker
801                                             .lock()
802                                             .expect("Blocking get waker lock failed! ");
803                                         // TODO Is there a situation where the result has been
804                                         // returned, but the waker has not been inserted into the
805                                         // map? how to deal with.
806                                         if let Some(waker) = stream_waker.waker.remove(&id) {
807                                             waker.wake();
808                                         }
809                                     }
810                                 }
811                             }
812                             H2Error::ConnectionError(code) => {
813                                 io_manager.close_frame_receiver();
814                                 self.controller.shutdown();
815                                 // Since ConnectError may be caused by an io error, so when the
816                                 // client actively sends a goaway
817                                 // frame, all streams are shut down and no streams are allowed to
818                                 // complete. TODO Then consider
819                                 // separating io errors from frame errors to allow streams whose
820                                 // stream id is less than last_stream_id to continue
821                                 self.controller.invalid();
822                                 // last_stream_id is set to 0 to ensure that all streams are
823                                 // shutdown.
824                                 let goaway_payload =
825                                     Goaway::new(code.clone().into_code(), 0, vec![]);
826                                 let frame = Frame::new(
827                                     0,
828                                     FrameFlags::empty(),
829                                     Payload::Goaway(goaway_payload),
830                                 );
831                                 // io_manager.connection_frame.going_away(frame);
832                                 if self
833                                     .poll_send_go_away(cx, frame.clone(), io_manager)?
834                                     .is_pending()
835                                 {
836                                     compare_exchange_occupation(
837                                         &self.controller.occupied,
838                                         0,
839                                         self.id,
840                                     )?;
841                                     return Poll::Pending;
842                                 }
843 
844                                 self.goaway_unsent_stream(io_manager, 0, frame)?;
845                                 self.goaway_and_shutdown();
846                                 return Poll::Ready(Err(H2Error::ConnectionError(code).into()));
847                             }
848                         }
849                     }
850                     Poll::Pending => {
851                         compare_exchange_occupation(&self.controller.occupied, 0, self.id)?;
852                         return Poll::Pending;
853                     }
854                 }
855             }
856         }
857 
poll_io( &self, cx: &mut Context<'_>, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<(), H2Error>>858         fn poll_io(
859             &self,
860             cx: &mut Context<'_>,
861             io_manager: &mut MutexGuard<IoManager<S>>,
862         ) -> Poll<Result<(), H2Error>> {
863             if self.poll_send_preface(cx, io_manager)?.is_pending() {
864                 return Poll::Pending;
865             }
866             if self.poll_send_settings(cx, io_manager)?.is_pending() {
867                 return Poll::Pending;
868             }
869             match self.poll_dispatch_frame(cx, io_manager)? {
870                 Poll::Ready(state) => {
871                     if let DispatchState::Partial = state {
872                         return Poll::Ready(Ok(()));
873                     }
874                 }
875                 Poll::Pending => {
876                     return Poll::Pending;
877                 }
878             }
879             // Write and read frames to io in a loop until the frame of the current stream
880             // is read and exit the loop.
881             loop {
882                 if self.poll_write_frame(cx, io_manager)?.is_pending() {
883                     return Poll::Pending;
884                 }
885                 match self.poll_read_frame(cx, io_manager)? {
886                     Poll::Ready(ReadState::EmptyIo) => {}
887                     Poll::Ready(ReadState::CurrentStream) => {
888                         return Poll::Ready(Ok(()));
889                     }
890                     Poll::Pending => {
891                         return Poll::Pending;
892                     }
893                 }
894             }
895         }
896 
poll_dispatch_frame( &self, cx: &mut Context<'_>, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<DispatchState, H2Error>>897         fn poll_dispatch_frame(
898             &self,
899             cx: &mut Context<'_>,
900             io_manager: &mut MutexGuard<IoManager<S>>,
901         ) -> Poll<Result<DispatchState, H2Error>> {
902             if io_manager.frame_iter.is_empty() {
903                 return Poll::Ready(Ok(DispatchState::Finish));
904             }
905             let iter_option = take(&mut io_manager.frame_iter.iter);
906             match iter_option {
907                 None => Poll::Ready(Err(H2Error::ConnectionError(ErrorCode::IntervalError))),
908                 Some(iter) => self.dispatch_read_frames(cx, io_manager, iter),
909             }
910         }
911 
poll_send_preface( &self, cx: &mut Context<'_>, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<(), H2Error>>912         fn poll_send_preface(
913             &self,
914             cx: &mut Context<'_>,
915             io_manager: &mut MutexGuard<IoManager<S>>,
916         ) -> Poll<Result<(), H2Error>> {
917             const PREFACE_MSG: &str = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
918             if io_manager.connection_frame.preface {
919                 let mut buf = [0u8; PREFACE_MSG.len()];
920                 buf.copy_from_slice(PREFACE_MSG.as_bytes());
921 
922                 let mut start_index = 0;
923                 loop {
924                     if start_index == PREFACE_MSG.len() {
925                         io_manager.connection_frame.preface = false;
926                         break;
927                     }
928                     match Pin::new(&mut io_manager.inner.io)
929                         .poll_write(cx, &buf[start_index..])
930                         .map_err(|_| H2Error::ConnectionError(ErrorCode::IntervalError))?
931                     {
932                         Poll::Ready(written) => {
933                             start_index += written;
934                         }
935                         Poll::Pending => {
936                             return Poll::Pending;
937                         }
938                     }
939                 }
940                 return poll_flush_io(cx, &mut io_manager.inner);
941             }
942             Poll::Ready(Ok(()))
943         }
944 
poll_send_go_away( &self, cx: &mut Context<'_>, goaway: Frame, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<(), H2Error>>945         fn poll_send_go_away(
946             &self,
947             cx: &mut Context<'_>,
948             goaway: Frame,
949             io_manager: &mut MutexGuard<IoManager<S>>,
950         ) -> Poll<Result<(), H2Error>> {
951             let mut buf = [0u8; 1024];
952             if write_frame_to_io(cx, &mut buf, goaway, &mut io_manager.inner)?.is_pending() {
953                 Poll::Pending
954             } else {
955                 poll_flush_io(cx, &mut io_manager.inner)
956             }
957         }
958 
poll_send_reset( &self, cx: &mut Context<'_>, reset: Frame, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<(), H2Error>>959         fn poll_send_reset(
960             &self,
961             cx: &mut Context<'_>,
962             reset: Frame,
963             io_manager: &mut MutexGuard<IoManager<S>>,
964         ) -> Poll<Result<(), H2Error>> {
965             let mut buf = [0u8; 1024];
966             if write_frame_to_io(cx, &mut buf, reset, &mut io_manager.inner)?.is_pending() {
967                 Poll::Pending
968             } else {
969                 poll_flush_io(cx, &mut io_manager.inner)
970             }
971         }
972 
poll_send_settings( &self, cx: &mut Context<'_>, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<(), H2Error>>973         fn poll_send_settings(
974             &self,
975             cx: &mut Context<'_>,
976             io_manager: &mut MutexGuard<IoManager<S>>,
977         ) -> Poll<Result<(), H2Error>> {
978             if let SettingsSync::Send(settings) = io_manager.connection_frame.settings.clone() {
979                 let mut buf = [0u8; 1024];
980                 let frame = Frame::new(0, FrameFlags::empty(), Settings(settings.clone()));
981                 if write_frame_to_io(cx, &mut buf, frame, &mut io_manager.inner)?.is_pending() {
982                     Poll::Pending
983                 } else {
984                     io_manager.connection_frame.settings = SettingsSync::Acknowledging(settings);
985                     poll_flush_io(cx, &mut io_manager.inner)
986                 }
987             } else {
988                 Poll::Ready(Ok(()))
989             }
990         }
991 
poll_write_frame( &self, cx: &mut Context<'_>, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<(), H2Error>>992         fn poll_write_frame(
993             &self,
994             cx: &mut Context<'_>,
995             io_manager: &mut MutexGuard<IoManager<S>>,
996         ) -> Poll<Result<(), H2Error>> {
997             const FRAME_WRITE_NUM: usize = 10;
998 
999             // Send 10 frames each time, if there is not enough in the queue, read enough
1000             // from mpsc::Receiver
1001             while io_manager.streams.size() < FRAME_WRITE_NUM {
1002                 match io_manager.frame_receiver.try_recv() {
1003                     // The Frame sent by the Handle for the first time will carry a Sender at the
1004                     // same time, which is used to send the Response Frame back
1005                     // to the Handle
1006                     Ok((Some((id, sender)), frame)) => {
1007                         if io_manager.senders.insert(id, sender).is_some() {
1008                             return Poll::Ready(Err(H2Error::ConnectionError(
1009                                 ErrorCode::IntervalError,
1010                             )));
1011                         }
1012                         io_manager.streams.insert(frame);
1013                     }
1014                     Ok((None, frame)) => {
1015                         io_manager.streams.insert(frame);
1016                     }
1017                     Err(TryRecvError::Empty) => {
1018                         break;
1019                     }
1020                     Err(TryRecvError::Disconnected) => {
1021                         return Poll::Ready(Err(H2Error::ConnectionError(ErrorCode::ConnectError)))
1022                     }
1023                 }
1024             }
1025             let mut buf = [0u8; 1024];
1026             for _i in 0..FRAME_WRITE_NUM {
1027                 match io_manager.streams.pop_front()? {
1028                     Some(frame) => {
1029                         if write_frame_to_io(cx, &mut buf, frame, &mut io_manager.inner)?
1030                             .is_pending()
1031                         {
1032                             return Poll::Pending;
1033                         }
1034                     }
1035                     None => {
1036                         break;
1037                     }
1038                 }
1039             }
1040             poll_flush_io(cx, &mut io_manager.inner)
1041         }
1042 
poll_read_frame( &self, cx: &mut Context<'_>, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<ReadState, H2Error>>1043         fn poll_read_frame(
1044             &self,
1045             cx: &mut Context<'_>,
1046             io_manager: &mut MutexGuard<IoManager<S>>,
1047         ) -> Poll<Result<ReadState, H2Error>> {
1048             // Read all the frames in io until the frame of the current stream is read and
1049             // stop.
1050             let mut buf = [0u8; 1024];
1051             loop {
1052                 let mut read_buf = ReadBuf::new(&mut buf);
1053                 match Pin::new(&mut io_manager.inner.io).poll_read(cx, &mut read_buf) {
1054                     Poll::Ready(Err(_)) => {
1055                         return Poll::Ready(Err(H2Error::ConnectionError(ErrorCode::ConnectError)))
1056                     }
1057                     Poll::Pending => {
1058                         return Poll::Pending;
1059                     }
1060                     _ => {}
1061                 }
1062                 let read = read_buf.filled().len();
1063                 if read == 0 {
1064                     break;
1065                 }
1066                 let frames = io_manager.inner.decoder.decode(&buf[..read])?;
1067                 let frame_iterator = frames.into_iter();
1068 
1069                 match self.dispatch_read_frames(cx, io_manager, frame_iterator)? {
1070                     Poll::Ready(state) => {
1071                         if let DispatchState::Partial = state {
1072                             return Poll::Ready(Ok(ReadState::CurrentStream));
1073                         }
1074                     }
1075                     Poll::Pending => {
1076                         return Poll::Pending;
1077                     }
1078                 }
1079             }
1080             Poll::Ready(Ok(ReadState::EmptyIo))
1081         }
1082 
dispatch_read_frames( &self, cx: &mut Context<'_>, io_manager: &mut MutexGuard<IoManager<S>>, mut frame_iterator: FramesIntoIter, ) -> Poll<Result<DispatchState, H2Error>>1083         fn dispatch_read_frames(
1084             &self,
1085             cx: &mut Context<'_>,
1086             io_manager: &mut MutexGuard<IoManager<S>>,
1087             mut frame_iterator: FramesIntoIter,
1088         ) -> Poll<Result<DispatchState, H2Error>> {
1089             let mut meet_this = false;
1090             loop {
1091                 match frame_iterator.next() {
1092                     None => break,
1093                     Some(frame_kind) => {
1094                         if let FrameKind::Complete(frame) = frame_kind {
1095                             match frame.payload() {
1096                                 Settings(settings) => {
1097                                     if self
1098                                         .recv_settings_frame(
1099                                             cx,
1100                                             io_manager,
1101                                             frame.flags().is_ack(),
1102                                             settings,
1103                                         )?
1104                                         .is_pending()
1105                                     {
1106                                         return Poll::Pending;
1107                                     }
1108                                     continue;
1109                                 }
1110                                 Payload::Ping(ping) => {
1111                                     if self
1112                                         .recv_ping_frame(
1113                                             cx,
1114                                             io_manager,
1115                                             frame.flags().is_ack(),
1116                                             ping,
1117                                         )?
1118                                         .is_pending()
1119                                     {
1120                                         return Poll::Pending;
1121                                     }
1122                                     continue;
1123                                 }
1124                                 Payload::PushPromise(_) => {
1125                                     // TODO The current settings_enable_push is fixed to false
1126                                     return Poll::Ready(Err(H2Error::ConnectionError(
1127                                         ErrorCode::ProtocolError,
1128                                     )));
1129                                 }
1130                                 Payload::Goaway(goaway) => {
1131                                     // shutdown io,prevent the creation of new stream
1132                                     self.controller.shutdown();
1133                                     io_manager.close_frame_receiver();
1134                                     let last_stream_id = goaway.get_last_stream_id();
1135                                     if self.next_stream_id.get_next_id() as usize <= last_stream_id
1136                                     {
1137                                         return Poll::Ready(Err(H2Error::ConnectionError(
1138                                             ErrorCode::ProtocolError,
1139                                         )));
1140                                     }
1141                                     self.goaway_unsent_stream(
1142                                         io_manager,
1143                                         last_stream_id as u32,
1144                                         frame.clone(),
1145                                     )?;
1146                                     continue;
1147                                 }
1148                                 Payload::RstStream(_reset) => {
1149                                     io_manager
1150                                         .streams
1151                                         .recv_remote_reset(frame.stream_id() as u32)?;
1152                                 }
1153                                 Payload::Headers(_headers) => {
1154                                     if let Closed(ResetReason::Local) =
1155                                         io_manager.streams.recv_headers(frame.stream_id() as u32)?
1156                                     {
1157                                         continue;
1158                                     }
1159                                 }
1160                                 Payload::Data(_data) => {
1161                                     if let Closed(ResetReason::Local) =
1162                                         io_manager.streams.recv_data(frame.stream_id() as u32)?
1163                                     {
1164                                         continue;
1165                                     }
1166                                 }
1167                                 // TODO Windows that processes streams and connections separately.
1168                                 Payload::WindowUpdate(_windows) => {
1169                                     continue;
1170                                 }
1171                                 Payload::Priority(_priority) => continue,
1172                             }
1173 
1174                             let stream_id = frame.stream_id() as u32;
1175                             if stream_id == self.id {
1176                                 meet_this = true;
1177                                 self.controller_send_frame_to_stream(stream_id, frame, io_manager);
1178                                 break;
1179                             } else {
1180                                 self.controller_send_frame_to_stream(stream_id, frame, io_manager);
1181                                 // TODO After adding frames such as Reset/Priority, there may be
1182                                 // problems with the following logic, because the lack of waker
1183                                 // cannot wake up
1184                                 let mut stream_waker = self
1185                                     .controller
1186                                     .stream_waker
1187                                     .lock()
1188                                     .expect("Blocking get waker lock failed! ");
1189                                 // TODO Is there a situation where the result has been returned, but
1190                                 // the waker has not been inserted into the map? how to deal with.
1191                                 if let Some(waker) = stream_waker.waker.remove(&stream_id) {
1192                                     waker.wake();
1193                                 }
1194                             }
1195                         }
1196                     }
1197                 }
1198             }
1199 
1200             if meet_this {
1201                 io_manager.frame_iter.iter = Some(frame_iterator);
1202                 Poll::Ready(Ok(DispatchState::Partial))
1203             } else {
1204                 Poll::Ready(Ok(DispatchState::Finish))
1205             }
1206         }
1207 
goaway_unsent_stream( &self, io_manager: &mut MutexGuard<IoManager<S>>, last_stream_id: u32, goaway: Frame, ) -> Result<(), H2Error>1208         fn goaway_unsent_stream(
1209             &self,
1210             io_manager: &mut MutexGuard<IoManager<S>>,
1211             last_stream_id: u32,
1212             goaway: Frame,
1213         ) -> Result<(), H2Error> {
1214             let goaway_streams = io_manager.streams.get_goaway_streams(last_stream_id)?;
1215             {
1216                 let mut stream_waker = self
1217                     .controller
1218                     .stream_waker
1219                     .lock()
1220                     .expect("Blocking get waker lock failed! ");
1221                 for goaway_stream in goaway_streams {
1222                     self.controller_send_frame_to_stream(goaway_stream, goaway.clone(), io_manager);
1223                     if let Some(waker) = stream_waker.waker.remove(&goaway_stream) {
1224                         waker.wake();
1225                     }
1226                 }
1227             }
1228             Ok(())
1229         }
1230 
goaway_and_shutdown(&self)1231         fn goaway_and_shutdown(&self) {
1232             {
1233                 let mut waker_guard = self
1234                     .controller
1235                     .stream_waker
1236                     .lock()
1237                     .expect("Blocking get waker lock failed! ");
1238                 let waker_map = take(&mut waker_guard.waker);
1239                 for (_id, waker) in waker_map.into_iter() {
1240                     waker.wake()
1241                 }
1242             }
1243         }
1244 
recv_settings_frame( &self, cx: &mut Context<'_>, guard: &mut MutexGuard<IoManager<S>>, is_ack: bool, settings: &h2::Settings, ) -> Poll<Result<(), H2Error>>1245         fn recv_settings_frame(
1246             &self,
1247             cx: &mut Context<'_>,
1248             guard: &mut MutexGuard<IoManager<S>>,
1249             is_ack: bool,
1250             settings: &h2::Settings,
1251         ) -> Poll<Result<(), H2Error>> {
1252             if is_ack {
1253                 match guard.connection_frame.settings.clone() {
1254                     SettingsSync::Acknowledging(local_settings) => {
1255                         for setting in local_settings.get_settings() {
1256                             if let Setting::MaxHeaderListSize(size) = setting {
1257                                 guard.inner.decoder.set_max_header_list_size(*size as usize);
1258                             }
1259                             if let Setting::MaxFrameSize(size) = setting {
1260                                 guard.inner.decoder.set_max_frame_size(*size)?;
1261                             }
1262                         }
1263                         guard.connection_frame.settings = SettingsSync::Synced;
1264                         Poll::Ready(Ok(()))
1265                     }
1266                     _ => Poll::Ready(Err(H2Error::ConnectionError(ErrorCode::ProtocolError))),
1267                 }
1268             } else {
1269                 for setting in settings.get_settings() {
1270                     if let Setting::HeaderTableSize(size) = setting {
1271                         guard.inner.encoder.update_header_table_size(*size as usize);
1272                     }
1273                     if let Setting::MaxFrameSize(size) = setting {
1274                         guard.inner.encoder.update_max_frame_size(*size as usize);
1275                     }
1276                 }
1277                 // reply ack Settings
1278                 let mut buf = [0u8; 1024];
1279                 if write_frame_to_io(cx, &mut buf, SettingsSync::ack_settings(), &mut guard.inner)?
1280                     .is_pending()
1281                 {
1282                     Poll::Pending
1283                 } else {
1284                     poll_flush_io(cx, &mut guard.inner)
1285                 }
1286             }
1287         }
1288 
recv_ping_frame( &self, cx: &mut Context<'_>, guard: &mut MutexGuard<IoManager<S>>, is_ack: bool, ping: &h2::Ping, ) -> Poll<Result<(), H2Error>>1289         fn recv_ping_frame(
1290             &self,
1291             cx: &mut Context<'_>,
1292             guard: &mut MutexGuard<IoManager<S>>,
1293             is_ack: bool,
1294             ping: &h2::Ping,
1295         ) -> Poll<Result<(), H2Error>> {
1296             if is_ack {
1297                 // TODO The sending logic of ping has not been implemented yet, so there is no
1298                 // processing for ack
1299                 Poll::Ready(Ok(()))
1300             } else {
1301                 // reply ack Settings
1302                 let ack = Frame::new(0, FrameFlags::new(0x1), Payload::Ping(ping.clone()));
1303                 let mut buf = [0u8; 1024];
1304                 if write_frame_to_io(cx, &mut buf, ack, &mut guard.inner)?.is_pending() {
1305                     Poll::Pending
1306                 } else {
1307                     poll_flush_io(cx, &mut guard.inner)
1308                 }
1309             }
1310         }
1311 
controller_send_frame_to_stream( &self, stream_id: u32, frame: Frame, guard: &mut MutexGuard<IoManager<S>>, )1312         fn controller_send_frame_to_stream(
1313             &self,
1314             stream_id: u32,
1315             frame: Frame,
1316             guard: &mut MutexGuard<IoManager<S>>,
1317         ) {
1318             // TODO Need to consider when to delete useless Sender after support reset
1319             // stream
1320             if let Some(sender) = guard.senders.get(&stream_id) {
1321                 // If the client coroutine has exited, this frame is skipped.
1322                 let _ = sender.send(frame);
1323             }
1324         }
1325     }
1326 
1327     impl FrameReceiver {
set_receiver(&mut self, receiver: UnboundedReceiver<Frame>)1328         fn set_receiver(&mut self, receiver: UnboundedReceiver<Frame>) {
1329             self.receiver = Some(receiver);
1330         }
1331 
recv_frame(&mut self, id: u32) -> Result<Option<Frame>, HttpError>1332         fn recv_frame(&mut self, id: u32) -> Result<Option<Frame>, HttpError> {
1333             if let Some(ref mut receiver) = self.receiver {
1334                 match receiver.try_recv() {
1335                     Ok(frame) => Ok(Some(frame)),
1336                     Err(TryRecvError::Disconnected) => {
1337                         Err(H2Error::StreamError(id, ErrorCode::StreamClosed).into())
1338                     }
1339                     Err(TryRecvError::Empty) => Ok(None),
1340                 }
1341             } else {
1342                 Err(H2Error::StreamError(id, ErrorCode::IntervalError).into())
1343             }
1344         }
1345 
is_none(&self) -> bool1346         fn is_none(&self) -> bool {
1347             self.receiver.is_none()
1348         }
1349     }
1350 
1351     // TODO Temporarily only deal with the Settings frame
build_connection_frames(config: H2Config) -> ConnectionFrames1352     pub(crate) fn build_connection_frames(config: H2Config) -> ConnectionFrames {
1353         const DEFAULT_ENABLE_PUSH: bool = false;
1354         let settings = SettingsBuilder::new()
1355             .max_header_list_size(config.max_header_list_size())
1356             .max_frame_size(config.max_frame_size())
1357             .header_table_size(config.header_table_size())
1358             .enable_push(DEFAULT_ENABLE_PUSH)
1359             .build();
1360 
1361         ConnectionFrames::new(settings)
1362     }
1363 
1364     // io write interface
write_frame_to_io<S>( cx: &mut Context<'_>, buf: &mut [u8], frame: Frame, inner: &mut Inner<S>, ) -> Poll<Result<(), H2Error>> where S: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static,1365     fn write_frame_to_io<S>(
1366         cx: &mut Context<'_>,
1367         buf: &mut [u8],
1368         frame: Frame,
1369         inner: &mut Inner<S>,
1370     ) -> Poll<Result<(), H2Error>>
1371     where
1372         S: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static,
1373     {
1374         let mut remain_size = 0;
1375         inner.encoder.set_frame(frame);
1376         loop {
1377             let size = inner
1378                 .encoder
1379                 .encode(&mut buf[remain_size..])
1380                 .map_err(|_| H2Error::ConnectionError(ErrorCode::IntervalError))?;
1381 
1382             let total = size + remain_size;
1383 
1384             // All the bytes of the frame are written
1385             if total == 0 {
1386                 break;
1387             }
1388             match Pin::new(&mut inner.io)
1389                 .poll_write(cx, &buf[..total])
1390                 .map_err(|_| H2Error::ConnectionError(ErrorCode::IntervalError))?
1391             {
1392                 Poll::Ready(written) => {
1393                     remain_size = total - written;
1394                     // written is not necessarily equal to total
1395                     if remain_size > 0 {
1396                         for i in 0..remain_size {
1397                             buf[i] = buf[written + i];
1398                         }
1399                     }
1400                 }
1401                 Poll::Pending => {
1402                     return Poll::Pending;
1403                 }
1404             }
1405         }
1406         Poll::Ready(Ok(()))
1407     }
1408 
poll_flush_io<S>(cx: &mut Context<'_>, inner: &mut Inner<S>) -> Poll<Result<(), H2Error>> where S: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static,1409     fn poll_flush_io<S>(cx: &mut Context<'_>, inner: &mut Inner<S>) -> Poll<Result<(), H2Error>>
1410     where
1411         S: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static,
1412     {
1413         Pin::new(&mut inner.io)
1414             .poll_flush(cx)
1415             .map_err(|_| H2Error::ConnectionError(ErrorCode::ConnectError))
1416     }
1417 
get_frame(frame: Option<Frame>) -> Result<Frame, HttpError>1418     fn get_frame(frame: Option<Frame>) -> Result<Frame, HttpError> {
1419         frame.ok_or(H2Error::ConnectionError(ErrorCode::IntervalError).into())
1420     }
1421 
wakeup_next_stream(waker_map: &mut HashMap<u32, Waker>)1422     fn wakeup_next_stream(waker_map: &mut HashMap<u32, Waker>) {
1423         {
1424             if !waker_map.is_empty() {
1425                 let mut id = 0;
1426                 if let Some((index, _)) = waker_map.iter().next() {
1427                     id = *index;
1428                 }
1429                 if let Some(waker) = waker_map.remove(&id) {
1430                     waker.wake();
1431                 }
1432             }
1433         }
1434     }
1435 
is_io_available(occupied: &AtomicU32, id: u32) -> Result<bool, HttpError>1436     fn is_io_available(occupied: &AtomicU32, id: u32) -> Result<bool, HttpError> {
1437         let is_occupied = occupied.load(Ordering::Relaxed);
1438         if is_occupied == 0 {
1439             return Ok(true);
1440         }
1441         if is_occupied == id {
1442             compare_exchange_occupation(occupied, id, 0)?;
1443             return Ok(true);
1444         }
1445         Ok(false)
1446     }
1447 
compare_exchange_occupation( occupied: &AtomicU32, current: u32, new: u32, ) -> Result<(), HttpError>1448     fn compare_exchange_occupation(
1449         occupied: &AtomicU32,
1450         current: u32,
1451         new: u32,
1452     ) -> Result<(), HttpError> {
1453         occupied
1454             .compare_exchange(current, new, Ordering::Acquire, Ordering::Relaxed)
1455             .map_err(|_| H2Error::ConnectionError(ErrorCode::IntervalError))?;
1456         Ok(())
1457     }
1458 }
1459