1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use crate::util::ConnInfo; 15 use crate::{ConnDetail, TimeGroup}; 16 17 pub(crate) trait Dispatcher { 18 type Handle; 19 dispatch(&self) -> Option<Self::Handle>20 fn dispatch(&self) -> Option<Self::Handle>; 21 is_shutdown(&self) -> bool22 fn is_shutdown(&self) -> bool; 23 24 #[allow(dead_code)] is_goaway(&self) -> bool25 fn is_goaway(&self) -> bool; 26 } 27 28 pub(crate) enum ConnDispatcher<S> { 29 #[cfg(feature = "http1_1")] 30 Http1(http1::Http1Dispatcher<S>), 31 32 #[cfg(feature = "http2")] 33 Http2(http2::Http2Dispatcher<S>), 34 35 #[cfg(feature = "http3")] 36 Http3(http3::Http3Dispatcher<S>), 37 } 38 39 impl<S> Dispatcher for ConnDispatcher<S> { 40 type Handle = Conn<S>; 41 dispatch(&self) -> Option<Self::Handle>42 fn dispatch(&self) -> Option<Self::Handle> { 43 match self { 44 #[cfg(feature = "http1_1")] 45 Self::Http1(h1) => h1.dispatch().map(Conn::Http1), 46 47 #[cfg(feature = "http2")] 48 Self::Http2(h2) => h2.dispatch().map(Conn::Http2), 49 50 #[cfg(feature = "http3")] 51 Self::Http3(h3) => h3.dispatch().map(Conn::Http3), 52 } 53 } 54 is_shutdown(&self) -> bool55 fn is_shutdown(&self) -> bool { 56 match self { 57 #[cfg(feature = "http1_1")] 58 Self::Http1(h1) => h1.is_shutdown(), 59 60 #[cfg(feature = "http2")] 61 Self::Http2(h2) => h2.is_shutdown(), 62 63 #[cfg(feature = "http3")] 64 Self::Http3(h3) => h3.is_shutdown(), 65 } 66 } 67 is_goaway(&self) -> bool68 fn is_goaway(&self) -> bool { 69 match self { 70 #[cfg(feature = "http1_1")] 71 Self::Http1(h1) => h1.is_goaway(), 72 73 #[cfg(feature = "http2")] 74 Self::Http2(h2) => h2.is_goaway(), 75 76 #[cfg(feature = "http3")] 77 Self::Http3(h3) => h3.is_goaway(), 78 } 79 } 80 } 81 82 pub(crate) enum Conn<S> { 83 #[cfg(feature = "http1_1")] 84 Http1(http1::Http1Conn<S>), 85 86 #[cfg(feature = "http2")] 87 Http2(http2::Http2Conn<S>), 88 89 #[cfg(feature = "http3")] 90 Http3(http3::Http3Conn<S>), 91 } 92 93 impl<S: ConnInfo> Conn<S> { get_detail(&mut self) -> ConnDetail94 pub(crate) fn get_detail(&mut self) -> ConnDetail { 95 match self { 96 #[cfg(feature = "http1_1")] 97 Conn::Http1(io) => io.raw_mut().conn_data().detail(), 98 #[cfg(feature = "http2")] 99 Conn::Http2(io) => io.detail.clone(), 100 #[cfg(feature = "http3")] 101 Conn::Http3(io) => io.detail.clone(), 102 } 103 } 104 } 105 106 pub(crate) struct TimeInfoConn<S> { 107 conn: Conn<S>, 108 time_group: TimeGroup, 109 } 110 111 impl<S> TimeInfoConn<S> { new(conn: Conn<S>, time_group: TimeGroup) -> Self112 pub(crate) fn new(conn: Conn<S>, time_group: TimeGroup) -> Self { 113 Self { conn, time_group } 114 } 115 time_group_mut(&mut self) -> &mut TimeGroup116 pub(crate) fn time_group_mut(&mut self) -> &mut TimeGroup { 117 &mut self.time_group 118 } 119 time_group(&mut self) -> &TimeGroup120 pub(crate) fn time_group(&mut self) -> &TimeGroup { 121 &self.time_group 122 } 123 connection(self) -> Conn<S>124 pub(crate) fn connection(self) -> Conn<S> { 125 self.conn 126 } 127 } 128 129 #[cfg(feature = "http1_1")] 130 pub(crate) mod http1 { 131 use std::cell::UnsafeCell; 132 use std::sync::atomic::{AtomicBool, Ordering}; 133 use std::sync::Arc; 134 135 use super::{ConnDispatcher, Dispatcher}; 136 use crate::runtime::Semaphore; 137 #[cfg(feature = "tokio_base")] 138 use crate::runtime::SemaphorePermit; 139 140 impl<S> ConnDispatcher<S> { http1(io: S) -> Self141 pub(crate) fn http1(io: S) -> Self { 142 Self::Http1(Http1Dispatcher::new(io)) 143 } 144 } 145 146 /// HTTP1-based connection manager, which can dispatch connections to other 147 /// threads according to HTTP1 syntax. 148 pub(crate) struct Http1Dispatcher<S> { 149 inner: Arc<Inner<S>>, 150 } 151 152 pub(crate) struct Inner<S> { 153 pub(crate) io: UnsafeCell<S>, 154 // `occupied` indicates that the connection is occupied. Only one coroutine 155 // can get the handle at the same time. Once the handle is fetched, the flag 156 // position is true. 157 pub(crate) occupied: AtomicBool, 158 // `shutdown` indicates that the connection need to be shut down. 159 pub(crate) shutdown: AtomicBool, 160 } 161 162 unsafe impl<S> Sync for Inner<S> {} 163 164 impl<S> Http1Dispatcher<S> { new(io: S) -> Self165 pub(crate) fn new(io: S) -> Self { 166 Self { 167 inner: Arc::new(Inner { 168 io: UnsafeCell::new(io), 169 occupied: AtomicBool::new(false), 170 shutdown: AtomicBool::new(false), 171 }), 172 } 173 } 174 } 175 176 impl<S> Dispatcher for Http1Dispatcher<S> { 177 type Handle = Http1Conn<S>; 178 dispatch(&self) -> Option<Self::Handle>179 fn dispatch(&self) -> Option<Self::Handle> { 180 self.inner 181 .occupied 182 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) 183 .ok() 184 .map(|_| Http1Conn::from_inner(self.inner.clone())) 185 } 186 is_shutdown(&self) -> bool187 fn is_shutdown(&self) -> bool { 188 self.inner.shutdown.load(Ordering::Relaxed) 189 } 190 is_goaway(&self) -> bool191 fn is_goaway(&self) -> bool { 192 false 193 } 194 } 195 196 /// Handle returned to other threads for I/O operations. 197 pub(crate) struct Http1Conn<S> { 198 pub(crate) sem: Option<WrappedSemPermit>, 199 pub(crate) inner: Arc<Inner<S>>, 200 } 201 202 impl<S> Http1Conn<S> { from_inner(inner: Arc<Inner<S>>) -> Self203 pub(crate) fn from_inner(inner: Arc<Inner<S>>) -> Self { 204 Self { sem: None, inner } 205 } 206 occupy_sem(&mut self, sem: WrappedSemPermit)207 pub(crate) fn occupy_sem(&mut self, sem: WrappedSemPermit) { 208 self.sem = Some(sem); 209 } 210 raw_mut(&mut self) -> &mut S211 pub(crate) fn raw_mut(&mut self) -> &mut S { 212 // SAFETY: In the case of `HTTP1`, only one coroutine gets the handle 213 // at the same time. 214 unsafe { &mut *self.inner.io.get() } 215 } 216 shutdown(&self)217 pub(crate) fn shutdown(&self) { 218 self.inner.shutdown.store(true, Ordering::Release); 219 } 220 } 221 222 impl<S> Drop for Http1Conn<S> { drop(&mut self)223 fn drop(&mut self) { 224 self.inner.occupied.store(false, Ordering::Release) 225 } 226 } 227 228 pub(crate) struct WrappedSemaphore { 229 sem: Arc<Semaphore>, 230 } 231 232 impl WrappedSemaphore { new(permits: usize) -> Self233 pub(crate) fn new(permits: usize) -> Self { 234 Self { 235 #[cfg(feature = "tokio_base")] 236 sem: Arc::new(tokio::sync::Semaphore::new(permits)), 237 #[cfg(feature = "ylong_base")] 238 sem: Arc::new(ylong_runtime::sync::Semaphore::new(permits).unwrap()), 239 } 240 } 241 acquire(&self) -> WrappedSemPermit242 pub(crate) async fn acquire(&self) -> WrappedSemPermit { 243 #[cfg(feature = "ylong_base")] 244 { 245 let semaphore = self.sem.clone(); 246 let _permit = semaphore.acquire().await.unwrap(); 247 WrappedSemPermit { sem: semaphore } 248 } 249 250 #[cfg(feature = "tokio_base")] 251 { 252 let permit = self.sem.clone().acquire_owned().await.unwrap(); 253 WrappedSemPermit { permit } 254 } 255 } 256 } 257 258 impl Clone for WrappedSemaphore { clone(&self) -> Self259 fn clone(&self) -> Self { 260 Self { 261 sem: self.sem.clone(), 262 } 263 } 264 } 265 266 pub(crate) struct WrappedSemPermit { 267 #[cfg(feature = "ylong_base")] 268 pub(crate) sem: Arc<Semaphore>, 269 #[cfg(feature = "tokio_base")] 270 #[allow(dead_code)] 271 pub(crate) permit: SemaphorePermit, 272 } 273 274 #[cfg(feature = "ylong_base")] 275 impl Drop for WrappedSemPermit { drop(&mut self)276 fn drop(&mut self) { 277 self.sem.release(); 278 } 279 } 280 } 281 282 #[cfg(feature = "http2")] 283 pub(crate) mod http2 { 284 use std::collections::HashMap; 285 use std::future::Future; 286 use std::marker::PhantomData; 287 use std::pin::Pin; 288 use std::sync::atomic::{AtomicBool, Ordering}; 289 use std::sync::{Arc, Mutex}; 290 use std::task::{Context, Poll}; 291 292 use ylong_http::error::HttpError; 293 use ylong_http::h2::{ 294 ErrorCode, Frame, FrameDecoder, FrameEncoder, FrameFlags, Goaway, H2Error, Payload, 295 RstStream, Settings, SettingsBuilder, StreamId, 296 }; 297 298 use crate::runtime::{ 299 bounded_channel, unbounded_channel, AsyncRead, AsyncWrite, AsyncWriteExt, BoundedReceiver, 300 BoundedSender, SendError, UnboundedReceiver, UnboundedSender, WriteHalf, 301 }; 302 use crate::util::config::H2Config; 303 use crate::util::dispatcher::{ConnDispatcher, Dispatcher}; 304 use crate::util::h2::{ 305 ConnManager, FlowControl, H2StreamState, RecvData, RequestWrapper, SendData, 306 StreamEndState, Streams, 307 }; 308 use crate::ErrorKind::Request; 309 use crate::{ConnDetail, ErrorKind, HttpClientError}; 310 const DEFAULT_MAX_FRAME_SIZE: usize = 2 << 13; 311 const DEFAULT_WINDOW_SIZE: u32 = 65535; 312 313 pub(crate) type ManagerSendFut = 314 Pin<Box<dyn Future<Output = Result<(), SendError<RespMessage>>> + Send + Sync>>; 315 316 pub(crate) enum RespMessage { 317 Output(Frame), 318 OutputExit(DispatchErrorKind), 319 } 320 321 pub(crate) enum OutputMessage { 322 Output(Frame), 323 OutputExit(DispatchErrorKind), 324 } 325 326 pub(crate) struct ReqMessage { 327 pub(crate) sender: BoundedSender<RespMessage>, 328 pub(crate) request: RequestWrapper, 329 } 330 331 #[derive(Debug, Eq, PartialEq, Copy, Clone)] 332 pub(crate) enum DispatchErrorKind { 333 H2(H2Error), 334 Io(std::io::ErrorKind), 335 ChannelClosed, 336 Disconnect, 337 } 338 339 // HTTP2-based connection manager, which can dispatch connections to other 340 // threads according to HTTP2 syntax. 341 pub(crate) struct Http2Dispatcher<S> { 342 pub(crate) detail: ConnDetail, 343 pub(crate) allowed_cache: usize, 344 pub(crate) sender: UnboundedSender<ReqMessage>, 345 pub(crate) io_shutdown: Arc<AtomicBool>, 346 pub(crate) io_goaway: Arc<AtomicBool>, 347 pub(crate) handles: Vec<crate::runtime::JoinHandle<()>>, 348 pub(crate) _mark: PhantomData<S>, 349 } 350 351 pub(crate) struct Http2Conn<S> { 352 pub(crate) allow_cached_frames: usize, 353 // Sends frame to StreamController 354 pub(crate) sender: UnboundedSender<ReqMessage>, 355 pub(crate) receiver: RespReceiver, 356 pub(crate) io_shutdown: Arc<AtomicBool>, 357 pub(crate) detail: ConnDetail, 358 pub(crate) _mark: PhantomData<S>, 359 } 360 361 pub(crate) struct StreamController { 362 // The connection close flag organizes new stream commits to the current connection when 363 // closed. 364 pub(crate) io_shutdown: Arc<AtomicBool>, 365 pub(crate) io_goaway: Arc<AtomicBool>, 366 // The senders of all connected stream channels of response. 367 pub(crate) senders: HashMap<StreamId, BoundedSender<RespMessage>>, 368 pub(crate) curr_message: HashMap<StreamId, ManagerSendFut>, 369 // Stream information on the connection. 370 pub(crate) streams: Streams, 371 // Received GO_AWAY frame. 372 pub(crate) go_away_error_code: Option<u32>, 373 // The last GO_AWAY frame sent by the client. 374 pub(crate) go_away_sync: GoAwaySync, 375 } 376 377 #[derive(Default)] 378 pub(crate) struct GoAwaySync { 379 pub(crate) going_away: Option<Goaway>, 380 } 381 382 #[derive(Default)] 383 pub(crate) struct SettingsSync { 384 pub(crate) settings: SettingsState, 385 } 386 387 #[derive(Default, Clone)] 388 pub(crate) enum SettingsState { 389 Acknowledging(Settings), 390 #[default] 391 Synced, 392 } 393 394 #[derive(Default)] 395 pub(crate) struct RespReceiver { 396 receiver: Option<BoundedReceiver<RespMessage>>, 397 } 398 399 impl<S> ConnDispatcher<S> 400 where 401 S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static, 402 { http2(detail: ConnDetail, config: H2Config, io: S) -> Self403 pub(crate) fn http2(detail: ConnDetail, config: H2Config, io: S) -> Self { 404 Self::Http2(Http2Dispatcher::new(detail, config, io)) 405 } 406 } 407 408 impl<S> Http2Dispatcher<S> 409 where 410 S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static, 411 { new(detail: ConnDetail, config: H2Config, io: S) -> Self412 pub(crate) fn new(detail: ConnDetail, config: H2Config, io: S) -> Self { 413 let mut flow = FlowControl::new(DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE); 414 flow.setup_recv_window(config.conn_window_size()); 415 416 let streams = Streams::new(config.stream_window_size(), DEFAULT_WINDOW_SIZE, flow); 417 let shutdown_flag = Arc::new(AtomicBool::new(false)); 418 let goaway_flag = Arc::new(AtomicBool::new(false)); 419 let mut controller = 420 StreamController::new(streams, shutdown_flag.clone(), goaway_flag.clone()); 421 422 let (input_tx, input_rx) = unbounded_channel(); 423 let (req_tx, req_rx) = unbounded_channel(); 424 425 let settings = create_initial_settings(&config); 426 427 // Error is not possible, so it is not handled for the time 428 // being. 429 let mut handles = Vec::with_capacity(3); 430 // send initial settings and update conn recv window 431 if input_tx.send(settings).is_ok() 432 && controller 433 .streams 434 .release_conn_recv_window(0, &input_tx) 435 .is_ok() 436 { 437 Self::launch( 438 config.allowed_cache_frame_size(), 439 config.use_huffman_coding(), 440 controller, 441 (input_tx, input_rx), 442 req_rx, 443 &mut handles, 444 io, 445 ); 446 } 447 Self { 448 detail, 449 allowed_cache: config.allowed_cache_frame_size(), 450 sender: req_tx, 451 io_shutdown: shutdown_flag, 452 io_goaway: goaway_flag, 453 handles, 454 _mark: PhantomData, 455 } 456 } 457 launch( allow_num: usize, use_huffman: bool, controller: StreamController, input_channel: (UnboundedSender<Frame>, UnboundedReceiver<Frame>), req_rx: UnboundedReceiver<ReqMessage>, handles: &mut Vec<crate::runtime::JoinHandle<()>>, io: S, )458 fn launch( 459 allow_num: usize, 460 use_huffman: bool, 461 controller: StreamController, 462 input_channel: (UnboundedSender<Frame>, UnboundedReceiver<Frame>), 463 req_rx: UnboundedReceiver<ReqMessage>, 464 handles: &mut Vec<crate::runtime::JoinHandle<()>>, 465 io: S, 466 ) { 467 let (resp_tx, resp_rx) = bounded_channel(allow_num); 468 let (read, write) = crate::runtime::split(io); 469 let settings_sync = Arc::new(Mutex::new(SettingsSync::default())); 470 let send_settings_sync = settings_sync.clone(); 471 let send = crate::runtime::spawn(async move { 472 let mut writer = write; 473 if async_send_preface(&mut writer).await.is_ok() { 474 let encoder = FrameEncoder::new(DEFAULT_MAX_FRAME_SIZE, use_huffman); 475 let mut send = 476 SendData::new(encoder, send_settings_sync, writer, input_channel.1); 477 let _ = Pin::new(&mut send).await; 478 } 479 }); 480 handles.push(send); 481 482 let recv_settings_sync = settings_sync.clone(); 483 let recv = crate::runtime::spawn(async move { 484 let decoder = FrameDecoder::new(); 485 let mut recv = RecvData::new(decoder, recv_settings_sync, read, resp_tx); 486 let _ = Pin::new(&mut recv).await; 487 }); 488 handles.push(recv); 489 490 let manager = crate::runtime::spawn(async move { 491 let mut conn_manager = 492 ConnManager::new(settings_sync, input_channel.0, resp_rx, req_rx, controller); 493 let _ = Pin::new(&mut conn_manager).await; 494 }); 495 handles.push(manager); 496 } 497 } 498 499 impl<S> Dispatcher for Http2Dispatcher<S> { 500 type Handle = Http2Conn<S>; 501 dispatch(&self) -> Option<Self::Handle>502 fn dispatch(&self) -> Option<Self::Handle> { 503 let sender = self.sender.clone(); 504 let handle = Http2Conn::new( 505 self.allowed_cache, 506 self.io_shutdown.clone(), 507 sender, 508 self.detail.clone(), 509 ); 510 Some(handle) 511 } 512 is_shutdown(&self) -> bool513 fn is_shutdown(&self) -> bool { 514 self.io_shutdown.load(Ordering::Relaxed) 515 } 516 is_goaway(&self) -> bool517 fn is_goaway(&self) -> bool { 518 self.io_goaway.load(Ordering::Relaxed) 519 } 520 } 521 522 impl<S> Drop for Http2Dispatcher<S> { drop(&mut self)523 fn drop(&mut self) { 524 for handle in &self.handles { 525 #[cfg(feature = "ylong_base")] 526 handle.cancel(); 527 #[cfg(feature = "tokio_base")] 528 handle.abort(); 529 } 530 } 531 } 532 533 impl<S> Http2Conn<S> { new( allow_cached_num: usize, io_shutdown: Arc<AtomicBool>, sender: UnboundedSender<ReqMessage>, detail: ConnDetail, ) -> Self534 pub(crate) fn new( 535 allow_cached_num: usize, 536 io_shutdown: Arc<AtomicBool>, 537 sender: UnboundedSender<ReqMessage>, 538 detail: ConnDetail, 539 ) -> Self { 540 Self { 541 allow_cached_frames: allow_cached_num, 542 sender, 543 receiver: RespReceiver::default(), 544 io_shutdown, 545 detail, 546 _mark: PhantomData, 547 } 548 } 549 send_frame_to_controller( &mut self, request: RequestWrapper, ) -> Result<(), HttpClientError>550 pub(crate) fn send_frame_to_controller( 551 &mut self, 552 request: RequestWrapper, 553 ) -> Result<(), HttpClientError> { 554 let (tx, rx) = bounded_channel::<RespMessage>(self.allow_cached_frames); 555 self.receiver.set_receiver(rx); 556 self.sender 557 .send(ReqMessage { 558 sender: tx, 559 request, 560 }) 561 .map_err(|_| { 562 HttpClientError::from_str(ErrorKind::Request, "Request Sender Closed !") 563 }) 564 } 565 } 566 567 impl StreamController { new( streams: Streams, shutdown: Arc<AtomicBool>, goaway: Arc<AtomicBool>, ) -> Self568 pub(crate) fn new( 569 streams: Streams, 570 shutdown: Arc<AtomicBool>, 571 goaway: Arc<AtomicBool>, 572 ) -> Self { 573 Self { 574 io_shutdown: shutdown, 575 io_goaway: goaway, 576 senders: HashMap::new(), 577 curr_message: HashMap::new(), 578 streams, 579 go_away_error_code: None, 580 go_away_sync: GoAwaySync::default(), 581 } 582 } 583 shutdown(&self)584 pub(crate) fn shutdown(&self) { 585 self.io_shutdown.store(true, Ordering::Release); 586 } 587 goaway(&self)588 pub(crate) fn goaway(&self) { 589 self.io_goaway.store(true, Ordering::Release); 590 } 591 get_unsent_streams( &mut self, last_stream_id: StreamId, ) -> Result<Vec<StreamId>, H2Error>592 pub(crate) fn get_unsent_streams( 593 &mut self, 594 last_stream_id: StreamId, 595 ) -> Result<Vec<StreamId>, H2Error> { 596 // The last-stream-id in the subsequent GO_AWAY frame 597 // cannot be greater than the last-stream-id in the previous GO_AWAY frame. 598 if self.streams.max_send_id < last_stream_id { 599 return Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); 600 } 601 self.streams.max_send_id = last_stream_id; 602 Ok(self.streams.get_unset_streams(last_stream_id)) 603 } 604 send_message_to_stream( &mut self, cx: &mut Context<'_>, stream_id: StreamId, message: RespMessage, ) -> Poll<Result<(), H2Error>>605 pub(crate) fn send_message_to_stream( 606 &mut self, 607 cx: &mut Context<'_>, 608 stream_id: StreamId, 609 message: RespMessage, 610 ) -> Poll<Result<(), H2Error>> { 611 if let Some(sender) = self.senders.get(&stream_id) { 612 // If the client coroutine has exited, this frame is skipped. 613 let mut tx = { 614 let sender = sender.clone(); 615 let ft = async move { sender.send(message).await }; 616 Box::pin(ft) 617 }; 618 619 match tx.as_mut().poll(cx) { 620 Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), 621 // The current coroutine sending the request exited prematurely. 622 Poll::Ready(Err(_)) => { 623 self.senders.remove(&stream_id); 624 Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError))) 625 } 626 Poll::Pending => { 627 self.curr_message.insert(stream_id, tx); 628 Poll::Pending 629 } 630 } 631 } else { 632 Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError))) 633 } 634 } 635 poll_blocked_message( &mut self, cx: &mut Context<'_>, input_tx: &UnboundedSender<Frame>, ) -> Poll<()>636 pub(crate) fn poll_blocked_message( 637 &mut self, 638 cx: &mut Context<'_>, 639 input_tx: &UnboundedSender<Frame>, 640 ) -> Poll<()> { 641 let keys: Vec<StreamId> = self.curr_message.keys().cloned().collect(); 642 let mut blocked = false; 643 644 for key in keys { 645 if let Some(mut task) = self.curr_message.remove(&key) { 646 match task.as_mut().poll(cx) { 647 Poll::Ready(Ok(_)) => {} 648 // The current coroutine sending the request exited prematurely. 649 Poll::Ready(Err(_)) => { 650 self.senders.remove(&key); 651 if let Some(state) = self.streams.stream_state(key) { 652 if !matches!(state, H2StreamState::Closed(_)) { 653 if let StreamEndState::OK = self.streams.send_local_reset(key) { 654 let rest_payload = 655 RstStream::new(ErrorCode::NoError.into_code()); 656 let frame = Frame::new( 657 key, 658 FrameFlags::empty(), 659 Payload::RstStream(rest_payload), 660 ); 661 // ignore the send error occurs here in order to finish all 662 // tasks. 663 let _ = input_tx.send(frame); 664 } 665 } 666 } 667 } 668 Poll::Pending => { 669 self.curr_message.insert(key, task); 670 blocked = true; 671 } 672 } 673 } 674 } 675 if blocked { 676 Poll::Pending 677 } else { 678 Poll::Ready(()) 679 } 680 } 681 } 682 683 impl RespReceiver { set_receiver(&mut self, receiver: BoundedReceiver<RespMessage>)684 pub(crate) fn set_receiver(&mut self, receiver: BoundedReceiver<RespMessage>) { 685 self.receiver = Some(receiver); 686 } 687 recv(&mut self) -> Result<Frame, HttpClientError>688 pub(crate) async fn recv(&mut self) -> Result<Frame, HttpClientError> { 689 match self.receiver { 690 Some(ref mut receiver) => { 691 #[cfg(feature = "tokio_base")] 692 match receiver.recv().await { 693 None => err_from_msg!(Request, "Response Receiver Closed !"), 694 Some(message) => match message { 695 RespMessage::Output(frame) => Ok(frame), 696 RespMessage::OutputExit(e) => Err(dispatch_client_error(e)), 697 }, 698 } 699 700 #[cfg(feature = "ylong_base")] 701 match receiver.recv().await { 702 Err(err) => Err(HttpClientError::from_error(ErrorKind::Request, err)), 703 Ok(message) => match message { 704 RespMessage::Output(frame) => Ok(frame), 705 RespMessage::OutputExit(e) => Err(dispatch_client_error(e)), 706 }, 707 } 708 } 709 // this will not happen. 710 None => Err(HttpClientError::from_str( 711 ErrorKind::Request, 712 "Invalid Frame Receiver !", 713 )), 714 } 715 } 716 poll_recv( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<Frame, HttpClientError>>717 pub(crate) fn poll_recv( 718 &mut self, 719 cx: &mut Context<'_>, 720 ) -> Poll<Result<Frame, HttpClientError>> { 721 if let Some(ref mut receiver) = self.receiver { 722 #[cfg(feature = "tokio_base")] 723 match receiver.poll_recv(cx) { 724 Poll::Ready(None) => { 725 Poll::Ready(err_from_msg!(Request, "Error receive response !")) 726 } 727 Poll::Ready(Some(message)) => match message { 728 RespMessage::Output(frame) => Poll::Ready(Ok(frame)), 729 RespMessage::OutputExit(e) => Poll::Ready(Err(dispatch_client_error(e))), 730 }, 731 Poll::Pending => Poll::Pending, 732 } 733 734 #[cfg(feature = "ylong_base")] 735 match receiver.poll_recv(cx) { 736 Poll::Ready(Err(e)) => { 737 Poll::Ready(Err(HttpClientError::from_error(ErrorKind::Request, e))) 738 } 739 Poll::Ready(Ok(message)) => match message { 740 RespMessage::Output(frame) => Poll::Ready(Ok(frame)), 741 RespMessage::OutputExit(e) => Poll::Ready(Err(dispatch_client_error(e))), 742 }, 743 Poll::Pending => Poll::Pending, 744 } 745 } else { 746 Poll::Ready(err_from_msg!(Request, "Invalid Frame Receiver !")) 747 } 748 } 749 } 750 async_send_preface<S>(writer: &mut WriteHalf<S>) -> Result<(), DispatchErrorKind> where S: AsyncWrite + Unpin,751 async fn async_send_preface<S>(writer: &mut WriteHalf<S>) -> Result<(), DispatchErrorKind> 752 where 753 S: AsyncWrite + Unpin, 754 { 755 const PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 756 writer 757 .write_all(PREFACE) 758 .await 759 .map_err(|e| DispatchErrorKind::Io(e.kind())) 760 } 761 create_initial_settings(config: &H2Config) -> Frame762 pub(crate) fn create_initial_settings(config: &H2Config) -> Frame { 763 let settings = SettingsBuilder::new() 764 .max_header_list_size(config.max_header_list_size()) 765 .max_frame_size(config.max_frame_size()) 766 .header_table_size(config.header_table_size()) 767 .enable_push(config.enable_push()) 768 .initial_window_size(config.stream_window_size()) 769 .build(); 770 771 Frame::new(0, FrameFlags::new(0), Payload::Settings(settings)) 772 } 773 774 impl From<std::io::Error> for DispatchErrorKind { from(value: std::io::Error) -> Self775 fn from(value: std::io::Error) -> Self { 776 DispatchErrorKind::Io(value.kind()) 777 } 778 } 779 780 impl From<H2Error> for DispatchErrorKind { from(err: H2Error) -> Self781 fn from(err: H2Error) -> Self { 782 DispatchErrorKind::H2(err) 783 } 784 } 785 dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError786 pub(crate) fn dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError { 787 match dispatch_error { 788 DispatchErrorKind::H2(e) => HttpClientError::from_error(Request, HttpError::from(e)), 789 DispatchErrorKind::Io(e) => { 790 HttpClientError::from_io_error(Request, std::io::Error::from(e)) 791 } 792 DispatchErrorKind::ChannelClosed => { 793 HttpClientError::from_str(Request, "Coroutine channel closed.") 794 } 795 DispatchErrorKind::Disconnect => { 796 HttpClientError::from_str(Request, "remote peer closed.") 797 } 798 } 799 } 800 } 801 802 #[cfg(feature = "http3")] 803 pub(crate) mod http3 { 804 use std::marker::PhantomData; 805 use std::pin::Pin; 806 use std::sync::atomic::{AtomicBool, Ordering}; 807 use std::sync::{Arc, Mutex}; 808 809 use ylong_http::error::HttpError; 810 use ylong_http::h3::{Frame, FrameDecoder, H3Error}; 811 812 use crate::async_impl::QuicConn; 813 use crate::runtime::{ 814 bounded_channel, unbounded_channel, AsyncRead, AsyncWrite, BoundedReceiver, BoundedSender, 815 UnboundedSender, 816 }; 817 use crate::util::config::H3Config; 818 use crate::util::data_ref::BodyDataRef; 819 use crate::util::dispatcher::{ConnDispatcher, Dispatcher}; 820 use crate::util::h3::io_manager::IOManager; 821 use crate::util::h3::stream_manager::StreamManager; 822 use crate::ErrorKind::Request; 823 use crate::{ConnDetail, ConnInfo, ErrorKind, HttpClientError}; 824 825 pub(crate) struct Http3Dispatcher<S> { 826 pub(crate) detail: ConnDetail, 827 pub(crate) req_tx: UnboundedSender<ReqMessage>, 828 pub(crate) handles: Vec<crate::runtime::JoinHandle<()>>, 829 pub(crate) _mark: PhantomData<S>, 830 pub(crate) io_shutdown: Arc<AtomicBool>, 831 pub(crate) io_goaway: Arc<AtomicBool>, 832 } 833 834 pub(crate) struct Http3Conn<S> { 835 pub(crate) sender: UnboundedSender<ReqMessage>, 836 pub(crate) resp_receiver: BoundedReceiver<RespMessage>, 837 pub(crate) resp_sender: BoundedSender<RespMessage>, 838 pub(crate) io_shutdown: Arc<AtomicBool>, 839 pub(crate) detail: ConnDetail, 840 pub(crate) _mark: PhantomData<S>, 841 } 842 843 pub(crate) struct RequestWrapper { 844 pub(crate) header: Frame, 845 pub(crate) data: BodyDataRef, 846 } 847 848 #[derive(Debug, Clone)] 849 pub(crate) enum DispatchErrorKind { 850 H3(H3Error), 851 Io(std::io::ErrorKind), 852 Quic(quiche::Error), 853 ChannelClosed, 854 StreamFinished, 855 // todo: retry? 856 GoawayReceived, 857 Disconnect, 858 } 859 860 pub(crate) enum RespMessage { 861 Output(Frame), 862 OutputExit(DispatchErrorKind), 863 } 864 865 pub(crate) struct ReqMessage { 866 pub(crate) request: RequestWrapper, 867 pub(crate) frame_tx: BoundedSender<RespMessage>, 868 } 869 870 impl<S> Http3Dispatcher<S> 871 where 872 S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static, 873 { new( detail: ConnDetail, config: H3Config, io: S, quic_connection: QuicConn, ) -> Self874 pub(crate) fn new( 875 detail: ConnDetail, 876 config: H3Config, 877 io: S, 878 quic_connection: QuicConn, 879 ) -> Self { 880 let (req_tx, req_rx) = unbounded_channel(); 881 let (io_manager_tx, io_manager_rx) = unbounded_channel(); 882 let (stream_manager_tx, stream_manager_rx) = unbounded_channel(); 883 let mut handles = Vec::with_capacity(2); 884 let conn = Arc::new(Mutex::new(quic_connection)); 885 let io_shutdown = Arc::new(AtomicBool::new(false)); 886 let io_goaway = Arc::new(AtomicBool::new(false)); 887 let mut stream_manager = StreamManager::new( 888 conn.clone(), 889 io_manager_tx, 890 stream_manager_rx, 891 req_rx, 892 FrameDecoder::new( 893 config.qpack_blocked_streams() as usize, 894 config.qpack_max_table_capacity() as usize, 895 ), 896 io_shutdown.clone(), 897 io_goaway.clone(), 898 ); 899 let stream_handle = crate::runtime::spawn(async move { 900 if stream_manager.init(config).is_err() { 901 return; 902 } 903 let _ = Pin::new(&mut stream_manager).await; 904 }); 905 handles.push(stream_handle); 906 907 let io_handle = crate::runtime::spawn(async move { 908 let mut io_manager = IOManager::new(io, conn, io_manager_rx, stream_manager_tx); 909 let _ = Pin::new(&mut io_manager).await; 910 }); 911 handles.push(io_handle); 912 // read_rx gets readable stream ids and writable client channels, then read 913 // stream and send to the corresponding channel 914 Self { 915 detail, 916 req_tx, 917 handles, 918 _mark: PhantomData, 919 io_shutdown, 920 io_goaway, 921 } 922 } 923 } 924 925 impl<S> Http3Conn<S> { new( detail: ConnDetail, sender: UnboundedSender<ReqMessage>, io_shutdown: Arc<AtomicBool>, ) -> Self926 pub(crate) fn new( 927 detail: ConnDetail, 928 sender: UnboundedSender<ReqMessage>, 929 io_shutdown: Arc<AtomicBool>, 930 ) -> Self { 931 const CHANNEL_SIZE: usize = 3; 932 let (resp_sender, resp_receiver) = bounded_channel(CHANNEL_SIZE); 933 Self { 934 sender, 935 resp_sender, 936 resp_receiver, 937 _mark: PhantomData, 938 io_shutdown, 939 detail, 940 } 941 } 942 send_frame_to_reader( &mut self, request: RequestWrapper, ) -> Result<(), HttpClientError>943 pub(crate) fn send_frame_to_reader( 944 &mut self, 945 request: RequestWrapper, 946 ) -> Result<(), HttpClientError> { 947 self.sender 948 .send(ReqMessage { 949 request, 950 frame_tx: self.resp_sender.clone(), 951 }) 952 .map_err(|_| { 953 HttpClientError::from_str(ErrorKind::Request, "Request Sender Closed !") 954 }) 955 } 956 recv_resp(&mut self) -> Result<Frame, HttpClientError>957 pub(crate) async fn recv_resp(&mut self) -> Result<Frame, HttpClientError> { 958 #[cfg(feature = "tokio_base")] 959 match self.resp_receiver.recv().await { 960 None => err_from_msg!(Request, "Response Receiver Closed !"), 961 Some(message) => match message { 962 RespMessage::Output(frame) => Ok(frame), 963 RespMessage::OutputExit(e) => Err(dispatch_client_error(e)), 964 }, 965 } 966 967 #[cfg(feature = "ylong_base")] 968 match self.resp_receiver.recv().await { 969 Err(err) => Err(HttpClientError::from_error(ErrorKind::Request, err)), 970 Ok(message) => match message { 971 RespMessage::Output(frame) => Ok(frame), 972 RespMessage::OutputExit(e) => Err(dispatch_client_error(e)), 973 }, 974 } 975 } 976 } 977 978 impl<S> ConnDispatcher<S> 979 where 980 S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static, 981 { http3( detail: ConnDetail, config: H3Config, io: S, quic_connection: QuicConn, ) -> Self982 pub(crate) fn http3( 983 detail: ConnDetail, 984 config: H3Config, 985 io: S, 986 quic_connection: QuicConn, 987 ) -> Self { 988 Self::Http3(Http3Dispatcher::new(detail, config, io, quic_connection)) 989 } 990 } 991 992 impl<S> Dispatcher for Http3Dispatcher<S> { 993 type Handle = Http3Conn<S>; 994 dispatch(&self) -> Option<Self::Handle>995 fn dispatch(&self) -> Option<Self::Handle> { 996 let sender = self.req_tx.clone(); 997 Some(Http3Conn::new( 998 self.detail.clone(), 999 sender, 1000 self.io_shutdown.clone(), 1001 )) 1002 } 1003 is_shutdown(&self) -> bool1004 fn is_shutdown(&self) -> bool { 1005 self.io_shutdown.load(Ordering::Relaxed) 1006 } 1007 is_goaway(&self) -> bool1008 fn is_goaway(&self) -> bool { 1009 self.io_goaway.load(Ordering::Relaxed) 1010 } 1011 } 1012 1013 impl<S> Drop for Http3Dispatcher<S> { drop(&mut self)1014 fn drop(&mut self) { 1015 for handle in &self.handles { 1016 #[cfg(feature = "tokio_base")] 1017 handle.abort(); 1018 #[cfg(feature = "ylong_base")] 1019 handle.cancel(); 1020 } 1021 } 1022 } 1023 1024 impl From<std::io::Error> for DispatchErrorKind { from(value: std::io::Error) -> Self1025 fn from(value: std::io::Error) -> Self { 1026 DispatchErrorKind::Io(value.kind()) 1027 } 1028 } 1029 1030 impl From<H3Error> for DispatchErrorKind { from(err: H3Error) -> Self1031 fn from(err: H3Error) -> Self { 1032 DispatchErrorKind::H3(err) 1033 } 1034 } 1035 1036 impl From<quiche::Error> for DispatchErrorKind { from(value: quiche::Error) -> Self1037 fn from(value: quiche::Error) -> Self { 1038 DispatchErrorKind::Quic(value) 1039 } 1040 } 1041 dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError1042 pub(crate) fn dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError { 1043 match dispatch_error { 1044 DispatchErrorKind::H3(e) => HttpClientError::from_error(Request, HttpError::from(e)), 1045 DispatchErrorKind::Io(e) => { 1046 HttpClientError::from_io_error(Request, std::io::Error::from(e)) 1047 } 1048 DispatchErrorKind::ChannelClosed => { 1049 HttpClientError::from_str(Request, "Coroutine channel closed.") 1050 } 1051 DispatchErrorKind::Quic(e) => HttpClientError::from_error(Request, e), 1052 DispatchErrorKind::GoawayReceived => { 1053 HttpClientError::from_str(Request, "received remote goaway.") 1054 } 1055 DispatchErrorKind::StreamFinished => { 1056 HttpClientError::from_str(Request, "stream finished.") 1057 } 1058 DispatchErrorKind::Disconnect => { 1059 HttpClientError::from_str(Request, "remote peer closed.") 1060 } 1061 } 1062 } 1063 } 1064 1065 #[cfg(test)] 1066 mod ut_dispatch { 1067 use crate::dispatcher::{ConnDispatcher, Dispatcher}; 1068 1069 /// UT test cases for `ConnDispatcher::is_shutdown`. 1070 /// 1071 /// # Brief 1072 /// 1. Creates a `ConnDispatcher`. 1073 /// 2. Calls `ConnDispatcher::is_shutdown` to get the result. 1074 /// 3. Calls `ConnDispatcher::dispatch` to get the result. 1075 /// 4. Checks if the result is false. 1076 #[test] ut_is_shutdown()1077 fn ut_is_shutdown() { 1078 let conn = ConnDispatcher::http1(b"Data"); 1079 let res = conn.is_shutdown(); 1080 assert!(!res); 1081 let res = conn.dispatch(); 1082 assert!(res.is_some()); 1083 } 1084 } 1085