1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::future::Future; 15 use std::pin::Pin; 16 use std::sync::{Arc, Mutex}; 17 use std::task::{Context, Poll}; 18 19 use ylong_runtime::time::{sleep, Sleep}; 20 21 use crate::async_impl::QuicConn; 22 use crate::runtime::{AsyncRead, AsyncWrite, ReadBuf, UnboundedReceiver, UnboundedSender}; 23 use crate::util::dispatcher::http3::DispatchErrorKind; 24 use crate::util::h3::stream_manager::UPD_RECV_BUF_SIZE; 25 use crate::util::ConnInfo; 26 27 const UDP_SEND_BUF_SIZE: usize = 1350; 28 29 enum IOManagerState { 30 IORecving, 31 Timeout, 32 IOSending, 33 ChannelRecving, 34 } 35 36 pub(crate) struct IOManager<S> { 37 io: S, 38 conn: Arc<Mutex<QuicConn>>, 39 io_manager_rx: UnboundedReceiver<Result<(), DispatchErrorKind>>, 40 stream_manager_tx: UnboundedSender<Result<(), DispatchErrorKind>>, 41 recv_timeout: Option<Pin<Box<Sleep>>>, 42 state: IOManagerState, 43 recv_buf: [u8; UPD_RECV_BUF_SIZE], 44 send_data: SendData, 45 } 46 47 impl<S: AsyncRead + AsyncWrite + ConnInfo + Unpin + Sync + Send + 'static> IOManager<S> { new( io: S, conn: Arc<Mutex<QuicConn>>, io_manager_rx: UnboundedReceiver<Result<(), DispatchErrorKind>>, stream_manager_tx: UnboundedSender<Result<(), DispatchErrorKind>>, ) -> Self48 pub(crate) fn new( 49 io: S, 50 conn: Arc<Mutex<QuicConn>>, 51 io_manager_rx: UnboundedReceiver<Result<(), DispatchErrorKind>>, 52 stream_manager_tx: UnboundedSender<Result<(), DispatchErrorKind>>, 53 ) -> Self { 54 Self { 55 io, 56 conn, 57 io_manager_rx, 58 stream_manager_tx, 59 recv_timeout: None, 60 state: IOManagerState::IORecving, 61 recv_buf: [0u8; UPD_RECV_BUF_SIZE], 62 send_data: SendData::new(), 63 } 64 } poll_recv_signal( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<Result<(), DispatchErrorKind>, DispatchErrorKind>>65 fn poll_recv_signal( 66 &mut self, 67 cx: &mut Context<'_>, 68 ) -> Poll<Result<Result<(), DispatchErrorKind>, DispatchErrorKind>> { 69 #[cfg(feature = "tokio_base")] 70 match self.io_manager_rx.poll_recv(cx) { 71 Poll::Ready(None) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)), 72 Poll::Ready(Some(data)) => Poll::Ready(Ok(data)), 73 Poll::Pending => Poll::Pending, 74 } 75 #[cfg(feature = "ylong_base")] 76 match self.io_manager_rx.poll_recv(cx) { 77 Poll::Ready(Err(_e)) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)), 78 Poll::Ready(Ok(data)) => Poll::Ready(Ok(data)), 79 Poll::Pending => Poll::Pending, 80 } 81 } 82 poll_io_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), DispatchErrorKind>>83 fn poll_io_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), DispatchErrorKind>> { 84 let mut buf = ReadBuf::new(&mut self.recv_buf); 85 if self.recv_timeout.is_none() { 86 if let Some(time) = self.conn.lock().unwrap().timeout() { 87 self.recv_timeout = Some(Box::pin(sleep(time))); 88 }; 89 } 90 91 if let Some(delay) = self.recv_timeout.as_mut() { 92 if let Poll::Ready(()) = delay.as_mut().poll(cx) { 93 self.recv_timeout = None; 94 self.conn.lock().unwrap().on_timeout(); 95 self.state = IOManagerState::Timeout; 96 return Poll::Ready(Ok(())); 97 } 98 } 99 match Pin::new(&mut self.io).poll_read(cx, &mut buf) { 100 Poll::Ready(Ok(())) => { 101 let info = self.io.conn_data().detail(); 102 self.recv_timeout = None; 103 let recv_info = quiche::RecvInfo { 104 to: info.local, 105 from: info.peer, 106 }; 107 return match self.conn.lock().unwrap().recv(buf.filled_mut(), recv_info) { 108 Ok(_) => { 109 let _ = self.stream_manager_tx.send(Ok(())); 110 // io recv once again 111 Poll::Ready(Ok(())) 112 } 113 Err(e) => Poll::Ready(Err(DispatchErrorKind::Quic(e))), 114 }; 115 } 116 Poll::Ready(Err(e)) => Poll::Ready(Err(DispatchErrorKind::Io(e.kind()))), 117 Poll::Pending => { 118 self.state = IOManagerState::IOSending; 119 Poll::Pending 120 } 121 } 122 } 123 poll_io_send(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), DispatchErrorKind>>124 fn poll_io_send(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), DispatchErrorKind>> { 125 loop { 126 // UDP buf has not been sent to the peer, send rest UDP buf first 127 if self.send_data.buf_size == self.send_data.offset { 128 // Retrieve the data to be sent via UDP from the connection 129 let size = match self.conn.lock().unwrap().send(&mut self.send_data.buf) { 130 Ok((size, _)) => size, 131 Err(quiche::Error::Done) => { 132 self.state = IOManagerState::ChannelRecving; 133 return Poll::Ready(Ok(())); 134 } 135 Err(e) => { 136 return Poll::Ready(Err(DispatchErrorKind::Quic(e))); 137 } 138 }; 139 self.send_data.buf_size = size; 140 self.send_data.offset = 0; 141 } 142 143 match Pin::new(&mut self.io).poll_write( 144 cx, 145 &self.send_data.buf[self.send_data.offset..self.send_data.buf_size], 146 ) { 147 Poll::Ready(Ok(size)) => { 148 self.send_data.offset += size; 149 if self.send_data.offset != self.send_data.buf_size { 150 // loop to send UDP buf 151 continue; 152 } else { 153 self.send_data.offset = 0; 154 self.send_data.buf_size = 0; 155 } 156 } 157 Poll::Ready(Err(e)) => { 158 return Poll::Ready(Err(DispatchErrorKind::Io(e.kind()))); 159 } 160 Poll::Pending => { 161 self.state = IOManagerState::ChannelRecving; 162 return Poll::Pending; 163 } 164 } 165 } 166 } 167 } 168 169 impl<S: AsyncRead + AsyncWrite + ConnInfo + Unpin + Sync + Send + 'static> Future for IOManager<S> { 170 type Output = Result<(), DispatchErrorKind>; 171 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>172 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 173 let this = self.get_mut(); 174 this.state = IOManagerState::IORecving; 175 loop { 176 match this.state { 177 IOManagerState::IORecving => { 178 if let Poll::Ready(Err(e)) = this.poll_io_recv(cx) { 179 return Poll::Ready(Err(e)); 180 } 181 } 182 IOManagerState::IOSending => { 183 if let Poll::Ready(Err(e)) = this.poll_io_send(cx) { 184 return Poll::Ready(Err(e)); 185 } 186 } 187 IOManagerState::Timeout => { 188 if let Poll::Ready(Err(e)) = this.poll_io_send(cx) { 189 return Poll::Ready(Err(e)); 190 } 191 // ensure pending at io recv 192 this.state = IOManagerState::IORecving; 193 } 194 IOManagerState::ChannelRecving => match this.poll_recv_signal(cx) { 195 // won't recv Err now 196 Poll::Ready(Ok(_)) => { 197 continue; 198 } 199 Poll::Ready(Err(e)) => { 200 return Poll::Ready(Err(e)); 201 } 202 Poll::Pending => { 203 this.state = IOManagerState::IORecving; 204 return Poll::Pending; 205 } 206 }, 207 } 208 } 209 } 210 } 211 212 pub(crate) struct SendData { 213 pub(crate) buf: [u8; UDP_SEND_BUF_SIZE], 214 pub(crate) buf_size: usize, 215 pub(crate) offset: usize, 216 } 217 218 impl SendData { new() -> Self219 pub(crate) fn new() -> Self { 220 Self { 221 buf: [0u8; UDP_SEND_BUF_SIZE], 222 buf_size: 0, 223 offset: 0, 224 } 225 } 226 } 227