• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::future::Future;
15 use std::pin::Pin;
16 use std::sync::{Arc, Mutex};
17 use std::task::{Context, Poll};
18 
19 use ylong_runtime::time::{sleep, Sleep};
20 
21 use crate::async_impl::QuicConn;
22 use crate::runtime::{AsyncRead, AsyncWrite, ReadBuf, UnboundedReceiver, UnboundedSender};
23 use crate::util::dispatcher::http3::DispatchErrorKind;
24 use crate::util::h3::stream_manager::UPD_RECV_BUF_SIZE;
25 use crate::util::ConnInfo;
26 
27 const UDP_SEND_BUF_SIZE: usize = 1350;
28 
29 enum IOManagerState {
30     IORecving,
31     Timeout,
32     IOSending,
33     ChannelRecving,
34 }
35 
36 pub(crate) struct IOManager<S> {
37     io: S,
38     conn: Arc<Mutex<QuicConn>>,
39     io_manager_rx: UnboundedReceiver<Result<(), DispatchErrorKind>>,
40     stream_manager_tx: UnboundedSender<Result<(), DispatchErrorKind>>,
41     recv_timeout: Option<Pin<Box<Sleep>>>,
42     state: IOManagerState,
43     recv_buf: [u8; UPD_RECV_BUF_SIZE],
44     send_data: SendData,
45 }
46 
47 impl<S: AsyncRead + AsyncWrite + ConnInfo + Unpin + Sync + Send + 'static> IOManager<S> {
new( io: S, conn: Arc<Mutex<QuicConn>>, io_manager_rx: UnboundedReceiver<Result<(), DispatchErrorKind>>, stream_manager_tx: UnboundedSender<Result<(), DispatchErrorKind>>, ) -> Self48     pub(crate) fn new(
49         io: S,
50         conn: Arc<Mutex<QuicConn>>,
51         io_manager_rx: UnboundedReceiver<Result<(), DispatchErrorKind>>,
52         stream_manager_tx: UnboundedSender<Result<(), DispatchErrorKind>>,
53     ) -> Self {
54         Self {
55             io,
56             conn,
57             io_manager_rx,
58             stream_manager_tx,
59             recv_timeout: None,
60             state: IOManagerState::IORecving,
61             recv_buf: [0u8; UPD_RECV_BUF_SIZE],
62             send_data: SendData::new(),
63         }
64     }
poll_recv_signal( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<Result<(), DispatchErrorKind>, DispatchErrorKind>>65     fn poll_recv_signal(
66         &mut self,
67         cx: &mut Context<'_>,
68     ) -> Poll<Result<Result<(), DispatchErrorKind>, DispatchErrorKind>> {
69         #[cfg(feature = "tokio_base")]
70         match self.io_manager_rx.poll_recv(cx) {
71             Poll::Ready(None) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)),
72             Poll::Ready(Some(data)) => Poll::Ready(Ok(data)),
73             Poll::Pending => Poll::Pending,
74         }
75         #[cfg(feature = "ylong_base")]
76         match self.io_manager_rx.poll_recv(cx) {
77             Poll::Ready(Err(_e)) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)),
78             Poll::Ready(Ok(data)) => Poll::Ready(Ok(data)),
79             Poll::Pending => Poll::Pending,
80         }
81     }
82 
poll_io_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), DispatchErrorKind>>83     fn poll_io_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), DispatchErrorKind>> {
84         let mut buf = ReadBuf::new(&mut self.recv_buf);
85         if self.recv_timeout.is_none() {
86             if let Some(time) = self.conn.lock().unwrap().timeout() {
87                 self.recv_timeout = Some(Box::pin(sleep(time)));
88             };
89         }
90 
91         if let Some(delay) = self.recv_timeout.as_mut() {
92             if let Poll::Ready(()) = delay.as_mut().poll(cx) {
93                 self.recv_timeout = None;
94                 self.conn.lock().unwrap().on_timeout();
95                 self.state = IOManagerState::Timeout;
96                 return Poll::Ready(Ok(()));
97             }
98         }
99         match Pin::new(&mut self.io).poll_read(cx, &mut buf) {
100             Poll::Ready(Ok(())) => {
101                 let info = self.io.conn_data().detail();
102                 self.recv_timeout = None;
103                 let recv_info = quiche::RecvInfo {
104                     to: info.local,
105                     from: info.peer,
106                 };
107                 return match self.conn.lock().unwrap().recv(buf.filled_mut(), recv_info) {
108                     Ok(_) => {
109                         let _ = self.stream_manager_tx.send(Ok(()));
110                         // io recv once again
111                         Poll::Ready(Ok(()))
112                     }
113                     Err(e) => Poll::Ready(Err(DispatchErrorKind::Quic(e))),
114                 };
115             }
116             Poll::Ready(Err(e)) => Poll::Ready(Err(DispatchErrorKind::Io(e.kind()))),
117             Poll::Pending => {
118                 self.state = IOManagerState::IOSending;
119                 Poll::Pending
120             }
121         }
122     }
123 
poll_io_send(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), DispatchErrorKind>>124     fn poll_io_send(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), DispatchErrorKind>> {
125         loop {
126             // UDP buf has not been sent to the peer, send rest UDP buf first
127             if self.send_data.buf_size == self.send_data.offset {
128                 // Retrieve the data to be sent via UDP from the connection
129                 let size = match self.conn.lock().unwrap().send(&mut self.send_data.buf) {
130                     Ok((size, _)) => size,
131                     Err(quiche::Error::Done) => {
132                         self.state = IOManagerState::ChannelRecving;
133                         return Poll::Ready(Ok(()));
134                     }
135                     Err(e) => {
136                         return Poll::Ready(Err(DispatchErrorKind::Quic(e)));
137                     }
138                 };
139                 self.send_data.buf_size = size;
140                 self.send_data.offset = 0;
141             }
142 
143             match Pin::new(&mut self.io).poll_write(
144                 cx,
145                 &self.send_data.buf[self.send_data.offset..self.send_data.buf_size],
146             ) {
147                 Poll::Ready(Ok(size)) => {
148                     self.send_data.offset += size;
149                     if self.send_data.offset != self.send_data.buf_size {
150                         // loop to send UDP buf
151                         continue;
152                     } else {
153                         self.send_data.offset = 0;
154                         self.send_data.buf_size = 0;
155                     }
156                 }
157                 Poll::Ready(Err(e)) => {
158                     return Poll::Ready(Err(DispatchErrorKind::Io(e.kind())));
159                 }
160                 Poll::Pending => {
161                     self.state = IOManagerState::ChannelRecving;
162                     return Poll::Pending;
163                 }
164             }
165         }
166     }
167 }
168 
169 impl<S: AsyncRead + AsyncWrite + ConnInfo + Unpin + Sync + Send + 'static> Future for IOManager<S> {
170     type Output = Result<(), DispatchErrorKind>;
171 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>172     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
173         let this = self.get_mut();
174         this.state = IOManagerState::IORecving;
175         loop {
176             match this.state {
177                 IOManagerState::IORecving => {
178                     if let Poll::Ready(Err(e)) = this.poll_io_recv(cx) {
179                         return Poll::Ready(Err(e));
180                     }
181                 }
182                 IOManagerState::IOSending => {
183                     if let Poll::Ready(Err(e)) = this.poll_io_send(cx) {
184                         return Poll::Ready(Err(e));
185                     }
186                 }
187                 IOManagerState::Timeout => {
188                     if let Poll::Ready(Err(e)) = this.poll_io_send(cx) {
189                         return Poll::Ready(Err(e));
190                     }
191                     // ensure pending at io recv
192                     this.state = IOManagerState::IORecving;
193                 }
194                 IOManagerState::ChannelRecving => match this.poll_recv_signal(cx) {
195                     // won't recv Err now
196                     Poll::Ready(Ok(_)) => {
197                         continue;
198                     }
199                     Poll::Ready(Err(e)) => {
200                         return Poll::Ready(Err(e));
201                     }
202                     Poll::Pending => {
203                         this.state = IOManagerState::IORecving;
204                         return Poll::Pending;
205                     }
206                 },
207             }
208         }
209     }
210 }
211 
212 pub(crate) struct SendData {
213     pub(crate) buf: [u8; UDP_SEND_BUF_SIZE],
214     pub(crate) buf_size: usize,
215     pub(crate) offset: usize,
216 }
217 
218 impl SendData {
new() -> Self219     pub(crate) fn new() -> Self {
220         Self {
221             buf: [0u8; UDP_SEND_BUF_SIZE],
222             buf_size: 0,
223             offset: 0,
224         }
225     }
226 }
227