1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 pub(crate) trait Dispatcher { 15 type Handle; 16 dispatch(&self) -> Option<Self::Handle>17 fn dispatch(&self) -> Option<Self::Handle>; 18 is_shutdown(&self) -> bool19 fn is_shutdown(&self) -> bool; 20 } 21 22 pub(crate) enum ConnDispatcher<S> { 23 #[cfg(feature = "http1_1")] 24 Http1(http1::Http1Dispatcher<S>), 25 26 #[cfg(feature = "http2")] 27 Http2(http2::Http2Dispatcher<S>), 28 } 29 30 impl<S> Dispatcher for ConnDispatcher<S> { 31 type Handle = Conn<S>; 32 dispatch(&self) -> Option<Self::Handle>33 fn dispatch(&self) -> Option<Self::Handle> { 34 match self { 35 #[cfg(feature = "http1_1")] 36 Self::Http1(h1) => h1.dispatch().map(Conn::Http1), 37 38 #[cfg(feature = "http2")] 39 Self::Http2(h2) => h2.dispatch().map(Conn::Http2), 40 } 41 } 42 is_shutdown(&self) -> bool43 fn is_shutdown(&self) -> bool { 44 match self { 45 #[cfg(feature = "http1_1")] 46 Self::Http1(h1) => h1.is_shutdown(), 47 48 #[cfg(feature = "http2")] 49 Self::Http2(h2) => h2.is_shutdown(), 50 } 51 } 52 } 53 54 pub(crate) enum Conn<S> { 55 #[cfg(feature = "http1_1")] 56 Http1(http1::Http1Conn<S>), 57 58 #[cfg(feature = "http2")] 59 Http2(http2::Http2Conn<S>), 60 } 61 62 #[cfg(feature = "http1_1")] 63 pub(crate) mod http1 { 64 use std::sync::atomic::{AtomicBool, Ordering}; 65 use std::sync::Arc; 66 67 use super::{ConnDispatcher, Dispatcher}; 68 69 impl<S> ConnDispatcher<S> { http1(io: S) -> Self70 pub(crate) fn http1(io: S) -> Self { 71 Self::Http1(Http1Dispatcher::new(io)) 72 } 73 } 74 75 /// HTTP1-based connection manager, which can dispatch connections to other 76 /// threads according to HTTP1 syntax. 77 pub(crate) struct Http1Dispatcher<S> { 78 inner: Arc<Inner<S>>, 79 } 80 81 pub(crate) struct Inner<S> { 82 pub(crate) io: S, 83 // `occupied` indicates that the connection is occupied. Only one coroutine 84 // can get the handle at the same time. Once the handle is fetched, the flag 85 // position is true. 86 pub(crate) occupied: AtomicBool, 87 // `shutdown` indicates that the connection need to be shut down. 88 pub(crate) shutdown: AtomicBool, 89 } 90 91 impl<S> Http1Dispatcher<S> { new(io: S) -> Self92 pub(crate) fn new(io: S) -> Self { 93 Self { 94 inner: Arc::new(Inner { 95 io, 96 occupied: AtomicBool::new(false), 97 shutdown: AtomicBool::new(false), 98 }), 99 } 100 } 101 } 102 103 impl<S> Dispatcher for Http1Dispatcher<S> { 104 type Handle = Http1Conn<S>; 105 dispatch(&self) -> Option<Self::Handle>106 fn dispatch(&self) -> Option<Self::Handle> { 107 self.inner 108 .occupied 109 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) 110 .ok() 111 .map(|_| Http1Conn { 112 inner: self.inner.clone(), 113 }) 114 } 115 is_shutdown(&self) -> bool116 fn is_shutdown(&self) -> bool { 117 self.inner.shutdown.load(Ordering::Relaxed) 118 } 119 } 120 121 /// Handle returned to other threads for I/O operations. 122 pub(crate) struct Http1Conn<S> { 123 pub(crate) inner: Arc<Inner<S>>, 124 } 125 126 impl<S> Http1Conn<S> { 127 // TODO: Use `UnsafeCell` instead when `Arc::get_mut_unchecked` become stable. raw_mut(&mut self) -> &mut S128 pub(crate) fn raw_mut(&mut self) -> &mut S { 129 // SAFETY: In the case of `HTTP1`, only one coroutine gets the handle 130 // at the same time. 131 &mut unsafe { &mut *(Arc::as_ptr(&self.inner) as *mut Inner<S>) }.io 132 } 133 shutdown(&self)134 pub(crate) fn shutdown(&self) { 135 self.inner.shutdown.store(true, Ordering::Release); 136 } 137 } 138 139 impl<S> Drop for Http1Conn<S> { drop(&mut self)140 fn drop(&mut self) { 141 self.inner.occupied.store(false, Ordering::Release) 142 } 143 } 144 } 145 146 #[cfg(feature = "http2")] 147 pub(crate) mod http2 { 148 use std::collections::{HashMap, VecDeque}; 149 use std::future::Future; 150 use std::mem::take; 151 use std::pin::Pin; 152 use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; 153 use std::sync::{Arc, Mutex}; 154 use std::task::{Context, Poll, Waker}; 155 156 use ylong_http::error::HttpError; 157 use ylong_http::h2; 158 use ylong_http::h2::Payload::Settings; 159 use ylong_http::h2::{ 160 ErrorCode, Frame, FrameDecoder, FrameEncoder, FrameFlags, FrameKind, FramesIntoIter, 161 Goaway, H2Error, Payload, RstStream, Setting, SettingsBuilder, 162 }; 163 164 use super::{ConnDispatcher, Dispatcher}; 165 use crate::dispatcher::http2::StreamState::Closed; 166 use crate::error::HttpClientError; 167 use crate::util::H2Config; 168 use crate::{ 169 unbounded_channel, AsyncMutex, AsyncRead, AsyncWrite, ErrorKind, MutexGuard, ReadBuf, 170 TryRecvError, UnboundedReceiver, UnboundedSender, 171 }; 172 173 impl<S> ConnDispatcher<S> { http2(config: H2Config, io: S) -> Self174 pub(crate) fn http2(config: H2Config, io: S) -> Self { 175 Self::Http2(Http2Dispatcher::new(config, io)) 176 } 177 } 178 179 // The data type of the first Frame sent to the `StreamController`. 180 type Send2Ctrl = (Option<(u32, UnboundedSender<Frame>)>, Frame); 181 182 const DEFAULT_MAX_STREAM_ID: u32 = u32::MAX >> 1; 183 const DEFAULT_MAX_FRAME_SIZE: usize = 2 << 13; 184 const DEFAULT_MAX_HEADER_LIST_SIZE: usize = 16 << 20; 185 186 // HTTP2-based connection manager, which can dispatch connections to other 187 // threads according to HTTP2 syntax. 188 pub(crate) struct Http2Dispatcher<S> { 189 pub(crate) controller: Arc<StreamController<S>>, 190 pub(crate) next_stream_id: Arc<StreamId>, 191 pub(crate) sender: UnboundedSender<Send2Ctrl>, 192 } 193 194 pub(crate) struct Http2Conn<S> { 195 // Handle id 196 pub(crate) id: u32, 197 // Sends frame to StreamController 198 pub(crate) sender: UnboundedSender<Send2Ctrl>, 199 pub(crate) stream_info: StreamInfo<S>, 200 } 201 202 pub(crate) struct StreamInfo<S> { 203 // Stream id 204 pub(crate) id: u32, 205 pub(crate) next_stream_id: Arc<StreamId>, 206 // Receive the Response frame transmitted from the StreamController 207 pub(crate) receiver: FrameReceiver, 208 // Used to handle TCP Stream 209 pub(crate) controller: Arc<StreamController<S>>, 210 } 211 212 pub(crate) struct StreamController<S> { 213 // I/O unavailability flag, which prevents the upper layer from using this I/O to create 214 // new streams. 215 pub(crate) io_shutdown: AtomicBool, 216 // Indicates that the dispatcher is occupied. At this time, a user coroutine is already 217 // acting as the dispatcher. 218 pub(crate) occupied: AtomicU32, 219 pub(crate) dispatcher_invalid: AtomicBool, 220 pub(crate) manager: AsyncMutex<IoManager<S>>, 221 pub(crate) stream_waker: Mutex<StreamWaker>, 222 } 223 224 pub(crate) struct StreamWaker { 225 waker: HashMap<u32, Waker>, 226 } 227 228 pub(crate) struct IoManager<S> { 229 inner: Inner<S>, 230 senders: HashMap<u32, UnboundedSender<Frame>>, 231 frame_receiver: UnboundedReceiver<Send2Ctrl>, 232 streams: Streams, 233 frame_iter: FrameIter, 234 connection_frame: ConnectionFrames, 235 } 236 237 #[derive(Default)] 238 pub(crate) struct FrameIter { 239 iter: Option<FramesIntoIter>, 240 } 241 242 pub(crate) struct Streams { 243 stream_to_send: VecDeque<u32>, 244 buffer: HashMap<u32, StreamBuffer>, 245 } 246 247 pub(crate) struct StreamBuffer { 248 state: StreamState, 249 frames: VecDeque<Frame>, 250 } 251 252 pub(crate) struct Inner<S> { 253 pub(crate) io: S, 254 pub(crate) encoder: FrameEncoder, 255 pub(crate) decoder: FrameDecoder, 256 } 257 258 pub(crate) enum ReadState { 259 EmptyIo, 260 CurrentStream, 261 } 262 263 enum DispatchState { 264 Partial, 265 Finish, 266 } 267 268 #[derive(Clone)] 269 pub(crate) enum ResetReason { 270 Local, 271 Remote, 272 Goaway(u32), 273 } 274 275 #[derive(Clone)] 276 pub(crate) enum SettingsSync { 277 Send(h2::Settings), 278 Acknowledging(h2::Settings), 279 Synced, 280 } 281 282 pub(crate) struct StreamId { 283 // TODO Determine the maximum value of id. 284 next_id: AtomicU32, 285 } 286 287 // TODO Add "open", "half-closed", "reserved" state 288 #[derive(Clone)] 289 pub(crate) enum StreamState { 290 Idle, 291 Closed(ResetReason), 292 } 293 294 #[derive(Default)] 295 pub(crate) struct FrameReceiver { 296 receiver: Option<UnboundedReceiver<Frame>>, 297 } 298 299 impl<S> StreamController<S> { new( inner: Inner<S>, frame_receiver: UnboundedReceiver<Send2Ctrl>, connection_frame: ConnectionFrames, ) -> Self300 pub(crate) fn new( 301 inner: Inner<S>, 302 frame_receiver: UnboundedReceiver<Send2Ctrl>, 303 connection_frame: ConnectionFrames, 304 ) -> Self { 305 let manager = IoManager::new(inner, frame_receiver, connection_frame); 306 Self { 307 io_shutdown: AtomicBool::new(false), 308 // 0 means io is not occupied 309 occupied: AtomicU32::new(0), 310 dispatcher_invalid: AtomicBool::new(false), 311 manager: AsyncMutex::new(manager), 312 stream_waker: Mutex::new(StreamWaker::new()), 313 } 314 } 315 shutdown(&self)316 pub(crate) fn shutdown(&self) { 317 self.io_shutdown.store(true, Ordering::Release); 318 } 319 invalid(&self)320 pub(crate) fn invalid(&self) { 321 self.dispatcher_invalid.store(true, Ordering::Release); 322 } 323 } 324 325 impl Streams { new() -> Self326 pub(crate) fn new() -> Self { 327 Self { 328 stream_to_send: VecDeque::new(), 329 buffer: HashMap::new(), 330 } 331 } 332 size(&self) -> usize333 pub(crate) fn size(&self) -> usize { 334 self.stream_to_send.len() 335 } 336 insert(&mut self, frame: Frame)337 pub(crate) fn insert(&mut self, frame: Frame) { 338 let id = frame.stream_id() as u32; 339 self.stream_to_send.push_back(id); 340 match self.buffer.get_mut(&id) { 341 Some(sender) => { 342 sender.push_back(frame); 343 } 344 None => { 345 let mut sender = StreamBuffer::new(); 346 sender.push_back(frame); 347 self.buffer.insert(id, sender); 348 } 349 } 350 } 351 get_goaway_streams( &mut self, last_stream_id: u32, ) -> Result<Vec<u32>, H2Error>352 pub(crate) fn get_goaway_streams( 353 &mut self, 354 last_stream_id: u32, 355 ) -> Result<Vec<u32>, H2Error> { 356 let mut ids = vec![]; 357 for (id, sender) in self.buffer.iter_mut() { 358 if *id >= last_stream_id { 359 ids.push(*id); 360 sender.go_away(*id)?; 361 } 362 } 363 Ok(ids) 364 } 365 recv_local_reset(&mut self, id: u32) -> Result<(), H2Error>366 pub(crate) fn recv_local_reset(&mut self, id: u32) -> Result<(), H2Error> { 367 match self.buffer.get_mut(&id) { 368 None => Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 369 Some(sender) => { 370 match sender.state { 371 Closed(ResetReason::Remote | ResetReason::Local) => {} 372 _ => { 373 sender.state = Closed(ResetReason::Local); 374 } 375 } 376 Ok(()) 377 } 378 } 379 } 380 recv_remote_reset(&mut self, id: u32) -> Result<(), H2Error>381 pub(crate) fn recv_remote_reset(&mut self, id: u32) -> Result<(), H2Error> { 382 match self.buffer.get_mut(&id) { 383 None => Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 384 Some(sender) => { 385 match sender.state { 386 Closed(ResetReason::Remote) => {} 387 _ => { 388 sender.state = Closed(ResetReason::Remote); 389 } 390 } 391 Ok(()) 392 } 393 } 394 } 395 396 // TODO At present, only the state is changed to closed, and other states are 397 // not involved, and it needs to be added later recv_headers(&mut self, id: u32) -> Result<StreamState, H2Error>398 pub(crate) fn recv_headers(&mut self, id: u32) -> Result<StreamState, H2Error> { 399 match self.buffer.get_mut(&id) { 400 None => Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 401 Some(sender) => match sender.state { 402 Closed(ResetReason::Goaway(last_id)) => { 403 if id > last_id { 404 return Err(H2Error::ConnectionError(ErrorCode::StreamClosed)); 405 } 406 Ok(sender.state.clone()) 407 } 408 Closed(ResetReason::Remote) => { 409 Err(H2Error::ConnectionError(ErrorCode::StreamClosed)) 410 } 411 _ => Ok(sender.state.clone()), 412 }, 413 } 414 } 415 recv_data(&mut self, id: u32) -> Result<StreamState, H2Error>416 pub(crate) fn recv_data(&mut self, id: u32) -> Result<StreamState, H2Error> { 417 self.recv_headers(id) 418 } 419 pop_front(&mut self) -> Result<Option<Frame>, H2Error>420 pub(crate) fn pop_front(&mut self) -> Result<Option<Frame>, H2Error> { 421 match self.stream_to_send.pop_front() { 422 None => Ok(None), 423 Some(id) => { 424 // TODO Subsequent consideration is to delete the corresponding elements in the 425 // map after the status becomes Closed 426 match self.buffer.get_mut(&id) { 427 None => Err(H2Error::ConnectionError(ErrorCode::IntervalError)), 428 Some(sender) => { 429 // TODO For the time being, match state is used here, and the complete 430 // logic should be judged based on the frame type and state 431 match sender.state { 432 Closed(ResetReason::Remote | ResetReason::Local) => Ok(None), 433 _ => Ok(sender.pop_front()), 434 } 435 } 436 } 437 } 438 } 439 } 440 } 441 442 impl StreamBuffer { push_back(&mut self, frame: Frame)443 pub(crate) fn push_back(&mut self, frame: Frame) { 444 self.frames.push_back(frame); 445 } 446 pop_front(&mut self) -> Option<Frame>447 pub(crate) fn pop_front(&mut self) -> Option<Frame> { 448 self.frames.pop_front() 449 } 450 new() -> Self451 pub(crate) fn new() -> Self { 452 Self { 453 state: StreamState::Idle, 454 frames: VecDeque::new(), 455 } 456 } 457 go_away(&mut self, last_stream_id: u32) -> Result<(), H2Error>458 pub(crate) fn go_away(&mut self, last_stream_id: u32) -> Result<(), H2Error> { 459 match self.state { 460 Closed(ResetReason::Local | ResetReason::Remote) => {} 461 Closed(ResetReason::Goaway(id)) => { 462 if last_stream_id > id { 463 return Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); 464 } 465 self.state = Closed(ResetReason::Goaway(last_stream_id)); 466 } 467 _ => { 468 self.state = Closed(ResetReason::Goaway(last_stream_id)); 469 } 470 } 471 Ok(()) 472 } 473 } 474 475 impl SettingsSync { ack_settings() -> Frame476 pub(crate) fn ack_settings() -> Frame { 477 Frame::new(0, FrameFlags::new(0x1), Settings(h2::Settings::new(vec![]))) 478 } 479 } 480 481 pub(crate) struct ConnectionFrames { 482 preface: bool, 483 settings: SettingsSync, 484 } 485 486 impl ConnectionFrames { new(settings: h2::Settings) -> Self487 pub(crate) fn new(settings: h2::Settings) -> Self { 488 Self { 489 preface: true, 490 settings: SettingsSync::Send(settings), 491 } 492 } 493 } 494 495 impl StreamWaker { new() -> Self496 pub(crate) fn new() -> Self { 497 Self { 498 waker: HashMap::new(), 499 } 500 } 501 } 502 503 impl<S> IoManager<S> { new( inner: Inner<S>, frame_receiver: UnboundedReceiver<Send2Ctrl>, connection_frame: ConnectionFrames, ) -> Self504 pub(crate) fn new( 505 inner: Inner<S>, 506 frame_receiver: UnboundedReceiver<Send2Ctrl>, 507 connection_frame: ConnectionFrames, 508 ) -> Self { 509 Self { 510 inner, 511 senders: HashMap::new(), 512 frame_receiver, 513 streams: Streams::new(), 514 frame_iter: FrameIter::default(), 515 connection_frame, 516 } 517 } 518 close_frame_receiver(&mut self)519 fn close_frame_receiver(&mut self) { 520 self.frame_receiver.close() 521 } 522 } 523 524 impl FrameIter { is_empty(&self) -> bool525 pub(crate) fn is_empty(&self) -> bool { 526 self.iter.is_none() 527 } 528 } 529 530 impl StreamId { stream_id_generate(&self) -> u32531 fn stream_id_generate(&self) -> u32 { 532 self.next_id.fetch_add(2, Ordering::Relaxed) 533 } 534 get_next_id(&self) -> u32535 fn get_next_id(&self) -> u32 { 536 self.next_id.load(Ordering::Relaxed) 537 } 538 } 539 540 impl<S> Http2Dispatcher<S> { new(config: H2Config, io: S) -> Self541 pub(crate) fn new(config: H2Config, io: S) -> Self { 542 // send_preface(&mut io).await?; 543 544 let connection_frames = build_connection_frames(config); 545 let inner = Inner { 546 io, 547 encoder: FrameEncoder::new(DEFAULT_MAX_FRAME_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE), 548 decoder: FrameDecoder::new(), 549 }; 550 551 // For each stream to send the frame to the controller 552 let (tx, rx) = unbounded_channel::<Send2Ctrl>(); 553 554 let stream_controller = Arc::new(StreamController::new(inner, rx, connection_frames)); 555 556 // The id of the client stream, starting from 1 557 let next_stream_id = StreamId { 558 next_id: AtomicU32::new(1), 559 }; 560 Self { 561 controller: stream_controller, 562 sender: tx, 563 next_stream_id: Arc::new(next_stream_id), 564 } 565 } 566 } 567 568 impl<S> Dispatcher for Http2Dispatcher<S> { 569 type Handle = Http2Conn<S>; 570 571 // Call this method to get a stream dispatch(&self) -> Option<Self::Handle>572 fn dispatch(&self) -> Option<Self::Handle> { 573 let id = self.next_stream_id.stream_id_generate(); 574 // TODO Consider how to create a new connection and transfer state 575 if id > DEFAULT_MAX_STREAM_ID { 576 return None; 577 } 578 let controller = self.controller.clone(); 579 let sender = self.sender.clone(); 580 let handle = Http2Conn::new(id, self.next_stream_id.clone(), sender, controller); 581 Some(handle) 582 } 583 584 // TODO When the stream id reaches the maximum value, shutdown the current 585 // connection is_shutdown(&self) -> bool586 fn is_shutdown(&self) -> bool { 587 self.controller.io_shutdown.load(Ordering::Relaxed) 588 } 589 } 590 591 impl<S> Http2Conn<S> { new( id: u32, next_stream_id: Arc<StreamId>, sender: UnboundedSender<Send2Ctrl>, controller: Arc<StreamController<S>>, ) -> Self592 pub(crate) fn new( 593 id: u32, 594 next_stream_id: Arc<StreamId>, 595 sender: UnboundedSender<Send2Ctrl>, 596 controller: Arc<StreamController<S>>, 597 ) -> Self { 598 let stream_info = StreamInfo { 599 id, 600 next_stream_id, 601 receiver: FrameReceiver::default(), 602 controller, 603 }; 604 Self { 605 id, 606 sender, 607 stream_info, 608 } 609 } 610 send_frame_to_controller( &mut self, frame: Frame, ) -> Result<(), HttpClientError>611 pub(crate) fn send_frame_to_controller( 612 &mut self, 613 frame: Frame, 614 ) -> Result<(), HttpClientError> { 615 if self.stream_info.receiver.is_none() { 616 let (tx, rx) = unbounded_channel::<Frame>(); 617 self.stream_info.receiver.set_receiver(rx); 618 self.sender.send((Some((self.id, tx)), frame)).map_err(|_| { 619 HttpClientError::new_with_cause( 620 ErrorKind::Request, 621 Some(String::from("resend")), 622 ) 623 }) 624 } else { 625 self.sender.send((None, frame)).map_err(|_| { 626 HttpClientError::new_with_cause( 627 ErrorKind::Request, 628 Some(String::from("resend")), 629 ) 630 }) 631 } 632 } 633 } 634 635 impl<S: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static> Future for StreamInfo<S> { 636 type Output = Result<Frame, HttpError>; 637 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>638 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 639 let stream_info = self.get_mut(); 640 641 // First, check whether the frame of the current stream is in the Channel. 642 // The error cannot occur. Therefore, the error is thrown directly without 643 // connection-level processing. 644 if let Some(frame) = stream_info.receiver.recv_frame(stream_info.id)? { 645 { 646 let mut stream_waker = stream_info 647 .controller 648 .stream_waker 649 .lock() 650 .expect("Blocking get waker lock failed! "); 651 // 652 wakeup_next_stream(&mut stream_waker.waker); 653 } 654 return Poll::Ready(Ok(frame)); 655 } 656 657 // If the dispatcher sends a goaway frame, all streams on the current connection 658 // are unavailable. 659 if stream_info 660 .controller 661 .dispatcher_invalid 662 .load(Ordering::Relaxed) 663 { 664 return Poll::Ready(Err(H2Error::ConnectionError(ErrorCode::ConnectError).into())); 665 } 666 667 // The error cannot occur. Therefore, the error is thrown directly without 668 // connection-level processing. 669 if is_io_available(&stream_info.controller.occupied, stream_info.id)? { 670 { 671 // Second, try to get io and read the frame of the current stream from io. 672 if let Ok(mut io_manager) = stream_info.controller.manager.try_lock() { 673 if stream_info 674 .poll_match_result(cx, &mut io_manager)? 675 .is_pending() 676 { 677 return Poll::Pending; 678 } 679 } 680 } 681 { 682 let mut stream_waker = stream_info 683 .controller 684 .stream_waker 685 .lock() 686 .expect("Blocking get waker lock failed! "); 687 wakeup_next_stream(&mut stream_waker.waker); 688 } 689 // The error cannot occur. Therefore, the error is thrown directly without 690 // connection-level processing. 691 let frame_opt = get_frame(stream_info.receiver.recv_frame(stream_info.id)?); 692 return Poll::Ready(frame_opt); 693 } 694 695 { 696 let mut io_manager = { 697 // Third, wait to acquire the lock of waker, which is used to insert the current 698 // waker, and wait to be awakened by the io stream. 699 let mut stream_waker = stream_info 700 .controller 701 .stream_waker 702 .lock() 703 .expect("Blocking get waker lock failed! "); 704 705 // Fourth, after obtaining the waker lock, 706 // you need to check the Receiver again to prevent the Receiver from receiving a 707 // frame while waiting for the waker. The error cannot 708 // occur. Therefore, the error is thrown directly without connection-level 709 // processing. 710 if let Some(frame) = stream_info.receiver.recv_frame(stream_info.id)? { 711 wakeup_next_stream(&mut stream_waker.waker); 712 return Poll::Ready(Ok(frame)); 713 } 714 715 // The error cannot occur. Therefore, the error is thrown directly without 716 // connection-level processing. 717 if is_io_available(&stream_info.controller.occupied, stream_info.id)? { 718 // Fifth, get io again to prevent no other streams from controlling io while 719 // waiting for the waker, leaving only the current 720 // stream. 721 match stream_info.controller.manager.try_lock() { 722 Ok(guard) => guard, 723 _ => { 724 stream_waker 725 .waker 726 .insert(stream_info.id, cx.waker().clone()); 727 return Poll::Pending; 728 } 729 } 730 } else { 731 stream_waker 732 .waker 733 .insert(stream_info.id, cx.waker().clone()); 734 return Poll::Pending; 735 } 736 }; 737 if stream_info 738 .poll_match_result(cx, &mut io_manager)? 739 .is_pending() 740 { 741 return Poll::Pending; 742 } 743 } 744 { 745 { 746 let mut stream_waker = stream_info 747 .controller 748 .stream_waker 749 .lock() 750 .expect("Blocking get waker lock failed! "); 751 wakeup_next_stream(&mut stream_waker.waker); 752 } 753 // The error cannot occur. Therefore, the error is thrown directly without 754 // connection-level processing. 755 let frame_opt = get_frame(stream_info.receiver.recv_frame(stream_info.id)?); 756 Poll::Ready(frame_opt) 757 } 758 } 759 } 760 761 impl<S: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static> StreamInfo<S> { poll_match_result( &self, cx: &mut Context<'_>, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<(), HttpError>>762 fn poll_match_result( 763 &self, 764 cx: &mut Context<'_>, 765 io_manager: &mut MutexGuard<IoManager<S>>, 766 ) -> Poll<Result<(), HttpError>> { 767 loop { 768 match self.poll_io(cx, io_manager) { 769 Poll::Ready(Ok(_)) => { 770 return Poll::Ready(Ok(())); 771 } 772 Poll::Ready(Err(h2_error)) => { 773 match h2_error { 774 H2Error::StreamError(id, code) => { 775 let rest_payload = RstStream::new(code.clone().into_code()); 776 let frame = Frame::new( 777 id as usize, 778 FrameFlags::empty(), 779 Payload::RstStream(rest_payload), 780 ); 781 io_manager.streams.recv_local_reset(id)?; 782 if self 783 .poll_send_reset(cx, frame.clone(), io_manager)? 784 .is_pending() 785 { 786 compare_exchange_occupation( 787 &self.controller.occupied, 788 0, 789 self.id, 790 )?; 791 return Poll::Pending; 792 } 793 if self.id == id { 794 return Poll::Ready(Err(H2Error::StreamError(id, code).into())); 795 } else { 796 self.controller_send_frame_to_stream(id, frame, io_manager); 797 { 798 let mut stream_waker = self 799 .controller 800 .stream_waker 801 .lock() 802 .expect("Blocking get waker lock failed! "); 803 // TODO Is there a situation where the result has been 804 // returned, but the waker has not been inserted into the 805 // map? how to deal with. 806 if let Some(waker) = stream_waker.waker.remove(&id) { 807 waker.wake(); 808 } 809 } 810 } 811 } 812 H2Error::ConnectionError(code) => { 813 io_manager.close_frame_receiver(); 814 self.controller.shutdown(); 815 // Since ConnectError may be caused by an io error, so when the 816 // client actively sends a goaway 817 // frame, all streams are shut down and no streams are allowed to 818 // complete. TODO Then consider 819 // separating io errors from frame errors to allow streams whose 820 // stream id is less than last_stream_id to continue 821 self.controller.invalid(); 822 // last_stream_id is set to 0 to ensure that all streams are 823 // shutdown. 824 let goaway_payload = 825 Goaway::new(code.clone().into_code(), 0, vec![]); 826 let frame = Frame::new( 827 0, 828 FrameFlags::empty(), 829 Payload::Goaway(goaway_payload), 830 ); 831 // io_manager.connection_frame.going_away(frame); 832 if self 833 .poll_send_go_away(cx, frame.clone(), io_manager)? 834 .is_pending() 835 { 836 compare_exchange_occupation( 837 &self.controller.occupied, 838 0, 839 self.id, 840 )?; 841 return Poll::Pending; 842 } 843 844 self.goaway_unsent_stream(io_manager, 0, frame)?; 845 self.goaway_and_shutdown(); 846 return Poll::Ready(Err(H2Error::ConnectionError(code).into())); 847 } 848 } 849 } 850 Poll::Pending => { 851 compare_exchange_occupation(&self.controller.occupied, 0, self.id)?; 852 return Poll::Pending; 853 } 854 } 855 } 856 } 857 poll_io( &self, cx: &mut Context<'_>, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<(), H2Error>>858 fn poll_io( 859 &self, 860 cx: &mut Context<'_>, 861 io_manager: &mut MutexGuard<IoManager<S>>, 862 ) -> Poll<Result<(), H2Error>> { 863 if self.poll_send_preface(cx, io_manager)?.is_pending() { 864 return Poll::Pending; 865 } 866 if self.poll_send_settings(cx, io_manager)?.is_pending() { 867 return Poll::Pending; 868 } 869 match self.poll_dispatch_frame(cx, io_manager)? { 870 Poll::Ready(state) => { 871 if let DispatchState::Partial = state { 872 return Poll::Ready(Ok(())); 873 } 874 } 875 Poll::Pending => { 876 return Poll::Pending; 877 } 878 } 879 // Write and read frames to io in a loop until the frame of the current stream 880 // is read and exit the loop. 881 loop { 882 if self.poll_write_frame(cx, io_manager)?.is_pending() { 883 return Poll::Pending; 884 } 885 match self.poll_read_frame(cx, io_manager)? { 886 Poll::Ready(ReadState::EmptyIo) => {} 887 Poll::Ready(ReadState::CurrentStream) => { 888 return Poll::Ready(Ok(())); 889 } 890 Poll::Pending => { 891 return Poll::Pending; 892 } 893 } 894 } 895 } 896 poll_dispatch_frame( &self, cx: &mut Context<'_>, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<DispatchState, H2Error>>897 fn poll_dispatch_frame( 898 &self, 899 cx: &mut Context<'_>, 900 io_manager: &mut MutexGuard<IoManager<S>>, 901 ) -> Poll<Result<DispatchState, H2Error>> { 902 if io_manager.frame_iter.is_empty() { 903 return Poll::Ready(Ok(DispatchState::Finish)); 904 } 905 let iter_option = take(&mut io_manager.frame_iter.iter); 906 match iter_option { 907 None => Poll::Ready(Err(H2Error::ConnectionError(ErrorCode::IntervalError))), 908 Some(iter) => self.dispatch_read_frames(cx, io_manager, iter), 909 } 910 } 911 poll_send_preface( &self, cx: &mut Context<'_>, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<(), H2Error>>912 fn poll_send_preface( 913 &self, 914 cx: &mut Context<'_>, 915 io_manager: &mut MutexGuard<IoManager<S>>, 916 ) -> Poll<Result<(), H2Error>> { 917 const PREFACE_MSG: &str = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 918 if io_manager.connection_frame.preface { 919 let mut buf = [0u8; PREFACE_MSG.len()]; 920 buf.copy_from_slice(PREFACE_MSG.as_bytes()); 921 922 let mut start_index = 0; 923 loop { 924 if start_index == PREFACE_MSG.len() { 925 io_manager.connection_frame.preface = false; 926 break; 927 } 928 match Pin::new(&mut io_manager.inner.io) 929 .poll_write(cx, &buf[start_index..]) 930 .map_err(|_| H2Error::ConnectionError(ErrorCode::IntervalError))? 931 { 932 Poll::Ready(written) => { 933 start_index += written; 934 } 935 Poll::Pending => { 936 return Poll::Pending; 937 } 938 } 939 } 940 return poll_flush_io(cx, &mut io_manager.inner); 941 } 942 Poll::Ready(Ok(())) 943 } 944 poll_send_go_away( &self, cx: &mut Context<'_>, goaway: Frame, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<(), H2Error>>945 fn poll_send_go_away( 946 &self, 947 cx: &mut Context<'_>, 948 goaway: Frame, 949 io_manager: &mut MutexGuard<IoManager<S>>, 950 ) -> Poll<Result<(), H2Error>> { 951 let mut buf = [0u8; 1024]; 952 if write_frame_to_io(cx, &mut buf, goaway, &mut io_manager.inner)?.is_pending() { 953 Poll::Pending 954 } else { 955 poll_flush_io(cx, &mut io_manager.inner) 956 } 957 } 958 poll_send_reset( &self, cx: &mut Context<'_>, reset: Frame, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<(), H2Error>>959 fn poll_send_reset( 960 &self, 961 cx: &mut Context<'_>, 962 reset: Frame, 963 io_manager: &mut MutexGuard<IoManager<S>>, 964 ) -> Poll<Result<(), H2Error>> { 965 let mut buf = [0u8; 1024]; 966 if write_frame_to_io(cx, &mut buf, reset, &mut io_manager.inner)?.is_pending() { 967 Poll::Pending 968 } else { 969 poll_flush_io(cx, &mut io_manager.inner) 970 } 971 } 972 poll_send_settings( &self, cx: &mut Context<'_>, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<(), H2Error>>973 fn poll_send_settings( 974 &self, 975 cx: &mut Context<'_>, 976 io_manager: &mut MutexGuard<IoManager<S>>, 977 ) -> Poll<Result<(), H2Error>> { 978 if let SettingsSync::Send(settings) = io_manager.connection_frame.settings.clone() { 979 let mut buf = [0u8; 1024]; 980 let frame = Frame::new(0, FrameFlags::empty(), Settings(settings.clone())); 981 if write_frame_to_io(cx, &mut buf, frame, &mut io_manager.inner)?.is_pending() { 982 Poll::Pending 983 } else { 984 io_manager.connection_frame.settings = SettingsSync::Acknowledging(settings); 985 poll_flush_io(cx, &mut io_manager.inner) 986 } 987 } else { 988 Poll::Ready(Ok(())) 989 } 990 } 991 poll_write_frame( &self, cx: &mut Context<'_>, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<(), H2Error>>992 fn poll_write_frame( 993 &self, 994 cx: &mut Context<'_>, 995 io_manager: &mut MutexGuard<IoManager<S>>, 996 ) -> Poll<Result<(), H2Error>> { 997 const FRAME_WRITE_NUM: usize = 10; 998 999 // Send 10 frames each time, if there is not enough in the queue, read enough 1000 // from mpsc::Receiver 1001 while io_manager.streams.size() < FRAME_WRITE_NUM { 1002 match io_manager.frame_receiver.try_recv() { 1003 // The Frame sent by the Handle for the first time will carry a Sender at the 1004 // same time, which is used to send the Response Frame back 1005 // to the Handle 1006 Ok((Some((id, sender)), frame)) => { 1007 if io_manager.senders.insert(id, sender).is_some() { 1008 return Poll::Ready(Err(H2Error::ConnectionError( 1009 ErrorCode::IntervalError, 1010 ))); 1011 } 1012 io_manager.streams.insert(frame); 1013 } 1014 Ok((None, frame)) => { 1015 io_manager.streams.insert(frame); 1016 } 1017 Err(TryRecvError::Empty) => { 1018 break; 1019 } 1020 Err(TryRecvError::Disconnected) => { 1021 return Poll::Ready(Err(H2Error::ConnectionError(ErrorCode::ConnectError))) 1022 } 1023 } 1024 } 1025 let mut buf = [0u8; 1024]; 1026 for _i in 0..FRAME_WRITE_NUM { 1027 match io_manager.streams.pop_front()? { 1028 Some(frame) => { 1029 if write_frame_to_io(cx, &mut buf, frame, &mut io_manager.inner)? 1030 .is_pending() 1031 { 1032 return Poll::Pending; 1033 } 1034 } 1035 None => { 1036 break; 1037 } 1038 } 1039 } 1040 poll_flush_io(cx, &mut io_manager.inner) 1041 } 1042 poll_read_frame( &self, cx: &mut Context<'_>, io_manager: &mut MutexGuard<IoManager<S>>, ) -> Poll<Result<ReadState, H2Error>>1043 fn poll_read_frame( 1044 &self, 1045 cx: &mut Context<'_>, 1046 io_manager: &mut MutexGuard<IoManager<S>>, 1047 ) -> Poll<Result<ReadState, H2Error>> { 1048 // Read all the frames in io until the frame of the current stream is read and 1049 // stop. 1050 let mut buf = [0u8; 1024]; 1051 loop { 1052 let mut read_buf = ReadBuf::new(&mut buf); 1053 match Pin::new(&mut io_manager.inner.io).poll_read(cx, &mut read_buf) { 1054 Poll::Ready(Err(_)) => { 1055 return Poll::Ready(Err(H2Error::ConnectionError(ErrorCode::ConnectError))) 1056 } 1057 Poll::Pending => { 1058 return Poll::Pending; 1059 } 1060 _ => {} 1061 } 1062 let read = read_buf.filled().len(); 1063 if read == 0 { 1064 break; 1065 } 1066 let frames = io_manager.inner.decoder.decode(&buf[..read])?; 1067 let frame_iterator = frames.into_iter(); 1068 1069 match self.dispatch_read_frames(cx, io_manager, frame_iterator)? { 1070 Poll::Ready(state) => { 1071 if let DispatchState::Partial = state { 1072 return Poll::Ready(Ok(ReadState::CurrentStream)); 1073 } 1074 } 1075 Poll::Pending => { 1076 return Poll::Pending; 1077 } 1078 } 1079 } 1080 Poll::Ready(Ok(ReadState::EmptyIo)) 1081 } 1082 dispatch_read_frames( &self, cx: &mut Context<'_>, io_manager: &mut MutexGuard<IoManager<S>>, mut frame_iterator: FramesIntoIter, ) -> Poll<Result<DispatchState, H2Error>>1083 fn dispatch_read_frames( 1084 &self, 1085 cx: &mut Context<'_>, 1086 io_manager: &mut MutexGuard<IoManager<S>>, 1087 mut frame_iterator: FramesIntoIter, 1088 ) -> Poll<Result<DispatchState, H2Error>> { 1089 let mut meet_this = false; 1090 loop { 1091 match frame_iterator.next() { 1092 None => break, 1093 Some(frame_kind) => { 1094 if let FrameKind::Complete(frame) = frame_kind { 1095 match frame.payload() { 1096 Settings(settings) => { 1097 if self 1098 .recv_settings_frame( 1099 cx, 1100 io_manager, 1101 frame.flags().is_ack(), 1102 settings, 1103 )? 1104 .is_pending() 1105 { 1106 return Poll::Pending; 1107 } 1108 continue; 1109 } 1110 Payload::Ping(ping) => { 1111 if self 1112 .recv_ping_frame( 1113 cx, 1114 io_manager, 1115 frame.flags().is_ack(), 1116 ping, 1117 )? 1118 .is_pending() 1119 { 1120 return Poll::Pending; 1121 } 1122 continue; 1123 } 1124 Payload::PushPromise(_) => { 1125 // TODO The current settings_enable_push is fixed to false 1126 return Poll::Ready(Err(H2Error::ConnectionError( 1127 ErrorCode::ProtocolError, 1128 ))); 1129 } 1130 Payload::Goaway(goaway) => { 1131 // shutdown io,prevent the creation of new stream 1132 self.controller.shutdown(); 1133 io_manager.close_frame_receiver(); 1134 let last_stream_id = goaway.get_last_stream_id(); 1135 if self.next_stream_id.get_next_id() as usize <= last_stream_id 1136 { 1137 return Poll::Ready(Err(H2Error::ConnectionError( 1138 ErrorCode::ProtocolError, 1139 ))); 1140 } 1141 self.goaway_unsent_stream( 1142 io_manager, 1143 last_stream_id as u32, 1144 frame.clone(), 1145 )?; 1146 continue; 1147 } 1148 Payload::RstStream(_reset) => { 1149 io_manager 1150 .streams 1151 .recv_remote_reset(frame.stream_id() as u32)?; 1152 } 1153 Payload::Headers(_headers) => { 1154 if let Closed(ResetReason::Local) = 1155 io_manager.streams.recv_headers(frame.stream_id() as u32)? 1156 { 1157 continue; 1158 } 1159 } 1160 Payload::Data(_data) => { 1161 if let Closed(ResetReason::Local) = 1162 io_manager.streams.recv_data(frame.stream_id() as u32)? 1163 { 1164 continue; 1165 } 1166 } 1167 // TODO Windows that processes streams and connections separately. 1168 Payload::WindowUpdate(_windows) => { 1169 continue; 1170 } 1171 Payload::Priority(_priority) => continue, 1172 } 1173 1174 let stream_id = frame.stream_id() as u32; 1175 if stream_id == self.id { 1176 meet_this = true; 1177 self.controller_send_frame_to_stream(stream_id, frame, io_manager); 1178 break; 1179 } else { 1180 self.controller_send_frame_to_stream(stream_id, frame, io_manager); 1181 // TODO After adding frames such as Reset/Priority, there may be 1182 // problems with the following logic, because the lack of waker 1183 // cannot wake up 1184 let mut stream_waker = self 1185 .controller 1186 .stream_waker 1187 .lock() 1188 .expect("Blocking get waker lock failed! "); 1189 // TODO Is there a situation where the result has been returned, but 1190 // the waker has not been inserted into the map? how to deal with. 1191 if let Some(waker) = stream_waker.waker.remove(&stream_id) { 1192 waker.wake(); 1193 } 1194 } 1195 } 1196 } 1197 } 1198 } 1199 1200 if meet_this { 1201 io_manager.frame_iter.iter = Some(frame_iterator); 1202 Poll::Ready(Ok(DispatchState::Partial)) 1203 } else { 1204 Poll::Ready(Ok(DispatchState::Finish)) 1205 } 1206 } 1207 goaway_unsent_stream( &self, io_manager: &mut MutexGuard<IoManager<S>>, last_stream_id: u32, goaway: Frame, ) -> Result<(), H2Error>1208 fn goaway_unsent_stream( 1209 &self, 1210 io_manager: &mut MutexGuard<IoManager<S>>, 1211 last_stream_id: u32, 1212 goaway: Frame, 1213 ) -> Result<(), H2Error> { 1214 let goaway_streams = io_manager.streams.get_goaway_streams(last_stream_id)?; 1215 { 1216 let mut stream_waker = self 1217 .controller 1218 .stream_waker 1219 .lock() 1220 .expect("Blocking get waker lock failed! "); 1221 for goaway_stream in goaway_streams { 1222 self.controller_send_frame_to_stream(goaway_stream, goaway.clone(), io_manager); 1223 if let Some(waker) = stream_waker.waker.remove(&goaway_stream) { 1224 waker.wake(); 1225 } 1226 } 1227 } 1228 Ok(()) 1229 } 1230 goaway_and_shutdown(&self)1231 fn goaway_and_shutdown(&self) { 1232 { 1233 let mut waker_guard = self 1234 .controller 1235 .stream_waker 1236 .lock() 1237 .expect("Blocking get waker lock failed! "); 1238 let waker_map = take(&mut waker_guard.waker); 1239 for (_id, waker) in waker_map.into_iter() { 1240 waker.wake() 1241 } 1242 } 1243 } 1244 recv_settings_frame( &self, cx: &mut Context<'_>, guard: &mut MutexGuard<IoManager<S>>, is_ack: bool, settings: &h2::Settings, ) -> Poll<Result<(), H2Error>>1245 fn recv_settings_frame( 1246 &self, 1247 cx: &mut Context<'_>, 1248 guard: &mut MutexGuard<IoManager<S>>, 1249 is_ack: bool, 1250 settings: &h2::Settings, 1251 ) -> Poll<Result<(), H2Error>> { 1252 if is_ack { 1253 match guard.connection_frame.settings.clone() { 1254 SettingsSync::Acknowledging(local_settings) => { 1255 for setting in local_settings.get_settings() { 1256 if let Setting::MaxHeaderListSize(size) = setting { 1257 guard.inner.decoder.set_max_header_list_size(*size as usize); 1258 } 1259 if let Setting::MaxFrameSize(size) = setting { 1260 guard.inner.decoder.set_max_frame_size(*size)?; 1261 } 1262 } 1263 guard.connection_frame.settings = SettingsSync::Synced; 1264 Poll::Ready(Ok(())) 1265 } 1266 _ => Poll::Ready(Err(H2Error::ConnectionError(ErrorCode::ProtocolError))), 1267 } 1268 } else { 1269 for setting in settings.get_settings() { 1270 if let Setting::HeaderTableSize(size) = setting { 1271 guard.inner.encoder.update_header_table_size(*size as usize); 1272 } 1273 if let Setting::MaxFrameSize(size) = setting { 1274 guard.inner.encoder.update_max_frame_size(*size as usize); 1275 } 1276 } 1277 // reply ack Settings 1278 let mut buf = [0u8; 1024]; 1279 if write_frame_to_io(cx, &mut buf, SettingsSync::ack_settings(), &mut guard.inner)? 1280 .is_pending() 1281 { 1282 Poll::Pending 1283 } else { 1284 poll_flush_io(cx, &mut guard.inner) 1285 } 1286 } 1287 } 1288 recv_ping_frame( &self, cx: &mut Context<'_>, guard: &mut MutexGuard<IoManager<S>>, is_ack: bool, ping: &h2::Ping, ) -> Poll<Result<(), H2Error>>1289 fn recv_ping_frame( 1290 &self, 1291 cx: &mut Context<'_>, 1292 guard: &mut MutexGuard<IoManager<S>>, 1293 is_ack: bool, 1294 ping: &h2::Ping, 1295 ) -> Poll<Result<(), H2Error>> { 1296 if is_ack { 1297 // TODO The sending logic of ping has not been implemented yet, so there is no 1298 // processing for ack 1299 Poll::Ready(Ok(())) 1300 } else { 1301 // reply ack Settings 1302 let ack = Frame::new(0, FrameFlags::new(0x1), Payload::Ping(ping.clone())); 1303 let mut buf = [0u8; 1024]; 1304 if write_frame_to_io(cx, &mut buf, ack, &mut guard.inner)?.is_pending() { 1305 Poll::Pending 1306 } else { 1307 poll_flush_io(cx, &mut guard.inner) 1308 } 1309 } 1310 } 1311 controller_send_frame_to_stream( &self, stream_id: u32, frame: Frame, guard: &mut MutexGuard<IoManager<S>>, )1312 fn controller_send_frame_to_stream( 1313 &self, 1314 stream_id: u32, 1315 frame: Frame, 1316 guard: &mut MutexGuard<IoManager<S>>, 1317 ) { 1318 // TODO Need to consider when to delete useless Sender after support reset 1319 // stream 1320 if let Some(sender) = guard.senders.get(&stream_id) { 1321 // If the client coroutine has exited, this frame is skipped. 1322 let _ = sender.send(frame); 1323 } 1324 } 1325 } 1326 1327 impl FrameReceiver { set_receiver(&mut self, receiver: UnboundedReceiver<Frame>)1328 fn set_receiver(&mut self, receiver: UnboundedReceiver<Frame>) { 1329 self.receiver = Some(receiver); 1330 } 1331 recv_frame(&mut self, id: u32) -> Result<Option<Frame>, HttpError>1332 fn recv_frame(&mut self, id: u32) -> Result<Option<Frame>, HttpError> { 1333 if let Some(ref mut receiver) = self.receiver { 1334 match receiver.try_recv() { 1335 Ok(frame) => Ok(Some(frame)), 1336 Err(TryRecvError::Disconnected) => { 1337 Err(H2Error::StreamError(id, ErrorCode::StreamClosed).into()) 1338 } 1339 Err(TryRecvError::Empty) => Ok(None), 1340 } 1341 } else { 1342 Err(H2Error::StreamError(id, ErrorCode::IntervalError).into()) 1343 } 1344 } 1345 is_none(&self) -> bool1346 fn is_none(&self) -> bool { 1347 self.receiver.is_none() 1348 } 1349 } 1350 1351 // TODO Temporarily only deal with the Settings frame build_connection_frames(config: H2Config) -> ConnectionFrames1352 pub(crate) fn build_connection_frames(config: H2Config) -> ConnectionFrames { 1353 const DEFAULT_ENABLE_PUSH: bool = false; 1354 let settings = SettingsBuilder::new() 1355 .max_header_list_size(config.max_header_list_size()) 1356 .max_frame_size(config.max_frame_size()) 1357 .header_table_size(config.header_table_size()) 1358 .enable_push(DEFAULT_ENABLE_PUSH) 1359 .build(); 1360 1361 ConnectionFrames::new(settings) 1362 } 1363 1364 // io write interface write_frame_to_io<S>( cx: &mut Context<'_>, buf: &mut [u8], frame: Frame, inner: &mut Inner<S>, ) -> Poll<Result<(), H2Error>> where S: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static,1365 fn write_frame_to_io<S>( 1366 cx: &mut Context<'_>, 1367 buf: &mut [u8], 1368 frame: Frame, 1369 inner: &mut Inner<S>, 1370 ) -> Poll<Result<(), H2Error>> 1371 where 1372 S: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static, 1373 { 1374 let mut remain_size = 0; 1375 inner.encoder.set_frame(frame); 1376 loop { 1377 let size = inner 1378 .encoder 1379 .encode(&mut buf[remain_size..]) 1380 .map_err(|_| H2Error::ConnectionError(ErrorCode::IntervalError))?; 1381 1382 let total = size + remain_size; 1383 1384 // All the bytes of the frame are written 1385 if total == 0 { 1386 break; 1387 } 1388 match Pin::new(&mut inner.io) 1389 .poll_write(cx, &buf[..total]) 1390 .map_err(|_| H2Error::ConnectionError(ErrorCode::IntervalError))? 1391 { 1392 Poll::Ready(written) => { 1393 remain_size = total - written; 1394 // written is not necessarily equal to total 1395 if remain_size > 0 { 1396 for i in 0..remain_size { 1397 buf[i] = buf[written + i]; 1398 } 1399 } 1400 } 1401 Poll::Pending => { 1402 return Poll::Pending; 1403 } 1404 } 1405 } 1406 Poll::Ready(Ok(())) 1407 } 1408 poll_flush_io<S>(cx: &mut Context<'_>, inner: &mut Inner<S>) -> Poll<Result<(), H2Error>> where S: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static,1409 fn poll_flush_io<S>(cx: &mut Context<'_>, inner: &mut Inner<S>) -> Poll<Result<(), H2Error>> 1410 where 1411 S: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static, 1412 { 1413 Pin::new(&mut inner.io) 1414 .poll_flush(cx) 1415 .map_err(|_| H2Error::ConnectionError(ErrorCode::ConnectError)) 1416 } 1417 get_frame(frame: Option<Frame>) -> Result<Frame, HttpError>1418 fn get_frame(frame: Option<Frame>) -> Result<Frame, HttpError> { 1419 frame.ok_or(H2Error::ConnectionError(ErrorCode::IntervalError).into()) 1420 } 1421 wakeup_next_stream(waker_map: &mut HashMap<u32, Waker>)1422 fn wakeup_next_stream(waker_map: &mut HashMap<u32, Waker>) { 1423 { 1424 if !waker_map.is_empty() { 1425 let mut id = 0; 1426 if let Some((index, _)) = waker_map.iter().next() { 1427 id = *index; 1428 } 1429 if let Some(waker) = waker_map.remove(&id) { 1430 waker.wake(); 1431 } 1432 } 1433 } 1434 } 1435 is_io_available(occupied: &AtomicU32, id: u32) -> Result<bool, HttpError>1436 fn is_io_available(occupied: &AtomicU32, id: u32) -> Result<bool, HttpError> { 1437 let is_occupied = occupied.load(Ordering::Relaxed); 1438 if is_occupied == 0 { 1439 return Ok(true); 1440 } 1441 if is_occupied == id { 1442 compare_exchange_occupation(occupied, id, 0)?; 1443 return Ok(true); 1444 } 1445 Ok(false) 1446 } 1447 compare_exchange_occupation( occupied: &AtomicU32, current: u32, new: u32, ) -> Result<(), HttpError>1448 fn compare_exchange_occupation( 1449 occupied: &AtomicU32, 1450 current: u32, 1451 new: u32, 1452 ) -> Result<(), HttpError> { 1453 occupied 1454 .compare_exchange(current, new, Ordering::Acquire, Ordering::Relaxed) 1455 .map_err(|_| H2Error::ConnectionError(ErrorCode::IntervalError))?; 1456 Ok(()) 1457 } 1458 } 1459