• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use crate::util::ConnInfo;
15 use crate::{ConnDetail, TimeGroup};
16 
17 pub(crate) trait Dispatcher {
18     type Handle;
19 
dispatch(&self) -> Option<Self::Handle>20     fn dispatch(&self) -> Option<Self::Handle>;
21 
is_shutdown(&self) -> bool22     fn is_shutdown(&self) -> bool;
23 
24     #[allow(dead_code)]
is_goaway(&self) -> bool25     fn is_goaway(&self) -> bool;
26 }
27 
28 pub(crate) enum ConnDispatcher<S> {
29     #[cfg(feature = "http1_1")]
30     Http1(http1::Http1Dispatcher<S>),
31 
32     #[cfg(feature = "http2")]
33     Http2(http2::Http2Dispatcher<S>),
34 
35     #[cfg(feature = "http3")]
36     Http3(http3::Http3Dispatcher<S>),
37 }
38 
39 impl<S> Dispatcher for ConnDispatcher<S> {
40     type Handle = Conn<S>;
41 
dispatch(&self) -> Option<Self::Handle>42     fn dispatch(&self) -> Option<Self::Handle> {
43         match self {
44             #[cfg(feature = "http1_1")]
45             Self::Http1(h1) => h1.dispatch().map(Conn::Http1),
46 
47             #[cfg(feature = "http2")]
48             Self::Http2(h2) => h2.dispatch().map(Conn::Http2),
49 
50             #[cfg(feature = "http3")]
51             Self::Http3(h3) => h3.dispatch().map(Conn::Http3),
52         }
53     }
54 
is_shutdown(&self) -> bool55     fn is_shutdown(&self) -> bool {
56         match self {
57             #[cfg(feature = "http1_1")]
58             Self::Http1(h1) => h1.is_shutdown(),
59 
60             #[cfg(feature = "http2")]
61             Self::Http2(h2) => h2.is_shutdown(),
62 
63             #[cfg(feature = "http3")]
64             Self::Http3(h3) => h3.is_shutdown(),
65         }
66     }
67 
is_goaway(&self) -> bool68     fn is_goaway(&self) -> bool {
69         match self {
70             #[cfg(feature = "http1_1")]
71             Self::Http1(h1) => h1.is_goaway(),
72 
73             #[cfg(feature = "http2")]
74             Self::Http2(h2) => h2.is_goaway(),
75 
76             #[cfg(feature = "http3")]
77             Self::Http3(h3) => h3.is_goaway(),
78         }
79     }
80 }
81 
82 pub(crate) enum Conn<S> {
83     #[cfg(feature = "http1_1")]
84     Http1(http1::Http1Conn<S>),
85 
86     #[cfg(feature = "http2")]
87     Http2(http2::Http2Conn<S>),
88 
89     #[cfg(feature = "http3")]
90     Http3(http3::Http3Conn<S>),
91 }
92 
93 impl<S: ConnInfo> Conn<S> {
get_detail(&mut self) -> ConnDetail94     pub(crate) fn get_detail(&mut self) -> ConnDetail {
95         match self {
96             #[cfg(feature = "http1_1")]
97             Conn::Http1(io) => io.raw_mut().conn_data().detail(),
98             #[cfg(feature = "http2")]
99             Conn::Http2(io) => io.detail.clone(),
100             #[cfg(feature = "http3")]
101             Conn::Http3(io) => io.detail.clone(),
102         }
103     }
104 }
105 
106 pub(crate) struct TimeInfoConn<S> {
107     conn: Conn<S>,
108     time_group: TimeGroup,
109 }
110 
111 impl<S> TimeInfoConn<S> {
new(conn: Conn<S>, time_group: TimeGroup) -> Self112     pub(crate) fn new(conn: Conn<S>, time_group: TimeGroup) -> Self {
113         Self { conn, time_group }
114     }
115 
time_group_mut(&mut self) -> &mut TimeGroup116     pub(crate) fn time_group_mut(&mut self) -> &mut TimeGroup {
117         &mut self.time_group
118     }
119 
time_group(&mut self) -> &TimeGroup120     pub(crate) fn time_group(&mut self) -> &TimeGroup {
121         &self.time_group
122     }
123 
connection(self) -> Conn<S>124     pub(crate) fn connection(self) -> Conn<S> {
125         self.conn
126     }
127 }
128 
129 #[cfg(feature = "http1_1")]
130 pub(crate) mod http1 {
131     use std::cell::UnsafeCell;
132     use std::sync::atomic::{AtomicBool, Ordering};
133     use std::sync::Arc;
134 
135     use super::{ConnDispatcher, Dispatcher};
136     use crate::runtime::Semaphore;
137     #[cfg(feature = "tokio_base")]
138     use crate::runtime::SemaphorePermit;
139 
140     impl<S> ConnDispatcher<S> {
http1(io: S) -> Self141         pub(crate) fn http1(io: S) -> Self {
142             Self::Http1(Http1Dispatcher::new(io))
143         }
144     }
145 
146     /// HTTP1-based connection manager, which can dispatch connections to other
147     /// threads according to HTTP1 syntax.
148     pub(crate) struct Http1Dispatcher<S> {
149         inner: Arc<Inner<S>>,
150     }
151 
152     pub(crate) struct Inner<S> {
153         pub(crate) io: UnsafeCell<S>,
154         // `occupied` indicates that the connection is occupied. Only one coroutine
155         // can get the handle at the same time. Once the handle is fetched, the flag
156         // position is true.
157         pub(crate) occupied: AtomicBool,
158         // `shutdown` indicates that the connection need to be shut down.
159         pub(crate) shutdown: AtomicBool,
160     }
161 
162     unsafe impl<S> Sync for Inner<S> {}
163 
164     impl<S> Http1Dispatcher<S> {
new(io: S) -> Self165         pub(crate) fn new(io: S) -> Self {
166             Self {
167                 inner: Arc::new(Inner {
168                     io: UnsafeCell::new(io),
169                     occupied: AtomicBool::new(false),
170                     shutdown: AtomicBool::new(false),
171                 }),
172             }
173         }
174     }
175 
176     impl<S> Dispatcher for Http1Dispatcher<S> {
177         type Handle = Http1Conn<S>;
178 
dispatch(&self) -> Option<Self::Handle>179         fn dispatch(&self) -> Option<Self::Handle> {
180             self.inner
181                 .occupied
182                 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
183                 .ok()
184                 .map(|_| Http1Conn::from_inner(self.inner.clone()))
185         }
186 
is_shutdown(&self) -> bool187         fn is_shutdown(&self) -> bool {
188             self.inner.shutdown.load(Ordering::Relaxed)
189         }
190 
is_goaway(&self) -> bool191         fn is_goaway(&self) -> bool {
192             false
193         }
194     }
195 
196     /// Handle returned to other threads for I/O operations.
197     pub(crate) struct Http1Conn<S> {
198         pub(crate) sem: Option<WrappedSemPermit>,
199         pub(crate) inner: Arc<Inner<S>>,
200     }
201 
202     impl<S> Http1Conn<S> {
from_inner(inner: Arc<Inner<S>>) -> Self203         pub(crate) fn from_inner(inner: Arc<Inner<S>>) -> Self {
204             Self { sem: None, inner }
205         }
206 
occupy_sem(&mut self, sem: WrappedSemPermit)207         pub(crate) fn occupy_sem(&mut self, sem: WrappedSemPermit) {
208             self.sem = Some(sem);
209         }
210 
raw_mut(&mut self) -> &mut S211         pub(crate) fn raw_mut(&mut self) -> &mut S {
212             // SAFETY: In the case of `HTTP1`, only one coroutine gets the handle
213             // at the same time.
214             unsafe { &mut *self.inner.io.get() }
215         }
216 
shutdown(&self)217         pub(crate) fn shutdown(&self) {
218             self.inner.shutdown.store(true, Ordering::Release);
219         }
220     }
221 
222     impl<S> Drop for Http1Conn<S> {
drop(&mut self)223         fn drop(&mut self) {
224             self.inner.occupied.store(false, Ordering::Release)
225         }
226     }
227 
228     pub(crate) struct WrappedSemaphore {
229         sem: Arc<Semaphore>,
230     }
231 
232     impl WrappedSemaphore {
new(permits: usize) -> Self233         pub(crate) fn new(permits: usize) -> Self {
234             Self {
235                 #[cfg(feature = "tokio_base")]
236                 sem: Arc::new(tokio::sync::Semaphore::new(permits)),
237                 #[cfg(feature = "ylong_base")]
238                 sem: Arc::new(ylong_runtime::sync::Semaphore::new(permits).unwrap()),
239             }
240         }
241 
acquire(&self) -> WrappedSemPermit242         pub(crate) async fn acquire(&self) -> WrappedSemPermit {
243             #[cfg(feature = "ylong_base")]
244             {
245                 let semaphore = self.sem.clone();
246                 let _permit = semaphore.acquire().await.unwrap();
247                 WrappedSemPermit { sem: semaphore }
248             }
249 
250             #[cfg(feature = "tokio_base")]
251             {
252                 let permit = self.sem.clone().acquire_owned().await.unwrap();
253                 WrappedSemPermit { permit }
254             }
255         }
256     }
257 
258     impl Clone for WrappedSemaphore {
clone(&self) -> Self259         fn clone(&self) -> Self {
260             Self {
261                 sem: self.sem.clone(),
262             }
263         }
264     }
265 
266     pub(crate) struct WrappedSemPermit {
267         #[cfg(feature = "ylong_base")]
268         pub(crate) sem: Arc<Semaphore>,
269         #[cfg(feature = "tokio_base")]
270         #[allow(dead_code)]
271         pub(crate) permit: SemaphorePermit,
272     }
273 
274     #[cfg(feature = "ylong_base")]
275     impl Drop for WrappedSemPermit {
drop(&mut self)276         fn drop(&mut self) {
277             self.sem.release();
278         }
279     }
280 }
281 
282 #[cfg(feature = "http2")]
283 pub(crate) mod http2 {
284     use std::collections::HashMap;
285     use std::future::Future;
286     use std::marker::PhantomData;
287     use std::pin::Pin;
288     use std::sync::atomic::{AtomicBool, Ordering};
289     use std::sync::{Arc, Mutex};
290     use std::task::{Context, Poll};
291 
292     use ylong_http::error::HttpError;
293     use ylong_http::h2::{
294         ErrorCode, Frame, FrameDecoder, FrameEncoder, FrameFlags, Goaway, H2Error, Payload,
295         RstStream, Settings, SettingsBuilder, StreamId,
296     };
297 
298     use crate::runtime::{
299         bounded_channel, unbounded_channel, AsyncRead, AsyncWrite, AsyncWriteExt, BoundedReceiver,
300         BoundedSender, SendError, UnboundedReceiver, UnboundedSender, WriteHalf,
301     };
302     use crate::util::config::H2Config;
303     use crate::util::dispatcher::{ConnDispatcher, Dispatcher};
304     use crate::util::h2::{
305         ConnManager, FlowControl, H2StreamState, RecvData, RequestWrapper, SendData,
306         StreamEndState, Streams,
307     };
308     use crate::ErrorKind::Request;
309     use crate::{ConnDetail, ErrorKind, HttpClientError};
310     const DEFAULT_MAX_FRAME_SIZE: usize = 2 << 13;
311     const DEFAULT_WINDOW_SIZE: u32 = 65535;
312 
313     pub(crate) type ManagerSendFut =
314         Pin<Box<dyn Future<Output = Result<(), SendError<RespMessage>>> + Send + Sync>>;
315 
316     pub(crate) enum RespMessage {
317         Output(Frame),
318         OutputExit(DispatchErrorKind),
319     }
320 
321     pub(crate) enum OutputMessage {
322         Output(Frame),
323         OutputExit(DispatchErrorKind),
324     }
325 
326     pub(crate) struct ReqMessage {
327         pub(crate) sender: BoundedSender<RespMessage>,
328         pub(crate) request: RequestWrapper,
329     }
330 
331     #[derive(Debug, Eq, PartialEq, Copy, Clone)]
332     pub(crate) enum DispatchErrorKind {
333         H2(H2Error),
334         Io(std::io::ErrorKind),
335         ChannelClosed,
336         Disconnect,
337     }
338 
339     // HTTP2-based connection manager, which can dispatch connections to other
340     // threads according to HTTP2 syntax.
341     pub(crate) struct Http2Dispatcher<S> {
342         pub(crate) detail: ConnDetail,
343         pub(crate) allowed_cache: usize,
344         pub(crate) sender: UnboundedSender<ReqMessage>,
345         pub(crate) io_shutdown: Arc<AtomicBool>,
346         pub(crate) io_goaway: Arc<AtomicBool>,
347         pub(crate) handles: Vec<crate::runtime::JoinHandle<()>>,
348         pub(crate) _mark: PhantomData<S>,
349     }
350 
351     pub(crate) struct Http2Conn<S> {
352         pub(crate) allow_cached_frames: usize,
353         // Sends frame to StreamController
354         pub(crate) sender: UnboundedSender<ReqMessage>,
355         pub(crate) receiver: RespReceiver,
356         pub(crate) io_shutdown: Arc<AtomicBool>,
357         pub(crate) detail: ConnDetail,
358         pub(crate) _mark: PhantomData<S>,
359     }
360 
361     pub(crate) struct StreamController {
362         // The connection close flag organizes new stream commits to the current connection when
363         // closed.
364         pub(crate) io_shutdown: Arc<AtomicBool>,
365         pub(crate) io_goaway: Arc<AtomicBool>,
366         // The senders of all connected stream channels of response.
367         pub(crate) senders: HashMap<StreamId, BoundedSender<RespMessage>>,
368         pub(crate) curr_message: HashMap<StreamId, ManagerSendFut>,
369         // Stream information on the connection.
370         pub(crate) streams: Streams,
371         // Received GO_AWAY frame.
372         pub(crate) go_away_error_code: Option<u32>,
373         // The last GO_AWAY frame sent by the client.
374         pub(crate) go_away_sync: GoAwaySync,
375     }
376 
377     #[derive(Default)]
378     pub(crate) struct GoAwaySync {
379         pub(crate) going_away: Option<Goaway>,
380     }
381 
382     #[derive(Default)]
383     pub(crate) struct SettingsSync {
384         pub(crate) settings: SettingsState,
385     }
386 
387     #[derive(Default, Clone)]
388     pub(crate) enum SettingsState {
389         Acknowledging(Settings),
390         #[default]
391         Synced,
392     }
393 
394     #[derive(Default)]
395     pub(crate) struct RespReceiver {
396         receiver: Option<BoundedReceiver<RespMessage>>,
397     }
398 
399     impl<S> ConnDispatcher<S>
400     where
401         S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
402     {
http2(detail: ConnDetail, config: H2Config, io: S) -> Self403         pub(crate) fn http2(detail: ConnDetail, config: H2Config, io: S) -> Self {
404             Self::Http2(Http2Dispatcher::new(detail, config, io))
405         }
406     }
407 
408     impl<S> Http2Dispatcher<S>
409     where
410         S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
411     {
new(detail: ConnDetail, config: H2Config, io: S) -> Self412         pub(crate) fn new(detail: ConnDetail, config: H2Config, io: S) -> Self {
413             let mut flow = FlowControl::new(DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE);
414             flow.setup_recv_window(config.conn_window_size());
415 
416             let streams = Streams::new(config.stream_window_size(), DEFAULT_WINDOW_SIZE, flow);
417             let shutdown_flag = Arc::new(AtomicBool::new(false));
418             let goaway_flag = Arc::new(AtomicBool::new(false));
419             let mut controller =
420                 StreamController::new(streams, shutdown_flag.clone(), goaway_flag.clone());
421 
422             let (input_tx, input_rx) = unbounded_channel();
423             let (req_tx, req_rx) = unbounded_channel();
424 
425             let settings = create_initial_settings(&config);
426 
427             // Error is not possible, so it is not handled for the time
428             // being.
429             let mut handles = Vec::with_capacity(3);
430             // send initial settings and update conn recv window
431             if input_tx.send(settings).is_ok()
432                 && controller
433                     .streams
434                     .release_conn_recv_window(0, &input_tx)
435                     .is_ok()
436             {
437                 Self::launch(
438                     config.allowed_cache_frame_size(),
439                     config.use_huffman_coding(),
440                     controller,
441                     (input_tx, input_rx),
442                     req_rx,
443                     &mut handles,
444                     io,
445                 );
446             }
447             Self {
448                 detail,
449                 allowed_cache: config.allowed_cache_frame_size(),
450                 sender: req_tx,
451                 io_shutdown: shutdown_flag,
452                 io_goaway: goaway_flag,
453                 handles,
454                 _mark: PhantomData,
455             }
456         }
457 
launch( allow_num: usize, use_huffman: bool, controller: StreamController, input_channel: (UnboundedSender<Frame>, UnboundedReceiver<Frame>), req_rx: UnboundedReceiver<ReqMessage>, handles: &mut Vec<crate::runtime::JoinHandle<()>>, io: S, )458         fn launch(
459             allow_num: usize,
460             use_huffman: bool,
461             controller: StreamController,
462             input_channel: (UnboundedSender<Frame>, UnboundedReceiver<Frame>),
463             req_rx: UnboundedReceiver<ReqMessage>,
464             handles: &mut Vec<crate::runtime::JoinHandle<()>>,
465             io: S,
466         ) {
467             let (resp_tx, resp_rx) = bounded_channel(allow_num);
468             let (read, write) = crate::runtime::split(io);
469             let settings_sync = Arc::new(Mutex::new(SettingsSync::default()));
470             let send_settings_sync = settings_sync.clone();
471             let send = crate::runtime::spawn(async move {
472                 let mut writer = write;
473                 if async_send_preface(&mut writer).await.is_ok() {
474                     let encoder = FrameEncoder::new(DEFAULT_MAX_FRAME_SIZE, use_huffman);
475                     let mut send =
476                         SendData::new(encoder, send_settings_sync, writer, input_channel.1);
477                     let _ = Pin::new(&mut send).await;
478                 }
479             });
480             handles.push(send);
481 
482             let recv_settings_sync = settings_sync.clone();
483             let recv = crate::runtime::spawn(async move {
484                 let decoder = FrameDecoder::new();
485                 let mut recv = RecvData::new(decoder, recv_settings_sync, read, resp_tx);
486                 let _ = Pin::new(&mut recv).await;
487             });
488             handles.push(recv);
489 
490             let manager = crate::runtime::spawn(async move {
491                 let mut conn_manager =
492                     ConnManager::new(settings_sync, input_channel.0, resp_rx, req_rx, controller);
493                 let _ = Pin::new(&mut conn_manager).await;
494             });
495             handles.push(manager);
496         }
497     }
498 
499     impl<S> Dispatcher for Http2Dispatcher<S> {
500         type Handle = Http2Conn<S>;
501 
dispatch(&self) -> Option<Self::Handle>502         fn dispatch(&self) -> Option<Self::Handle> {
503             let sender = self.sender.clone();
504             let handle = Http2Conn::new(
505                 self.allowed_cache,
506                 self.io_shutdown.clone(),
507                 sender,
508                 self.detail.clone(),
509             );
510             Some(handle)
511         }
512 
is_shutdown(&self) -> bool513         fn is_shutdown(&self) -> bool {
514             self.io_shutdown.load(Ordering::Relaxed)
515         }
516 
is_goaway(&self) -> bool517         fn is_goaway(&self) -> bool {
518             self.io_goaway.load(Ordering::Relaxed)
519         }
520     }
521 
522     impl<S> Drop for Http2Dispatcher<S> {
drop(&mut self)523         fn drop(&mut self) {
524             for handle in &self.handles {
525                 #[cfg(feature = "ylong_base")]
526                 handle.cancel();
527                 #[cfg(feature = "tokio_base")]
528                 handle.abort();
529             }
530         }
531     }
532 
533     impl<S> Http2Conn<S> {
new( allow_cached_num: usize, io_shutdown: Arc<AtomicBool>, sender: UnboundedSender<ReqMessage>, detail: ConnDetail, ) -> Self534         pub(crate) fn new(
535             allow_cached_num: usize,
536             io_shutdown: Arc<AtomicBool>,
537             sender: UnboundedSender<ReqMessage>,
538             detail: ConnDetail,
539         ) -> Self {
540             Self {
541                 allow_cached_frames: allow_cached_num,
542                 sender,
543                 receiver: RespReceiver::default(),
544                 io_shutdown,
545                 detail,
546                 _mark: PhantomData,
547             }
548         }
549 
send_frame_to_controller( &mut self, request: RequestWrapper, ) -> Result<(), HttpClientError>550         pub(crate) fn send_frame_to_controller(
551             &mut self,
552             request: RequestWrapper,
553         ) -> Result<(), HttpClientError> {
554             let (tx, rx) = bounded_channel::<RespMessage>(self.allow_cached_frames);
555             self.receiver.set_receiver(rx);
556             self.sender
557                 .send(ReqMessage {
558                     sender: tx,
559                     request,
560                 })
561                 .map_err(|_| {
562                     HttpClientError::from_str(ErrorKind::Request, "Request Sender Closed !")
563                 })
564         }
565     }
566 
567     impl StreamController {
new( streams: Streams, shutdown: Arc<AtomicBool>, goaway: Arc<AtomicBool>, ) -> Self568         pub(crate) fn new(
569             streams: Streams,
570             shutdown: Arc<AtomicBool>,
571             goaway: Arc<AtomicBool>,
572         ) -> Self {
573             Self {
574                 io_shutdown: shutdown,
575                 io_goaway: goaway,
576                 senders: HashMap::new(),
577                 curr_message: HashMap::new(),
578                 streams,
579                 go_away_error_code: None,
580                 go_away_sync: GoAwaySync::default(),
581             }
582         }
583 
shutdown(&self)584         pub(crate) fn shutdown(&self) {
585             self.io_shutdown.store(true, Ordering::Release);
586         }
587 
goaway(&self)588         pub(crate) fn goaway(&self) {
589             self.io_goaway.store(true, Ordering::Release);
590         }
591 
get_unsent_streams( &mut self, last_stream_id: StreamId, ) -> Result<Vec<StreamId>, H2Error>592         pub(crate) fn get_unsent_streams(
593             &mut self,
594             last_stream_id: StreamId,
595         ) -> Result<Vec<StreamId>, H2Error> {
596             // The last-stream-id in the subsequent GO_AWAY frame
597             // cannot be greater than the last-stream-id in the previous GO_AWAY frame.
598             if self.streams.max_send_id < last_stream_id {
599                 return Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
600             }
601             self.streams.max_send_id = last_stream_id;
602             Ok(self.streams.get_unset_streams(last_stream_id))
603         }
604 
send_message_to_stream( &mut self, cx: &mut Context<'_>, stream_id: StreamId, message: RespMessage, ) -> Poll<Result<(), H2Error>>605         pub(crate) fn send_message_to_stream(
606             &mut self,
607             cx: &mut Context<'_>,
608             stream_id: StreamId,
609             message: RespMessage,
610         ) -> Poll<Result<(), H2Error>> {
611             if let Some(sender) = self.senders.get(&stream_id) {
612                 // If the client coroutine has exited, this frame is skipped.
613                 let mut tx = {
614                     let sender = sender.clone();
615                     let ft = async move { sender.send(message).await };
616                     Box::pin(ft)
617                 };
618 
619                 match tx.as_mut().poll(cx) {
620                     Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
621                     // The current coroutine sending the request exited prematurely.
622                     Poll::Ready(Err(_)) => {
623                         self.senders.remove(&stream_id);
624                         Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError)))
625                     }
626                     Poll::Pending => {
627                         self.curr_message.insert(stream_id, tx);
628                         Poll::Pending
629                     }
630                 }
631             } else {
632                 Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError)))
633             }
634         }
635 
poll_blocked_message( &mut self, cx: &mut Context<'_>, input_tx: &UnboundedSender<Frame>, ) -> Poll<()>636         pub(crate) fn poll_blocked_message(
637             &mut self,
638             cx: &mut Context<'_>,
639             input_tx: &UnboundedSender<Frame>,
640         ) -> Poll<()> {
641             let keys: Vec<StreamId> = self.curr_message.keys().cloned().collect();
642             let mut blocked = false;
643 
644             for key in keys {
645                 if let Some(mut task) = self.curr_message.remove(&key) {
646                     match task.as_mut().poll(cx) {
647                         Poll::Ready(Ok(_)) => {}
648                         // The current coroutine sending the request exited prematurely.
649                         Poll::Ready(Err(_)) => {
650                             self.senders.remove(&key);
651                             if let Some(state) = self.streams.stream_state(key) {
652                                 if !matches!(state, H2StreamState::Closed(_)) {
653                                     if let StreamEndState::OK = self.streams.send_local_reset(key) {
654                                         let rest_payload =
655                                             RstStream::new(ErrorCode::NoError.into_code());
656                                         let frame = Frame::new(
657                                             key,
658                                             FrameFlags::empty(),
659                                             Payload::RstStream(rest_payload),
660                                         );
661                                         // ignore the send error occurs here in order to finish all
662                                         // tasks.
663                                         let _ = input_tx.send(frame);
664                                     }
665                                 }
666                             }
667                         }
668                         Poll::Pending => {
669                             self.curr_message.insert(key, task);
670                             blocked = true;
671                         }
672                     }
673                 }
674             }
675             if blocked {
676                 Poll::Pending
677             } else {
678                 Poll::Ready(())
679             }
680         }
681     }
682 
683     impl RespReceiver {
set_receiver(&mut self, receiver: BoundedReceiver<RespMessage>)684         pub(crate) fn set_receiver(&mut self, receiver: BoundedReceiver<RespMessage>) {
685             self.receiver = Some(receiver);
686         }
687 
recv(&mut self) -> Result<Frame, HttpClientError>688         pub(crate) async fn recv(&mut self) -> Result<Frame, HttpClientError> {
689             match self.receiver {
690                 Some(ref mut receiver) => {
691                     #[cfg(feature = "tokio_base")]
692                     match receiver.recv().await {
693                         None => err_from_msg!(Request, "Response Receiver Closed !"),
694                         Some(message) => match message {
695                             RespMessage::Output(frame) => Ok(frame),
696                             RespMessage::OutputExit(e) => Err(dispatch_client_error(e)),
697                         },
698                     }
699 
700                     #[cfg(feature = "ylong_base")]
701                     match receiver.recv().await {
702                         Err(err) => Err(HttpClientError::from_error(ErrorKind::Request, err)),
703                         Ok(message) => match message {
704                             RespMessage::Output(frame) => Ok(frame),
705                             RespMessage::OutputExit(e) => Err(dispatch_client_error(e)),
706                         },
707                     }
708                 }
709                 // this will not happen.
710                 None => Err(HttpClientError::from_str(
711                     ErrorKind::Request,
712                     "Invalid Frame Receiver !",
713                 )),
714             }
715         }
716 
poll_recv( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<Frame, HttpClientError>>717         pub(crate) fn poll_recv(
718             &mut self,
719             cx: &mut Context<'_>,
720         ) -> Poll<Result<Frame, HttpClientError>> {
721             if let Some(ref mut receiver) = self.receiver {
722                 #[cfg(feature = "tokio_base")]
723                 match receiver.poll_recv(cx) {
724                     Poll::Ready(None) => {
725                         Poll::Ready(err_from_msg!(Request, "Error receive response !"))
726                     }
727                     Poll::Ready(Some(message)) => match message {
728                         RespMessage::Output(frame) => Poll::Ready(Ok(frame)),
729                         RespMessage::OutputExit(e) => Poll::Ready(Err(dispatch_client_error(e))),
730                     },
731                     Poll::Pending => Poll::Pending,
732                 }
733 
734                 #[cfg(feature = "ylong_base")]
735                 match receiver.poll_recv(cx) {
736                     Poll::Ready(Err(e)) => {
737                         Poll::Ready(Err(HttpClientError::from_error(ErrorKind::Request, e)))
738                     }
739                     Poll::Ready(Ok(message)) => match message {
740                         RespMessage::Output(frame) => Poll::Ready(Ok(frame)),
741                         RespMessage::OutputExit(e) => Poll::Ready(Err(dispatch_client_error(e))),
742                     },
743                     Poll::Pending => Poll::Pending,
744                 }
745             } else {
746                 Poll::Ready(err_from_msg!(Request, "Invalid Frame Receiver !"))
747             }
748         }
749     }
750 
async_send_preface<S>(writer: &mut WriteHalf<S>) -> Result<(), DispatchErrorKind> where S: AsyncWrite + Unpin,751     async fn async_send_preface<S>(writer: &mut WriteHalf<S>) -> Result<(), DispatchErrorKind>
752     where
753         S: AsyncWrite + Unpin,
754     {
755         const PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
756         writer
757             .write_all(PREFACE)
758             .await
759             .map_err(|e| DispatchErrorKind::Io(e.kind()))
760     }
761 
create_initial_settings(config: &H2Config) -> Frame762     pub(crate) fn create_initial_settings(config: &H2Config) -> Frame {
763         let settings = SettingsBuilder::new()
764             .max_header_list_size(config.max_header_list_size())
765             .max_frame_size(config.max_frame_size())
766             .header_table_size(config.header_table_size())
767             .enable_push(config.enable_push())
768             .initial_window_size(config.stream_window_size())
769             .build();
770 
771         Frame::new(0, FrameFlags::new(0), Payload::Settings(settings))
772     }
773 
774     impl From<std::io::Error> for DispatchErrorKind {
from(value: std::io::Error) -> Self775         fn from(value: std::io::Error) -> Self {
776             DispatchErrorKind::Io(value.kind())
777         }
778     }
779 
780     impl From<H2Error> for DispatchErrorKind {
from(err: H2Error) -> Self781         fn from(err: H2Error) -> Self {
782             DispatchErrorKind::H2(err)
783         }
784     }
785 
dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError786     pub(crate) fn dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError {
787         match dispatch_error {
788             DispatchErrorKind::H2(e) => HttpClientError::from_error(Request, HttpError::from(e)),
789             DispatchErrorKind::Io(e) => {
790                 HttpClientError::from_io_error(Request, std::io::Error::from(e))
791             }
792             DispatchErrorKind::ChannelClosed => {
793                 HttpClientError::from_str(Request, "Coroutine channel closed.")
794             }
795             DispatchErrorKind::Disconnect => {
796                 HttpClientError::from_str(Request, "remote peer closed.")
797             }
798         }
799     }
800 }
801 
802 #[cfg(feature = "http3")]
803 pub(crate) mod http3 {
804     use std::marker::PhantomData;
805     use std::pin::Pin;
806     use std::sync::atomic::{AtomicBool, Ordering};
807     use std::sync::{Arc, Mutex};
808 
809     use ylong_http::error::HttpError;
810     use ylong_http::h3::{Frame, FrameDecoder, H3Error};
811 
812     use crate::async_impl::QuicConn;
813     use crate::runtime::{
814         bounded_channel, unbounded_channel, AsyncRead, AsyncWrite, BoundedReceiver, BoundedSender,
815         UnboundedSender,
816     };
817     use crate::util::config::H3Config;
818     use crate::util::data_ref::BodyDataRef;
819     use crate::util::dispatcher::{ConnDispatcher, Dispatcher};
820     use crate::util::h3::io_manager::IOManager;
821     use crate::util::h3::stream_manager::StreamManager;
822     use crate::ErrorKind::Request;
823     use crate::{ConnDetail, ConnInfo, ErrorKind, HttpClientError};
824 
825     pub(crate) struct Http3Dispatcher<S> {
826         pub(crate) detail: ConnDetail,
827         pub(crate) req_tx: UnboundedSender<ReqMessage>,
828         pub(crate) handles: Vec<crate::runtime::JoinHandle<()>>,
829         pub(crate) _mark: PhantomData<S>,
830         pub(crate) io_shutdown: Arc<AtomicBool>,
831         pub(crate) io_goaway: Arc<AtomicBool>,
832     }
833 
834     pub(crate) struct Http3Conn<S> {
835         pub(crate) sender: UnboundedSender<ReqMessage>,
836         pub(crate) resp_receiver: BoundedReceiver<RespMessage>,
837         pub(crate) resp_sender: BoundedSender<RespMessage>,
838         pub(crate) io_shutdown: Arc<AtomicBool>,
839         pub(crate) detail: ConnDetail,
840         pub(crate) _mark: PhantomData<S>,
841     }
842 
843     pub(crate) struct RequestWrapper {
844         pub(crate) header: Frame,
845         pub(crate) data: BodyDataRef,
846     }
847 
848     #[derive(Debug, Clone)]
849     pub(crate) enum DispatchErrorKind {
850         H3(H3Error),
851         Io(std::io::ErrorKind),
852         Quic(quiche::Error),
853         ChannelClosed,
854         StreamFinished,
855         // todo: retry?
856         GoawayReceived,
857         Disconnect,
858     }
859 
860     pub(crate) enum RespMessage {
861         Output(Frame),
862         OutputExit(DispatchErrorKind),
863     }
864 
865     pub(crate) struct ReqMessage {
866         pub(crate) request: RequestWrapper,
867         pub(crate) frame_tx: BoundedSender<RespMessage>,
868     }
869 
870     impl<S> Http3Dispatcher<S>
871     where
872         S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
873     {
new( detail: ConnDetail, config: H3Config, io: S, quic_connection: QuicConn, ) -> Self874         pub(crate) fn new(
875             detail: ConnDetail,
876             config: H3Config,
877             io: S,
878             quic_connection: QuicConn,
879         ) -> Self {
880             let (req_tx, req_rx) = unbounded_channel();
881             let (io_manager_tx, io_manager_rx) = unbounded_channel();
882             let (stream_manager_tx, stream_manager_rx) = unbounded_channel();
883             let mut handles = Vec::with_capacity(2);
884             let conn = Arc::new(Mutex::new(quic_connection));
885             let io_shutdown = Arc::new(AtomicBool::new(false));
886             let io_goaway = Arc::new(AtomicBool::new(false));
887             let mut stream_manager = StreamManager::new(
888                 conn.clone(),
889                 io_manager_tx,
890                 stream_manager_rx,
891                 req_rx,
892                 FrameDecoder::new(
893                     config.qpack_blocked_streams() as usize,
894                     config.qpack_max_table_capacity() as usize,
895                 ),
896                 io_shutdown.clone(),
897                 io_goaway.clone(),
898             );
899             let stream_handle = crate::runtime::spawn(async move {
900                 if stream_manager.init(config).is_err() {
901                     return;
902                 }
903                 let _ = Pin::new(&mut stream_manager).await;
904             });
905             handles.push(stream_handle);
906 
907             let io_handle = crate::runtime::spawn(async move {
908                 let mut io_manager = IOManager::new(io, conn, io_manager_rx, stream_manager_tx);
909                 let _ = Pin::new(&mut io_manager).await;
910             });
911             handles.push(io_handle);
912             // read_rx gets readable stream ids and writable client channels, then read
913             // stream and send to the corresponding channel
914             Self {
915                 detail,
916                 req_tx,
917                 handles,
918                 _mark: PhantomData,
919                 io_shutdown,
920                 io_goaway,
921             }
922         }
923     }
924 
925     impl<S> Http3Conn<S> {
new( detail: ConnDetail, sender: UnboundedSender<ReqMessage>, io_shutdown: Arc<AtomicBool>, ) -> Self926         pub(crate) fn new(
927             detail: ConnDetail,
928             sender: UnboundedSender<ReqMessage>,
929             io_shutdown: Arc<AtomicBool>,
930         ) -> Self {
931             const CHANNEL_SIZE: usize = 3;
932             let (resp_sender, resp_receiver) = bounded_channel(CHANNEL_SIZE);
933             Self {
934                 sender,
935                 resp_sender,
936                 resp_receiver,
937                 _mark: PhantomData,
938                 io_shutdown,
939                 detail,
940             }
941         }
942 
send_frame_to_reader( &mut self, request: RequestWrapper, ) -> Result<(), HttpClientError>943         pub(crate) fn send_frame_to_reader(
944             &mut self,
945             request: RequestWrapper,
946         ) -> Result<(), HttpClientError> {
947             self.sender
948                 .send(ReqMessage {
949                     request,
950                     frame_tx: self.resp_sender.clone(),
951                 })
952                 .map_err(|_| {
953                     HttpClientError::from_str(ErrorKind::Request, "Request Sender Closed !")
954                 })
955         }
956 
recv_resp(&mut self) -> Result<Frame, HttpClientError>957         pub(crate) async fn recv_resp(&mut self) -> Result<Frame, HttpClientError> {
958             #[cfg(feature = "tokio_base")]
959             match self.resp_receiver.recv().await {
960                 None => err_from_msg!(Request, "Response Receiver Closed !"),
961                 Some(message) => match message {
962                     RespMessage::Output(frame) => Ok(frame),
963                     RespMessage::OutputExit(e) => Err(dispatch_client_error(e)),
964                 },
965             }
966 
967             #[cfg(feature = "ylong_base")]
968             match self.resp_receiver.recv().await {
969                 Err(err) => Err(HttpClientError::from_error(ErrorKind::Request, err)),
970                 Ok(message) => match message {
971                     RespMessage::Output(frame) => Ok(frame),
972                     RespMessage::OutputExit(e) => Err(dispatch_client_error(e)),
973                 },
974             }
975         }
976     }
977 
978     impl<S> ConnDispatcher<S>
979     where
980         S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
981     {
http3( detail: ConnDetail, config: H3Config, io: S, quic_connection: QuicConn, ) -> Self982         pub(crate) fn http3(
983             detail: ConnDetail,
984             config: H3Config,
985             io: S,
986             quic_connection: QuicConn,
987         ) -> Self {
988             Self::Http3(Http3Dispatcher::new(detail, config, io, quic_connection))
989         }
990     }
991 
992     impl<S> Dispatcher for Http3Dispatcher<S> {
993         type Handle = Http3Conn<S>;
994 
dispatch(&self) -> Option<Self::Handle>995         fn dispatch(&self) -> Option<Self::Handle> {
996             let sender = self.req_tx.clone();
997             Some(Http3Conn::new(
998                 self.detail.clone(),
999                 sender,
1000                 self.io_shutdown.clone(),
1001             ))
1002         }
1003 
is_shutdown(&self) -> bool1004         fn is_shutdown(&self) -> bool {
1005             self.io_shutdown.load(Ordering::Relaxed)
1006         }
1007 
is_goaway(&self) -> bool1008         fn is_goaway(&self) -> bool {
1009             self.io_goaway.load(Ordering::Relaxed)
1010         }
1011     }
1012 
1013     impl<S> Drop for Http3Dispatcher<S> {
drop(&mut self)1014         fn drop(&mut self) {
1015             for handle in &self.handles {
1016                 #[cfg(feature = "tokio_base")]
1017                 handle.abort();
1018                 #[cfg(feature = "ylong_base")]
1019                 handle.cancel();
1020             }
1021         }
1022     }
1023 
1024     impl From<std::io::Error> for DispatchErrorKind {
from(value: std::io::Error) -> Self1025         fn from(value: std::io::Error) -> Self {
1026             DispatchErrorKind::Io(value.kind())
1027         }
1028     }
1029 
1030     impl From<H3Error> for DispatchErrorKind {
from(err: H3Error) -> Self1031         fn from(err: H3Error) -> Self {
1032             DispatchErrorKind::H3(err)
1033         }
1034     }
1035 
1036     impl From<quiche::Error> for DispatchErrorKind {
from(value: quiche::Error) -> Self1037         fn from(value: quiche::Error) -> Self {
1038             DispatchErrorKind::Quic(value)
1039         }
1040     }
1041 
dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError1042     pub(crate) fn dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError {
1043         match dispatch_error {
1044             DispatchErrorKind::H3(e) => HttpClientError::from_error(Request, HttpError::from(e)),
1045             DispatchErrorKind::Io(e) => {
1046                 HttpClientError::from_io_error(Request, std::io::Error::from(e))
1047             }
1048             DispatchErrorKind::ChannelClosed => {
1049                 HttpClientError::from_str(Request, "Coroutine channel closed.")
1050             }
1051             DispatchErrorKind::Quic(e) => HttpClientError::from_error(Request, e),
1052             DispatchErrorKind::GoawayReceived => {
1053                 HttpClientError::from_str(Request, "received remote goaway.")
1054             }
1055             DispatchErrorKind::StreamFinished => {
1056                 HttpClientError::from_str(Request, "stream finished.")
1057             }
1058             DispatchErrorKind::Disconnect => {
1059                 HttpClientError::from_str(Request, "remote peer closed.")
1060             }
1061         }
1062     }
1063 }
1064 
1065 #[cfg(test)]
1066 mod ut_dispatch {
1067     use crate::dispatcher::{ConnDispatcher, Dispatcher};
1068 
1069     /// UT test cases for `ConnDispatcher::is_shutdown`.
1070     ///
1071     /// # Brief
1072     /// 1. Creates a `ConnDispatcher`.
1073     /// 2. Calls `ConnDispatcher::is_shutdown` to get the result.
1074     /// 3. Calls `ConnDispatcher::dispatch` to get the result.
1075     /// 4. Checks if the result is false.
1076     #[test]
ut_is_shutdown()1077     fn ut_is_shutdown() {
1078         let conn = ConnDispatcher::http1(b"Data");
1079         let res = conn.is_shutdown();
1080         assert!(!res);
1081         let res = conn.dispatch();
1082         assert!(res.is_some());
1083     }
1084 }
1085